You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:23 UTC

[46/51] [abbrv] git commit: [streaming] Exception handling update & minor refactor

[streaming] Exception handling update & minor refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/35cf874c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/35cf874c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/35cf874c

Branch: refs/heads/master
Commit: 35cf874c36fc522e9fae6af4625901ee3c3bc9a3
Parents: e6766fd
Author: ghermann <re...@gmail.com>
Authored: Mon Aug 11 19:56:35 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  |  2 +-
 .../flink/streaming/api/StreamConfig.java       | 10 ---
 .../api/collector/DirectedStreamCollector.java  |  6 +-
 .../environment/RemoteStreamEnvironment.java    |  6 +-
 .../streaming/api/invokable/SinkInvokable.java  |  9 ++-
 .../api/invokable/SourceInvokable.java          |  1 +
 .../api/invokable/StreamComponentInvokable.java |  3 +-
 .../api/invokable/StreamRecordInvokable.java    | 20 ++++-
 .../operator/BatchGroupReduceInvokable.java     | 22 ++++--
 .../operator/BatchReduceInvokable.java          |  1 +
 .../api/invokable/operator/FilterInvokable.java | 12 ++-
 .../invokable/operator/FlatMapInvokable.java    |  9 ++-
 .../operator/GroupReduceInvokable.java          | 15 +++-
 .../api/invokable/operator/MapInvokable.java    |  9 ++-
 .../operator/StreamReduceInvokable.java         |  9 ++-
 .../operator/WindowGroupReduceInvokable.java    | 17 +++-
 .../api/invokable/operator/co/CoInvokable.java  |  2 -
 .../AbstractStreamComponent.java                | 82 ++++++++++++++++----
 .../api/streamcomponent/CoStreamTask.java       | 33 +-------
 .../streamcomponent/StreamIterationSource.java  | 20 ++---
 .../api/streamcomponent/StreamRecordWriter.java | 23 +++---
 .../api/streamcomponent/StreamSource.java       | 37 +--------
 .../api/streamcomponent/StreamTask.java         | 38 ++-------
 .../flink/streaming/state/StateManager.java     | 15 ++--
 .../flink/streaming/util/ClusterUtil.java       | 14 ++--
 .../flink/streaming/util/TestDataUtil.java      | 20 ++---
 26 files changed, 229 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 12b6341..4a89243 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -60,7 +60,7 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
 			channel = connection.createChannel();
 
 		} catch (IOException e) {
-			e.printStackTrace();
+			throw new RuntimeException(e);
 		}
 
 		initDone = true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3d124c2..4dd4cc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -119,12 +119,6 @@ public class StreamConfig {
 		}
 	}
 
-	// @SuppressWarnings("unchecked")
-	// public <T extends StreamComponentInvokable> Class<? extends T>
-	// getUserInvokableClass() {
-	// return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
-	// }
-
 	public <T> StreamComponentInvokable<T> getUserInvokableObject() {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
@@ -156,10 +150,6 @@ public class StreamConfig {
 		}
 	}
 
-	// public void setFunctionName(String functionName) {
-	// config.setString(FUNCTION_NAME, functionName);
-	// }
-
 	public String getFunctionName() {
 		return config.getString(FUNCTION_NAME, "");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 285a7b4..9750d44 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.StringUtils;
 public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 
 	OutputSelector<OUT> outputSelector;
-	private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
+	private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
 
 	/**
@@ -97,8 +97,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 					}
 				}
 			} catch (Exception e) {
-				if (log.isErrorEnabled()) {
-					log.error(String.format("Emit to %s failed due to: %s", outputName,
+				if (LOG.isErrorEnabled()) {
+					LOG.error(String.format("Emit to %s failed due to: %s", outputName,
 							StringUtils.stringifyException(e)));
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 19a2d48..ec6bc6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
-	private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
+	private static final Log LOG = LogFactory.getLog(RemoteStreamEnvironment.class);
 
 	private String host;
 	private int port;
@@ -72,8 +72,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public void execute() {
-		if (log.isInfoEnabled()) {
-			log.info("Running remotely at " + host + ":" + port);
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running remotely at " + host + ":" + port);
 		}
 
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 887df8b..9219f80 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -34,7 +34,7 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 	@Override
 	protected void immutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			sinkFunction.invoke((IN) reuse.getObject());
+			callUserFunctionAndLogException();
 			resetReuse();
 		}
 	}
@@ -42,8 +42,13 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 	@Override
 	protected void mutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			sinkFunction.invoke((IN) reuse.getObject());
+			callUserFunctionAndLogException();
 		}
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+		sinkFunction.invoke((IN) reuse.getObject());		
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index a4be1e8..666427f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -35,6 +35,7 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
 		this.sourceFunction = sourceFunction;
 	}
 
+	@Override
 	public void invoke() throws Exception {
 		sourceFunction.invoke(collector);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index ed718f1..4b49252 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -65,5 +65,6 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
 			((RichFunction) userFunction).close();
 		}
 	}
-
+	
+	public abstract void invoke() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 27dc05a..d4fc92c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,11 +21,14 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
 
 public abstract class StreamRecordInvokable<IN, OUT> extends
 		StreamComponentInvokable<OUT> {
@@ -35,6 +38,7 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
 	}
 
 	private static final long serialVersionUID = 1L;
+	private static final Log LOG = LogFactory.getLog(StreamComponentInvokable.class);
 
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
 	StreamRecordSerializer<IN> serializer;
@@ -59,7 +63,7 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
 		try {
 			reuse = recordIterator.next(reuse);
 		} catch (IOException e) {
-			e.printStackTrace();
+			throw new RuntimeException(e);
 		}
 		return reuse;
 	}
@@ -68,6 +72,20 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
 
 	protected abstract void mutableInvoke() throws Exception;
 
+	protected abstract void callUserFunction() throws Exception;
+	
+	protected void callUserFunctionAndLogException() {
+		try {
+			callUserFunction();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error(String.format("Calling user function failed due to: %s",
+						StringUtils.stringifyException(e)));
+			}
+		}
+	}
+	
+	@Override
 	public void invoke() throws Exception {
 		if (this.isMutable) {
 			mutableInvoke();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index c64bd33..327ddaf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.state.MutableTableState;
 
 public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN> {
 
+	private static final long serialVersionUID = 1L;
+
 	int keyPosition;
 	protected ReduceFunction<IN> reducer;
 	private Iterator<StreamRecord<IN>> iterator;
@@ -40,18 +42,22 @@ public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN>
 		values = new MutableTableState<Object, IN>();
 	}
 
+	private IN reduced;
+	private IN nextValue;
+	private IN currentValue;
+
 	@Override
-	protected void reduce() throws Exception {
+	protected void reduce() {
 		iterator = state.getStreamRecordIterator();
 		while (iterator.hasNext()) {
 			StreamRecord<IN> nextRecord = iterator.next();
 
-			IN nextValue = nextRecord.getObject();
+			nextValue = nextRecord.getObject();
 			Object key = nextRecord.getField(keyPosition);
 
-			IN currentValue = values.get(key);
+			currentValue = values.get(key);
 			if (currentValue != null) {
-				IN reduced = reducer.reduce(currentValue, nextValue);
+				callUserFunctionAndLogException();
 				values.put(key, reduced);
 				collector.collect(reduced);
 			} else {
@@ -59,12 +65,12 @@ public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN>
 				collector.collect(nextValue);
 			}
 		}
-		System.out.println(values);
 		values.clear();
-		System.out.println(values);
-
 	}
 
-	private static final long serialVersionUID = 1L;
+	@Override
+	protected void callUserFunction() throws Exception {
+		reduced = reducer.reduce(currentValue, nextValue);
+	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 3d1d813..a6516f8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -71,4 +71,5 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
 		}
 	}
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 388920c..9e0edd7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -33,10 +33,13 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 		this.filterFunction = filterFunction;
 	}
 
+	private boolean canCollect;
+	
 	@Override
 	protected void immutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			if (filterFunction.filter(reuse.getObject())) {
+			callUserFunctionAndLogException();
+			if (canCollect) {
 				collector.collect(reuse.getObject());
 			}
 			resetReuse();
@@ -46,10 +49,15 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 	@Override
 	protected void mutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			if (filterFunction.filter(reuse.getObject())) {
+			callUserFunctionAndLogException();
+			if (canCollect) {
 				collector.collect(reuse.getObject());
 			}
 		}
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+		canCollect = filterFunction.filter(reuse.getObject());
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 4cb4712..0b4b4d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -35,7 +35,7 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	@Override
 	protected void immutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			flatMapper.flatMap(reuse.getObject(), collector);
+			callUserFunctionAndLogException();
 			resetReuse();
 		}
 	}
@@ -43,8 +43,13 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	@Override
 	protected void mutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			flatMapper.flatMap(reuse.getObject(), collector);
+			callUserFunctionAndLogException();
 		}
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+		flatMapper.flatMap(reuse.getObject(), collector);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
index 277078b..43ab483 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
@@ -52,12 +52,16 @@ public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
 		}
 	}
 
+	private IN reduced;
+	private IN nextValue;
+	private IN currentValue;
+	
 	private void reduce() throws Exception {
 		Object key = reuse.getField(keyPosition);
-		IN currentValue = values.get(key);
-		IN nextValue = reuse.getObject();
+		currentValue = values.get(key);
+		nextValue = reuse.getObject();
 		if (currentValue != null) {
-			IN reduced = reducer.reduce(currentValue, nextValue);
+			callUserFunctionAndLogException();
 			values.put(key, reduced);
 			collector.collect(reduced);
 		} else {
@@ -66,4 +70,9 @@ public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
 		}
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+		reduced = reducer.reduce(currentValue, nextValue);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 53e85e0..2e501ca 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -35,7 +35,7 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	@Override
 	protected void immutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			collector.collect(mapper.map(reuse.getObject()));
+			callUserFunctionAndLogException();
 			resetReuse();
 		}
 	}
@@ -43,7 +43,12 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	@Override
 	protected void mutableInvoke() throws Exception {
 		while ((reuse = recordIterator.next(reuse)) != null) {
-			collector.collect(mapper.map(reuse.getObject()));
+			callUserFunctionAndLogException();
 		}
 	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		collector.collect(mapper.map(reuse.getObject()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index f6515dc..c73bb95 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -71,11 +71,16 @@ public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<I
 		}
 	}
 
-	protected void reduce() throws Exception {
+	protected void reduce() {
 		userIterator = state.getIterator();
+		callUserFunctionAndLogException();
+	}
+	
+	@Override
+	protected void callUserFunction() throws Exception {
 		reducer.reduce(userIterable, collector);
 	}
-
+	
 	private void collectOneUnit() {
 		ArrayList<StreamRecord<IN>> list;
 		list = new ArrayList<StreamRecord<IN>>(listSize);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 065df25..ac57220 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -40,18 +40,22 @@ public class WindowGroupReduceInvokable<IN> extends WindowReduceInvokable<IN, IN
 		values = new MutableTableState<Object, IN>();
 	}
 
+	private IN reduced;
+	private IN nextValue;
+	private IN currentValue;
+	
 	@Override
-	protected void reduce() throws Exception {
+	protected void reduce() {
 		iterator = state.getStreamRecordIterator();
 		while (iterator.hasNext()) {
 			StreamRecord<IN> nextRecord = iterator.next();
 
-			IN nextValue = nextRecord.getObject();
+			nextValue = nextRecord.getObject();
 			Object key = nextRecord.getField(keyPosition);
 
-			IN currentValue = values.get(key);
+			currentValue = values.get(key);
 			if (currentValue != null) {
-				IN reduced = reducer.reduce(currentValue, nextValue);
+				callUserFunctionAndLogException();
 				values.put(key, reduced);
 				collector.collect(reduced);
 			} else {
@@ -62,6 +66,11 @@ public class WindowGroupReduceInvokable<IN> extends WindowReduceInvokable<IN, IN
 		values.clear();
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+		reduced = reducer.reduce(currentValue, nextValue);
+	}
+	
 	private static final long serialVersionUID = 1L;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 1e8623c..8e002ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -65,6 +65,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends
 		this.reuse1 = serializer1.createInstance();
 		this.reuse2 = serializer2.createInstance();
 	}
-
-	public abstract void invoke() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index c973894..9171a00 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -20,6 +20,7 @@
 package org.apache.flink.streaming.api.streamcomponent;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang.SerializationUtils;
@@ -54,7 +55,8 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 	protected TypeInformation<OUT> outTypeInfo = null;
 	protected StreamRecordSerializer<OUT> outSerializer = null;
 	protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
-
+	protected OutputHandler outputHandler = createEmptyOutputHandler();
+	
 	protected StreamConfig configuration;
 	protected TypeSerializerWrapper<?, ?, OUT> typeWrapper;
 	protected StreamCollector<OUT> collector;
@@ -79,6 +81,71 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 		setCollector();
 	}
 
+	protected class OutputHandler {
+		private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+		
+		public OutputHandler() {
+			this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		}
+
+		public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
+			return outputs;
+		}
+		
+		public void setConfigOutputs() {
+			setSerializers();
+			setCollector();
+
+			int numberOfOutputs = configuration.getNumberOfOutputs();
+			bufferTimeout = configuration.getBufferTimeout();
+
+			for (int i = 0; i < numberOfOutputs; i++) {
+				setPartitioner(i, outputs);
+			}
+		}
+		
+		public void flushOutputs() throws IOException, InterruptedException {
+			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+				output.flush();
+			}
+		}
+		
+		public void initializeOutputSerializers() {
+			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
+				output.initializeSerializers();
+			}
+		}
+		
+		public void invokeUserFunction(String componentTypeName, StreamComponentInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(componentTypeName + " " + name + " invoked with instance id "
+						+ instanceID);
+			}
+
+			initializeOutputSerializers();
+
+			try {
+				userInvokable.open(getTaskConfiguration());
+				userInvokable.invoke();
+				userInvokable.close();
+			} catch (Exception e) {
+				flushOutputs();
+				throw new RuntimeException(e);
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(componentTypeName + " " + name + " invoke finished with instance id "
+						+ instanceID);
+			}
+
+			flushOutputs();
+		}
+	}
+	
+	private OutputHandler createEmptyOutputHandler() {
+		return new OutputHandler();
+	}
+	
 	protected void initialize() {
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getComponentName();
@@ -111,19 +178,6 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 		outSerializationDelegate.setInstance(outSerializer.createInstance());
 	}
 
-	protected void setConfigOutputs(
-			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-		setSerializers();
-		setCollector();
-
-		int numberOfOutputs = configuration.getNumberOfOutputs();
-		bufferTimeout = configuration.getBufferTimeout();
-
-		for (int i = 0; i < numberOfOutputs; i++) {
-			setPartitioner(i, outputs);
-		}
-	}
-
 	private void setPartitioner(int outputNumber,
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 		StreamPartitioner<OUT> outputPartitioner = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index c06e664..a595cfa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -20,19 +20,13 @@
 package org.apache.flink.streaming.api.streamcomponent;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -42,7 +36,6 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
 		AbstractStreamComponent<OUT> {
-	private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
 
 	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
 	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
@@ -52,13 +45,11 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
 	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
 
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private CoInvokable<IN1, IN2, OUT> userInvokable;
 	private static int numTasks;
 
 	public CoStreamTask() {
-
-		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		outputHandler = new OutputHandler();
 		userInvokable = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
@@ -94,7 +85,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	@Override
 	public void setInputsOutputs() {
-		setConfigOutputs(outputs);
+		outputHandler.setConfigOutputs();
 		setConfigInputs();
 
 		inputIter1 = createInputIterator(inputs1, inputDeserializer1);
@@ -148,25 +139,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	@Override
 	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.initializeSerializers();
-		}
-
-		userInvokable.open(getTaskConfiguration());
-		userInvokable.invoke();
-		userInvokable.close();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
-		}
+		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 2514eb0..8f32dd7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -19,8 +19,6 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +35,6 @@ public class StreamIterationSource<OUT extends Tuple> extends
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
 
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private static int numSources;
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
@@ -48,7 +45,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
 	@SuppressWarnings("rawtypes")
 	public StreamIterationSource() {
 
-		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		outputHandler = new OutputHandler();
 		numSources = newComponent();
 		instanceID = numSources;
 		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
@@ -57,9 +54,8 @@ public class StreamIterationSource<OUT extends Tuple> extends
 	@Override
 	public void setInputsOutputs() {
 		try {
-			setConfigOutputs(outputs);
+			outputHandler.setConfigOutputs();
 		} catch (StreamComponentException e) {
-			e.printStackTrace();
 			throw new StreamComponentException("Cannot register outputs", e);
 		}
 
@@ -82,9 +78,8 @@ public class StreamIterationSource<OUT extends Tuple> extends
 			LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
 		}
 
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.initializeSerializers();
-		}
+		outputHandler.initializeOutputSerializers();
+		
 		StreamRecord<OUT> nextRecord;
 
 		while (true) {
@@ -96,16 +91,13 @@ public class StreamIterationSource<OUT extends Tuple> extends
 			if (nextRecord == null) {
 				break;
 			}
-			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
 				outSerializationDelegate.setInstance(nextRecord);
 				output.emit(outSerializationDelegate);
 			}
 		}
 
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
-		}
-
+		outputHandler.flushOutputs();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
index a89935a..f695cb1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
@@ -12,7 +12,8 @@ import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+public class StreamRecordWriter<T extends IOReadableWritable> extends
+		RecordWriter<T> {
 
 	private final BufferProvider bufferPool;
 
@@ -31,12 +32,13 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
 	}
 
-	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+	public StreamRecordWriter(AbstractInvokable invokable,
+			ChannelSelector<T> channelSelector) {
 		this(invokable, channelSelector, 1000);
 	}
 
-	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector,
-			long timeout) {
+	public StreamRecordWriter(AbstractInvokable invokable,
+			ChannelSelector<T> channelSelector, long timeout) {
 		// initialize the gate
 		super(invokable);
 
@@ -60,20 +62,24 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 
 	@Override
 	public void emit(final T record) throws IOException, InterruptedException {
-		for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+		for (int targetChannel : this.channelSelector.selectChannels(record,
+				this.numChannels)) {
 			// serialize with corresponding serializer and send full buffer
 
 			RecordSerializer<T> serializer = this.serializers[targetChannel];
 
 			synchronized (serializer) {
-				RecordSerializer.SerializationResult result = serializer.addRecord(record);
+				RecordSerializer.SerializationResult result = serializer
+						.addRecord(record);
 				while (result.isFullBuffer()) {
 					Buffer buffer = serializer.getCurrentBuffer();
 					if (buffer != null) {
 						sendBuffer(buffer, targetChannel);
 					}
 
-					buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+					buffer = this.bufferPool
+							.requestBufferBlocking(this.bufferPool
+									.getBufferSize());
 					result = serializer.setNextBuffer(buffer);
 				}
 			}
@@ -105,9 +111,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 					Thread.sleep(timeout);
 					flush();
 				} catch (Exception e) {
-					e.printStackTrace();
+					throw new RuntimeException(e);
 				}
-
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 6644d6f..494dfc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -19,28 +19,16 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
 
-	private static final Log LOG = LogFactory.getLog(StreamSource.class);
-
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private SourceInvokable<OUT> userInvokable;
 	private static int numSources;
 
 	public StreamSource() {
-
-		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		outputHandler = new OutputHandler();
 		userInvokable = null;
 		numSources = newComponent();
 		instanceID = numSources;
@@ -49,7 +37,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	@Override
 	public void setInputsOutputs() {
 		try {
-			setConfigOutputs(outputs);
+			outputHandler.setConfigOutputs();
 		} catch (StreamComponentException e) {
 			throw new StreamComponentException("Cannot register outputs for "
 					+ getClass().getSimpleName(), e);
@@ -59,31 +47,12 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	@Override
 	protected void setInvokable() {
 		userInvokable = getInvokable();
-		// setCollector();
 		userInvokable.setCollector(collector);
 	}
 
 	@Override
 	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.initializeSerializers();
-		}
-
-		userInvokable.open(getTaskConfiguration());
-		userInvokable.invoke();
-		userInvokable.close();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
-		}
+		outputHandler.invokeUserFunction("SOURCE", userInvokable);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index f809dae..32c10b6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -19,28 +19,17 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 		SingleInputAbstractStreamComponent<IN, OUT> {
 
-	private static final Log LOG = LogFactory.getLog(StreamTask.class);
-
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private StreamRecordInvokable<IN, OUT> userInvokable;
 	private static int numTasks;
 
 	public StreamTask() {
-		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		outputHandler = new OutputHandler();
 		userInvokable = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
@@ -49,7 +38,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	@Override
 	public void setInputsOutputs() {
 		setConfigInputs();
-		setConfigOutputs(outputs);
+		outputHandler.setConfigOutputs();
 
 		inputIter = createInputIterator(inputs, inputSerializer);
 	}
@@ -57,29 +46,12 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	@Override
 	protected void setInvokable() {
 		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
+		userInvokable.initialize(collector, inputIter, inputSerializer,
+				isMutable);
 	}
 
 	@Override
 	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.initializeSerializers();
-		}
-
-		userInvokable.open(getTaskConfiguration());
-		userInvokable.invoke();
-		userInvokable.close();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
-		}
-
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
-		}
+		outputHandler.invokeUserFunction("TASK", userInvokable);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
index 46089f9..17d911a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
@@ -47,15 +47,12 @@ public class StateManager implements Runnable, Serializable {
 		ObjectInputStream ois = null;
 		try {
 			ois = new ObjectInputStream(new FileInputStream(filename));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-		for (Object state : stateList) {
-			try {
+			for (Object state : stateList) {
 				state = ois.readObject();
-			} catch (Exception e) {
-				e.printStackTrace();
 			}
+			ois.close();
+		} catch (Exception e) {
+			throw new RuntimeException(e);
 		}
 	}
 
@@ -67,7 +64,7 @@ public class StateManager implements Runnable, Serializable {
 		try {
 			oos = new ObjectOutputStream(new FileOutputStream(filename));
 		} catch (Exception e) {
-			e.printStackTrace();
+			throw new RuntimeException(e);
 		}
 		// take snapshot of every registered state.
 		while (true) {
@@ -78,7 +75,7 @@ public class StateManager implements Runnable, Serializable {
 					oos.flush();
 				}
 			} catch (Exception e) {
-				e.printStackTrace();
+				throw new RuntimeException(e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 3ea5b39..fc5978e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 public class ClusterUtil {
-	private static final Log log = LogFactory.getLog(ClusterUtil.class);
+	private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
 
 	/**
 	 * Executes the given JobGraph locally, on a NepheleMiniCluster
@@ -51,8 +51,8 @@ public class ClusterUtil {
 		exec.setNumTaskTracker(numberOfTaskTrackers);
 		Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
 		
-		if (log.isInfoEnabled()) {
-			log.info("Running on mini cluster");
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running on mini cluster");
 		}
 		
 		try {
@@ -62,7 +62,7 @@ public class ClusterUtil {
 
 			exec.stop();
 		} catch (Exception e) {
-			throw new RuntimeException(e.getMessage());
+			throw new RuntimeException(e);
 		}
 	}
 
@@ -71,8 +71,8 @@ public class ClusterUtil {
 	}
 
 	public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
-		if (log.isInfoEnabled()) {
-			log.info("Running on mini cluster");
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running on mini cluster");
 		}
 		
 		Configuration configuration = jobGraph.getJobConfiguration();
@@ -82,7 +82,7 @@ public class ClusterUtil {
 		try {
 			client.run(jobGraph, true);
 		} catch (ProgramInvocationException e) {
-			throw new RuntimeException(e.getMessage());
+			throw new RuntimeException(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 44e0aa7..b9e3c4e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -39,7 +39,7 @@ public class TestDataUtil {
 
 	// TODO: Exception handling
 	// TODO: check checksum after download
-	private static final Log log = LogFactory.getLog(TestDataUtil.class);
+	private static final Log LOG = LogFactory.getLog(TestDataUtil.class);
 	public static final String testDataDir = "src/test/resources/testdata/";
 	public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
 	public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
@@ -67,8 +67,8 @@ public class TestDataUtil {
 		}
 
 		if (file.exists()) {
-			if (log.isInfoEnabled()) {
-				log.info(fileName + " already exists.");
+			if (LOG.isInfoEnabled()) {
+				LOG.info(fileName + " already exists.");
 			}
 
 			try {
@@ -78,16 +78,16 @@ public class TestDataUtil {
 						+ file.getAbsolutePath(), e);
 			}
 			if (!checkSumActaul.equals(checkSumDesired)) {
-				if (log.isInfoEnabled()) {
-					log.info("Checksum is incorrect.");
-					log.info("Downloading file.");
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Checksum is incorrect.");
+					LOG.info("Downloading file.");
 				}
 				download(fileName);
 			}
 		} else {
-			if (log.isInfoEnabled()) {
-				log.info("File does not exist.");
-				log.info("Downloading file.");
+			if (LOG.isInfoEnabled()) {
+				LOG.info("File does not exist.");
+				LOG.info("Downloading file.");
 			}
 			download(fileName);
 		}
@@ -95,7 +95,7 @@ public class TestDataUtil {
 	}
 
 	public static void download(String fileName) {
-		log.info("downloading " + fileName);
+		LOG.info("downloading " + fileName);
 
 		try {
 			URL website = new URL(testRepoUrl + fileName);