You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:52 UTC

[15/51] [abbrv] git commit: [streaming] Operator invokable refactor

[streaming] Operator invokable refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/be459aec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/be459aec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/be459aec

Branch: refs/heads/master
Commit: be459aece580ea0f4ea9e028cf29b15d7d7f33f4
Parents: 1fccb10
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 23 14:53:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:18 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |  31 +++-
 .../streaming/api/invokable/SinkInvokable.java  |  21 +--
 .../api/invokable/StreamRecordInvokable.java    |  23 ++-
 .../operator/BatchReduceInvokable.java          | 156 ++++---------------
 .../api/invokable/operator/FilterInvokable.java |  26 ++--
 .../invokable/operator/FlatMapInvokable.java    |  22 +--
 .../api/invokable/operator/MapInvokable.java    |  21 +--
 .../operator/StreamReduceInvokable.java         |  32 ++++
 .../operator/WindowReduceInvokable.java         | 125 +++++++++++++++
 9 files changed, 276 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d32aa18..27e4d89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
@@ -292,7 +293,7 @@ public class DataStream<T extends Tuple> {
 	public DataStream<T> forward() {
 		return setConnectionType(new ForwardPartitioner<T>());
 	}
-	
+
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output tuples
 	 * are distributed evenly to the next component.
@@ -332,6 +333,27 @@ public class DataStream<T extends Tuple> {
 	}
 
 	/**
+	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
+	 * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
+	 * of the first DataStream (on which .coMapWith was called) and
+	 * {@link CoMapFunction#map2(Tuple)} for each element of the second
+	 * DataStream. Each CoMapFunction call returns exactly one element.
+	 * 
+	 * @param coMapper
+	 *            The CoMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @param otherStream
+	 *            The DataStream that will be transformed with
+	 *            {@link CoMapFunction#map2(Tuple)}
+	 * @return The transformed DataStream
+	 */
+	public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
+			CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
+		return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(
+				otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+	}
+
+	/**
 	 * Applies a FlatMap transformation on a {@link DataStream}. The
 	 * transformation calls a FlatMapFunction for each element of the
 	 * DataStream. Each FlatMapFunction call can return any number of elements
@@ -387,11 +409,6 @@ public class DataStream<T extends Tuple> {
 				new BatchReduceInvokable<T, R>(reducer, batchSize));
 	}
 
-	public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
-		return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
-	}
-	
-	
 	/**
 	 * Applies a reduce transformation on preset "time" chunks of the
 	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
@@ -411,7 +428,7 @@ public class DataStream<T extends Tuple> {
 	public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
 			long windowSize) {
 		return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
-				new BatchReduceInvokable<T, R>(reducer, windowSize));
+				new WindowReduceInvokable<T, R>(reducer, windowSize));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 3c14490..81cfa81 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -32,16 +32,17 @@ public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, I
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			while (recordIterator.next(reuse) != null) {
-				sinkFunction.invoke((IN) reuse.getTuple());
-			}
-		} else {
-			while (recordIterator.next(reuse) != null) {
-				sinkFunction.invoke((IN) reuse.getTuple());
-				resetReuse();
-			}
+	protected void immutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			sinkFunction.invoke((IN) reuse.getTuple());
+			resetReuse();
+		}
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			sinkFunction.invoke((IN) reuse.getTuple());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 903372b..6beec27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.streaming.api.invokable;
 
+import java.io.IOException;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -49,6 +51,25 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
 	protected void resetReuse() {
 		this.reuse = serializer.createInstance();
 	}
+	
+	protected StreamRecord<IN> loadNextRecord() {
+		try {
+			reuse = recordIterator.next(reuse);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return reuse;
+	}
+
+	protected abstract void immutableInvoke() throws Exception;
 
-	public abstract void invoke() throws Exception;
+	protected abstract void mutableInvoke() throws Exception;
+
+	public void invoke() throws Exception {
+		if (this.isMutable) {
+			mutableInvoke();
+		} else {
+			immutableInvoke();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
old mode 100644
new mode 100755
index 7684f70..4aa540c
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -19,109 +19,53 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
-		UserTaskInvokable<IN, OUT> {
+		StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private int batchSize;
-	private long windowSize;
-	volatile boolean isRunning;
-	boolean window;
-
-	private GroupReduceFunction<IN, OUT> reducer;
 
 	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
 		this.reducer = reduceFunction;
 		this.batchSize = batchSize;
 	}
 
-	public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
-		this.reducer = reduceFunction;
-		this.windowSize = windowSize;
-		this.window = true;
-	}
-
-	private StreamRecord<IN> loadNextRecord() {
-		try {
-			reuse = recordIterator.next(reuse);
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-		return reuse;
-	}
-
 	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			mutableInvoke();
-		} else {
-			immutableInvoke();
-		}
-	}
-
-	private void immutableInvoke() throws Exception {
+	protected void immutableInvoke() throws Exception {
 		List<IN> tupleBatch = new ArrayList<IN>();
 		boolean batchStart;
-
-		if (window) {
-			long startTime = System.currentTimeMillis();
-			while (loadNextRecord() != null) {
-				batchStart = true;
-				do {
-					if (batchStart) {
-						batchStart = false;
-					} else {
-						reuse = loadNextRecord();
-						if (reuse == null) {
-							break;
-						}
+		int counter = 0;
+
+		while (loadNextRecord() != null) {
+			batchStart = true;
+			do {
+				if (batchStart) {
+					batchStart = false;
+				} else {
+					reuse = loadNextRecord();
+					if (reuse == null) {
+						break;
 					}
-					tupleBatch.add(reuse.getTuple());
-					resetReuse();
-				} while (System.currentTimeMillis() - startTime < windowSize);
-				reducer.reduce(tupleBatch.iterator(), collector);
-				tupleBatch.clear();
-				startTime = System.currentTimeMillis();
-			}
-		} else {
-			int counter = 0;
-			while (loadNextRecord() != null) {
-				batchStart = true;
-				do {
-					if (batchStart) {
-						batchStart = false;
-					} else {
-						reuse = loadNextRecord();
-						if (reuse == null) {
-							break;
-						}
-					}
-					counter++;
-					tupleBatch.add(reuse.getTuple());
-					resetReuse();
-				} while (counter < batchSize);
-				reducer.reduce(tupleBatch.iterator(), collector);
-				tupleBatch.clear();
-				counter = 0;
-			}
+				}
+				counter++;
+				tupleBatch.add(reuse.getTuple());
+				resetReuse();
+			} while (counter < batchSize);
+			reducer.reduce(tupleBatch.iterator(), collector);
+			tupleBatch.clear();
+			counter = 0;
 		}
+
 	}
 
-	private void mutableInvoke() throws Exception {
-		BatchIterator<IN> userIterator;
-		if (window) {
-			userIterator = new WindowIterator();
-		} else {
-			userIterator = new CounterIterator();
-		}
+	@Override
+	protected void mutableInvoke() throws Exception {
+		BatchIterator<IN> userIterator = new CounterIterator();
 
 		do {
 			if (userIterator.hasNext()) {
@@ -131,7 +75,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 		} while (reuse != null);
 	}
 
-	public class CounterIterator implements BatchIterator<IN> {
+	private class CounterIterator implements BatchIterator<IN> {
 		private int counter;
 		private boolean loadedNext;
 
@@ -179,52 +123,4 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 
 	}
 
-	public class WindowIterator implements BatchIterator<IN> {
-
-		volatile boolean iterate;
-		private boolean loadedNext;
-		private long startTime;
-
-		public WindowIterator() {
-			startTime = System.currentTimeMillis();
-		}
-
-		@Override
-		public boolean hasNext() {
-			if (System.currentTimeMillis() - startTime > windowSize) {
-				return false;
-			} else if (!loadedNext) {
-				loadNextRecord();
-				loadedNext = true;
-			}
-			return (reuse != null);
-		}
-
-		@Override
-		public IN next() {
-			if (hasNext()) {
-				loadedNext = false;
-				return reuse.getTuple();
-			} else {
-				loadedNext = false;
-				return reuse.getTuple();
-			}
-		}
-
-		public void reset() {
-			while (System.currentTimeMillis() - startTime < windowSize) {
-				loadNextRecord();
-			}
-			loadNextRecord();
-			loadedNext = true;
-			startTime = System.currentTimeMillis();
-		}
-
-		@Override
-		public void remove() {
-
-		}
-
-	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index ac79764..edeb79a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -34,21 +34,21 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			while (recordIterator.next(reuse) != null) {
-				if (filterFunction.filter(reuse.getTuple())) {
-					collector.collect(reuse.getTuple());
-				}
-			}
-		} else {
-			while (recordIterator.next(reuse) != null) {
-				if (filterFunction.filter(reuse.getTuple())) {
-					collector.collect(reuse.getTuple());
-				}
-				resetReuse();
+	protected void immutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			if (filterFunction.filter(reuse.getTuple())) {
+				collector.collect(reuse.getTuple());
 			}
+			resetReuse();
 		}
+	}
 
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			if (filterFunction.filter(reuse.getTuple())) {
+				collector.collect(reuse.getTuple());
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 33bda80..279b160 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -33,16 +33,18 @@ public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
 		this.flatMapper = flatMapper;
 	}
 
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			while (recordIterator.next(reuse) != null) {
-				flatMapper.flatMap(reuse.getTuple(), collector);
-			}
-		} else {
-			while (recordIterator.next(reuse) != null) {
-				flatMapper.flatMap(reuse.getTuple(), collector);
-				resetReuse();
-			}
+	@Override
+	protected void immutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			flatMapper.flatMap(reuse.getTuple(), collector);
+			resetReuse();
+		}
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			flatMapper.flatMap(reuse.getTuple(), collector);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index ff29d15..3c56b6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -33,16 +33,17 @@ public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskI
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			while (recordIterator.next(reuse) != null) {
-				collector.collect(mapper.map(reuse.getTuple()));
-			}
-		} else {
-			while (recordIterator.next(reuse) != null) {
-				collector.collect(mapper.map(reuse.getTuple()));
-				resetReuse();
-			}
+	protected void immutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			collector.collect(mapper.map(reuse.getTuple()));
+			resetReuse();
+		}
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		while (recordIterator.next(reuse) != null) {
+			collector.collect(mapper.map(reuse.getTuple()));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
new file mode 100644
index 0000000..e881d57
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+
+public abstract class StreamReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+		UserTaskInvokable<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+	protected GroupReduceFunction<IN, OUT> reducer;
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
new file mode 100755
index 0000000..67c15dc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -0,0 +1,125 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class WindowReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+		StreamReduceInvokable<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+	private long windowSize;
+	volatile boolean isRunning;
+	boolean window;
+
+	public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+		this.reducer = reduceFunction;
+		this.windowSize = windowSize;
+		this.window = true;
+	}
+
+	protected void immutableInvoke() throws Exception {
+		List<IN> tupleBatch = new ArrayList<IN>();
+		boolean batchStart;
+
+		long startTime = System.currentTimeMillis();
+		while (loadNextRecord() != null) {
+			batchStart = true;
+			do {
+				if (batchStart) {
+					batchStart = false;
+				} else {
+					reuse = loadNextRecord();
+					if (reuse == null) {
+						break;
+					}
+				}
+				tupleBatch.add(reuse.getTuple());
+				resetReuse();
+			} while (System.currentTimeMillis() - startTime < windowSize);
+			reducer.reduce(tupleBatch.iterator(), collector);
+			tupleBatch.clear();
+			startTime = System.currentTimeMillis();
+		}
+
+	}
+
+	protected void mutableInvoke() throws Exception {
+		BatchIterator<IN> userIterator = new WindowIterator();
+
+		do {
+			if (userIterator.hasNext()) {
+				reducer.reduce(userIterator, collector);
+				userIterator.reset();
+			}
+		} while (reuse != null);
+	}
+
+	private class WindowIterator implements BatchIterator<IN> {
+
+		private boolean loadedNext;
+		private long startTime;
+
+		public WindowIterator() {
+			startTime = System.currentTimeMillis();
+		}
+
+		@Override
+		public boolean hasNext() {
+			if (System.currentTimeMillis() - startTime > windowSize) {
+				return false;
+			} else if (!loadedNext) {
+				loadNextRecord();
+				loadedNext = true;
+			}
+			return (reuse != null);
+		}
+
+		@Override
+		public IN next() {
+			if (hasNext()) {
+				loadedNext = false;
+				return reuse.getTuple();
+			} else {
+				loadedNext = false;
+				return reuse.getTuple();
+			}
+		}
+
+		public void reset() {
+			while (System.currentTimeMillis() - startTime < windowSize) {
+				loadNextRecord();
+			}
+			loadNextRecord();
+			loadedNext = true;
+			startTime = System.currentTimeMillis();
+		}
+
+		@Override
+		public void remove() {
+
+		}
+
+	}
+
+}
\ No newline at end of file