You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:59 UTC

[26/28] git commit: [streaming] StreamInvokable refactor and javadoc update + StreamRecordSerializer update

[streaming] StreamInvokable refactor and javadoc update + StreamRecordSerializer update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/723cb27c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/723cb27c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/723cb27c

Branch: refs/heads/master
Commit: 723cb27c3402d7b2524fadc0697d6bc3a175d58e
Parents: 0731f77
Author: gyfora <gy...@gmail.com>
Authored: Wed Aug 27 17:57:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  14 +--
 .../flink/streaming/api/StreamConfig.java       |   4 +-
 .../api/collector/DirectedStreamCollector.java  |   2 +-
 .../api/collector/StreamCollector.java          |   2 +-
 .../streaming/api/datastream/DataStream.java    |   4 +-
 .../streaming/api/invokable/SinkInvokable.java  |   2 +-
 .../api/invokable/SourceInvokable.java          |   2 +-
 .../api/invokable/StreamComponentInvokable.java |  68 -----------
 .../api/invokable/StreamInvokable.java          |  79 ++++++++++++
 .../api/invokable/StreamOperatorInvokable.java  | 119 +++++++++++++++++++
 .../api/invokable/StreamRecordInvokable.java    |  82 -------------
 .../api/invokable/UserTaskInvokable.java        |  32 -----
 .../operator/BatchReduceInvokable.java          |   4 +-
 .../api/invokable/operator/FilterInvokable.java |   4 +-
 .../invokable/operator/FlatMapInvokable.java    |   4 +-
 .../api/invokable/operator/MapInvokable.java    |   4 +-
 .../operator/StreamReduceInvokable.java         |   4 +-
 .../api/invokable/operator/co/CoInvokable.java  |   4 +-
 .../AbstractStreamComponent.java                |   4 +-
 .../api/streamcomponent/OutputHandler.java      |   4 +-
 .../api/streamcomponent/StreamSink.java         |   4 +-
 .../api/streamcomponent/StreamTask.java         |   4 +-
 .../api/streamrecord/StreamRecord.java          |  14 ++-
 .../streamrecord/StreamRecordSerializer.java    |  38 +-----
 .../flink/streaming/util/MockInvokable.java     |   4 +-
 25 files changed, 253 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index e41abd6..77c860b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
 import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
@@ -68,7 +68,7 @@ public class JobGraphBuilder {
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
-	private Map<String, StreamComponentInvokable<?>> invokableObjects;
+	private Map<String, StreamInvokable<?>> invokableObjects;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn1;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn2;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperOut1;
@@ -110,7 +110,7 @@ public class JobGraphBuilder {
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
-		invokableObjects = new HashMap<String, StreamComponentInvokable<?>>();
+		invokableObjects = new HashMap<String, StreamInvokable<?>>();
 		typeWrapperIn1 = new HashMap<String, TypeSerializerWrapper<?>>();
 		typeWrapperIn2 = new HashMap<String, TypeSerializerWrapper<?>>();
 		typeWrapperOut1 = new HashMap<String, TypeSerializerWrapper<?>>();
@@ -232,7 +232,7 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	public <IN, OUT> void addTask(String componentName,
-			UserTaskInvokable<IN, OUT> taskInvokableObject,
+			StreamOperatorInvokable<IN, OUT> taskInvokableObject,
 			TypeSerializerWrapper<?> inTypeWrapper,
 			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
@@ -348,7 +348,7 @@ public class JobGraphBuilder {
 	 */
 	private void addComponent(String componentName,
 			Class<? extends AbstractInvokable> componentClass,
-			StreamComponentInvokable<?> invokableObject, String operatorName,
+			StreamInvokable<?> invokableObject, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
 		componentClasses.put(componentName, componentClass);
@@ -387,7 +387,7 @@ public class JobGraphBuilder {
 		// Get vertex attributes
 		Class<? extends AbstractInvokable> componentClass = componentClasses
 				.get(componentName);
-		StreamComponentInvokable<?> invokableObject = invokableObjects
+		StreamInvokable<?> invokableObject = invokableObjects
 				.get(componentName);
 		String operatorName = operatorNames.get(componentName);
 		byte[] serializedFunction = serializedFunctions.get(componentName);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 51dc4e3..445020a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
@@ -144,7 +144,7 @@ public class StreamConfig {
 		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
 	}
 
-	public void setUserInvokable(StreamComponentInvokable<?> invokableObject) {
+	public void setUserInvokable(StreamInvokable<?> invokableObject) {
 		if (invokableObject != null) {
 			config.setClass(USER_FUNCTION, invokableObject.getClass());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 82f3c50..162c5df 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -82,7 +82,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 	 */
 	private void emit(StreamRecord<OUT> streamRecord) {
 		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
-		streamRecord.setId(channelID);
+		streamRecord.newId(channelID);
 		serializationDelegate.setInstance(streamRecord);
 		emitted.clear();
 		for (String outputName : outputNames) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index cd29b01..65098d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -117,7 +117,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 	 *            StreamRecord to emit.
 	 */
 	private void emit(StreamRecord<OUT> streamRecord) {
-		streamRecord.setId(channelID);
+		streamRecord.newId(channelID);
 		serializationDelegate.setInstance(streamRecord);
 		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
 			try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5d5909c..ead9c35 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -43,7 +43,7 @@ import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
@@ -836,7 +836,7 @@ public abstract class DataStream<OUT> {
 			String functionName, final Function function,
 			TypeSerializerWrapper<OUT> inTypeWrapper,
 			TypeSerializerWrapper<R> outTypeWrapper,
-			UserTaskInvokable<OUT, R> functionInvokable) {
+			StreamOperatorInvokable<OUT, R> functionInvokable) {
 
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 1353a5a..75a0c34 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	private SinkFunction<IN> sinkFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 6411878..ad75157 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
deleted file mode 100644
index 5d9d5e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction implements
-		Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("unused")
-	private String componentName;
-	@SuppressWarnings("unused")
-	private int channelID;
-	protected Collector<OUT> collector;
-	protected Function userFunction;
-
-	public StreamComponentInvokable(Function userFunction) {
-		this.userFunction = userFunction;
-	}
-
-	public void setCollector(Collector<OUT> collector) {
-		this.collector = collector;
-	}
-
-	public void setAttributes(String componentName, int channelID) {
-		this.componentName = componentName;
-		this.channelID = channelID;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (userFunction instanceof RichFunction) {
-			((RichFunction) userFunction).open(parameters);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (userFunction instanceof RichFunction) {
-			((RichFunction) userFunction).close();
-		}
-	}
-	
-	public abstract void invoke() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
new file mode 100644
index 0000000..9a6f2cc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/**
+ * The StreamInvokable represents the base class for all invokables in
+ * the streaming topology.
+ *
+ * @param <OUT>
+ *            The output type of the invokable
+ */
+public abstract class StreamInvokable<OUT> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	protected Collector<OUT> collector;
+	protected Function userFunction;
+
+	public StreamInvokable(Function userFunction) {
+		this.userFunction = userFunction;
+	}
+
+	public void setCollector(Collector<OUT> collector) {
+		this.collector = collector;
+	}
+
+	/**
+	 * Open method to be used if the user defined function extends the
+	 * RichFunction class
+	 * 
+	 * @param parameters
+	 *            The configuration parameters for the operator
+	 */
+	public void open(Configuration parameters) throws Exception {
+		if (userFunction instanceof RichFunction) {
+			((RichFunction) userFunction).open(parameters);
+		}
+	}
+
+	/**
+	 * Close method to be used if the user defined function extends the
+	 * RichFunction class
+	 * 
+	 */
+	public void close() throws Exception {
+		if (userFunction instanceof RichFunction) {
+			((RichFunction) userFunction).close();
+		}
+	}
+
+	/**
+	 * The method that will be called once when the operator is created, the
+	 * working mechanics of the operator should be implemented here
+	 * 
+	 */
+	public abstract void invoke() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
new file mode 100644
index 0000000..d92d1f0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * The StreamOperatorInvokable represents the base class for all operators in
+ * the streaming topology.
+ *
+ * @param <IN>
+ *            Input type of the operator
+ * @param <OUT>
+ *            Output type of the operator
+ */
+public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<OUT> {
+
+	public StreamOperatorInvokable(Function userFunction) {
+		super(userFunction);
+	}
+
+	private static final long serialVersionUID = 1L;
+	private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+
+	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+	protected StreamRecordSerializer<IN> serializer;
+	protected StreamRecord<IN> reuse;
+	protected boolean isMutable;
+
+	/**
+	 * Initializes the {@link StreamOperatorInvokable} for input and output
+	 * handling
+	 * 
+	 * @param collector
+	 *            Collector object for collecting the outputs for the operator
+	 * @param recordIterator
+	 *            Iterator for reading in the input records
+	 * @param serializer
+	 *            Serializer used to deserialize inputs
+	 * @param isMutable
+	 *            Mutability setting for the operator
+	 */
+	public void initialize(Collector<OUT> collector,
+			MutableObjectIterator<StreamRecord<IN>> recordIterator,
+			StreamRecordSerializer<IN> serializer, boolean isMutable) {
+		setCollector(collector);
+		this.recordIterator = recordIterator;
+		this.serializer = serializer;
+		this.reuse = serializer.createInstance();
+		this.isMutable = isMutable;
+	}
+
+	/**
+	 * Re-initializes the object in which the next input record will be read in
+	 */
+	protected void resetReuse() {
+		this.reuse = serializer.createInstance();
+	}
+
+	/**
+	 * Method that will be called if the mutability setting is set to immutable
+	 */
+	protected abstract void immutableInvoke() throws Exception;
+
+	/**
+	 * Method that will be called if the mutability setting is set to mutable
+	 */
+	protected abstract void mutableInvoke() throws Exception;
+
+	/**
+	 * The call of the user implemented function should be implemented here
+	 */
+	protected abstract void callUserFunction() throws Exception;
+
+	/**
+	 * Method for logging exceptions thrown during the user function call
+	 */
+	protected void callUserFunctionAndLogException() {
+		try {
+			callUserFunction();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error(String.format("Calling user function failed due to: %s",
+						StringUtils.stringifyException(e)));
+			}
+		}
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (this.isMutable) {
+			mutableInvoke();
+		} else {
+			immutableInvoke();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
deleted file mode 100644
index c5a6f0f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-
-public abstract class StreamRecordInvokable<IN, OUT> extends StreamComponentInvokable<OUT> {
-
-	public StreamRecordInvokable(Function userFunction) {
-		super(userFunction);
-	}
-
-	private static final long serialVersionUID = 1L;
-	private static final Log LOG = LogFactory.getLog(StreamComponentInvokable.class);
-
-	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
-	StreamRecordSerializer<IN> serializer;
-	protected StreamRecord<IN> reuse;
-	protected boolean isMutable;
-
-	public void initialize(Collector<OUT> collector,
-			MutableObjectIterator<StreamRecord<IN>> recordIterator,
-			StreamRecordSerializer<IN> serializer, boolean isMutable) {
-		setCollector(collector);
-		this.recordIterator = recordIterator;
-		this.serializer = serializer;
-		this.reuse = serializer.createInstance();
-		this.isMutable = isMutable;
-	}
-
-	protected void resetReuse() {
-		this.reuse = serializer.createInstance();
-	}
-
-	protected abstract void immutableInvoke() throws Exception;
-
-	protected abstract void mutableInvoke() throws Exception;
-
-	protected abstract void callUserFunction() throws Exception;
-
-	protected void callUserFunctionAndLogException() {
-		try {
-			callUserFunction();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error(String.format("Calling user function failed due to: %s",
-						StringUtils.stringifyException(e)));
-			}
-		}
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			mutableInvoke();
-		} else {
-			immutableInvoke();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
deleted file mode 100644
index e374773..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-public abstract class UserTaskInvokable<IN, OUT> extends
-		StreamRecordInvokable<IN, OUT> implements Serializable {
-
-	public UserTaskInvokable(Function userFunction) {
-		super(userFunction);
-	}
-
-	private static final long serialVersionUID = 1L;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index df07675..693b1c8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -24,11 +24,11 @@ import java.util.Iterator;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.SlidingWindowState;
 
-public class BatchReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class BatchReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
 	protected GroupReduceFunction<IN, OUT> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index df7d62b..6b31037 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
-public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
+public class FilterInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 2cabc88..c7c3e1e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
-public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private FlatMapFunction<IN, OUT> flatMapper;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 73dd350..7c0352e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
-public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private MapFunction<IN, OUT> mapper;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 52039be..8b76d49 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
-public class StreamReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
+public class StreamReduceInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	protected ReduceFunction<IN> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 7baf687..585bd72 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -18,13 +18,13 @@
 package org.apache.flink.streaming.api.invokable.operator.co;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.util.Collector;
 
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokable<OUT> {
+public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 
 	public CoInvokable(Function userFunction) {
 		super(userFunction);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 88b8db1..a7f70ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.streamcomponent;
 
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 public abstract class AbstractStreamComponent extends AbstractInvokable {
 
@@ -51,7 +51,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
 		this.function = configuration.getFunction();
 	}
 
-	protected <T> void invokeUserFunction(StreamComponentInvokable<T> userInvokable) throws Exception {
+	protected <T> void invokeUserFunction(StreamInvokable<T> userInvokable) throws Exception {
 		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
 		userInvokable.close();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 71fb015..e19eeaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
@@ -153,7 +153,7 @@ public class OutputHandler<OUT> {
 	long startTime;
 
 	public void invokeUserFunction(String componentTypeName,
-			StreamComponentInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+			StreamInvokable<OUT> userInvokable) throws IOException, InterruptedException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(componentTypeName + " " + streamComponent.getName()
 					+ " invoked with instance id " + streamComponent.getInstanceID());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index f98e891..145b709 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.streamcomponent;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
 public class StreamSink<IN> extends AbstractStreamComponent {
 
@@ -27,7 +27,7 @@ public class StreamSink<IN> extends AbstractStreamComponent {
 
 	private InputHandler<IN> inputHandler;
 	
-	private StreamRecordInvokable<IN, IN> userInvokable;
+	private StreamOperatorInvokable<IN, IN> userInvokable;
 
 	public StreamSink() {
 		userInvokable = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 898720e..55b2f98 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -18,14 +18,14 @@
 package org.apache.flink.streaming.api.streamcomponent;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 
 public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
 
 	private InputHandler<IN> inputHandler;
 	private OutputHandler<OUT> outputHandler;
 
-	private StreamRecordInvokable<IN, OUT> userInvokable;
+	private StreamOperatorInvokable<IN, OUT> userInvokable;
 	
 	private static int numTasks;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
index ef19755..2d02949 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
@@ -33,7 +33,7 @@ public class StreamRecord<T> implements Serializable {
 	public boolean isTuple;
 
 	/**
-	 * Creates an empty StreamRecord and initializes an empty ID
+	 * Creates an empty StreamRecord
 	 */
 	public StreamRecord() {
 		uid = new UID();
@@ -53,12 +53,22 @@ public class StreamRecord<T> implements Serializable {
 	 *            ID of the emitting task
 	 * @return The StreamRecord object
 	 */
-	public StreamRecord<T> setId(int channelID) {
+	public StreamRecord<T> newId(int channelID) {
 		uid = new UID(channelID);
 		return this;
 	}
 
 	/**
+	 * Sets the ID of the StreamRecord
+	 * 
+	 * @param id
+	 *            id to set
+	 */
+	public void setId(UID id) {
+		this.uid = id;
+	}
+
+	/**
 	 * Gets the wrapped object from the StreamRecord
 	 * 
 	 * @return The object wrapped

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 66cb0bd..9395293 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -50,8 +50,7 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	@Override
 	public StreamRecord<T> createInstance() {
 		try {
-			@SuppressWarnings("unchecked")
-			StreamRecord<T> t = StreamRecord.class.newInstance();
+			StreamRecord<T> t = new StreamRecord<T>();
 			t.isTuple = isTuple;
 			t.setObject(typeSerializer.createInstance());
 			return t;
@@ -62,15 +61,10 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 
 	@Override
 	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-
-		return null;
-		// for (int i = 0; i < arity; i++) {
-		// Object copy = fieldSerializers[i].copy(from.getField(i),
-		// reuse.getField(i));
-		// reuse.setField(copy, i);
-		// }
-		//
-		// return reuse;
+		reuse.isTuple = from.isTuple;
+		reuse.setId(from.getId().copy());
+		reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
+		return reuse;
 	}
 
 	@Override
@@ -94,27 +88,7 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-
+		//Needs to be implemented
 	}
 
-	// @Override
-	// public int hashCode() {
-	// int hashCode = arity * 47;
-	// for (TypeSerializer<?> ser : this.fieldSerializers) {
-	// hashCode = (hashCode << 7) | (hashCode >>> -7);
-	// hashCode += ser.hashCode();
-	// }
-	// return hashCode;
-	// }
-
-	// @Override
-	// public boolean equals(Object obj) {
-	// if (obj != null && obj instanceof StreamRecordSerializer) {
-	// StreamRecordSerializer otherTS = (StreamRecordSerializer) obj;
-	// return (otherTS.tupleClass == this.tupleClass)
-	// && Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-	// } else {
-	// return false;
-	// }
-	// }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index dd8b029..47a64ac 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.types.TypeInformation;
@@ -88,7 +88,7 @@ public class MockInvokable<IN, OUT> {
 		return iterator;
 	}
 
-	public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
+	public static <IN, OUT> List<OUT> createAndExecute(StreamOperatorInvokable<IN, OUT> invokable, List<IN> inputs) {
 		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
 		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
 		try {