You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 21:23:03 UTC

[1/2] flink git commit: [FLINK-1765] [streaming] GroupedReduceInvokable chaining fix + minor FilterInvokable fix

Repository: flink
Updated Branches:
  refs/heads/master 3838dd19c -> ff60ecfb2


[FLINK-1765] [streaming] GroupedReduceInvokable chaining fix + minor FilterInvokable fix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0619c052
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0619c052
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0619c052

Branch: refs/heads/master
Commit: 0619c0526387f4cc7d81537d17ae42ee6196edbd
Parents: 3838dd1
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Mar 20 20:00:54 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 20:00:54 2015 +0100

----------------------------------------------------------------------
 .../api/invokable/operator/FilterInvokable.java  |  2 +-
 .../operator/GroupedReduceInvokable.java         | 19 ++++++-------------
 .../api/invokable/operator/ProjectInvokable.java |  2 +-
 .../invokable/operator/StreamFoldInvokable.java  |  2 +-
 .../operator/StreamReduceInvokable.java          | 16 ++++------------
 5 files changed, 13 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0619c052/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 610fa53..c4b01a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -41,7 +41,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		collect = filterFunction.filter(copy(nextObject));
+		collect = filterFunction.filter(nextObject);
 		if (collect) {
 			collector.collect(nextObject);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0619c052/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index c2177fa..72f52ea 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -28,7 +28,6 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
 
 	private KeySelector<IN, ?> keySelector;
 	private Map<Object, IN> values;
-	private IN reduced;
 
 	public GroupedReduceInvokable(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
 		super(reducer);
@@ -37,23 +36,17 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
 	}
 
 	@Override
-	protected void reduce() throws Exception {
-		Object key = nextRecord.getKey(keySelector);
-		currentValue = values.get(key);
-		nextValue = nextObject;
+	protected void callUserFunction() throws Exception {
+		Object key = keySelector.getKey(nextObject);
+		IN currentValue = values.get(key);
 		if (currentValue != null) {
-			callUserFunctionAndLogException();
+			IN reduced = reducer.reduce(copy(currentValue), nextObject);
 			values.put(key, reduced);
 			collector.collect(reduced);
 		} else {
-			values.put(key, nextValue);
-			collector.collect(nextValue);
+			values.put(key, nextObject);
+			collector.collect(nextObject);
 		}
 	}
 
-	@Override
-	protected void callUserFunction() throws Exception {
-		reduced = reducer.reduce(currentValue, nextValue);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0619c052/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 8f57fe7..0626ff9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -64,7 +64,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0619c052/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 07ed022..321acbe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -61,7 +61,7 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	@Override
 	public void collect(IN record) {
 		if (isRunning) {
-			nextObject = copy(record);
+			nextObject = copyInput(record);
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0619c052/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index fe58105..ac05d06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -24,8 +24,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	protected ReduceFunction<IN> reducer;
-	protected IN currentValue;
-	protected IN nextValue;
+	private IN currentValue;
 
 	public StreamReduceInvokable(ReduceFunction<IN> reducer) {
 		super(reducer);
@@ -36,24 +35,17 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 	@Override
 	public void invoke() throws Exception {
 		while (isRunning && readNext() != null) {
-			reduce();
+			callUserFunctionAndLogException();
 		}
 	}
 
-	protected void reduce() throws Exception {
-		callUserFunctionAndLogException();
-
-	}
-
 	@Override
 	protected void callUserFunction() throws Exception {
 
-		nextValue = nextObject;
-
 		if (currentValue != null) {
-			currentValue = reducer.reduce(copy(currentValue), nextValue);
+			currentValue = reducer.reduce(copy(currentValue), nextObject);
 		} else {
-			currentValue = nextValue;
+			currentValue = nextObject;
 
 		}
 		collector.collect(currentValue);


[2/2] flink git commit: [streaming] ChainableInvokable refactor

Posted by gy...@apache.org.
[streaming] ChainableInvokable refactor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff60ecfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff60ecfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff60ecfb

Branch: refs/heads/master
Commit: ff60ecfb26e6542659c73ce6e2bd3ce911caaea6
Parents: 0619c05
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Mar 20 20:29:10 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 20:29:10 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       | 11 ++++-----
 .../api/datastream/WindowedDataStream.java      |  3 +--
 .../api/invokable/ChainableInvokable.java       |  8 +++++++
 .../streaming/api/invokable/SinkInvokable.java  |  6 -----
 .../api/invokable/operator/FilterInvokable.java |  8 -------
 .../invokable/operator/FlatMapInvokable.java    |  8 -------
 .../operator/GroupedFoldInvokable.java          | 15 ++++--------
 .../api/invokable/operator/MapInvokable.java    |  7 ------
 .../invokable/operator/ProjectInvokable.java    |  9 +-------
 .../invokable/operator/StreamFoldInvokable.java | 24 ++++----------------
 .../operator/StreamReduceInvokable.java         |  8 -------
 .../windowing/WindowBufferInvokable.java        |  9 +-------
 .../operator/windowing/WindowFlattener.java     |  9 +-------
 .../operator/windowing/WindowFolder.java        |  1 +
 .../operator/windowing/WindowMapper.java        |  3 ++-
 .../operator/windowing/WindowMerger.java        |  9 +-------
 .../operator/windowing/WindowPartitioner.java   | 10 ++------
 .../operator/windowing/WindowReducer.java       |  1 +
 18 files changed, 32 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 7597b47..7832777 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -196,13 +196,10 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
-		return input.discretizedStream
-				.transform(
-						"ExtractParts",
-						new TupleTypeInfo(Tuple2.class, BasicTypeInfo.INT_TYPE_INFO,
-								BasicTypeInfo.INT_TYPE_INFO),
-						new FlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
-								new WindowPartExtractor<OUT>()).withoutInputCopy());
+		return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
+				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+				new FlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+						new WindowPartExtractor<OUT>()).withoutInputCopy());
 	}
 
 	private DiscretizedStream<OUT> partition(WindowTransformation transformation) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 73cbdfd..efbbda9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -441,8 +441,7 @@ public class WindowedDataStream<OUT> {
 		// discretized stream, we also pass the type of the windowbuffer
 		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
-		return discretized
-				.timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
+		return discretized.timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
index 470fc81..4e09a98 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -46,4 +46,12 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU
 	protected IN copyInput(IN input) {
 		return copyInput ? copy(input) : input;
 	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			nextObject = copyInput(record);
+			callUserFunctionAndLogException();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 2c6b6e6..01a295a 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -42,12 +42,6 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 	}
 
 	@Override
-	public void collect(IN record) {
-		nextObject = copyInput(record);
-		callUserFunctionAndLogException();
-	}
-
-	@Override
 	public void cancel() {
 		super.cancel();
 		sinkFunction.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index c4b01a9..00d432b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -46,12 +46,4 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 			collector.collect(nextObject);
 		}
 	}
-
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 436cf4e..dfead14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -42,12 +42,4 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 		flatMapper.flatMap(nextObject, collector);
 	}
 
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
index d4b5f7e..4a0f4f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -29,7 +29,6 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 
 	private KeySelector<IN, ?> keySelector;
 	private Map<Object, OUT> values;
-	private OUT folded;
 	private OUT initialValue;
 
 	public GroupedFoldInvokable(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
@@ -41,24 +40,18 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 	}
 
 	@Override
-	protected void fold() throws Exception {
+	protected void callUserFunction() throws Exception {
 		Object key = nextRecord.getKey(keySelector);
-		accumulator = values.get(key);
-		nextValue = nextObject;
+		OUT accumulator = values.get(key);
 		if (accumulator != null) {
-			callUserFunctionAndLogException();
+			OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
 			values.put(key, folded);
 			collector.collect(folded);
 		} else {
-			OUT first = folded = folder.fold(outTypeSerializer.copy(initialValue), nextValue);
+			OUT first = folder.fold(outTypeSerializer.copy(initialValue), nextObject);
 			values.put(key, first);
 			collector.collect(first);
 		}
 	}
 
-	@Override
-	protected void callUserFunction() throws Exception {
-		folded = folder.fold(outTypeSerializer.copy(accumulator), nextValue);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 9647144..53cb825 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -42,11 +42,4 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 		collector.collect(mapper.map(nextObject));
 	}
 
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 0626ff9..bc58188 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -60,12 +60,5 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<
 		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 		outTuple = outTypeSerializer.createInstance();
 	}
-
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
+	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 321acbe..1353c01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -26,11 +26,11 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	protected FoldFunction<IN, OUT> folder;
-	protected OUT accumulator;
-	protected IN nextValue;
+	private OUT accumulator;
 	protected TypeSerializer<OUT> outTypeSerializer;
 
-	public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue,
+			TypeInformation<OUT> outTypeInformation) {
 		super(folder);
 		this.folder = folder;
 		this.accumulator = initialValue;
@@ -40,29 +40,15 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	@Override
 	public void invoke() throws Exception {
 		while (isRunning && readNext() != null) {
-			fold();
+			callUserFunctionAndLogException();
 		}
 	}
 
-	protected void fold() throws Exception {
-		callUserFunctionAndLogException();
-
-	}
-
 	@Override
 	protected void callUserFunction() throws Exception {
 
-		nextValue = nextObject;
-		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextValue);
+		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
 		collector.collect(accumulator);
 
 	}
-
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index ac05d06..f0f378d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -52,12 +52,4 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	}
 
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 475611f..fbd8258 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -32,6 +32,7 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
 	public WindowBufferInvokable(WindowBuffer<T> buffer) {
 		super(null);
 		this.buffer = buffer;
+		withoutInputCopy();
 	}
 
 	private static final long serialVersionUID = 1L;
@@ -63,12 +64,4 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
 		handleWindowEvent(windowEvent, buffer);
 	}
 
-	@Override
-	public void collect(WindowEvent<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index 0ff4724..4aff6c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -28,6 +28,7 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	public WindowFlattener() {
 		super(null);
+		withoutInputCopy();
 	}
 
 	private static final long serialVersionUID = 1L;
@@ -46,12 +47,4 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 		}
 	}
 
-	@Override
-	public void collect(StreamWindow<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
index 162cb3c..aa398c5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
@@ -35,6 +35,7 @@ public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
 		this.folder = folder;
+		withoutInputCopy();
 	}
 
 	private static class WindowFoldFunction<IN, OUT> implements

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
index 9578a70..a065f4e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -36,6 +36,7 @@ public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 	public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
 		super(new WindowMap<IN, OUT>(mapper));
 		this.mapper = mapper;
+		withoutInputCopy();
 	}
 
 	private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
@@ -50,7 +51,7 @@ public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 		@Override
 		public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
 			StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
-			
+
 			outputWindow.numberOfParts = window.numberOfParts;
 
 			mapper.mapWindow(window, outputWindow);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index f425255..4c112d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -35,6 +35,7 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 	public WindowMerger() {
 		super(null);
 		this.windows = new HashMap<Integer, StreamWindow<T>>();
+		withoutInputCopy();
 	}
 
 	private static final long serialVersionUID = 1L;
@@ -66,12 +67,4 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 			windows.put(nextWindow.windowID, current);
 		}
 	}
-
-	@Override
-	public void collect(StreamWindow<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index 0a28d99..9672b0f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -33,11 +33,13 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 	public WindowPartitioner(KeySelector<T, ?> keySelector) {
 		super(null);
 		this.keySelector = keySelector;
+		withoutInputCopy();
 	}
 
 	public WindowPartitioner(int numberOfSplits) {
 		super(null);
 		this.numberOfSplits = numberOfSplits;
+		withoutInputCopy();
 	}
 
 	private static final long serialVersionUID = 1L;
@@ -69,12 +71,4 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 
 		}
 	}
-
-	@Override
-	public void collect(StreamWindow<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff60ecfb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
index d7182de..67d42b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
@@ -36,6 +36,7 @@ public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWind
 	public WindowReducer(ReduceFunction<IN> reducer) {
 		super(new WindowReduceFunction<IN>(reducer));
 		this.reducer = reducer;
+		withoutInputCopy();
 	}
 
 	private static class WindowReduceFunction<T> implements