You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:45 UTC

[08/51] [abbrv] git commit: [streaming] Added CoFunctions for two type inputs

[streaming] Added CoFunctions for two type inputs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f436690f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f436690f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f436690f

Branch: refs/heads/master
Commit: f436690fb4ec4d9b5dc7342b5124b6fb0e855d76
Parents: 6e52195
Author: ghermann <re...@gmail.com>
Authored: Tue Jul 22 11:28:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:14 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |   7 +
 .../flink/streaming/api/JobGraphBuilder.java    |  95 ++++++---
 .../api/StreamExecutionEnvironment.java         |  43 +++-
 .../api/function/co/CoMapFunction.java          |  29 +++
 .../api/invokable/operator/co/CoInvokable.java  |  55 ++++++
 .../invokable/operator/co/CoMapInvokable.java   |  53 +++++
 .../AbstractStreamComponent.java                | 144 +++-----------
 .../api/streamcomponent/CoStreamTask.java       | 198 +++++++++++++++++++
 .../SingleInputAbstractStreamComponent.java     | 129 ++++++++++++
 .../streamcomponent/StreamIterationSink.java    |   2 +-
 .../streamcomponent/StreamIterationSource.java  |   2 +-
 .../api/streamcomponent/StreamSink.java         |   2 +-
 .../api/streamcomponent/StreamSource.java       |   2 +-
 .../api/streamcomponent/StreamTask.java         |   2 +-
 .../api/invokable/operator/CoMapTest.java       |  77 ++++++++
 15 files changed, 675 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 4356795..6ef2faf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
@@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -387,6 +389,11 @@ public class DataStream<T extends Tuple> {
 				new BatchReduceInvokable<T, R>(reducer, batchSize));
 	}
 
+	public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
+		return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+	}
+	
+	
 	/**
 	 * Applies a reduce transformation on preset "time" chunks of the
 	 * DataStream. The transformation calls a {@link GroupReduceFunction} on

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 2b3ee5a..022fcd8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -41,12 +41,13 @@ import org.apache.flink.runtime.jobgraph.JobInputVertex;
 import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
 import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
 import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
 import org.apache.flink.streaming.api.streamcomponent.StreamSink;
@@ -70,8 +71,8 @@ public class JobGraphBuilder {
 	// Graph attributes
 	private Map<String, AbstractJobVertex> components;
 	private Map<String, Integer> componentParallelism;
-	private Map<String, Boolean> mutability;
-	private Map<String, List<String>> outEdgeList;
+	private Map<String, ArrayList<String>> outEdgeList;
+	private Map<String, ArrayList<Integer>> outEdgeType;
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
 	private Map<String, String> userDefinedNames;
@@ -101,8 +102,8 @@ public class JobGraphBuilder {
 
 		components = new HashMap<String, AbstractJobVertex>();
 		componentParallelism = new HashMap<String, Integer>();
-		mutability = new HashMap<String, Boolean>();
-		outEdgeList = new HashMap<String, List<String>>();
+		outEdgeList = new HashMap<String, ArrayList<String>>();
+		outEdgeType = new HashMap<String, ArrayList<Integer>>();
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
 		userDefinedNames = new HashMap<String, String>();
@@ -171,7 +172,7 @@ public class JobGraphBuilder {
 		setBytesFrom(iterationHead, componentName);
 
 		setEdge(componentName, iterationHead,
-				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0));
+				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SOURCE: " + componentName);
@@ -204,6 +205,18 @@ public class JobGraphBuilder {
 		}
 	}
 
+	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void addCoTask(
+			String componentName, CoInvokable<IN1, IN2, OUT> taskInvokableObject,
+			String operatorName, byte[] serializedFunction, int parallelism) {
+
+		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
+				serializedFunction, parallelism);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("CO-TASK: " + componentName);
+		}
+	}
+
 	/**
 	 * Adds sink to the JobGraph with the given parameters
 	 * 
@@ -218,7 +231,7 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
+	public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
 			String operatorName, byte[] serializedFunction, int parallelism) {
 
 		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
@@ -288,11 +301,11 @@ public class JobGraphBuilder {
 
 		componentClasses.put(componentName, componentClass);
 		setParallelism(componentName, parallelism);
-		mutability.put(componentName, false);
 		invokableObjects.put(componentName, invokableObject);
 		operatorNames.put(componentName, operatorName);
 		serializedFunctions.put(componentName, serializedFunction);
 		outEdgeList.put(componentName, new ArrayList<String>());
+		outEdgeType.put(componentName, new ArrayList<Integer>());
 		inEdgeList.put(componentName, new ArrayList<String>());
 		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
 		iterationTailCount.put(componentName, 0);
@@ -321,7 +334,8 @@ public class JobGraphBuilder {
 		if (componentClass.equals(StreamSource.class)
 				|| componentClass.equals(StreamIterationSource.class)) {
 			component = new JobInputVertex(componentName, this.jobGraph);
-		} else if (componentClass.equals(StreamTask.class)) {
+		} else if (componentClass.equals(StreamTask.class)
+				|| componentClass.equals(CoStreamTask.class)) {
 			component = new JobTaskVertex(componentName, this.jobGraph);
 		} else if (componentClass.equals(StreamSink.class)
 				|| componentClass.equals(StreamIterationSink.class)) {
@@ -334,9 +348,7 @@ public class JobGraphBuilder {
 			LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
 		}
 
-		Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
-
-		config.setBoolean("isMutable", mutability.get(componentName));
+		Configuration config = component.getConfiguration();
 
 		// Set vertex config
 		if (invokableObject != null) {
@@ -428,10 +440,6 @@ public class JobGraphBuilder {
 		componentParallelism.put(componentName, parallelism);
 	}
 
-	public void setMutability(String componentName, boolean isMutable) {
-		mutability.put(componentName, isMutable);
-	}
-
 	/**
 	 * Connects two vertices in the JobGraph using the selected partitioner
 	 * settings
@@ -442,10 +450,13 @@ public class JobGraphBuilder {
 	 *            Name of the downstream(input) vertex
 	 * @param partitionerObject
 	 *            Partitioner object
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public void setEdge(String upStreamComponentName, String downStreamComponentName,
-			StreamPartitioner<? extends Tuple> partitionerObject) {
+			StreamPartitioner<? extends Tuple> partitionerObject, int typeNumber) {
 		outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
+		outEdgeType.get(upStreamComponentName).add(typeNumber);
 		inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
 		connectionTypes.get(upStreamComponentName).add(partitionerObject);
 	}
@@ -463,10 +474,13 @@ public class JobGraphBuilder {
 	 * @param downStreamComponentName
 	 *            Name of the downstream component, that will receive the
 	 *            records
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void broadcastConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName) {
-		setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>());
+			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+		setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>(),
+				typeNumber);
 	}
 
 	/**
@@ -485,12 +499,15 @@ public class JobGraphBuilder {
 	 *            records
 	 * @param keyPosition
 	 *            Position of key in the tuple
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void fieldsConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int keyPosition) {
+			String upStreamComponentName, String downStreamComponentName, int keyPosition,
+			int typeNumber) {
 
 		setEdge(upStreamComponentName, downStreamComponentName, new FieldsPartitioner<T>(
-				keyPosition));
+				keyPosition), typeNumber);
 	}
 
 	/**
@@ -505,10 +522,13 @@ public class JobGraphBuilder {
 	 *            Name of the upstream component, that will emit the tuples
 	 * @param downStreamComponentName
 	 *            Name of the downstream component, that will receive the tuples
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void globalConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName) {
-		setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>());
+			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+		setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>(),
+				typeNumber);
 	}
 
 	/**
@@ -523,10 +543,13 @@ public class JobGraphBuilder {
 	 *            Name of the upstream component, that will emit the tuples
 	 * @param downStreamComponentName
 	 *            Name of the downstream component, that will receive the tuples
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void shuffleConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName) {
-		setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>());
+			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+		setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>(),
+				typeNumber);
 	}
 
 	/**
@@ -542,10 +565,13 @@ public class JobGraphBuilder {
 	 *            Name of the upstream component, that will emit the tuples
 	 * @param downStreamComponentName
 	 *            Name of the downstream component, that will receive the tuples
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
 	public <T extends Tuple> void forwardConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName) {
-		setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>());
+			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+		setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
+				typeNumber);
 	}
 
 	/**
@@ -565,8 +591,7 @@ public class JobGraphBuilder {
 		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
 		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
 
-		Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
-				.getConfiguration();
+		Configuration config = upStreamComponent.getConfiguration();
 
 		try {
 			if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
@@ -729,15 +754,21 @@ public class JobGraphBuilder {
 		for (String componentName : outEdgeList.keySet()) {
 			createVertex(componentName);
 		}
-
+		int inputNumber = 0;
 		for (String upStreamComponentName : outEdgeList.keySet()) {
+			
 			int i = 0;
+			ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
+
 			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
+				Configuration downStreamComponentConfig = components.get(downStreamComponentName)
+						.getConfiguration();
+				downStreamComponentConfig.setInteger("inputType_" + inputNumber++, outEdgeTypeList.get(i));
+
 				connect(upStreamComponentName, downStreamComponentName,
 						connectionTypes.get(upStreamComponentName).get(i));
 				i++;
 			}
-
 		}
 
 		setAutomaticInstanceSharing();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index c1eca4a..9b1a16a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -310,7 +311,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new RuntimeException("Cannot serialize user defined function");
 		}
 
-		connectGraph(inputStream, returnStream.getId());
+		connectGraph(inputStream, returnStream.getId(), 0);
 
 		if (inputStream.iterationflag) {
 			returnStream.addIterationSource(inputStream.iterationID.toString());
@@ -319,6 +320,30 @@ public abstract class StreamExecutionEnvironment {
 
 		return returnStream;
 	}
+	
+	protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2, final AbstractFunction function,
+			CoInvokable<T1, T2, R> functionInvokable) {
+		
+		DataStream<R> returnStream = new DataStream<R>(this, functionName);
+		
+		try {
+			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, functionName,
+					SerializationUtils.serialize(function), degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		connectGraph(inputStream1, returnStream.getId(), 1);
+		connectGraph(inputStream2, returnStream.getId(), 2);
+
+		// TODO consider iteration
+//		if (inputStream.iterationflag) {
+//			returnStream.addIterationSource(inputStream.iterationID.toString());
+//			inputStream.iterationflag = false;
+//		}
+
+		return returnStream; 
+	}
 
 	protected <T extends Tuple, R extends Tuple> void addIterationSource(DataStream<T> inputStream,
 			String iterationID) {
@@ -340,7 +365,7 @@ public abstract class StreamExecutionEnvironment {
 
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
 			String input = inputStream.connectIDs.get(i);
-			jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId());
+			jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId(), 0);
 
 		}
 	}
@@ -368,7 +393,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new RuntimeException("Cannot serialize SinkFunction");
 		}
 
-		connectGraph(inputStream, returnStream.getId());
+		connectGraph(inputStream, returnStream.getId(), 0);
 
 		return returnStream;
 	}
@@ -523,8 +548,10 @@ public abstract class StreamExecutionEnvironment {
 	 *            ID of the output
 	 * @param <T>
 	 *            type of the input stream
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
 	 */
-	private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
+	private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
 
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
 			ConnectionType type = inputStream.ctypes.get(i);
@@ -533,16 +560,16 @@ public abstract class StreamExecutionEnvironment {
 
 			switch (type) {
 			case SHUFFLE:
-				jobGraphBuilder.shuffleConnect(inputStream, input, outputID);
+				jobGraphBuilder.shuffleConnect(inputStream, input, outputID, typeNumber);
 				break;
 			case BROADCAST:
-				jobGraphBuilder.broadcastConnect(inputStream, input, outputID);
+				jobGraphBuilder.broadcastConnect(inputStream, input, outputID, typeNumber);
 				break;
 			case FIELD:
-				jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param);
+				jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param, typeNumber);
 				break;
 			case FORWARD:
-				jobGraphBuilder.forwardConnect(inputStream, input, outputID);
+				jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
 				break;
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
new file mode 100644
index 0000000..5885cbf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class CoMapFunction<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends AbstractFunction {
+	private static final long serialVersionUID = 1L;
+	
+	public abstract OUT map1(IN1 value);
+	public abstract OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
new file mode 100644
index 0000000..733b61e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+		StreamComponentInvokable {
+
+	private static final long serialVersionUID = 1L;
+
+	protected Collector<OUT> collector;
+	protected MutableObjectIterator<StreamRecord<IN1>> recordIterator1;
+	protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
+	protected StreamRecord<IN1> reuse1;
+	protected StreamRecord<IN2> reuse2;
+
+	public void initialize(Collector<OUT> collector,
+			MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
+			StreamRecordSerializer<IN1> serializer1,
+			MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
+			StreamRecordSerializer<IN2> serializer2) {
+		this.collector = collector;
+	
+		this.recordIterator1 = recordIterator1;
+		this.reuse1 = serializer1.createInstance();
+		
+		this.recordIterator2 = recordIterator2;
+		this.reuse2 = serializer2.createInstance();
+	}
+
+	public abstract void invoke() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
new file mode 100644
index 0000000..4a30425
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+
+public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+		CoInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CoMapFunction<IN1, IN2, OUT> mapper;
+
+	public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
+		this.mapper = mapper;
+	}
+
+	// TODO rework this as UnionRecordReader
+	@Override
+	public void invoke() throws Exception {
+		boolean noMoreRecordOnInput1;
+		boolean noMoreRecordOnInput2;
+		
+		do {
+			noMoreRecordOnInput1 = recordIterator1.next(reuse1) == null;
+			if (!noMoreRecordOnInput1) {
+				collector.collect(mapper.map1(reuse1.getTuple()));
+			}
+			
+			noMoreRecordOnInput2 = recordIterator2.next(reuse2) == null;
+			if (!noMoreRecordOnInput2) {
+				collector.collect(mapper.map2(reuse2.getTuple()));
+			}
+		} while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index f90caf8..1a51492 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -19,28 +19,18 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.List;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
@@ -49,9 +39,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
@@ -59,14 +47,10 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
-		AbstractInvokable {
+public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
+	
 	private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
 
-	protected TupleTypeInfo<IN> inTupleTypeInfo = null;
-	protected StreamRecordSerializer<IN> inTupleSerializer = null;
-	protected DeserializationDelegate<StreamRecord<IN>> inDeserializationDelegate = null;
-
 	protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
 	protected StreamRecordSerializer<OUT> outTupleSerializer = null;
 	protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
@@ -105,109 +89,17 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 		}
 		return collector;
 	}
-
-	protected void setSerializers() {
-		byte[] operatorBytes = configuration.getBytes("operator", null);
-		String operatorName = configuration.getString("operatorName", "");
-
-		Object function = null;
-		try {
-			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
-			function = in.readObject();
-
-			if (operatorName.equals("flatMap")) {
-				setSerializerDeserializer(function, FlatMapFunction.class);
-			} else if (operatorName.equals("map")) {
-				setSerializerDeserializer(function, MapFunction.class);
-			} else if (operatorName.equals("batchReduce")) {
-				setSerializerDeserializer(function, GroupReduceFunction.class);
-			} else if (operatorName.equals("filter")) {
-				setDeserializer(function, FilterFunction.class);
-				setSerializer(function, FilterFunction.class, 0);
-			} else if (operatorName.equals("sink")) {
-				setDeserializer(function, SinkFunction.class);
-			} else if (operatorName.equals("source")) {
-				setSerializer(function, UserSourceInvokable.class, 0);
-			} else if (operatorName.equals("elements")) {
-				outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
-				outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
-				outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
-						outTupleSerializer);
-			} else {
-				throw new Exception("Wrong operator name!");
-			}
-
-		} catch (Exception e) {
-			throw new StreamComponentException(e);
-			// throw new StreamComponentException("Nonsupported object (named "
-			// + operatorName
-			// + ") passed as operator");
-		}
-	}
-
-	private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
-		setDeserializer(function, clazz);
-		setSerializer(function, clazz, 1);
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
-		inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
-				0, null, null);
-
-		inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
-		inDeserializationDelegate = new DeserializationDelegate<StreamRecord<IN>>(inTupleSerializer);
-	}
+	
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	private void setSerializer(Object function, Class<?> clazz, int typeParameter) {
+	protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
 		outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
 				typeParameter, null, null);
 
 		outTupleSerializer = new StreamRecordSerializer(outTupleTypeInfo.createSerializer());
 		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
 	}
-
-	@SuppressWarnings("unchecked")
-	protected void setSinkSerializer() {
-		if (outSerializationDelegate != null) {
-			inTupleTypeInfo = (TupleTypeInfo<IN>) outTupleTypeInfo;
-
-			inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
-			inDeserializationDelegate = new DeserializationDelegate<StreamRecord<IN>>(inTupleSerializer);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
-		int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
-
-		if (numberOfInputs < 2) {
-
-			return new MutableRecordReader<IOReadableWritable>(this);
-
-		} else {
-			MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
-			for (int i = 0; i < numberOfInputs; i++) {
-				recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
-			}
-			return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
-		}
-	}
-
-	protected MutableObjectIterator<StreamRecord<IN>> createInputIterator(MutableReader<?> inputReader,
-			TypeSerializer<?> serializer) {
-
-		// generic data type serialization
-		@SuppressWarnings("unchecked")
-		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(reader, serializer);
-		return iter;
-
-	}
+	
 
 	protected void setConfigOutputs(List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 
@@ -217,7 +109,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 			setPartitioner(i, outputs);
 		}
 	}
-
+	
 	private void setPartitioner(int outputNumber,
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 
@@ -242,12 +134,12 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 						+ " with " + outputNumber + " outputs");
 			}
 		} catch (Exception e) {
-			throw new StreamComponentException("Cannot deserialize partitioner "
-					+ outputPartitioner.getClass().getSimpleName() + " with " + outputNumber
-					+ " outputs");
+			throw new StreamComponentException("Cannot deserialize " + outputPartitioner.getClass().getSimpleName() + " of " + 
+					name + " with " + outputNumber
+					+ " outputs", e);
 		}
 	}
-
+	
 	/**
 	 * Reads and creates a StreamComponent from the config.
 	 * 
@@ -274,12 +166,24 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
 
 		return userFunction;
 	}
+	
+	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(MutableReader<?> inputReader,
+			TypeSerializer<?> serializer) {
 
+		// generic data type serialization
+		@SuppressWarnings("unchecked")
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(reader, serializer);
+		return iter;
+	}
+	
 	@SuppressWarnings("unchecked")
-	private static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+	protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
 			ClassNotFoundException {
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
-
+	
 	protected abstract void setInvokable();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
new file mode 100644
index 0000000..4c1cf42
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.streamcomponent;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+		AbstractStreamComponent<OUT> {
+	private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
+
+	protected StreamRecordSerializer<IN1> inTupleSerializer1 = null;
+	protected StreamRecordSerializer<IN2> inTupleSerializer2 = null;
+
+	private MutableReader<IOReadableWritable> inputs1;
+	private MutableReader<IOReadableWritable> inputs2;
+	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
+	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
+
+	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+	private CoInvokable<IN1, IN2, OUT> userFunction;
+	private int[] numberOfOutputChannels;
+	private static int numTasks;
+
+	public CoStreamTask() {
+
+		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		userFunction = null;
+		numTasks = newComponent();
+		instanceID = numTasks;
+	}
+
+	protected void setSerializers() {
+		byte[] operatorBytes = configuration.getBytes("operator", null);
+		String operatorName = configuration.getString("operatorName", "");
+
+		Object function = null;
+		try {
+			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
+			function = in.readObject();
+
+			if (operatorName.equals("coMap")) {
+				setSerializer(function, CoMapFunction.class, 2);
+				setDeserializers(function, CoMapFunction.class);
+			} else {
+				throw new Exception("Wrong operator name!");
+			}
+
+		} catch (Exception e) {
+			throw new StreamComponentException(e);
+		}
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
+
+		TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
+				function.getClass(), 0, null, null);
+		inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+
+		inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+				1, null, null);
+		inTupleSerializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+	}
+
+	@Override
+	public void registerInputOutput() {
+		initialize();
+
+		setSerializers();
+		setCollector();
+
+		// inputs1 = setConfigInputs();
+		setConfigInputs();
+
+		inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
+
+		// inputs2 = setConfigInputs();
+		inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
+
+		setConfigOutputs(outputs);
+
+		numberOfOutputChannels = new int[outputs.size()];
+		for (int i = 0; i < numberOfOutputChannels.length; i++) {
+			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+		}
+
+		setInvokable();
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	protected void setInvokable() {
+		// Default value is a CoMapInvokable
+		Class<? extends CoInvokable> userFunctionClass = configuration.getClass("userfunction",
+				CoMapInvokable.class, CoInvokable.class);
+		userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
+		userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
+				inTupleSerializer2);
+	}
+
+	protected void setConfigInputs() throws StreamComponentException {
+		int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+
+		ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
+		ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
+
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = configuration.getInteger("inputType_" + i, 0);
+			switch (inputType) {
+			case 1:
+				inputList1.add(new MutableRecordReader<IOReadableWritable>(this));
+				break;
+			case 2:
+				inputList2.add(new MutableRecordReader<IOReadableWritable>(this));
+				break;
+			default:
+				throw new RuntimeException("Invalid input type number: " + inputType);
+			}
+		}
+
+		inputs1 = getInputs(inputList1);
+		inputs2 = getInputs(inputList2);
+	}
+
+	@SuppressWarnings("unchecked")
+	private MutableReader<IOReadableWritable> getInputs(
+			ArrayList<MutableRecordReader<IOReadableWritable>> inputList) {
+		if (inputList.size() == 1) {
+			return inputList.get(0);
+		} else if (inputList.size() > 1) {
+			return new MutableUnionRecordReader<IOReadableWritable>(
+					(MutableRecordReader<IOReadableWritable>[]) inputList.toArray());
+		}
+		return null;
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
+		}
+
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			output.initializeSerializers();
+		}
+
+		userFunction.invoke();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
+		}
+
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			output.flush();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
new file mode 100644
index 0000000..b49620f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.streamcomponent;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+
+public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
+		AbstractStreamComponent<OUT> {
+
+	protected StreamRecordSerializer<IN> inTupleSerializer = null;
+
+	protected void setSerializers() {
+		byte[] operatorBytes = configuration.getBytes("operator", null);
+		String operatorName = configuration.getString("operatorName", "");
+
+		Object function = null;
+		try {
+			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
+			function = in.readObject();
+
+			if (operatorName.equals("flatMap")) {
+				setSerializerDeserializer(function, FlatMapFunction.class);
+			} else if (operatorName.equals("map")) {
+				setSerializerDeserializer(function, MapFunction.class);
+			} else if (operatorName.equals("batchReduce")) {
+				setSerializerDeserializer(function, GroupReduceFunction.class);
+			} else if (operatorName.equals("filter")) {
+				setDeserializer(function, FilterFunction.class);
+				setSerializer(function, FilterFunction.class, 0);
+			} else if (operatorName.equals("sink")) {
+				setDeserializer(function, SinkFunction.class);
+			} else if (operatorName.equals("source")) {
+				setSerializer(function, UserSourceInvokable.class, 0);
+			} else if (operatorName.equals("coMap")) {
+				setSerializer(function, CoMapFunction.class, 2);
+				//setDeserializers(function, CoMapFunction.class);
+			} else if (operatorName.equals("elements")) {
+				outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
+
+				outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
+				outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
+						outTupleSerializer);
+			} else {
+				throw new Exception("Wrong operator name: " + operatorName);
+			}
+
+		} catch (Exception e) {
+			throw new StreamComponentException(e);
+		}
+	}
+
+	private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
+		setDeserializer(function, clazz);
+		setSerializer(function, clazz, 1);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
+		TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+				0, null, null);
+
+		inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+	}
+	
+	@SuppressWarnings("unchecked")
+	protected void setSinkSerializer() {
+		if (outSerializationDelegate != null) {
+			TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) outTupleTypeInfo;
+
+			inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
+		int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+
+		if (numberOfInputs < 2) {
+
+			return new MutableRecordReader<IOReadableWritable>(this);
+
+		} else {
+			MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
+
+			for (int i = 0; i < numberOfInputs; i++) {
+				recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
+			}
+			return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index b92e031..1b25285 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
 
-public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN> {
+public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 9f58842..f880470 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 1433ebc..5e3457c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class StreamSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN> {
+public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
 
 	private static final Log LOG = LogFactory.getLog(StreamSink.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 108916a..12c9ba3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent<Tuple, OUT> {
+public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamSource.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 7da19aa..5032446 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 
 public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
-		AbstractStreamComponent<IN, OUT> {
+		SingleInputAbstractStreamComponent<IN, OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
new file mode 100644
index 0000000..00ae3b9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+	
+	private static Set<String> result = new HashSet<String>();
+	private static Set<String> expected = new HashSet<String>();
+
+	@Test
+	public void test() {
+		expected.add("a");
+		expected.add("b");
+		expected.add("c");
+		expected.add("1");
+		expected.add("2");
+		expected.add("3");
+		expected.add("4");
+		
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		
+		DataStream<Tuple1<Integer>> ds1 = env.fromElements(1, 2, 3, 4);
+
+		@SuppressWarnings("unused")
+		DataStream<Tuple1<Boolean>> ds2 = env.fromElements("a", "b", "c").coMapWith(new CoMapFunction<Tuple1<String>, Tuple1<Integer>, Tuple1<Boolean>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple1<Boolean> map1(Tuple1<String> value) {
+				System.out.println("1: " + value);
+				result.add(value.f0);
+				return new Tuple1<Boolean>(true);
+			}
+
+			@Override
+			public Tuple1<Boolean> map2(Tuple1<Integer> value) {
+				System.out.println("2: " +value);
+				result.add(value.f0.toString());
+				return new Tuple1<Boolean>(false);
+			}
+		}, ds1)
+		.print();
+		
+		env.executeTest(32);
+		Assert.assertArrayEquals(expected.toArray(), result.toArray());
+	}
+}