You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:39 UTC

[06/12] [streaming] Streaming jobgraph and vertex refactor to match recent runtime changes

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
deleted file mode 100644
index 0477afa..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class InputHandler<IN> {
-	private StreamRecordSerializer<IN> inputSerializer = null;
-	private MutableObjectIterator<StreamRecord<IN>> inputIter;
-	private MutableReader<IOReadableWritable> inputs;
-
-	private AbstractStreamComponent streamComponent;
-	private StreamConfig configuration;
-
-	public InputHandler(AbstractStreamComponent streamComponent) {
-		this.streamComponent = streamComponent;
-		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
-		try {
-			setConfigInputs();
-		} catch (Exception e) {
-			throw new StreamComponentException("Cannot register inputs for "
-					+ getClass().getSimpleName(), e);
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	protected void setConfigInputs() throws StreamComponentException {
-		setDeserializer();
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs < 2) {
-
-			inputs = new MutableRecordReader<IOReadableWritable>(streamComponent);
-
-		} else {
-			MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
-			for (int i = 0; i < numberOfInputs; i++) {
-				recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamComponent);
-			}
-			inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
-		}
-
-		inputIter = createInputIterator();
-	}
-
-	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
-		inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
-	}
-
-	private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs, inputSerializer);
-		return iter;
-	}
-
-	protected static <T> MutableObjectIterator<StreamRecord<T>> staticCreateInputIterator(
-			MutableReader<?> inputReader, TypeSerializer<?> serializer) {
-
-		// generic data type serialization
-		@SuppressWarnings("unchecked")
-		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		final MutableObjectIterator<StreamRecord<T>> iter = new ReaderIterator(reader, serializer);
-		return iter;
-	}
-
-	public StreamRecordSerializer<IN> getInputSerializer() {
-		return inputSerializer;
-	}
-
-	public MutableObjectIterator<StreamRecord<IN>> getInputIter() {
-		return inputIter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
deleted file mode 100644
index dabb871..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.StreamRecordWriter;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
-	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
-	private AbstractStreamComponent streamComponent;
-	private StreamConfig configuration;
-
-	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
-	private StreamCollector<OUT> collector;
-	private long bufferTimeout;
-
-	TypeInformation<OUT> outTypeInfo = null;
-	StreamRecordSerializer<OUT> outSerializer = null;
-	SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
-
-	public OutputHandler(AbstractStreamComponent streamComponent) {
-		this.streamComponent = streamComponent;
-		this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
-
-		try {
-			setConfigOutputs();
-		} catch (StreamComponentException e) {
-			throw new StreamComponentException("Cannot register outputs for "
-					+ streamComponent.getClass().getSimpleName(), e);
-		}
-	}
-
-	public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
-		return outputs;
-	}
-
-	private void setConfigOutputs() {
-		setSerializers();
-		setCollector();
-
-		int numberOfOutputs = configuration.getNumberOfOutputs();
-		bufferTimeout = configuration.getBufferTimeout();
-
-		for (int i = 0; i < numberOfOutputs; i++) {
-			setPartitioner(i, outputs);
-		}
-	}
-
-	private StreamCollector<OUT> setCollector() {
-		if (streamComponent.configuration.getDirectedEmit()) {
-			OutputSelector<OUT> outputSelector = streamComponent.configuration.getOutputSelector();
-
-			collector = new DirectedStreamCollector<OUT>(streamComponent.getInstanceID(),
-					outSerializationDelegate, outputSelector);
-		} else {
-			collector = new StreamCollector<OUT>(streamComponent.getInstanceID(),
-					outSerializationDelegate);
-		}
-		return collector;
-	}
-
-	public StreamCollector<OUT> getCollector() {
-		return collector;
-	}
-
-	void setSerializers() {
-		outTypeInfo = configuration.getTypeInfoOut1();
-		outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
-		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
-		outSerializationDelegate.setInstance(outSerializer.createInstance());
-	}
-
-	void setPartitioner(int outputNumber,
-			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-		StreamPartitioner<OUT> outputPartitioner = null;
-
-		try {
-			outputPartitioner = configuration.getPartitioner(outputNumber);
-
-		} catch (Exception e) {
-			throw new StreamComponentException("Cannot deserialize partitioner for "
-					+ streamComponent.getName() + " with " + outputNumber + " outputs", e);
-		}
-
-		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
-
-		if (bufferTimeout > 0) {
-			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
-					streamComponent, outputPartitioner, bufferTimeout);
-		} else {
-			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamComponent,
-					outputPartitioner);
-		}
-
-		outputs.add(output);
-		List<String> outputName = configuration.getOutputName(outputNumber);
-		boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
-
-		if (collector != null) {
-			collector.addOutput(output, outputName, isSelectAllOutput);
-		}
-
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
-					.getSimpleName(), outputNumber);
-		}
-	}
-
-	public void flushOutputs() throws IOException, InterruptedException {
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
-		}
-	}
-
-	public void initializeOutputSerializers() {
-		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.initializeSerializers();
-		}
-	}
-
-	long startTime;
-
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<OUT> userInvokable)
-			throws IOException, InterruptedException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoked with instance id {}", componentTypeName,
-					streamComponent.getName(), streamComponent.getInstanceID());
-		}
-
-		initializeOutputSerializers();
-
-		try {
-			streamComponent.invokeUserFunction(userInvokable);
-		} catch (Exception e) {
-			flushOutputs();
-			throw new RuntimeException(e);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
-					streamComponent.getName(), streamComponent.getInstanceID());
-		}
-
-		flushOutputs();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
deleted file mode 100644
index 240c9ba..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-/**
- * An exception that is thrown by the stream components when encountering an
- * illegal condition.
- */
-public class StreamComponentException extends RuntimeException {
-
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8392043527067472439L;
-
-	/**
-	 * Creates a compiler exception with no message and no cause.
-	 */
-	public StreamComponentException() {
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and no cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 */
-	public StreamComponentException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a compiler exception with the given cause and no message.
-	 * 
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamComponentException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamComponentException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
deleted file mode 100755
index 32fc5f9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-
-public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
-
-	private InputHandler<IN> inputHandler;
-
-	private String iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	public StreamIterationSink() {
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		try {
-			inputHandler = new InputHandler<IN>(this);
-
-			iterationId = configuration.getIterationId();
-			iterationWaitTime = configuration.getIterationWaitTime();
-			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId);
-		} catch (Exception e) {
-			throw new StreamComponentException(String.format(
-					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
-		}
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoked", getName());
-		}
-
-		forwardRecords();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoke finished", getName());
-		}
-	}
-
-	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
-		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
-			if (!pushToQueue(reuse)) {
-				break;
-			}
-			// TODO: Fix object reuse for iteration
-			reuse = inputHandler.getInputSerializer().createInstance();
-		}
-	}
-
-	private boolean pushToQueue(StreamRecord<IN> record) {
-		try {
-			if (shouldWait) {
-				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				dataChannel.put(record);
-				return true;
-			}
-		} catch (InterruptedException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
-						StringUtils.stringifyException(e));
-			}
-			return false;
-		}
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
deleted file mode 100755
index ce5687a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-
-public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSource.class);
-
-	private OutputHandler<OUT> outputHandler;
-
-	private static int numSources;
-	private String iterationId;
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	@SuppressWarnings("rawtypes")
-	public StreamIterationSource() {
-		numSources = newComponent();
-		instanceID = numSources;
-		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-
-		iterationId = configuration.getIterationId();
-		iterationWaitTime = configuration.getIterationWaitTime();
-		shouldWait = iterationWaitTime > 0;
-
-		try {
-			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
-		} catch (Exception e) {
-
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		outputHandler.initializeOutputSerializers();
-
-		StreamRecord<OUT> nextRecord;
-
-		while (true) {
-			if (shouldWait) {
-				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				nextRecord = dataChannel.take();
-			}
-			if (nextRecord == null) {
-				break;
-			}
-			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
-					.getOutputs()) {
-				outputHandler.outSerializationDelegate.setInstance(nextRecord);
-				output.emit(outputHandler.outSerializationDelegate);
-			}
-		}
-
-		outputHandler.flushOutputs();
-	}
-
-	@Override
-	protected void setInvokable() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
deleted file mode 100644
index e2982bf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamSink<IN> extends AbstractStreamComponent {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
-
-	private InputHandler<IN> inputHandler;
-	
-	private StreamOperatorInvokable<IN, IN> userInvokable;
-
-	public StreamSink() {
-		userInvokable = null;
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		inputHandler = new InputHandler<IN>(this);
-	}
-
-	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
-		userInvokable.initialize(null, inputHandler.getInputIter(), inputHandler.getInputSerializer(),
-				isMutable);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoked", getName());
-		}
-
-		invokeUserFunction(userInvokable);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoke finished", getName());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
deleted file mode 100644
index 11f372a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
-
-public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent {
-
-	protected OutputHandler<OUT> outputHandler;
-
-	private SourceInvokable<OUT> sourceInvokable;
-	
-	private static int numSources;
-
-	public StreamSource() {
-		sourceInvokable = null;
-		numSources = newComponent();
-		instanceID = numSources;
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-	}
-
-	@Override
-	protected void setInvokable() {
-		sourceInvokable = configuration.getUserInvokable();
-		sourceInvokable.setCollector(outputHandler.getCollector());
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("SOURCE", sourceInvokable);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
deleted file mode 100644
index 6824d09..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-
-public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
-
-	private InputHandler<IN> inputHandler;
-	private OutputHandler<OUT> outputHandler;
-
-	private StreamOperatorInvokable<IN, OUT> userInvokable;
-	
-	private static int numTasks;
-
-	public StreamTask() {
-		userInvokable = null;
-		numTasks = newComponent();
-		instanceID = numTasks;
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		inputHandler = new InputHandler<IN>(this);
-		outputHandler = new OutputHandler<OUT>(this);
-	}
-
-	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
-		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
-				inputHandler.getInputSerializer(), isMutable);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("TASK", userInvokable);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
new file mode 100644
index 0000000..5a6519d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.streaming.io.CoRecordReader;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class CoStreamVertex<IN1, IN2, OUT> extends
+		StreamVertex<IN1,OUT> {
+
+	private OutputHandler<OUT> outputHandler;
+
+	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
+	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
+
+	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
+	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
+
+	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
+	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+
+	private CoInvokable<IN1, IN2, OUT> userInvokable;
+	private static int numTasks;
+
+	public CoStreamVertex() {
+		userInvokable = null;
+		numTasks = newVertex();
+		instanceID = numTasks;
+	}
+
+	private void setDeserializers() {
+		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
+
+		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		outputHandler = new OutputHandler<OUT>(this);
+
+		setConfigInputs();
+
+		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
+				inputDeserializer1, inputDeserializer2);
+	}
+
+	@Override
+	protected void setInvokable() {
+		userInvokable = configuration.getUserInvokable();
+		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
+				inputDeserializer2, isMutable);
+	}
+
+	protected void setConfigInputs() throws StreamVertexException {
+		setDeserializers();
+
+		int numberOfInputs = configuration.getNumberOfInputs();
+
+		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
+		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
+
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = configuration.getInputType(i);
+			switch (inputType) {
+			case 1:
+				inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
+						this));
+				break;
+			case 2:
+				inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
+						this));
+				break;
+			default:
+				throw new RuntimeException("Invalid input type number: " + inputType);
+			}
+		}
+
+		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
+				inputList1, inputList2);
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
new file mode 100644
index 0000000..17d2ae5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class InputHandler<IN> {
+	private StreamRecordSerializer<IN> inputSerializer = null;
+	private MutableObjectIterator<StreamRecord<IN>> inputIter;
+	private MutableReader<IOReadableWritable> inputs;
+
+	private StreamVertex<IN,?> streamVertex;
+	private StreamConfig configuration;
+
+	public InputHandler(StreamVertex<IN,?> streamComponent) {
+		this.streamVertex = streamComponent;
+		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
+		try {
+			setConfigInputs();
+		} catch (Exception e) {
+			throw new StreamVertexException("Cannot register inputs for "
+					+ getClass().getSimpleName(), e);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	protected void setConfigInputs() throws StreamVertexException {
+		setDeserializer();
+
+		int numberOfInputs = configuration.getNumberOfInputs();
+		if (numberOfInputs > 0) {
+
+			if (numberOfInputs < 2) {
+
+				inputs = new MutableRecordReader<IOReadableWritable>(streamVertex);
+
+			} else {
+				MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
+
+				for (int i = 0; i < numberOfInputs; i++) {
+					recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamVertex);
+				}
+				inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+			}
+
+			inputIter = createInputIterator();
+		}
+	}
+
+	private void setDeserializer() {
+		TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+		if (inTupleTypeInfo != null) {
+			inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
+		}
+	}
+
+	private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,
+				inputSerializer);
+		return iter;
+	}
+
+	protected static <T> MutableObjectIterator<StreamRecord<T>> staticCreateInputIterator(
+			MutableReader<?> inputReader, TypeSerializer<?> serializer) {
+
+		// generic data type serialization
+		@SuppressWarnings("unchecked")
+		MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		final MutableObjectIterator<StreamRecord<T>> iter = new ReaderIterator(reader, serializer);
+		return iter;
+	}
+
+	public StreamRecordSerializer<IN> getInputSerializer() {
+		return inputSerializer;
+	}
+
+	public MutableObjectIterator<StreamRecord<IN>> getInputIter() {
+		return inputIter;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
new file mode 100644
index 0000000..d3f75dd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.StreamCollector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.StreamRecordWriter;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutputHandler<OUT> {
+	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
+
+	private StreamVertex<?,OUT> streamVertex;
+	private StreamConfig configuration;
+
+	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+	private StreamCollector<OUT> collector;
+	private long bufferTimeout;
+
+	TypeInformation<OUT> outTypeInfo = null;
+	StreamRecordSerializer<OUT> outSerializer = null;
+	SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
+
+	public OutputHandler(StreamVertex<?,OUT> streamComponent) {
+		this.streamVertex = streamComponent;
+		this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
+
+		try {
+			setConfigOutputs();
+		} catch (StreamVertexException e) {
+			throw new StreamVertexException("Cannot register outputs for "
+					+ streamComponent.getClass().getSimpleName(), e);
+		}
+	}
+
+	public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
+		return outputs;
+	}
+
+	private void setConfigOutputs() {
+		setSerializers();
+		setCollector();
+
+		int numberOfOutputs = configuration.getNumberOfOutputs();
+		bufferTimeout = configuration.getBufferTimeout();
+
+		for (int i = 0; i < numberOfOutputs; i++) {
+			setPartitioner(i, outputs);
+		}
+	}
+
+	private StreamCollector<OUT> setCollector() {
+		if (streamVertex.configuration.getDirectedEmit()) {
+			OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+
+			collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
+					outSerializationDelegate, outputSelector);
+		} else {
+			collector = new StreamCollector<OUT>(streamVertex.getInstanceID(),
+					outSerializationDelegate);
+		}
+		return collector;
+	}
+
+	public StreamCollector<OUT> getCollector() {
+		return collector;
+	}
+
+	void setSerializers() {
+		outTypeInfo = configuration.getTypeInfoOut1();
+		if (outTypeInfo != null) {
+			outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
+			outSerializationDelegate.setInstance(outSerializer.createInstance());
+		}
+	}
+
+	void setPartitioner(int outputNumber,
+			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
+		StreamPartitioner<OUT> outputPartitioner = null;
+
+		try {
+			outputPartitioner = configuration.getPartitioner(outputNumber);
+
+		} catch (Exception e) {
+			throw new StreamVertexException("Cannot deserialize partitioner for "
+					+ streamVertex.getName() + " with " + outputNumber + " outputs", e);
+		}
+
+		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+
+		if (bufferTimeout > 0) {
+			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
+					streamVertex, outputPartitioner, bufferTimeout);
+		} else {
+			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
+					outputPartitioner);
+		}
+
+		outputs.add(output);
+		List<String> outputName = configuration.getOutputName(outputNumber);
+		boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
+
+		if (collector != null) {
+			collector.addOutput(output, outputName, isSelectAllOutput);
+		}
+
+		if (LOG.isTraceEnabled()) {
+			LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
+					.getSimpleName(), outputNumber);
+		}
+	}
+
+	public void flushOutputs() throws IOException, InterruptedException {
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			output.flush();
+		}
+	}
+
+	public void initializeOutputSerializers() {
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+			output.initializeSerializers();
+		}
+	}
+
+	long startTime;
+
+	public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
+			throws IOException, InterruptedException {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} {} invoked with instance id {}", componentTypeName,
+					streamVertex.getName(), streamVertex.getInstanceID());
+		}
+
+		initializeOutputSerializers();
+
+		try {
+			streamVertex.invokeUserFunction(userInvokable);
+		} catch (Exception e) {
+			flushOutputs();
+			throw new RuntimeException(e);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
+					streamVertex.getName(), streamVertex.getInstanceID());
+		}
+
+		flushOutputs();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
new file mode 100755
index 0000000..4dfecb1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
+
+	private OutputHandler<OUT> outputHandler;
+
+	private static int numSources;
+	private String iterationId;
+	@SuppressWarnings("rawtypes")
+	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
+
+	@SuppressWarnings("rawtypes")
+	public StreamIterationHead() {
+		numSources = newVertex();
+		instanceID = numSources;
+		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		outputHandler = new OutputHandler<OUT>(this);
+
+		iterationId = configuration.getIterationId();
+		iterationWaitTime = configuration.getIterationWaitTime();
+		shouldWait = iterationWaitTime > 0;
+
+		try {
+			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
+		} catch (Exception e) {
+
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void invoke() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
+		}
+
+		outputHandler.initializeOutputSerializers();
+
+		StreamRecord<OUT> nextRecord;
+
+		while (true) {
+			if (shouldWait) {
+				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+			} else {
+				nextRecord = dataChannel.take();
+			}
+			if (nextRecord == null) {
+				break;
+			}
+			for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
+					.getOutputs()) {
+				outputHandler.outSerializationDelegate.setInstance(nextRecord);
+				output.emit(outputHandler.outSerializationDelegate);
+			}
+		}
+
+		outputHandler.flushOutputs();
+	}
+
+	@Override
+	protected void setInvokable() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
new file mode 100755
index 0000000..b603686
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
+
+	private InputHandler<IN> inputHandler;
+
+	private String iterationId;
+	@SuppressWarnings("rawtypes")
+	private BlockingQueue<StreamRecord> dataChannel;
+	private long iterationWaitTime;
+	private boolean shouldWait;
+
+	public StreamIterationTail() {
+	}
+
+	@Override
+	public void setInputsOutputs() {
+		try {
+			inputHandler = new InputHandler<IN>(this);
+
+			iterationId = configuration.getIterationId();
+			iterationWaitTime = configuration.getIterationWaitTime();
+			shouldWait = iterationWaitTime > 0;
+			dataChannel = BlockingQueueBroker.instance().get(iterationId);
+		} catch (Exception e) {
+			throw new StreamVertexException(String.format(
+					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
+		}
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("SINK {} invoked", getName());
+		}
+
+		forwardRecords();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("SINK {} invoke finished", getName());
+		}
+	}
+
+	protected void forwardRecords() throws Exception {
+		StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
+		while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
+			if (!pushToQueue(reuse)) {
+				break;
+			}
+			// TODO: Fix object reuse for iteration
+			reuse = inputHandler.getInputSerializer().createInstance();
+		}
+	}
+
+	private boolean pushToQueue(StreamRecord<IN> record) {
+		try {
+			if (shouldWait) {
+				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+			} else {
+				dataChannel.put(record);
+				return true;
+			}
+		} catch (InterruptedException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+						StringUtils.stringifyException(e));
+			}
+			return false;
+		}
+	}
+
+	@Override
+	protected void setInvokable() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
new file mode 100644
index 0000000..d05339a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+
+public class StreamVertex<IN, OUT> extends AbstractInvokable {
+	
+	private static int numTasks;
+	
+	protected StreamConfig configuration;
+	protected int instanceID;
+	protected String name;
+	private static int numVertices = 0;
+	protected boolean isMutable;
+	protected Object function;
+	protected String functionName;
+	
+	private InputHandler<IN> inputHandler;
+	private OutputHandler<OUT> outputHandler;
+	private StreamInvokable<IN, OUT> userInvokable;
+	
+	public StreamVertex() {
+		userInvokable = null;
+		numTasks = newVertex();
+		instanceID = numTasks;
+	}
+
+	protected static int newVertex() {
+		numVertices++;
+		return numVertices;
+	}
+
+	@Override
+	public void registerInputOutput() {
+		initialize();
+		setInputsOutputs();
+		setInvokable();
+	}
+	
+	protected void initialize() {
+		this.configuration = new StreamConfig(getTaskConfiguration());
+		this.name = configuration.getVertexName();
+		this.isMutable = configuration.getMutability();
+		this.functionName = configuration.getFunctionName();
+		this.function = configuration.getFunction();
+	}
+
+	protected <T> void invokeUserFunction(StreamInvokable<?,T> userInvokable) throws Exception {
+		userInvokable.open(getTaskConfiguration());
+		userInvokable.invoke();
+		userInvokable.close();
+	}
+	
+
+	public void setInputsOutputs() {
+		inputHandler = new InputHandler<IN>(this);
+		outputHandler = new OutputHandler<OUT>(this);
+	}
+
+	protected void setInvokable() {
+		userInvokable = configuration.getUserInvokable();
+		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
+				inputHandler.getInputSerializer(), isMutable);
+	}
+	
+	public String getName() {
+		return name;
+	}
+
+	public int getInstanceID() {
+		return instanceID;
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		outputHandler.invokeUserFunction("TASK", userInvokable);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
new file mode 100644
index 0000000..ed8b91e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+/**
+ * An exception that is thrown by the stream verices when encountering an
+ * illegal condition.
+ */
+public class StreamVertexException extends RuntimeException {
+
+	/**
+	 * Serial version UID for serialization interoperability.
+	 */
+	private static final long serialVersionUID = 8392043527067472439L;
+
+	/**
+	 * Creates a compiler exception with no message and no cause.
+	 */
+	public StreamVertexException() {
+	}
+
+	/**
+	 * Creates a compiler exception with the given message and no cause.
+	 * 
+	 * @param message
+	 *            The message for the exception.
+	 */
+	public StreamVertexException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a compiler exception with the given cause and no message.
+	 * 
+	 * @param cause
+	 *            The <tt>Throwable</tt> that caused this exception.
+	 */
+	public StreamVertexException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * Creates a compiler exception with the given message and cause.
+	 * 
+	 * @param message
+	 *            The message for the exception.
+	 * @param cause
+	 *            The <tt>Throwable</tt> that caused this exception.
+	 */
+	public StreamVertexException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index ae52e67..535e109 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -18,13 +18,13 @@
 package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertArrayEquals;
-
+
 import java.util.ArrayList;
-
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
 import org.apache.flink.streaming.util.MockRecordWriterFactory;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
deleted file mode 100755
index 9c5c43f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-
-public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
-
-	public ArrayList<Integer> emittedRecords;
-
-	public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
-		super(inputBase);
-	}
-
-	public boolean initList() {
-		emittedRecords = new ArrayList<Integer>();
-		return true;
-	}
-	
-	@Override
-	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
-		emittedRecords.add(record.getInstance().getObject().f0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
deleted file mode 100644
index cc341dc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class StreamComponentTest {
-
-	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
-
-	public static class MySource implements SourceFunction<Tuple1<Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
-
-		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				tuple.f0 = i;
-				collector.collect(tuple);
-			}
-		}
-	}
-
-	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
-			Integer i = value.f0;
-			return new Tuple2<Integer, Integer>(i, i + 1);
-		}
-	}
-
-	public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple2<Integer, Integer> tuple) {
-			Integer k = tuple.getField(0);
-			Integer v = tuple.getField(1);
-			data.put(k, v);
-		}
-	}
-
-	@SuppressWarnings("unused")
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALELISM = 1;
-	private static final long MEMORYSIZE = 32;
-
-//	@Test
-	public void wrongJobGraph() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
-
-		try {
-			env.execute();
-			fail();
-		} catch (Exception e) {
-		}
-
-		env.fromCollection(Arrays.asList("a", "b"));
-
-		try {
-			env.execute();
-			fail();
-		} catch (Exception e) {
-		}
-
-		try {
-			env.fromCollection(null);
-			fail();
-		} catch (NullPointerException e) {
-		}
-
-		try {
-			env.fromElements();
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.generateSequence(-10, -30);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.setBufferTimeout(-10);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.setExecutionParallelism(-10);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.readTextFile("random/path/that/is/not/valid");
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-	}
-
-	private static class CoMap implements CoMapFunction<String, Long, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(String value) {
-			return value;
-		}
-
-		@Override
-		public String map2(Long value) {
-			return value.toString();
-		}
-	}
-
-	static HashSet<String> resultSet;
-
-	private static class SetSink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(String value) {
-			resultSet.add(value);
-		}
-	}
-
-	@Test
-	public void coTest() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
-
-		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
-		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-
-		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-
-		resultSet = new HashSet<String>();
-		env.execute();
-
-		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
-				"2", "3"));
-		assertEquals(expectedSet, resultSet);
-	}
-
-	@Test
-	public void runStream() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
-
-		env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
-
-		env.executeTest(MEMORYSIZE);
-		assertEquals(10, data.keySet().size());
-
-		for (Integer k : data.keySet()) {
-			assertEquals((Integer) (k + 1), data.get(k));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
new file mode 100755
index 0000000..e91dc8b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+
+public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
+
+	public ArrayList<Integer> emittedRecords;
+
+	public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
+		super(inputBase);
+	}
+
+	public boolean initList() {
+		emittedRecords = new ArrayList<Integer>();
+		return true;
+	}
+	
+	@Override
+	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
+		emittedRecords.add(record.getInstance().getObject().f0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
new file mode 100644
index 0000000..f470c76
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamVertexTest {
+
+	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
+
+	public static class MySource implements SourceFunction<Tuple1<Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
+
+		@Override
+		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+			for (int i = 0; i < 10; i++) {
+				tuple.f0 = i;
+				collector.collect(tuple);
+			}
+		}
+	}
+
+	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
+			Integer i = value.f0;
+			return new Tuple2<Integer, Integer>(i, i + 1);
+		}
+	}
+
+	public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple2<Integer, Integer> tuple) {
+			Integer k = tuple.getField(0);
+			Integer v = tuple.getField(1);
+			data.put(k, v);
+		}
+	}
+
+	@SuppressWarnings("unused")
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALELISM = 1;
+	private static final long MEMORYSIZE = 32;
+
+	@Test
+	public void wrongJobGraph() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+
+		try {
+			env.fromCollection(null);
+			fail();
+		} catch (NullPointerException e) {
+		}
+
+		try {
+			env.fromElements();
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.generateSequence(-10, -30);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.setBufferTimeout(-10);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.setExecutionParallelism(-10);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.readTextFile("random/path/that/is/not/valid");
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+	}
+
+	private static class CoMap implements CoMapFunction<String, Long, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) {
+			return value;
+		}
+
+		@Override
+		public String map2(Long value) {
+			return value.toString();
+		}
+	}
+
+	static HashSet<String> resultSet;
+
+	private static class SetSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String value) {
+			resultSet.add(value);
+		}
+	}
+
+	@Test
+	public void coTest() throws Exception {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
+		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+
+		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
+
+		resultSet = new HashSet<String>();
+		env.execute();
+
+		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+				"2", "3"));
+		assertEquals(expectedSet, resultSet);
+	}
+
+	@Test
+	public void runStream() throws Exception {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+		env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
+
+		env.executeTest(MEMORYSIZE);
+		assertEquals(10, data.keySet().size());
+
+		for (Integer k : data.keySet()) {
+			assertEquals((Integer) (k + 1), data.get(k));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index 07314ea..c06f53a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
@@ -88,7 +88,7 @@ public class MockInvokable<IN, OUT> {
 		return iterator;
 	}
 
-	public static <IN, OUT> List<OUT> createAndExecute(StreamOperatorInvokable<IN, OUT> invokable, List<IN> inputs) {
+	public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) {
 		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
 		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
index 3baa08d..88673a3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
@@ -21,7 +21,7 @@ import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
+import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
 import org.mockito.Mockito;
 
 public class MockRecordWriterFactory {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 4232398..211daf6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class WindowJoinLocal {
 
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
+	private static final int PARALLELISM = 4;
+	private static final int SOURCE_PARALLELISM = 2;
 
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.