You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:16 UTC

[3/5] incubator-flink git commit: [streaming] Example cleanup + windowJoin fix

[streaming] Example cleanup + windowJoin fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e7ce9567
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e7ce9567
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e7ce9567

Branch: refs/heads/master
Commit: e7ce9567c57ffc59e5a7588445b6b891bc58e5f8
Parents: 6867f9b
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 12:32:32 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |   6 +
 .../examples/basictopology/BasicTopology.java   |  98 ++++++++-------
 .../examples/cellinfo/CellInfoLocal.java        | 123 ------------------
 .../examples/cellinfo/IWorkerEngine.java        |  23 ----
 .../flink/streaming/examples/cellinfo/Util.java |  28 -----
 .../examples/cellinfo/WorkerEngineBin.java      |  97 --------------
 .../examples/cellinfo/WorkerEngineExact.java    |  76 -----------
 .../examples/iteration/IterateExample.java      |  80 ++++++++----
 .../ml/IncrementalLearningSkeleton.java         |  15 ++-
 .../examples/twitter/TwitterStream.java         |   5 +-
 .../examples/window/join/GradeSource.java       |  14 +--
 .../examples/window/join/SalarySource.java      |  14 +--
 .../examples/window/join/WindowJoinLocal.java   |  25 ++--
 .../examples/window/join/WindowJoinTask.java    | 126 -------------------
 14 files changed, 148 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 23c420c..6556550 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -580,6 +580,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, int fieldIn1, int fieldIn2) {
 
+		dataStream1 = dataStream1.groupBy(fieldIn1);
+		dataStream2 = dataStream2.groupBy(fieldIn2);
+
 		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
 				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
 
@@ -590,6 +593,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
 
+		dataStream1 = dataStream1.groupBy(fieldIn1);
+		dataStream2 = dataStream2.groupBy(fieldIn2);
+
 		JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
 				dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 2b1ac4f..9379ff2 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -1,49 +1,61 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.basictopology;
-
+ */
+
+package org.apache.flink.streaming.examples.basictopology;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class BasicTopology {
-
-	public static class IdentityMap implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-		// map to the same value
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-
-	}
-
-	private static final int PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
-
-		stream.print();
-
-		env.execute();
-	}
-}
+
+/**
+ * 
+ * Very basic Flink streaming topology for local testing.
+ *
+ */
+public class BasicTopology {
+
+	private static final int PARALLELISM = 1;
+
+	public static void main(String[] args) throws Exception {
+
+		// We create a new Local environment. The program will be executed using
+		// a new mini-cluster that is set up at execution.
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		// We create a new data stream from a collection of words then apply a
+		// simple map.
+		DataStream<String> stream = env.fromElements(WordCountData.WORDS).map(new IdentityMap());
+
+		// Print the results
+		stream.print();
+
+		env.execute();
+	}
+
+	public static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		// map to the same value
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
deleted file mode 100644
index 1dcb5e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class CellInfoLocal {
-
-	private final static int CELL_COUNT = 10;
-	private final static int LAST_MILLIS = 1000;
-	private final static int PARALLELISM = 1;
-	private final static int SOURCE_PARALLELISM = 1;
-	private final static int QUERY_SLEEP_TIME = 1000;
-	private final static int QUERY_COUNT = 10;
-	private final static int INFO_SLEEP_TIME = 100;
-	private final static int INFO_COUNT = 100;
-
-	public final static class InfoSource implements SourceFunction<Tuple3<Integer, Long, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private static Random rand = new Random();
-		private Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L, 0);
-
-		@Override
-		public void invoke(Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
-			for (int i = 0; i < INFO_COUNT; i++) {
-				Thread.sleep(INFO_SLEEP_TIME);
-				tuple.f0 = rand.nextInt(CELL_COUNT);
-				tuple.f1 = System.currentTimeMillis();
-
-				out.collect(tuple);
-			}
-		}
-	}
-
-	private final static class QuerySource implements
-			SourceFunction<Tuple3<Integer, Long, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private static Random rand = new Random();
-		Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0, 0L,
-				LAST_MILLIS);
-
-		@Override
-		public void invoke(Collector<Tuple3<Integer, Long, Integer>> collector) throws Exception {
-			for (int i = 0; i < QUERY_COUNT; i++) {
-				Thread.sleep(QUERY_SLEEP_TIME);
-				tuple.f0 = rand.nextInt(CELL_COUNT);
-				tuple.f1 = System.currentTimeMillis();
-				collector.collect(tuple);
-			}
-		}
-	}
-
-	private final static class CellTask	extends
-			RichCoMapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		private WorkerEngineExact engine;
-		private Integer cellID;
-		private Long timeStamp;
-		private Integer lastMillis;
-
-		public CellTask() {
-			engine = new WorkerEngineExact(CELL_COUNT, LAST_MILLIS, System.currentTimeMillis());
-		}
-
-		// INFO
-		@Override
-		public String map1(Tuple3<Integer, Long, Integer> value) {
-			cellID = value.f0;
-			timeStamp = value.f1;
-			engine.put(cellID, timeStamp);
-			return "INFO:\t" + cellID + " @ " + timeStamp;
-		}
-
-		// QUERY
-		@Override
-		public String map2(Tuple3<Integer, Long, Integer> value) {
-			cellID = value.f0;
-			timeStamp = value.f1;
-			lastMillis = value.f2;
-			return "QUERY:\t" + cellID + ": " + engine.get(timeStamp, lastMillis, cellID);
-		}
-	}
-
-	// Example for connecting data streams
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple3<Integer, Long, Integer>> querySource = env.addSource(new QuerySource(),
-				SOURCE_PARALLELISM).partitionBy(0);
-
-		DataStream<String> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
-				.partitionBy(0).connect(querySource).map(new CellTask());
-		stream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
deleted file mode 100644
index b2b2264..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/IWorkerEngine.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public interface IWorkerEngine {
-	public int get(long timeStamp, long lastMillis, int cellId);
-	public void put(int cellId, long timeStamp);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
deleted file mode 100644
index 7463c6c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/Util.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-public class Util {
-	public static int mod(int x, int y) {
-		int result = x % y;
-		if (result < 0) {
-			result += y;
-		}
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
deleted file mode 100644
index b6097f5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineBin.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import org.apache.flink.streaming.examples.cellinfo.Util;
-
-public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
-
-	private static final long serialVersionUID = 1L;
-	private long unitLength_;
-	private long lastTimeUpdated_;
-	private int[][] counters_;
-	private int pointer_;
-
-	public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
-		lastTimeUpdated_ = currentTime / unitLength * unitLength;
-		unitLength_ = unitLength;
-		counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
-	}
-
-	private int getCell(int interval, int cell) {
-		return counters_[Util.mod((interval + pointer_), counters_.length)][cell];
-	}
-
-	private void incrCell(int interval, int cell) {
-		++counters_[Util.mod((interval + pointer_), counters_.length)][cell];
-	}
-
-	private void toZero(int interval) {
-		for (int cell = 0; cell < counters_[0].length; ++cell) {
-			counters_[Util.mod((interval + pointer_), counters_.length)][cell] = 0;
-		}
-	}
-
-	private void shift(int shiftBy) {
-		if (shiftBy > 0 && shiftBy < counters_.length) {
-			pointer_ = Util.mod((pointer_ - shiftBy), counters_.length);
-			for (int i = 0; i < shiftBy; ++i) {
-				toZero(i);
-			}
-		} else if (shiftBy >= counters_.length) {
-			pointer_ = 0;
-			for (int i = 0; i < counters_.length; ++i) {
-				toZero(i);
-			}
-		}
-	}
-
-	public int get(long timeStamp, long lastMillis, int cellId) {
-		int shift = refresh(timeStamp);
-		int numOfLastIntervals = (int) (lastMillis / unitLength_);
-		if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
-			return -1;
-		}
-		int sum = 0;
-		for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
-			sum += getCell(i, cellId);
-		}
-		return sum;
-	}
-
-	private int refresh(long timeStamp) {
-		int shiftBy = (int) ((timeStamp - lastTimeUpdated_) / unitLength_);
-		shift(shiftBy);
-		int retVal;
-		if (shiftBy > 0) {
-			lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
-			retVal = 0;
-		} else {
-			retVal = -shiftBy;
-		}
-		return retVal;
-	}
-
-	public void put(int cellId, long timeStamp) {
-		int shift = refresh(timeStamp);
-		if (shift >= counters_.length) {
-			return;
-		}
-		incrCell(shift, cellId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
deleted file mode 100644
index 8a9bcda..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/WorkerEngineExact.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.cellinfo;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
-	private static final long serialVersionUID = 1L;
-	private long lastTimeUpdated_;
-	private long bufferInterval_;
-	private TreeMap<Long, Integer>[] counters_;
-
-	@SuppressWarnings("unchecked")
-	public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
-		lastTimeUpdated_ = currentTime;
-		bufferInterval_ = bufferInterval;
-		counters_ = new TreeMap[numOfCells];
-		for (int i = 0; i < numOfCells; ++i) {
-			counters_[i] = new TreeMap<Long, Integer>();
-		}
-	}
-
-	public int get(long timeStamp, long lastMillis, int cellId) {
-		refresh(timeStamp);
-		Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
-		int retVal = 0;
-		for (Map.Entry<Long, Integer> entry : subMap.entrySet()) {
-			retVal += entry.getValue();
-		}
-		return retVal;
-	}
-
-	public void put(int cellId, long timeStamp) {
-		refresh(timeStamp);
-		TreeMap<Long, Integer> map = counters_[cellId];
-		if (map.containsKey(timeStamp)) {
-			map.put(timeStamp, map.get(timeStamp) + 1);
-		} else {
-			map.put(timeStamp, 1);
-		}
-	}
-
-	public void refresh(long timeStamp) {
-		if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
-			for (int i = 0; i < counters_.length; ++i) {
-				for (Iterator<Map.Entry<Long, Integer>> it = counters_[i].entrySet().iterator(); it.hasNext();) {
-					Map.Entry<Long, Integer> entry = it.next();
-					long time = entry.getKey();
-					if (timeStamp - time > bufferInterval_) {
-						it.remove();
-					} else {
-						break;
-					}
-				}
-			}
-			lastTimeUpdated_ = timeStamp;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index e2094fb..a011837 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -29,13 +29,63 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+/**
+ * Example illustrating iterations in Flink streaming programs. The program will
+ * sum up random numbers and counts how many additions it needs to reach a
+ * specific threshold in an iterative streaming fashion.
+ *
+ */
 public class IterateExample {
 
+	public static void main(String[] args) throws Exception {
+
+		// Set up our input for the stream of (0,0)s
+		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
+		for (int i = 0; i < 1000; i++) {
+			input.add(new Tuple2<Double, Integer>(0., 0));
+		}
+
+		// Obtain execution environment and use setBufferTimeout(0) to enable
+		// continuous flushing of the output buffers (lowest latency).
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+				.setBufferTimeout(0);
+
+		// Create an iterative datastream from the input.
+		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+				.iterate();
+
+		// We make sure that the iteration terminates if no new data received in
+		// the stream for 5 seconds
+		it.setMaxWaitTime(5000);
+
+		// We apply the stepfunction to add a new random value to the tuple and
+		// increment the counter, then we split the output with our
+		// outputselector.
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+				.split(new MySelector());
+
+		// We close the iteration be selecting the tuples that was directed to
+		// 'iterate' in the outputselector
+		it.closeWith(step.select("iterate"));
+
+		// To produce the final output we select he tuples directed to 'output'
+		// than project it to the second field
+		step.select("output").project(1).types(Integer.class).print();
+
+		// Execute the streaming program
+		env.execute();
+	}
+
+	/**
+	 * Iteration step function which takes an input (Double , Integer) and
+	 * produces an output (Double+random, Integer +1)
+	 *
+	 */
 	public static class Step implements
 			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
 
 		private static final long serialVersionUID = 1L;
-	
+
 		Random rnd;
 
 		public Step() {
@@ -50,13 +100,16 @@ public class IterateExample {
 
 	}
 
+	/**
+	 * s OutputSelector test which tuple needed to be iterated again.
+	 */
 	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
-			if (value.f0 > 100) {
+			if (value.f0 > 200) {
 				outputs.add("output");
 			} else {
 				outputs.add("iterate");
@@ -64,27 +117,4 @@ public class IterateExample {
 		}
 
 	}
-	
-
-	public static void main(String[] args) throws Exception {
-
-		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
-		for (int i = 0; i < 100; i++) {
-			input.add(new Tuple2<Double, Integer>(0., 0));
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
-				.setBufferTimeout(1);
-
-		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
-				.setMaxWaitTime(3000);
-		
-		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
-		
-		it.closeWith(step.select("iterate"));
-		
-		step.select("output").project(1).types(Integer.class).print();
-
-		env.execute();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 9fe1af4..d2f9e6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -39,7 +39,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new data for prediction
 		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			return 1;
 		}
 	}
@@ -59,7 +59,7 @@ public class IncrementalLearningSkeleton {
 
 		// Method for pulling new training data
 		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			return 1;
 
 		}
@@ -114,13 +114,13 @@ public class IncrementalLearningSkeleton {
 
 	}
 
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
+	private static int SOURCE_PARALLELISM = 1;
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(1000);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		IncrementalLearningSkeleton.SOURCE_PARALLELISM = env.getDegreeOfParallelism();
 
 		// Build new model on every second of new data
 		DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
@@ -130,11 +130,10 @@ public class IncrementalLearningSkeleton {
 		DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
 				.connect(model).map(new Predictor());
 
+		// We pring the output
 		prediction.print();
 
 		env.execute();
 	}
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 5d8022a..ef1d84b 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -32,7 +32,7 @@ import org.apache.sling.commons.json.JSONException;
 
 /**
  * Implements the "TwitterStream" program that computes a most used word occurrence
- * histogram over JSON files in a streaming fashion.
+ * over JSON files in a streaming fashion.
  * 
  * <p>
  * The input is a JSON text file with lines separated by newline characters.
@@ -52,7 +52,6 @@ import org.apache.sling.commons.json.JSONException;
  * 
  */
 public class TwitterStream {
-	private static final int PARALLELISM = 1;
 	
 	// *************************************************************************
 	// PROGRAM
@@ -65,7 +64,7 @@ public class TwitterStream {
 
 		// set up the execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
+				.getExecutionEnvironment();
 
 		env.setBufferTimeout(1000);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
index c895c5b..d877f91 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/GradeSource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-public class GradeSource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
 
 	private static final long serialVersionUID = -5897483980082089771L;
 
 	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" ,"mark", "eric"};
 	private Random rand = new Random();
-	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
-	private Long progress = 0L;
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
 		// Continuously emit tuples with random names and integers (grades).
 		while (true) {
 			outTuple.f0 = names[rand.nextInt(names.length)];
 			outTuple.f1 = rand.nextInt(5) + 1;
-			outTuple.f2 = progress;
 			out.collect(outTuple);
-			progress += 1;
+			Thread.sleep(rand.nextInt(10) + 1);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
index 641a738..0da48e5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/SalarySource.java
@@ -19,29 +19,27 @@ package org.apache.flink.streaming.examples.window.join;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-public class SalarySource implements SourceFunction<Tuple3<String, Integer, Long>> {
+public class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
 
 	private static final long serialVersionUID = 6670933703432267728L;
 
 	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter", "mark", "eric" };
 	private Random rand = new Random();
-	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
-	private Long progress = 0L;
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Integer, Long>> out) throws Exception {
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
 		// Continuously emit tuples with random names and integers (salaries).
 		while (true) {
 			outTuple.f0 = names[rand.nextInt(names.length)];
 			outTuple.f1 = rand.nextInt(10000);
-			outTuple.f2 = progress;
 			out.collect(outTuple);
-			progress += 1;
+			Thread.sleep(rand.nextInt(10) + 1);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 211daf6..695cf5d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -17,33 +17,30 @@
 
 package org.apache.flink.streaming.examples.window.join;
 
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class WindowJoinLocal {
 
-	private static final int PARALLELISM = 4;
-	private static final int SOURCE_PARALLELISM = 2;
-
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple3<String, Integer, Long>> grades = env.addSource(new GradeSource(),
-				SOURCE_PARALLELISM);
+		// Obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStream<Tuple3<String, Integer, Long>> salaries = env.addSource(new SalarySource(),
-				SOURCE_PARALLELISM);
+		// Connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
 
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
-				.flatMap(new WindowJoinTask());
+		// Apply a temporal join over the two stream based on the names in one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.windowJoin(salaries, 1000, 1000, 0, 0);
 
-		System.out.println("(NAME, GRADE, SALARY)");
+		// Print the results
 		joinedStream.print();
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e7ce9567/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
deleted file mode 100644
index 524ffbd..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinTask.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-public class WindowJoinTask extends
-		RichCoFlatMapFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Integer>> {
-
-	class SalaryProgress {
-		public SalaryProgress(Integer salary, Long progress) {
-			this.salary = salary;
-			this.progress = progress;
-		}
-
-		Integer salary;
-		Long progress;
-	}
-
-	class GradeProgress {
-		public GradeProgress(Integer grade, Long progress) {
-			this.grade = grade;
-			this.progress = progress;
-		}
-
-		Integer grade;
-		Long progress;
-	}
-
-	private static final long serialVersionUID = 749913336259789039L;
-	private int windowSize = 100;
-	private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
-	private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
-	private String name;
-	private Long progress;
-	
-	public WindowJoinTask() {
-		gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
-		salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
-		name = new String();
-		progress = 0L;
-	}
-
-	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-	
-	// Joins the input value (grade) with the already known values (salaries) on
-	// a given interval.
-	// Also stores the new element.
-	@Override
-	public void flatMap1(Tuple3<String, Integer, Long> value,
-			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		name = value.f0;
-		progress = value.f2;
-
-		outputTuple.f0 = name;
-		outputTuple.f1 = value.f1;
-		
-		if (salaryHashmap.containsKey(name)) {
-			Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
-			while (iterator.hasNext()) {
-				SalaryProgress entry = iterator.next();
-				if (progress - entry.progress > windowSize) {
-					iterator.remove(); 
-				} else {
-					outputTuple.f2 = entry.salary;
-					out.collect(outputTuple);
-				}
-			}
-		}
-		if (!gradeHashmap.containsKey(name)) {
-			gradeHashmap.put(name, new LinkedList<GradeProgress>());
-		}
-		gradeHashmap.get(name).add(new GradeProgress(value.f1, progress));
-	}
-
-	// Joins the input value (salary) with the already known values (grades) on
-	// a given interval.
-	// Also stores the new element.
-	@Override
-	public void flatMap2(Tuple3<String, Integer, Long> value,
-			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		name = value.f0;
-		progress = value.f2;
-
-		outputTuple.f0 = name;
-		outputTuple.f2 = value.f1;
-		
-		if (gradeHashmap.containsKey(name)) {
-			Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
-			while (iterator.hasNext()) {
-				GradeProgress entry = iterator.next();
-				if (progress - entry.progress > windowSize) {
-					iterator.remove();
-				} else {
-					outputTuple.f1 = entry.grade;
-					out.collect(outputTuple);
-				}
-			}
-		}
-		if (!salaryHashmap.containsKey(name)) {
-			salaryHashmap.put(name, new LinkedList<SalaryProgress>());
-		}
-		salaryHashmap.get(name).add(new SalaryProgress(value.f1, progress));
-	}
-}