You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:14 UTC

[1/5] incubator-flink git commit: [streaming] Removed obsolete Join example

Repository: incubator-flink
Updated Branches:
  refs/heads/master 818ebda0f -> d332d6c31


[streaming] Removed obsolete Join example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/537d6f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/537d6f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/537d6f6d

Branch: refs/heads/master
Commit: 537d6f6da529d534f9c523c0b208861b381e3905
Parents: 818ebda
Author: Marton Balassi <mb...@apache.org>
Authored: Mon Nov 10 14:15:05 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:03 2014 +0100

----------------------------------------------------------------------
 .../streaming/examples/join/GradeSource.java    | 45 -----------
 .../streaming/examples/join/JoinLocal.java      | 53 ------------
 .../flink/streaming/examples/join/JoinTask.java | 84 --------------------
 .../streaming/examples/join/SalarySource.java   | 45 -----------
 4 files changed, 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
deleted file mode 100644
index 93bcdab..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(5) + 1;
-
-			out.collect(outTuple);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
deleted file mode 100644
index 717ad8e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class JoinLocal {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-
-	// This example will join two streams. One which emits people's grades and
-	// one which emits people's salaries.
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource(),
-				SOURCE_PARALLELISM);
-		
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource(),
-				SOURCE_PARALLELISM);
-
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
-				.flatMap(new JoinTask());
-		
-		System.out.println("(NAME, GRADE, SALARY)");
-		joinedStream.print();
-
-		env.execute();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
deleted file mode 100644
index 35e18f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-//Joins the input value with the already known values. If it is a grade
-// then with the salaries, if it is a salary then with the grades. Also
-// stores the new element.
-public class JoinTask extends
-		RichCoFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
-	private static final long serialVersionUID = 1L;
-
-	private HashMap<String, ArrayList<Integer>> gradeHashmap;
-	private HashMap<String, ArrayList<Integer>> salaryHashmap;
-	private String name;
-
-	public JoinTask() {
-		gradeHashmap = new HashMap<String, ArrayList<Integer>>();
-		salaryHashmap = new HashMap<String, ArrayList<Integer>>();
-		name = new String();
-	}
-
-	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-
-	// GRADES
-	@Override
-	public void flatMap1(Tuple2<String, Integer> value,
-			Collector<Tuple3<String, Integer, Integer>> out) {
-		name = value.f0;
-		outputTuple.f0 = name;
-		outputTuple.f1 = value.f1;
-		if (salaryHashmap.containsKey(name)) {
-			for (Integer salary : salaryHashmap.get(name)) {
-				outputTuple.f2 = salary;
-				out.collect(outputTuple);
-			}
-		}
-		if (!gradeHashmap.containsKey(name)) {
-			gradeHashmap.put(name, new ArrayList<Integer>());
-		}
-		gradeHashmap.get(name).add(value.f1);
-	}
-
-	// SALARIES
-	@Override
-	public void flatMap2(Tuple2<String, Integer> value,
-			Collector<Tuple3<String, Integer, Integer>> out) {
-		name = value.f0;
-		outputTuple.f0 = name;
-		outputTuple.f2 = value.f1;
-		if (gradeHashmap.containsKey(name)) {
-			for (Integer grade : gradeHashmap.get(name)) {
-				outputTuple.f1 = grade;
-				out.collect(outputTuple);
-			}
-		}
-		if (!salaryHashmap.containsKey(name)) {
-			salaryHashmap.put(name, new ArrayList<Integer>());
-		}
-		salaryHashmap.get(name).add(value.f1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
deleted file mode 100644
index 988935f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(10000);
-			out.collect(outTuple);
-		}
-	}
-}


[3/5] incubator-flink git commit: [streaming] Example cleanup + windowJoin fix

Posted by mb...@apache.org.
[streaming] Example cleanup + windowJoin fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7ce9567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7ce9567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7ce9567

Branch: refs/heads/master
Commit: e7ce9567c57ffc59e5a7588445b6b891bc58e5f8
Parents: 6867f9b
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 12:32:32 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |   6 +
 .../examples/basictopology/BasicTopology.java   |  98 ++++++++-------
 .../examples/cellinfo/CellInfoLocal.java        | 123 ------------------
 .../examples/cellinfo/IWorkerEngine.java        |  23 ----
 .../flink/streaming/examples/cellinfo/Util.java |  28 -----
 .../examples/cellinfo/WorkerEngineBin.java      |  97 --------------
 .../examples/cellinfo/WorkerEngineExact.java    |  76 -----------
 .../examples/iteration/IterateExample.java      |  80 ++++++++----
 .../ml/IncrementalLearningSkeleton.java         |  15 ++-
 .../examples/twitter/TwitterStream.java         |   5 +-
 .../examples/window/join/GradeSource.java       |  14 +--
 .../examples/window/join/SalarySource.java      |  14 +--
 .../examples/window/join/WindowJoinLocal.java   |  25 ++--
 .../examples/window/join/WindowJoinTask.java    | 126 -------------------
 14 files changed, 148 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 23c420c..6556550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -580,6 +580,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
 
+		dataStream1 = dataStream1.groupBy(fieldIn1);
+		dataStream2 = dataStream2.groupBy(fieldIn2);
+
 		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
 				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
 
@@ -590,6 +593,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
 
+		dataStream1 = dataStream1.groupBy(fieldIn1);
+		dataStream2 = dataStream2.groupBy(fieldIn2);
+
 		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
 				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 2b1ac4f..9379ff2 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -1,49 +1,61 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
+ */
+
+package org.apache.flink.streaming.examples.basictopology;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class BasicTopology {
-
-	public static class IdentityMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-		// map to the same value
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-
-	}
-
-	private static final int PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
-		stream.print();
-
-		env.execute();
-	}
-}
+
+/**
+ * 
+ * Very basic Flink streaming topology for local testing.
+ *
+ */
+public class BasicTopology {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+
+		// We create a new Local environment. The program will be executed using
+		// a new mini-cluster that is set up at execution.
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		// We create a new data stream from a collection of words then apply a
+		// simple map.
+		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
+
+		// Print the results
+		stream.print();
+
+		env.execute();
+	}
+
+	public static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		// map to the same value
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
deleted file mode 100644
index 1dcb5e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class CellInfoLocal {
-
-	private final static int CELL_COUNT = 10;
-	private final static int LAST_MILLIS = 1000;
-	private final static int PARALLELISM = 1;
-	private final static int SOURCE_PARALLELISM = 1;
-	private final static int QUERY_SLEEP_TIME = 1000;
-	private final static int QUERY_COUNT = 10;
-	private final static int INFO_SLEEP_TIME = 100;
-	private final static int INFO_COUNT = 100;
-
-	public final static class InfoSource implements SourceFunction<Tuple3<Integer, Long, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private static Random rand = new Random();
-		private Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L, 0);
-
-		@Override
-		public void invoke(Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
-			for (int i = 0; i < INFO_COUNT; i++) {
-				Thread.sleep(INFO_SLEEP_TIME);
-				tuple.f0 = rand.nextInt(CELL_COUNT);
-				tuple.f1 = System.currentTimeMillis();
-
-				out.collect(tuple);
-			}
-		}
-	}
-
-	private final static class QuerySource implements
-			SourceFunction<Tuple3<Integer, Long, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private static Random rand = new Random();
-		Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L,
-				LAST_MILLIS);
-
-		@Override
-		public void invoke(Collector<Tuple3<Integer, Long, Integer>> collector) throws Exception {
-			for (int i = 0; i < QUERY_COUNT; i++) {
-				Thread.sleep(QUERY_SLEEP_TIME);
-				tuple.f0 = rand.nextInt(CELL_COUNT);
-				tuple.f1 = System.currentTimeMillis();
-				collector.collect(tuple);
-			}
-		}
-	}
-
-	private final static class CellTask	extends
-			RichCoMapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		private WorkerEngineExact engine;
-		private Integer cellID;
-		private Long timeStamp;
-		private Integer lastMillis;
-
-		public CellTask() {
-			engine = new WorkerEngineExact(CELL_COUNT, LAST_MILLIS, System.currentTimeMillis());
-		}
-
-		// INFO
-		@Override
-		public String map1(Tuple3<Integer, Long, Integer> value) {
-			cellID = value.f0;
-			timeStamp = value.f1;
-			engine.put(cellID, timeStamp);
-			return "INFO:\t" + cellID + " @ " + timeStamp;
-		}
-
-		// QUERY
-		@Override
-		public String map2(Tuple3<Integer, Long, Integer> value) {
-			cellID = value.f0;
-			timeStamp = value.f1;
-			lastMillis = value.f2;
-			return "QUERY:\t" + cellID + ": " + engine.get(timeStamp, lastMillis, cellID);
-		}
-	}
-
-	// Example for connecting data streams
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple3<Integer, Long, Integer>> querySource = env.addSource(new QuerySource(),
-				SOURCE_PARALLELISM).partitionBy(0);
-
-		DataStream<String> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
-				.partitionBy(0).connect(querySource).map(new CellTask());
-		stream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
deleted file mode 100644
index b2b2264..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public interface IWorkerEngine {
-	public int get(long timeStamp, long lastMillis, int cellId);
-	public void put(int cellId, long timeStamp);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
deleted file mode 100644
index 7463c6c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public class Util {
-	public static int mod(int x, int y) {
-		int result = x % y;
-		if (result < 0) {
-			result += y;
-		}
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
deleted file mode 100644
index b6097f5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import org.apache.flink.streaming.examples.cellinfo.Util;
-
-public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
-
-	private static final long serialVersionUID = 1L;
-	private long unitLength_;
-	private long lastTimeUpdated_;
-	private int[][] counters_;
-	private int pointer_;
-
-	public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
-		lastTimeUpdated_ = currentTime / unitLength * unitLength;
-		unitLength_ = unitLength;
-		counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
-	}
-
-	private int getCell(int interval, int cell) {
-		return counters_[Util.mod((interval + pointer_), counters_.length)][cell];
-	}
-
-	private void incrCell(int interval, int cell) {
-		++counters_[Util.mod((interval + pointer_), counters_.length)][cell];
-	}
-
-	private void toZero(int interval) {
-		for (int cell = 0; cell < counters_[0].length; ++cell) {
-			counters_[Util.mod((interval + pointer_), counters_.length)][cell] = 0;
-		}
-	}
-
-	private void shift(int shiftBy) {
-		if (shiftBy > 0 && shiftBy < counters_.length) {
-			pointer_ = Util.mod((pointer_ - shiftBy), counters_.length);
-			for (int i = 0; i < shiftBy; ++i) {
-				toZero(i);
-			}
-		} else if (shiftBy >= counters_.length) {
-			pointer_ = 0;
-			for (int i = 0; i < counters_.length; ++i) {
-				toZero(i);
-			}
-		}
-	}
-
-	public int get(long timeStamp, long lastMillis, int cellId) {
-		int shift = refresh(timeStamp);
-		int numOfLastIntervals = (int) (lastMillis / unitLength_);
-		if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
-			return -1;
-		}
-		int sum = 0;
-		for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
-			sum += getCell(i, cellId);
-		}
-		return sum;
-	}
-
-	private int refresh(long timeStamp) {
-		int shiftBy = (int) ((timeStamp - lastTimeUpdated_) / unitLength_);
-		shift(shiftBy);
-		int retVal;
-		if (shiftBy > 0) {
-			lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
-			retVal = 0;
-		} else {
-			retVal = -shiftBy;
-		}
-		return retVal;
-	}
-
-	public void put(int cellId, long timeStamp) {
-		int shift = refresh(timeStamp);
-		if (shift >= counters_.length) {
-			return;
-		}
-		incrCell(shift, cellId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
deleted file mode 100644
index 8a9bcda..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
-	private static final long serialVersionUID = 1L;
-	private long lastTimeUpdated_;
-	private long bufferInterval_;
-	private TreeMap<Long, Integer>[] counters_;
-
-	@SuppressWarnings("unchecked")
-	public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
-		lastTimeUpdated_ = currentTime;
-		bufferInterval_ = bufferInterval;
-		counters_ = new TreeMap[numOfCells];
-		for (int i = 0; i < numOfCells; ++i) {
-			counters_[i] = new TreeMap<Long, Integer>();
-		}
-	}
-
-	public int get(long timeStamp, long lastMillis, int cellId) {
-		refresh(timeStamp);
-		Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
-		int retVal = 0;
-		for (Map.Entry<Long, Integer> entry : subMap.entrySet()) {
-			retVal += entry.getValue();
-		}
-		return retVal;
-	}
-
-	public void put(int cellId, long timeStamp) {
-		refresh(timeStamp);
-		TreeMap<Long, Integer> map = counters_[cellId];
-		if (map.containsKey(timeStamp)) {
-			map.put(timeStamp, map.get(timeStamp) + 1);
-		} else {
-			map.put(timeStamp, 1);
-		}
-	}
-
-	public void refresh(long timeStamp) {
-		if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
-			for (int i = 0; i < counters_.length; ++i) {
-				for (Iterator<Map.Entry<Long, Integer>> it = counters_[i].entrySet().iterator(); it.hasNext();) {
-					Map.Entry<Long, Integer> entry = it.next();
-					long time = entry.getKey();
-					if (timeStamp - time > bufferInterval_) {
-						it.remove();
-					} else {
-						break;
-					}
-				}
-			}
-			lastTimeUpdated_ = timeStamp;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index e2094fb..a011837 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -29,13 +29,63 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+/**
+ * Example illustrating iterations in Flink streaming programs. The program will
+ * sum up random numbers and counts how many additions it needs to reach a
+ * specific threshold in an iterative streaming fashion.
+ *
+ */
 public class IterateExample {
 
+	public static void main(String[] args) throws Exception {
+
+		// Set up our input for the stream of (0,0)s
+		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
+		for (int i = 0; i < 1000; i++) {
+			input.add(new Tuple2<Double, Integer>(0., 0));
+		}
+
+		// Obtain execution environment and use setBufferTimeout(0) to enable
+		// continuous flushing of the output buffers (lowest latency).
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+				.setBufferTimeout(0);
+
+		// Create an iterative datastream from the input.
+		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+				.iterate();
+
+		// We make sure that the iteration terminates if no new data received in
+		// the stream for 5 seconds
+		it.setMaxWaitTime(5000);
+
+		// We apply the stepfunction to add a new random value to the tuple and
+		// increment the counter, then we split the output with our
+		// outputselector.
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+				.split(new MySelector());
+
+		// We close the iteration be selecting the tuples that was directed to
+		// 'iterate' in the outputselector
+		it.closeWith(step.select("iterate"));
+
+		// To produce the final output we select he tuples directed to 'output'
+		// than project it to the second field
+		step.select("output").project(1).types(Integer.class).print();
+
+		// Execute the streaming program
+		env.execute();
+	}
+
+	/**
+	 * Iteration step function which takes an input (Double , Integer) and
+	 * produces an output (Double+random, Integer +1)
+	 *
+	 */
 	public static class Step implements
 			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
 
 		private static final long serialVersionUID = 1L;
-	
+
 		Random rnd;
 
 		public Step() {
@@ -50,13 +100,16 @@ public class IterateExample {
 
 	}
 
+	/**
+	 * s OutputSelector test which tuple needed to be iterated again.
+	 */
 	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
-			if (value.f0 > 100) {
+			if (value.f0 > 200) {
 				outputs.add("output");
 			} else {
 				outputs.add("iterate");
@@ -64,27 +117,4 @@ public class IterateExample {
 		}
 
 	}
-	
-
-	public static void main(String[] args) throws Exception {
-
-		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
-		for (int i = 0; i < 100; i++) {
-			input.add(new Tuple2<Double, Integer>(0., 0));
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
-				.setBufferTimeout(1);
-
-		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
-				.setMaxWaitTime(3000);
-		
-		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
-		
-		it.closeWith(step.select("iterate"));
-		
-		step.select("output").project(1).types(Integer.class).print();
-
-		env.execute();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 9fe1af4..d2f9e6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -39,7 +39,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new data for prediction
 		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			return 1;
 		}
 	}
@@ -59,7 +59,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new training data
 		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			return 1;
 
 		}
@@ -114,13 +114,13 @@ public class IncrementalLearningSkeleton {
 
 	}
 
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
+	private static int SOURCE_PARALLELISM = 1;
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(1000);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
 
 		// Build new model on every second of new data
 		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
@@ -130,11 +130,10 @@ public class IncrementalLearningSkeleton {
 		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
 				.connect(model).map(new Predictor());
 
+		// We pring the output
 		prediction.print();
 
 		env.execute();
 	}
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 5d8022a..ef1d84b 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -32,7 +32,7 @@ import org.apache.sling.commons.json.JSONException;
 
 /**
  * Implements the "TwitterStream" program that computes a most used word occurrence
- * histogram over JSON files in a streaming fashion.
+ * over JSON files in a streaming fashion.
  * 
  * <p>
  * The input is a JSON text file with lines separated by newline characters.
@@ -52,7 +52,6 @@ import org.apache.sling.commons.json.JSONException;
  * 
  */
 public class TwitterStream {
-	private static final int PARALLELISM = 1;
 	
 	// *************************************************************************
 	// PROGRAM
@@ -65,7 +64,7 @@ public class TwitterStream {
 
 		// set up the execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
+				.getExecutionEnvironment();
 
 		env.setBufferTimeout(1000);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
index c895c5b..d877f91 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
 
 	private static final long serialVersionUID = -5897483980082089771L;
 
 	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
 	private Random rand = new Random();
-	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
-	private Long progress = 0L;
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
 		// Continuously emit tuples with random names and integers (grades).
 		while (true) {
 			outTuple.f0 = names[rand.nextInt(names.length)];
 			outTuple.f1 = rand.nextInt(5) + 1;
-			outTuple.f2 = progress;
 			out.collect(outTuple);
-			progress += 1;
+			Thread.sleep(rand.nextInt(10) + 1);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
index 641a738..0da48e5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
 
 	private static final long serialVersionUID = 6670933703432267728L;
 
 	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
 	private Random rand = new Random();
-	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
-	private Long progress = 0L;
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
 		// Continuously emit tuples with random names and integers (salaries).
 		while (true) {
 			outTuple.f0 = names[rand.nextInt(names.length)];
 			outTuple.f1 = rand.nextInt(10000);
-			outTuple.f2 = progress;
 			out.collect(outTuple);
-			progress += 1;
+			Thread.sleep(rand.nextInt(10) + 1);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 211daf6..695cf5d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -17,33 +17,30 @@
 
 package org.apache.flink.streaming.examples.window.join;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class WindowJoinLocal {
 
-	private static final int PARALLELISM = 4;
-	private static final int SOURCE_PARALLELISM = 2;
-
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
-				SOURCE_PARALLELISM);
+		// Obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
-				SOURCE_PARALLELISM);
+		// Connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
 
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
-				.flatMap(new WindowJoinTask());
+		// Apply a temporal join over the two stream based on the names in one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.windowJoin(salaries, 1000, 1000, 0, 0);
 
-		System.out.println("(NAME, GRADE, SALARY)");
+		// Print the results
 		joinedStream.print();
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
deleted file mode 100644
index 524ffbd..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinTask extends
-		RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
-
-	class SalaryProgress {
-		public SalaryProgress(Integer salary, Long progress) {
-			this.salary = salary;
-			this.progress = progress;
-		}
-
-		Integer salary;
-		Long progress;
-	}
-
-	class GradeProgress {
-		public GradeProgress(Integer grade, Long progress) {
-			this.grade = grade;
-			this.progress = progress;
-		}
-
-		Integer grade;
-		Long progress;
-	}
-
-	private static final long serialVersionUID = 749913336259789039L;
-	private int windowSize = 100;
-	private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
-	private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
-	private String name;
-	private Long progress;
-	
-	public WindowJoinTask() {
-		gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
-		salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
-		name = new String();
-		progress = 0L;
-	}
-
-	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-	
-	// Joins the input value (grade) with the already known values (salaries) on
-	// a given interval.
-	// Also stores the new element.
-	@Override
-	public void flatMap1(Tuple3<String, Integer, Long> value,
-			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		name = value.f0;
-		progress = value.f2;
-
-		outputTuple.f0 = name;
-		outputTuple.f1 = value.f1;
-		
-		if (salaryHashmap.containsKey(name)) {
-			Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
-			while (iterator.hasNext()) {
-				SalaryProgress entry = iterator.next();
-				if (progress - entry.progress > windowSize) {
-					iterator.remove(); 
-				} else {
-					outputTuple.f2 = entry.salary;
-					out.collect(outputTuple);
-				}
-			}
-		}
-		if (!gradeHashmap.containsKey(name)) {
-			gradeHashmap.put(name, new LinkedList<GradeProgress>());
-		}
-		gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
-	}
-
-	// Joins the input value (salary) with the already known values (grades) on
-	// a given interval.
-	// Also stores the new element.
-	@Override
-	public void flatMap2(Tuple3<String, Integer, Long> value,
-			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		name = value.f0;
-		progress = value.f2;
-
-		outputTuple.f0 = name;
-		outputTuple.f2 = value.f1;
-		
-		if (gradeHashmap.containsKey(name)) {
-			Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
-			while (iterator.hasNext()) {
-				GradeProgress entry = iterator.next();
-				if (progress - entry.progress > windowSize) {
-					iterator.remove();
-				} else {
-					outputTuple.f1 = entry.grade;
-					out.collect(outputTuple);
-				}
-			}
-		}
-		if (!salaryHashmap.containsKey(name)) {
-			salaryHashmap.put(name, new LinkedList<SalaryProgress>());
-		}
-		salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
-	}
-}


[2/5] incubator-flink git commit: [streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment

Posted by mb...@apache.org.
[streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6867f9b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6867f9b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6867f9b9

Branch: refs/heads/master
Commit: 6867f9b93ec1ad9a627450c4fbd0b5ff98ef6148
Parents: c6dd9b1
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 01:11:36 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 18 -----
 .../flink/streaming/api/StreamConfig.java       | 73 +++++++++++--------
 .../api/environment/LocalStreamEnvironment.java | 17 ++---
 .../environment/RemoteStreamEnvironment.java    | 40 ++++++-----
 .../environment/StreamContextEnvironment.java   | 75 ++++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 69 +++++-------------
 .../api/streamvertex/CoStreamVertex.java        |  9 ++-
 .../api/streamvertex/InputHandler.java          |  7 +-
 .../api/streamvertex/OutputHandler.java         |  8 ++-
 .../api/streamvertex/StreamVertex.java          |  9 ++-
 .../flink/streaming/util/ClusterUtil.java       | 40 +----------
 .../api/streamvertex/StreamVertexTest.java      |  5 --
 .../examples/iteration/IterateExample.java      |  4 +-
 .../client/program/ContextEnvironment.java      |  4 ++
 .../flink/client/program/JobWithJars.java       |  2 +-
 15 files changed, 193 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 5a8fd22..df59be1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -80,8 +80,6 @@ public class JobGraphBuilder {
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
 
-	private int degreeOfParallelism;
-	private int executionParallelism;
 
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -120,22 +118,6 @@ public class JobGraphBuilder {
 		}
 	}
 
-	public int getDefaultParallelism() {
-		return degreeOfParallelism;
-	}
-
-	public void setDefaultParallelism(int defaultParallelism) {
-		this.degreeOfParallelism = defaultParallelism;
-	}
-
-	public int getExecutionParallelism() {
-		return executionParallelism;
-	}
-
-	public void setExecutionParallelism(int executionParallelism) {
-		this.executionParallelism = executionParallelism;
-	}
-
 	/**
 	 * Adds a vertex to the streaming JobGraph with the given parameters
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7da6265..3dba376 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig {
 	private static final String INPUT_TYPE = "inputType_";
@@ -98,20 +99,20 @@ public class StreamConfig {
 		setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn1() {
-		return getTypeInfo(TYPE_WRAPPER_IN_1);
+	public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn2() {
-		return getTypeInfo(TYPE_WRAPPER_IN_2);
+	public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut1() {
-		return getTypeInfo(TYPE_WRAPPER_OUT_1);
+	public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut2() {
-		return getTypeInfo(TYPE_WRAPPER_OUT_2);
+	public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
 	}
 
 	private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
@@ -119,18 +120,17 @@ public class StreamConfig {
 	}
 
 	@SuppressWarnings("unchecked")
-	private <T> TypeInformation<T> getTypeInfo(String key) {
-		byte[] serializedWrapper = config.getBytes(key, null);
+	private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
 
-		if (serializedWrapper == null) {
-			throw new RuntimeException("TypeSerializationWrapper must be set");
+		TypeWrapper<T> typeWrapper;
+		try {
+			typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
+					cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot load typeinfo");
 		}
-
-		TypeWrapper<T> typeWrapper = (TypeWrapper<T>) SerializationUtils
-				.deserialize(serializedWrapper);
 		if (typeWrapper != null) {
 			return typeWrapper.getTypeInfo();
-
 		} else {
 			return null;
 		}
@@ -166,9 +166,10 @@ public class StreamConfig {
 		}
 	}
 
-	public <T> T getUserInvokable() {
+	@SuppressWarnings({ "unchecked" })
+	public <T> T getUserInvokable(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(SERIALIZEDUDF, null));
+			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot instantiate user function", e);
 		}
@@ -189,10 +190,10 @@ public class StreamConfig {
 		}
 	}
 
-	public Object getFunction() {
+	public Object getFunction(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
-		} catch (SerializationException e) {
+			return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
+		} catch (Exception e) {
 			throw new RuntimeException("Cannot deserialize invokable object", e);
 		}
 	}
@@ -216,9 +217,11 @@ public class StreamConfig {
 		}
 	}
 
-	public <T> OutputSelector<T> getOutputSelector() {
+	@SuppressWarnings("unchecked")
+	public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(OUTPUT_SELECTOR, null));
+			return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					OUTPUT_SELECTOR, cl);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
 		}
@@ -254,10 +257,16 @@ public class StreamConfig {
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
-			IOException {
-		return SerializationUtils.deserialize(config.getBytes(PARTITIONER_OBJECT + outputIndex,
-				SerializationUtils.serialize(new ShufflePartitioner<T>())));
+	public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex)
+			throws ClassNotFoundException, IOException {
+		@SuppressWarnings("unchecked")
+		StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil
+				.readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl);
+		if (partitioner != null) {
+			return partitioner;
+		} else {
+			return new ShufflePartitioner<T>();
+		}
 	}
 
 	public void setSelectAll(int outputIndex, Boolean selectAll) {
@@ -323,8 +332,14 @@ public class StreamConfig {
 		config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
 	}
 
-	public Map<String, OperatorState<?>> getOperatorStates() {
-		return SerializationUtils.deserialize(config.getBytes(OPERATOR_STATES, null));
+	@SuppressWarnings("unchecked")
+	public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
+		try {
+			return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
+					this.config, OPERATOR_STATES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not load operator state");
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5c0f555..505f0e7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -21,13 +21,15 @@ import org.apache.flink.streaming.util.ClusterUtil;
 
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+	protected static ClassLoader userClassLoader;
+
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
 	 * default name.
 	 */
 	@Override
 	public void execute() throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
 	}
 
 	/**
@@ -39,19 +41,12 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public void execute(String jobName) throws Exception {
-		if (localExecutionIsAllowed()) {
-			ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
-					getExecutionParallelism());
-		} else {
-			ClusterUtil.runOnLocalCluster(this.jobGraphBuilder.getJobGraph(jobName),
-					getExecutionParallelism());
-		}
-
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+				getDegreeOfParallelism());
 	}
 
 	public void executeTest(long memorySize) throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
 				memorySize);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 864e18d..d833c8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -30,13 +29,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
 	private String host;
 	private int port;
-	private String[] jarFiles;
+	private List<File> jarFiles;
 
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
@@ -65,19 +66,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 		this.host = host;
 		this.port = port;
-		this.jarFiles = jarFiles;
+		this.jarFiles = new ArrayList<File>();
+		for (int i = 0; i < jarFiles.length; i++) {
+			File file = new File(jarFiles[i]);
+			try {
+				JobWithJars.checkJarFile(file);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+			}
+			this.jarFiles.add(file);
+		}
 	}
 
 	@Override
 	public void execute() {
-		
+
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph();
 		executeRemotely(jobGraph);
 	}
-	
+
 	@Override
 	public void execute(String jobName) {
-		
+
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
 		executeRemotely(jobGraph);
 	}
@@ -85,25 +95,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	/**
 	 * Executes the remote job.
 	 * 
-	 * @param jobGraph jobGraph to execute
+	 * @param jobGraph
+	 *            jobGraph to execute
 	 */
 	private void executeRemotely(JobGraph jobGraph) {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		for (int i = 0; i < jarFiles.length; i++) {
-			File file = new File(jarFiles[i]);
-			try {
-				JobWithJars.checkJarFile(file);
-			} catch (IOException e) {
-				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
-			}
+		for (File file : jarFiles) {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}
 
 		Configuration configuration = jobGraph.getJobConfiguration();
-		Client client = new Client(new InetSocketAddress(host, port), configuration, getClass().getClassLoader());
+		Client client = new Client(new InetSocketAddress(host, port), configuration,
+				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
 
 		try {
 			client.run(jobGraph, true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
new file mode 100644
index 0000000..c157435
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class StreamContextEnvironment extends StreamExecutionEnvironment {
+
+	protected static ClassLoader userClassLoader;
+	protected List<File> jars;
+	protected Client client;
+
+	protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+		this.client = client;
+		this.jars = jars;
+		if (dop > 0) {
+			setDegreeOfParallelism(dop);
+		} else {
+			setDegreeOfParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+		}
+	}
+
+	@Override
+	public void execute() throws Exception {
+		execute(null);
+	}
+
+	@Override
+	public void execute(String jobName) throws Exception {
+
+		JobGraph jobGraph;
+		if (jobName == null) {
+			jobGraph = this.jobGraphBuilder.getJobGraph();
+		} else {
+			jobGraph = this.jobGraphBuilder.getJobGraph(jobName);
+		}
+
+		for (File file : jars) {
+			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		}
+
+		try {
+			client.run(jobGraph, true);
+
+		} catch (Exception e) {
+			throw e;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0d00db3..600a87a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,21 +47,10 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
  */
 public abstract class StreamExecutionEnvironment {
 
-	/**
-	 * The environment of the context (local by default, cluster if invoked
-	 * through command line)
-	 */
-	private static StreamExecutionEnvironment contextEnvironment;
-
-	/** flag to disable local executor when using the ContextEnvironment */
-	private static boolean allowLocalExecution = true;
-
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
 
 	private int degreeOfParallelism = 1;
 
-	private int executionParallelism = -1;
-
 	private long bufferTimeout = 100;
 
 	protected JobGraphBuilder jobGraphBuilder;
@@ -74,10 +66,6 @@ public abstract class StreamExecutionEnvironment {
 		jobGraphBuilder = new JobGraphBuilder();
 	}
 
-	public int getExecutionParallelism() {
-		return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
-	}
-
 	/**
 	 * Gets the degree of parallelism with which operation are executed by
 	 * default. Operations can individually override this value to use a
@@ -143,21 +131,6 @@ public abstract class StreamExecutionEnvironment {
 		return this.bufferTimeout;
 	}
 
-	/**
-	 * Sets the number of hardware contexts (CPU cores / threads) used when
-	 * executed in {@link LocalStreamEnvironment}.
-	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism in local environment
-	 */
-	public void setExecutionParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
-		}
-
-		this.executionParallelism = degreeOfParallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Data stream creations
 	// --------------------------------------------------------------------------------------------
@@ -351,8 +324,19 @@ public abstract class StreamExecutionEnvironment {
 	 *         executed.
 	 */
 	public static StreamExecutionEnvironment getExecutionEnvironment() {
-		allowLocalExecution = ExecutionEnvironment.localExecutionIsAllowed();
-		return createLocalEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (env instanceof ContextEnvironment) {
+			ContextEnvironment ctx = (ContextEnvironment) env;
+			return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+					ctx.getDegreeOfParallelism());
+		} else {
+			return createLocalEnvironment();
+		}
+	}
+
+	private static StreamExecutionEnvironment createContextEnvironment(Client client,
+			List<File> jars, int dop) {
+		return new StreamContextEnvironment(client, jars, dop);
 	}
 
 	/**
@@ -440,27 +424,6 @@ public abstract class StreamExecutionEnvironment {
 		return rec;
 	}
 
-	// --------------------------------------------------------------------------------------------
-	// Methods to control the context and local environments for execution from
-	// packaged programs
-	// --------------------------------------------------------------------------------------------
-
-	protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
-		contextEnvironment = ctx;
-	}
-
-	protected static boolean isContextEnvironmentSet() {
-		return contextEnvironment != null;
-	}
-
-	protected static void disableLocalExecution() {
-		allowLocalExecution = false;
-	}
-
-	public static boolean localExecutionIsAllowed() {
-		return allowLocalExecution;
-	}
-
 	/**
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 5a6519d..2464ff2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -29,8 +29,7 @@ import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.io.CoRecordReader;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class CoStreamVertex<IN1, IN2, OUT> extends
-		StreamVertex<IN1,OUT> {
+public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
 	private OutputHandler<OUT> outputHandler;
 
@@ -53,10 +52,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
 	}
 
 	private void setDeserializers() {
-		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
 		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
 
-		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
 		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
 	}
 
@@ -72,7 +71,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
 
 	@Override
 	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
+		userInvokable = configuration.getUserInvokable(userClassLoader);
 		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
 				inputDeserializer2, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 17d2ae5..9d65a21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -35,10 +35,10 @@ public class InputHandler<IN> {
 	private MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private MutableReader<IOReadableWritable> inputs;
 
-	private StreamVertex<IN,?> streamVertex;
+	private StreamVertex<IN, ?> streamVertex;
 	private StreamConfig configuration;
 
-	public InputHandler(StreamVertex<IN,?> streamComponent) {
+	public InputHandler(StreamVertex<IN, ?> streamComponent) {
 		this.streamVertex = streamComponent;
 		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
 		try {
@@ -75,7 +75,8 @@ public class InputHandler<IN> {
 	}
 
 	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+		TypeInformation<IN> inTupleTypeInfo = configuration
+				.getTypeInfoIn1(streamVertex.userClassLoader);
 		if (inTupleTypeInfo != null) {
 			inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 8b72195..d8eb146 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -81,7 +81,8 @@ public class OutputHandler<OUT> {
 
 	private StreamCollector<OUT> setCollector() {
 		if (streamVertex.configuration.getDirectedEmit()) {
-			OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+			OutputSelector<OUT> outputSelector = streamVertex.configuration
+					.getOutputSelector(streamVertex.userClassLoader);
 
 			collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
 					outSerializationDelegate, outputSelector);
@@ -97,7 +98,7 @@ public class OutputHandler<OUT> {
 	}
 
 	void setSerializers() {
-		outTypeInfo = configuration.getTypeInfoOut1();
+		outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
 		if (outTypeInfo != null) {
 			outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
 			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
@@ -110,7 +111,8 @@ public class OutputHandler<OUT> {
 		StreamPartitioner<OUT> outputPartitioner = null;
 
 		try {
-			outputPartitioner = configuration.getPartitioner(outputNumber);
+			outputPartitioner = configuration.getPartitioner(streamVertex.userClassLoader,
+					outputNumber);
 
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize partitioner for "

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index a8ec98f..2db0d8b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -44,6 +44,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	private StreamingRuntimeContext context;
 	private Map<String, OperatorState<?>> states;
 
+	protected ClassLoader userClassLoader;
+
 	public StreamVertex() {
 		userInvokable = null;
 		numTasks = newVertex();
@@ -63,12 +65,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected void initialize() {
+		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getVertexName();
 		this.isMutable = configuration.getMutability();
 		this.functionName = configuration.getFunctionName();
-		this.function = configuration.getFunction();
-		this.states = configuration.getOperatorStates();
+		this.function = configuration.getFunction(userClassLoader);
+		this.states = configuration.getOperatorStates(userClassLoader);
 		this.context = createRuntimeContext(name, this.states);
 	}
 
@@ -85,7 +88,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
+		userInvokable = configuration.getUserInvokable(userClassLoader);
 		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
 				inputHandler.getInputSerializer(), isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 278cb5a..ebe383d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,10 +19,8 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,44 +70,12 @@ public class ClusterUtil {
 		} catch (Exception e) {
 			throw e;
 		} finally {
-			try {
-				exec.stop();
-			} catch (Throwable t) {
-			}
+			exec.stop();
 		}
 	}
 
-	public static void runOnLocalCluster(JobGraph jobGraph, int degreeOfPrallelism)
-			throws Exception {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running on mini cluster");
-		}
-
-		try {
-
-			Client client = ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-					.getClient();
-
-			client.run(jobGraph, true);
-		} catch (ProgramInvocationException e) {
-			if (e.getMessage().contains("GraphConversionException")) {
-				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
-			} else {
-				throw e;
-			}
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			try {
-			} catch (Throwable t) {
-			}
-		}
-	}
-
-	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers)
-			throws Exception {
-		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
+	public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
+		runOnMiniCluster(jobGraph, numOfSlots, -1);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index e01809d..765de9c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -112,11 +112,6 @@ public class StreamVertexTest {
 		}
 
 		try {
-			env.setExecutionParallelism(-10);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-		try {
 			env.generateSequence(1, 10).project(2);
 			fail();
 		} catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 82d81f4..e2094fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -73,13 +73,13 @@ public class IterateExample {
 			input.add(new Tuple2<Double, Integer>(0., 0));
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(1);
 
 		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
 				.setMaxWaitTime(3000);
 		
-		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().setParallelism(2).split(new MySelector());
+		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
 		
 		it.closeWith(step.select("iterate"));
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 4f91514..89b301a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -83,4 +83,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public Client getClient() {
 		return this.client;
 	}
+	
+	public List<File> getJars(){
+		return jarFilesToAttach;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b8151da..b86487f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -102,7 +102,7 @@ public class JobWithJars {
 		// TODO: Check if proper JAR file
 	}
 	
-	static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
+	public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
 		
 		URL[] urls = new URL[jars.size()];
 		try {


[5/5] incubator-flink git commit: [FLINK-1204] [streaming] Individual, self-contained packaging for streaming examples

Posted by mb...@apache.org.
[FLINK-1204] [streaming] Individual, self-contained packaging for streaming examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c6dd9b10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c6dd9b10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c6dd9b10

Branch: refs/heads/master
Commit: c6dd9b104dad2fdf35d3bbca27779060792dc877
Parents: 537d6f6
Author: Marton Balassi <mb...@apache.org>
Authored: Mon Nov 10 14:56:45 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            | 210 ++++++++++++++++++-
 1 file changed, 209 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c6dd9b10/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 43a7430..f1480de 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -56,14 +56,222 @@ under the License.
 
 	<build>
 		<plugins>
+			
+			<!-- get default data from flink-java-examples package -->
+			<plugin>
+         			<groupId>org.apache.maven.plugins</groupId>
+         			<artifactId>maven-dependency-plugin</artifactId>
+         			<version>2.9</version>
+         			<executions>
+           				<execution>
+             					<id>unpack</id>
+             					<phase>package</phase>
+             					<goals>
+               						<goal>unpack</goal>
+             					</goals>
+             					<configuration>
+               						<artifactItems>
+                 						<artifactItem>
+                   							<groupId>org.apache.flink</groupId>
+                   							<artifactId>flink-java-examples</artifactId>
+                   							<version>${project.version}</version>
+                   							<type>jar</type>
+                   							<overWrite>false</overWrite>
+                   							<outputDirectory>${project.build.directory}/classes</outputDirectory>
+                   							<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+						                </artifactItem>
+               						</artifactItems>
+             					</configuration>
+           				</execution>
+         			</executions>
+       			</plugin>
+
+			<!-- self-contained jars for each example -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
+				
 				<executions>
+					<!-- Basic -->
+					<execution>
+						<id>Basic</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Basic</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.basictopology.BasicTopology</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/basictopology/BasicTopology.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- CellInfo -->
 					<execution>
+						<id>CellInfo</id>
+						<phase>package</phase>
 						<goals>
-							<goal>test-jar</goal>
+							<goal>jar</goal>
 						</goals>
+						<configuration>
+							<classifier>CellInfo</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.cellinfo.CellInfoLocal</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/cellinfo/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Iteration -->
+					<execution>
+						<id>Iteration</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Iteration</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- IncrementalLearning -->
+					<execution>
+						<id>IncrementalLearning</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>IncrementalLearning</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/ml/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Twitter -->
+					<execution>
+						<id>Twitter</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Twitter</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
+								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>						
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowJoin -->
+					<execution>
+						<id>WindowJoin</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowJoin</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoinLocal</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/window/join/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCountPOJO -->
+					<execution>
+						<id>WordCountPOJO</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCountPOJO</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount -->
+					<execution>
+						<id>WordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
+							</includes>
+						</configuration>
 					</execution>
 				</executions>
 			</plugin>


[4/5] incubator-flink git commit: [streaming] Examples refactor and packaging update

Posted by mb...@apache.org.
[streaming] Examples refactor and packaging update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d332d6c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d332d6c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d332d6c3

Branch: refs/heads/master
Commit: d332d6c310a9be12b61065083218d7e4fdec1ab1
Parents: e7ce956
Author: mbalassi <mb...@apache.org>
Authored: Wed Nov 12 23:23:15 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  54 +-----
 .../examples/basictopology/BasicTopology.java   |  61 ------
 .../examples/iteration/IterateExample.java      | 106 ++++++++---
 .../ml/IncrementalLearningSkeleton.java         | 132 +++++++++----
 .../examples/twitter/TwitterStream.java         |  98 +++++-----
 .../examples/window/join/GradeSource.java       |  45 -----
 .../examples/window/join/SalarySource.java      |  45 -----
 .../examples/window/join/WindowJoin.java        | 162 ++++++++++++++++
 .../examples/window/join/WindowJoinLocal.java   |  49 -----
 .../examples/wordcount/PojoExample.java         | 185 +++++++++++++++++++
 .../examples/wordcount/PojoWordCount.java       | 169 -----------------
 .../streaming/examples/wordcount/WordCount.java |  13 +-
 12 files changed, 586 insertions(+), 533 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index f1480de..20e1500 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -65,7 +65,7 @@ under the License.
          			<executions>
            				<execution>
              					<id>unpack</id>
-             					<phase>package</phase>
+             					<phase>prepare-package</phase>
              					<goals>
                						<goal>unpack</goal>
              					</goals>
@@ -92,50 +92,6 @@ under the License.
 				<artifactId>maven-jar-plugin</artifactId>
 				
 				<executions>
-					<!-- Basic -->
-					<execution>
-						<id>Basic</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>Basic</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.basictopology.BasicTopology</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/basictopology/BasicTopology.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- CellInfo -->
-					<execution>
-						<id>CellInfo</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>CellInfo</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.cellinfo.CellInfoLocal</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/cellinfo/*.class</include>			
-							</includes>
-						</configuration>
-					</execution>
 
 					<!-- Iteration -->
 					<execution>
@@ -216,7 +172,7 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoinLocal</program-class>
+									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
 								</manifestEntries>
 							</archive>
 
@@ -238,13 +194,13 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.PojoWordCount</program-class>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoWordCount$*.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
 								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
 							</includes>
 						</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
deleted file mode 100755
index 9379ff2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * 
- * Very basic Flink streaming topology for local testing.
- *
- */
-public class BasicTopology {
-
-	private static final int PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-
-		// We create a new Local environment. The program will be executed using
-		// a new mini-cluster that is set up at execution.
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		// We create a new data stream from a collection of words then apply a
-		// simple map.
-		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
-		// Print the results
-		stream.print();
-
-		env.execute();
-	}
-
-	public static class IdentityMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		// map to the same value
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index a011837..f574718 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -23,70 +23,97 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
- * Example illustrating iterations in Flink streaming programs. The program will
- * sum up random numbers and counts how many additions it needs to reach a
- * specific threshold in an iterative streaming fashion.
- *
+ * Example illustrating iterations in Flink streaming.
+ * 
+ * <p>
+ * The program sums up random numbers and counts additions it performs to reach
+ * a specific threshold in an iterative streaming fashion.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>streaming iterations,
+ * <li>buffer timeout to enhance latency,
+ * <li>directed outputs.
+ * </ul>
  */
 public class IterateExample {
 
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
 	public static void main(String[] args) throws Exception {
 
-		// Set up our input for the stream of (0,0)s
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up input for the stream of (0,0) pairs
 		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
 		for (int i = 0; i < 1000; i++) {
 			input.add(new Tuple2<Double, Integer>(0., 0));
 		}
 
-		// Obtain execution environment and use setBufferTimeout(0) to enable
-		// continuous flushing of the output buffers (lowest latency).
+		// obtain execution environment and set setBufferTimeout(0) to enable
+		// continuous flushing of the output buffers (lowest latency)
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(0);
 
-		// Create an iterative datastream from the input.
+		// create an iterative data stream from the input
 		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
 				.iterate();
 
-		// We make sure that the iteration terminates if no new data received in
-		// the stream for 5 seconds
+		// trigger iteration termination if no new data received for 5 seconds
 		it.setMaxWaitTime(5000);
 
-		// We apply the stepfunction to add a new random value to the tuple and
-		// increment the counter, then we split the output with our
-		// outputselector.
+		// apply the step function to add new random value to the tuple and to
+		// increment the counter and split the output with the output selector
 		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
 				.split(new MySelector());
 
-		// We close the iteration be selecting the tuples that was directed to
-		// 'iterate' in the outputselector
+		// close the iteration by selecting the tuples that were directed to the
+		// 'iterate' channel in the output selector
 		it.closeWith(step.select("iterate"));
 
-		// To produce the final output we select he tuples directed to 'output'
-		// than project it to the second field
-		step.select("output").project(1).types(Integer.class).print();
+		// to produce the final output select the tuples directed to the
+		// 'output' channel then project it to the desired second field
+
+		DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
 
-		// Execute the streaming program
-		env.execute();
+		// emit result
+		if (fileOutput) {
+			numbers.writeAsText(outputPath, 1);
+		} else {
+			numbers.print();
+		}
+
+		// execute the program
+		env.execute("Streaming Iteration Example");
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	/**
 	 * Iteration step function which takes an input (Double , Integer) and
-	 * produces an output (Double+random, Integer +1)
-	 *
+	 * produces an output (Double + random, Integer + 1).
 	 */
 	public static class Step implements
 			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
-
 		private static final long serialVersionUID = 1L;
-
-		Random rnd;
+		private Random rnd;
 
 		public Step() {
 			rnd = new Random();
@@ -101,10 +128,9 @@ public class IterateExample {
 	}
 
 	/**
-	 * s OutputSelector test which tuple needed to be iterated again.
+	 * OutputSelector testing which tuple needs to be iterated again.
 	 */
 	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
-
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -117,4 +143,30 @@ public class IterateExample {
 		}
 
 	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IterateExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IterateExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IterateExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index d2f9e6e..2def142 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -24,11 +24,69 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ * 
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Connected streams
+ * <li>CoFunctions
+ * <li>Tuple data types
+ * </ul>
+ */
 public class IncrementalLearningSkeleton {
 
-	// Source for feeding new data for prediction
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// build new model on every second of new data
+		DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
+				.reduceGroup(new PartialModelBuilder());
+
+		// use partial model for prediction
+		DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
+				.map(new Predictor());
+
+		// emit result
+		if (fileOutput) {
+			prediction.writeAsText(outputPath, 1);
+		} else {
+			prediction.print();
+		}
+
+		// execute program
+		env.execute("Streaming Incremental Learning");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Feeds new data for prediction. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
 	public static class NewDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
+		private static final int NEW_DATA_SLEEP_TIME = 1000;
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
@@ -37,39 +95,41 @@ public class IncrementalLearningSkeleton {
 			}
 		}
 
-		// Method for pulling new data for prediction
 		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(100);
+			Thread.sleep(NEW_DATA_SLEEP_TIME);
 			return 1;
 		}
 	}
 
-	// Source for feeding new training data for partial model building
+	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
 	public static class TrainingDataSource implements SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
+		private static final int TRAINING_DATA_SLEEP_TIME = 10;
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
-
 			while (true) {
 				collector.collect(getTrainingData());
 			}
 
 		}
 
-		// Method for pulling new training data
 		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(100);
+			Thread.sleep(TRAINING_DATA_SLEEP_TIME);
 			return 1;
 
 		}
 	}
 
-	// Task for building up-to-date partial models on new training data
+	/**
+	 * Builds up-to-date partial models on new training data.
+	 */
 	public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
 		private static final long serialVersionUID = 1L;
 
-		// Method for building partial model on the grouped training data
 		protected Double[] buildPartialModel(Iterable<Integer> values) {
 			return new Double[] { 1. };
 		}
@@ -80,8 +140,15 @@ public class IncrementalLearningSkeleton {
 		}
 	}
 
-	// Task for performing prediction using the model produced in
-	// batch-processing and the up-to-date partial model
+	/**
+	 * Creates prediction using the model produced in batch-processing and the
+	 * up-to-date partial model.
+	 * 
+	 * <p>
+	 * By defaults emits the Integer 0 for every prediction and the Integer 1
+	 * for every model update.
+	 * </p>
+	 */
 	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
 		private static final long serialVersionUID = 1L;
 
@@ -102,38 +169,41 @@ public class IncrementalLearningSkeleton {
 			return 1;
 		}
 
-		// Pulls model built with batch-job on the old training data
+		// pulls model built with batch-job on the old training data
 		protected Double[] getBatchModel() {
 			return new Double[] { 0. };
 		}
 
-		// Performs prediction using the two models
+		// performs prediction using the two models
 		protected Integer predict(Integer inTuple) {
 			return 0;
 		}
 
 	}
 
-	private static int SOURCE_PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
-
-		// Build new model on every second of new data
-		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
-				.window(5000).reduceGroup(new PartialModelBuilder());
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
 
-		// Use partial model for prediction
-		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
-				.connect(model).map(new Predictor());
+	private static boolean fileOutput = false;
+	private static String outputPath;
 
-		// We pring the output
-		prediction.print();
+	private static boolean parseParameters(String[] args) {
 
-		env.execute();
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
+		}
+		return true;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index ef1d84b..08aa5cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 
 /**
- * Implements the "TwitterStream" program that computes a most used word occurrence
- * over JSON files in a streaming fashion.
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
  * 
  * <p>
  * The input is a JSON text file with lines separated by newline characters.
@@ -45,91 +44,89 @@ import org.apache.sling.commons.json.JSONException;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
- * <li>write and use user-defined functions.
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
  * </ul>
  * 
  */
 public class TwitterStream {
-	
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 		if (!parseParameters(args)) {
 			return;
 		}
 
 		// set up the execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		env.setBufferTimeout(1000);
-		
+
 		// get input data
 		DataStream<String> streamSource = getTextDataStream(env);
 
-		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				// selecting english tweets and split to words
-				.flatMap(new SelectEnglishAndTokenizeFlatMap())
-				.partitionBy(0)
+		DataStream<Tuple2<String, Integer>> tweets = streamSource
+		// selecting English tweets and splitting to words
+				.flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0)
 				// returning (word, 1)
 				.map(new MapFunction<String, Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public Tuple2<String, Integer> map(String value)
-							throws Exception {
+					public Tuple2<String, Integer> map(String value) throws Exception {
 						return new Tuple2<String, Integer>(value, 1);
 					}
 				})
 				// group by words and sum their occurence
-				.groupBy(0)
-				.sum(1)
-				// select maximum occurenced word
+				.groupBy(0).sum(1)
+				// select word with maximum occurence
 				.flatMap(new SelectMaxOccurence());
 
 		// emit result
-		dataStream.print();
+		if (fileOutput) {
+			tweets.writeAsText(outputPath, 1);
+		} else {
+			tweets.print();
+		}
 
 		// execute program
-		env.execute();
+		env.execute("Twitter Streaming Example");
 	}
-	
+
 	// *************************************************************************
 	// USER FUNCTIONS
 	// *************************************************************************
 
 	/**
-	 * Make sentence from english tweets.
+	 * Makes sentences from English tweets.
 	 * 
-	 * Implements the string tokenizer that splits sentences into words as a
+	 * <p>
+	 * Implements a string tokenizer that splits sentences into words as a
 	 * user-defined FlatMapFunction. The function takes a line (String) and
 	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
 	 * Integer>).
+	 * </p>
 	 */
-	public static class SelectEnglishAndTokenizeFlatMap extends
-			JSONParseFlatMap<String, String> {
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
 		private static final long serialVersionUID = 1L;
 
 		/**
 		 * Select the language from the incoming JSON text
 		 */
 		@Override
-		public void flatMap(String value, Collector<String> out)
-				throws Exception {
+		public void flatMap(String value, Collector<String> out) throws Exception {
 			try {
 				if (getString(value, "lang").equals("en")) {
 					// message of tweet
-					StringTokenizer tokenizer = new StringTokenizer(getString(
-							value, "text"));
+					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
 
 					// split the message
 					while (tokenizer.hasMoreTokens()) {
-						String result = tokenizer.nextToken().replaceAll(
-								"\\s*", "");
+						String result = tokenizer.nextToken().replaceAll("\\s*", "");
 
 						if (result != null && !result.equals("")) {
 							out.collect(result);
@@ -143,10 +140,9 @@ public class TwitterStream {
 	}
 
 	/**
-	 * 
-	 * Implements a user-defined FlatMapFunction that check if the word's current occurence
-	 * is higher than the maximum occurence. If it is, return with the word and change the maximum.
-	 *
+	 * Implements a user-defined FlatMapFunction that checks if the current
+	 * occurence is higher than the maximum occurence. If so, returns the word
+	 * and changes the maximum.
 	 */
 	public static class SelectMaxOccurence implements
 			FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@@ -158,29 +154,32 @@ public class TwitterStream {
 		}
 
 		@Override
-		public void flatMap(Tuple2<String, Integer> value,
-				Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
 			if ((Integer) value.getField(1) >= maximum) {
 				out.collect(value);
 				maximum = (Integer) value.getField(1);
 			}
 		}
 	}
-	
+
 	// *************************************************************************
 	// UTIL METHODS
 	// *************************************************************************
 
-	private static boolean fromFile = false;
-	private static String path;
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
 		if (args.length > 0) {
-			if (args.length == 1) {
-				fromFile = true;
-				path = args[0];
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
 			} else {
-				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile>");
+				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
 				return false;
 			}
 		} else {
@@ -191,11 +190,10 @@ public class TwitterStream {
 		return true;
 	}
 
-	private static DataStream<String> getTextDataStream(
-			StreamExecutionEnvironment env) {
-		if (fromFile) {
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
 			// read the text file from given input path
-			return env.addSource(new TwitterSource(path));
+			return env.readTextFile(textPath);
 		} else {
 			// get default test text data
 			return env.fromElements(TwitterStreamData.TEXTS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
deleted file mode 100644
index d877f91..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(5) + 1;
-			out.collect(outTuple);
-			Thread.sleep(rand.nextInt(10) + 1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
deleted file mode 100644
index 0da48e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = 6670933703432267728L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(10000);
-			out.collect(outTuple);
-			Thread.sleep(rand.nextInt(10) + 1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
new file mode 100644
index 0000000..6d8656c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.window.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ * 
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.windowJoin(salaries, 1000, 1000, 0, 0);
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public SalarySource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: WindowJoin <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
deleted file mode 100644
index 695cf5d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class WindowJoinLocal {
-
-	// This example will join two streams with a sliding window. One which emits
-	// people's grades and one which emits people's salaries.
-
-	public static void main(String[] args) throws Exception {
-
-		// Obtain execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// Connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
-		// Apply a temporal join over the two stream based on the names in one
-		// second windows
-		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
-				.windowJoin(salaries, 1000, 1000, 0, 0);
-
-		// Print the results
-		joinedStream.print();
-
-		env.execute();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
new file mode 100644
index 0000000..f377863
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the Tuple2
+ * type, but a custom class.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use POJO data types,
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions. 
+ * </ul>
+ */
+public class PojoExample {
+	
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Word> counts =
+		// split up the lines into Word objects
+		text.flatMap(new Tokenizer())
+		// group by the field word and sum up the frequency
+				.groupBy("word").sum("frequency");
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount Pojo Example");
+	}
+
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * This is the POJO (Plain Old Java Object) that is being used for all the
+	 * operations. As long as all fields are public or have a getter/setter, the
+	 * system can handle them
+	 */
+	public static class Word {
+
+		private String word;
+		private Integer frequency;
+
+		public Word() {
+		}
+
+		public Word(String word, int i) {
+			this.word = word;
+			this.frequency = i;
+		}
+
+		public String getWord() {
+			return word;
+		}
+
+		public void setWord(String word) {
+			this.word = word;
+		}
+
+		public Integer getFrequency() {
+			return frequency;
+		}
+
+		public void setFrequency(Integer frequency) {
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + word + ", " + frequency + ")";
+		}
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Word> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Word> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Word(token, 1));
+				}
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: PojoExample <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing PojoExample example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: PojoExample <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
deleted file mode 100644
index e95f042..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoWordCount.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of Wordcount without using the Tuple2
- * type, but a custom class.
- *
- */
-public class PojoWordCount {
-
-	/**
-	 * This is the POJO (Plain Old Java Object) that is being used for all the
-	 * operations. As long as all fields are public or have a getter/setter, the
-	 * system can handle them
-	 */
-	public static class Word {
-		// fields
-		private String word;
-		private Integer frequency;
-
-		// constructors
-		public Word() {
-		}
-
-		public Word(String word, int i) {
-			this.word = word;
-			this.frequency = i;
-		}
-
-		// getters setters
-		public String getWord() {
-			return word;
-		}
-
-		public void setWord(String word) {
-			this.word = word;
-		}
-
-		public Integer getFrequency() {
-			return frequency;
-		}
-
-		public void setFrequency(Integer frequency) {
-			this.frequency = frequency;
-		}
-
-		// to String
-		@Override
-		public String toString() {
-			return "Word=" + word + " freq=" + frequency;
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = getTextDataStream(env);
-
-		DataStream<Word> counts =
-		// split up the lines into Word objects
-		text.flatMap(new Tokenizer())
-		// group by the field word and sum up the frequency
-		.groupBy("word")
-		.sum("frequency");
-
-		if (fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount-Pojo Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Word> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Word> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Word(token, 1));
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(WordCountData.WORDS);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d332d6c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index e07dfe5..085fe5f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -41,8 +41,8 @@ import org.apache.flink.util.Collector;
  * <p>
  * This example shows how to:
  * <ul>
- * <li>write a simple Flink Streaming program.
- * <li>use Tuple data types.
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
  * <li>write and use user-defined functions.
  * </ul>
  * 
@@ -66,11 +66,10 @@ public class WordCount {
 		DataStream<String> text = getTextDataStream(env);
 
 		DataStream<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {