You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:17 UTC

[4/5] incubator-flink git commit: [streaming] Examples refactor and packaging update

[streaming] Examples refactor and packaging update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d332d6c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d332d6c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d332d6c3

Branch: refs/heads/master
Commit: d332d6c310a9be12b61065083218d7e4fdec1ab1
Parents: e7ce956
Author: mbalassi <mb...@apache.org>
Authored: Wed Nov 12 23:23:15 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  54 +-----
 .../examples/basictopology/BasicTopology.java   |  61 ------
 .../examples/iteration/IterateExample.java      | 106 ++++++++---
 .../ml/IncrementalLearningSkeleton.java         | 132 +++++++++----
 .../examples/twitter/TwitterStream.java         |  98 +++++-----
 .../examples/window/join/GradeSource.java       |  45 -----
 .../examples/window/join/SalarySource.java      |  45 -----
 .../examples/window/join/WindowJoin.java        | 162 ++++++++++++++++
 .../examples/window/join/WindowJoinLocal.java   |  49 -----
 .../examples/wordcount/PojoExample.java         | 185 +++++++++++++++++++
 .../examples/wordcount/PojoWordCount.java       | 169 -----------------
 .../streaming/examples/wordcount/WordCount.java |  13 +-
 12 files changed, 586 insertions(+), 533 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index f1480de..20e1500 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -65,7 +65,7 @@ under the License.
          			<executions>
            				<execution>
              					<id>unpack</id>
-             					<phase>package</phase>
+             					<phase>prepare-package</phase>
              					<goals>
                						<goal>unpack</goal>
              					</goals>
@@ -92,50 +92,6 @@ under the License.
 				<artifactId>maven-jar-plugin</artifactId>
 				
 				<executions>
-					<!-- Basic -->
-					<execution>
-						<id>Basic</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>Basic</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.basictopology.BasicTopology</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/basictopology/BasicTopology.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- CellInfo -->
-					<execution>
-						<id>CellInfo</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>CellInfo</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.cellinfo.CellInfoLocal</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/cellinfo/*.class</include>			
-							</includes>
-						</configuration>
-					</execution>
 
 					<!-- Iteration -->
 					<execution>
@@ -216,7 +172,7 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoinLocal</program-class>
+									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
 								</manifestEntries>
 							</archive>
 
@@ -238,13 +194,13 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.PojoWordCount</program-class>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount$*.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
 								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
 							</includes>
 						</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
deleted file mode 100755
index 9379ff2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * 
- * Very basic Flink streaming topology for local testing.
- *
- */
-public class BasicTopology {
-
-	private static final int PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-
-		// We create a new Local environment. The program will be executed using
-		// a new mini-cluster that is set up at execution.
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		// We create a new data stream from a collection of words then apply a
-		// simple map.
-		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
-		// Print the results
-		stream.print();
-
-		env.execute();
-	}
-
-	public static class IdentityMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		// map to the same value
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index a011837..f574718 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -23,70 +23,97 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
- * Example illustrating iterations in Flink streaming programs. The program will
- * sum up random numbers and counts how many additions it needs to reach a
- * specific threshold in an iterative streaming fashion.
- *
+ * Example illustrating iterations in Flink streaming.
+ * 
+ * <p>
+ * The program sums up random numbers and counts additions it performs to reach
+ * a specific threshold in an iterative streaming fashion.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>streaming iterations,
+ * <li>buffer timeout to enhance latency,
+ * <li>directed outputs.
+ * </ul>
  */
 public class IterateExample {
 
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
 	public static void main(String[] args) throws Exception {
 
-		// Set up our input for the stream of (0,0)s
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up input for the stream of (0,0) pairs
 		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
 		for (int i = 0; i < 1000; i++) {
 			input.add(new Tuple2<Double, Integer>(0., 0));
 		}
 
-		// Obtain execution environment and use setBufferTimeout(0) to enable
-		// continuous flushing of the output buffers (lowest latency).
+		// obtain execution environment and set setBufferTimeout(0) to enable
+		// continuous flushing of the output buffers (lowest latency)
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(0);
 
-		// Create an iterative datastream from the input.
+		// create an iterative data stream from the input
 		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
 				.iterate();
 
-		// We make sure that the iteration terminates if no new data received in
-		// the stream for 5 seconds
+		// trigger iteration termination if no new data received for 5 seconds
 		it.setMaxWaitTime(5000);
 
-		// We apply the stepfunction to add a new random value to the tuple and
-		// increment the counter, then we split the output with our
-		// outputselector.
+		// apply the step function to add new random value to the tuple and to
+		// increment the counter and split the output with the output selector
 		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
 				.split(new MySelector());
 
-		// We close the iteration be selecting the tuples that was directed to
-		// 'iterate' in the outputselector
+		// close the iteration by selecting the tuples that were directed to the
+		// 'iterate' channel in the output selector
 		it.closeWith(step.select("iterate"));
 
-		// To produce the final output we select he tuples directed to 'output'
-		// than project it to the second field
-		step.select("output").project(1).types(Integer.class).print();
+		// to produce the final output select the tuples directed to the
+		// 'output' channel then project it to the desired second field
+
+		DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
 
-		// Execute the streaming program
-		env.execute();
+		// emit result
+		if (fileOutput) {
+			numbers.writeAsText(outputPath, 1);
+		} else {
+			numbers.print();
+		}
+
+		// execute the program
+		env.execute("Streaming Iteration Example");
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	/**
 	 * Iteration step function which takes an input (Double , Integer) and
-	 * produces an output (Double+random, Integer +1)
-	 *
+	 * produces an output (Double + random, Integer + 1).
 	 */
 	public static class Step implements
 			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
-
 		private static final long serialVersionUID = 1L;
-
-		Random rnd;
+		private Random rnd;
 
 		public Step() {
 			rnd = new Random();
@@ -101,10 +128,9 @@ public class IterateExample {
 	}
 
 	/**
-	 * s OutputSelector test which tuple needed to be iterated again.
+	 * OutputSelector testing which tuple needs to be iterated again.
 	 */
 	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
-
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -117,4 +143,30 @@ public class IterateExample {
 		}
 
 	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IterateExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IterateExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IterateExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d2f9e6e..2def142 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -24,11 +24,69 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ * 
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Connected streams
+ * <li>CoFunctions
+ * <li>Tuple data types
+ * </ul>
+ */
 public class IncrementalLearningSkeleton {
 
-	// Source for feeding new data for prediction
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// build new model on every second of new data
+		DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
+				.reduceGroup(new PartialModelBuilder());
+
+		// use partial model for prediction
+		DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
+				.map(new Predictor());
+
+		// emit result
+		if (fileOutput) {
+			prediction.writeAsText(outputPath, 1);
+		} else {
+			prediction.print();
+		}
+
+		// execute program
+		env.execute("Streaming Incremental Learning");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Feeds new data for prediction. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
 	public static class NewDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
+		private static final int NEW_DATA_SLEEP_TIME = 1000;
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
@@ -37,39 +95,41 @@ public class IncrementalLearningSkeleton {
 			}
 		}
 
-		// Method for pulling new data for prediction
 		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(100);
+			Thread.sleep(NEW_DATA_SLEEP_TIME);
 			return 1;
 		}
 	}
 
-	// Source for feeding new training data for partial model building
+	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
 	public static class TrainingDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
+		private static final int TRAINING_DATA_SLEEP_TIME = 10;
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
-
 			while (true) {
 				collector.collect(getTrainingData());
 			}
 
 		}
 
-		// Method for pulling new training data
 		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(100);
+			Thread.sleep(TRAINING_DATA_SLEEP_TIME);
 			return 1;
 
 		}
 	}
 
-	// Task for building up-to-date partial models on new training data
+	/**
+	 * Builds up-to-date partial models on new training data.
+	 */
 	public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
 		private static final long serialVersionUID = 1L;
 
-		// Method for building partial model on the grouped training data
 		protected Double[] buildPartialModel(Iterable<Integer> values) {
 			return new Double[] { 1. };
 		}
@@ -80,8 +140,15 @@ public class IncrementalLearningSkeleton {
 		}
 	}
 
-	// Task for performing prediction using the model produced in
-	// batch-processing and the up-to-date partial model
+	/**
+	 * Creates prediction using the model produced in batch-processing and the
+	 * up-to-date partial model.
+	 * 
+	 * <p>
+	 * By defaults emits the Integer 0 for every prediction and the Integer 1
+	 * for every model update.
+	 * </p>
+	 */
 	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
 		private static final long serialVersionUID = 1L;
 
@@ -102,38 +169,41 @@ public class IncrementalLearningSkeleton {
 			return 1;
 		}
 
-		// Pulls model built with batch-job on the old training data
+		// pulls model built with batch-job on the old training data
 		protected Double[] getBatchModel() {
 			return new Double[] { 0. };
 		}
 
-		// Performs prediction using the two models
+		// performs prediction using the two models
 		protected Integer predict(Integer inTuple) {
 			return 0;
 		}
 
 	}
 
-	private static int SOURCE_PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
-
-		// Build new model on every second of new data
-		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
-				.window(5000).reduceGroup(new PartialModelBuilder());
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
 
-		// Use partial model for prediction
-		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
-				.connect(model).map(new Predictor());
+	private static boolean fileOutput = false;
+	private static String outputPath;
 
-		// We pring the output
-		prediction.print();
+	private static boolean parseParameters(String[] args) {
 
-		env.execute();
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
+		}
+		return true;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index ef1d84b..08aa5cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 
 /**
- * Implements the "TwitterStream" program that computes a most used word occurrence
- * over JSON files in a streaming fashion.
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
  * 
  * <p>
  * The input is a JSON text file with lines separated by newline characters.
@@ -45,91 +44,89 @@ import org.apache.sling.commons.json.JSONException;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
- * <li>write and use user-defined functions.
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
  * </ul>
  * 
  */
 public class TwitterStream {
-	
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 		if (!parseParameters(args)) {
 			return;
 		}
 
 		// set up the execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setBufferTimeout(1000);
-		
+
 		// get input data
 		DataStream<String> streamSource = getTextDataStream(env);
 
-		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				// selecting english tweets and split to words
-				.flatMap(new SelectEnglishAndTokenizeFlatMap())
-				.partitionBy(0)
+		DataStream<Tuple2<String, Integer>> tweets = streamSource
+		// selecting English tweets and splitting to words
+				.flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0)
 				// returning (word, 1)
 				.map(new MapFunction<String, Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public Tuple2<String, Integer> map(String value)
-							throws Exception {
+					public Tuple2<String, Integer> map(String value) throws Exception {
 						return new Tuple2<String, Integer>(value, 1);
 					}
 				})
 				// group by words and sum their occurence
-				.groupBy(0)
-				.sum(1)
-				// select maximum occurenced word
+				.groupBy(0).sum(1)
+				// select word with maximum occurence
 				.flatMap(new SelectMaxOccurence());
 
 		// emit result
-		dataStream.print();
+		if (fileOutput) {
+			tweets.writeAsText(outputPath, 1);
+		} else {
+			tweets.print();
+		}
 
 		// execute program
-		env.execute();
+		env.execute("Twitter Streaming Example");
 	}
-	
+
 	// *************************************************************************
 	// USER FUNCTIONS
 	// *************************************************************************
 
 	/**
-	 * Make sentence from english tweets.
+	 * Makes sentences from English tweets.
 	 * 
-	 * Implements the string tokenizer that splits sentences into words as a
+	 * <p>
+	 * Implements a string tokenizer that splits sentences into words as a
 	 * user-defined FlatMapFunction. The function takes a line (String) and
 	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
 	 * Integer>).
+	 * </p>
 	 */
-	public static class SelectEnglishAndTokenizeFlatMap extends
-			JSONParseFlatMap<String, String> {
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
 		private static final long serialVersionUID = 1L;
 
 		/**
 		 * Select the language from the incoming JSON text
 		 */
 		@Override
-		public void flatMap(String value, Collector<String> out)
-				throws Exception {
+		public void flatMap(String value, Collector<String> out) throws Exception {
 			try {
 				if (getString(value, "lang").equals("en")) {
 					// message of tweet
-					StringTokenizer tokenizer = new StringTokenizer(getString(
-							value, "text"));
+					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
 
 					// split the message
 					while (tokenizer.hasMoreTokens()) {
-						String result = tokenizer.nextToken().replaceAll(
-								"\\s*", "");
+						String result = tokenizer.nextToken().replaceAll("\\s*", "");
 
 						if (result != null && !result.equals("")) {
 							out.collect(result);
@@ -143,10 +140,9 @@ public class TwitterStream {
 	}
 
 	/**
-	 * 
-	 * Implements a user-defined FlatMapFunction that check if the word's current occurence
-	 * is higher than the maximum occurence. If it is, return with the word and change the maximum.
-	 *
+	 * Implements a user-defined FlatMapFunction that checks if the current
+	 * occurence is higher than the maximum occurence. If so, returns the word
+	 * and changes the maximum.
 	 */
 	public static class SelectMaxOccurence implements
 			FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@@ -158,29 +154,32 @@ public class TwitterStream {
 		}
 
 		@Override
-		public void flatMap(Tuple2<String, Integer> value,
-				Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
 			if ((Integer) value.getField(1) >= maximum) {
 				out.collect(value);
 				maximum = (Integer) value.getField(1);
 			}
 		}
 	}
-	
+
 	// *************************************************************************
 	// UTIL METHODS
 	// *************************************************************************
 
-	private static boolean fromFile = false;
-	private static String path;
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
 		if (args.length > 0) {
-			if (args.length == 1) {
-				fromFile = true;
-				path = args[0];
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
 			} else {
-				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile>");
+				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
 				return false;
 			}
 		} else {
@@ -191,11 +190,10 @@ public class TwitterStream {
 		return true;
 	}
 
-	private static DataStream<String> getTextDataStream(
-			StreamExecutionEnvironment env) {
-		if (fromFile) {
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
 			// read the text file from given input path
-			return env.addSource(new TwitterSource(path));
+			return env.readTextFile(textPath);
 		} else {
 			// get default test text data
 			return env.fromElements(TwitterStreamData.TEXTS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
deleted file mode 100644
index d877f91..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(5) + 1;
-			out.collect(outTuple);
-			Thread.sleep(rand.nextInt(10) + 1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
deleted file mode 100644
index 0da48e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = 6670933703432267728L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(10000);
-			out.collect(outTuple);
-			Thread.sleep(rand.nextInt(10) + 1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
new file mode 100644
index 0000000..6d8656c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ * 
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.windowJoin(salaries, 1000, 1000, 0, 0);
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public SalarySource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: WindowJoin <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
deleted file mode 100644
index 695cf5d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class WindowJoinLocal {
-
-	// This example will join two streams with a sliding window. One which emits
-	// people's grades and one which emits people's salaries.
-
-	public static void main(String[] args) throws Exception {
-
-		// Obtain execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// Connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
-		// Apply a temporal join over the two stream based on the names in one
-		// second windows
-		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
-				.windowJoin(salaries, 1000, 1000, 0, 0);
-
-		// Print the results
-		joinedStream.print();
-
-		env.execute();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
new file mode 100644
index 0000000..f377863
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the Tuple2
+ * type, but a custom class.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use POJO data types,
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions. 
+ * </ul>
+ */
+public class PojoExample {
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Word> counts =
+		// split up the lines into Word objects
+		text.flatMap(new Tokenizer())
+		// group by the field word and sum up the frequency
+				.groupBy("word").sum("frequency");
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount Pojo Example");
+	}
+
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used for all the
+	 * operations. As long as all fields are public or have a getter/setter, the
+	 * system can handle them
+	 */
+	public static class Word {
+
+		private String word;
+		private Integer frequency;
+
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + word + ", " + frequency + ")";
+		}
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: PojoExample <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing PojoExample example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: PojoExample <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
deleted file mode 100644
index e95f042..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of Wordcount without using the Tuple2
- * type, but a custom class.
- *
- */
-public class PojoWordCount {
-
-	/**
-	 * This is the POJO (Plain Old Java Object) that is being used for all the
-	 * operations. As long as all fields are public or have a getter/setter, the
-	 * system can handle them
-	 */
-	public static class Word {
-		// fields
-		private String word;
-		private Integer frequency;
-
-		// constructors
-		public Word() {
-		}
-
-		public Word(String word, int i) {
-			this.word = word;
-			this.frequency = i;
-		}
-
-		// getters setters
-		public String getWord() {
-			return word;
-		}
-
-		public void setWord(String word) {
-			this.word = word;
-		}
-
-		public Integer getFrequency() {
-			return frequency;
-		}
-
-		public void setFrequency(Integer frequency) {
-			this.frequency = frequency;
-		}
-
-		// to String
-		@Override
-		public String toString() {
-			return "Word=" + word + " freq=" + frequency;
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Word> counts =
-		// split up the lines into Word objects
-		text.flatMap(new Tokenizer())
-		// group by the field word and sum up the frequency
-		.groupBy("word")
-		.sum("frequency");
-
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount-Pojo Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Word> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Word> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Word(token, 1));
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index e07dfe5..085fe5f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -41,8 +41,8 @@ import org.apache.flink.util.Collector;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
  * <li>write and use user-defined functions.
  * </ul>
  * 
@@ -66,11 +66,10 @@ public class WordCount {
 		DataStream<String> text = getTextDataStream(env);
 
 		DataStream<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {