You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:53 UTC

[16/51] [abbrv] git commit: [streaming] StreamRecordWriter added for automatic output flushing settings

[streaming] StreamRecordWriter added for automatic output flushing settings


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a2c4137f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a2c4137f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a2c4137f

Branch: refs/heads/master
Commit: a2c4137feb4c8ad6e0f0b34c4b0062b577af0150
Parents: be459ae
Author: gyfora <gy...@gmail.com>
Authored: Thu Jul 24 11:10:56 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:20:17 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |  13 +
 .../flink/streaming/api/JobGraphBuilder.java    | 321 +++++++++----------
 .../api/StreamExecutionEnvironment.java         |   4 +
 .../AbstractStreamComponent.java                |  41 ++-
 .../api/streamcomponent/StreamRecordWriter.java | 115 +++++++
 .../apache/flink/streaming/api/PrintTest.java   |  47 +--
 6 files changed, 303 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 27e4d89..d965bf2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -149,6 +149,19 @@ public class DataStream<T extends Tuple> {
 	}
 
 	/**
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffer. By default the output buffers flush only when they are full.
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 * @return The DataStream with buffer timeout set.
+	 */
+	public DataStream<T> setBufferTimeout(long timeoutMillis) {
+		environment.setBufferTimeout(this, timeoutMillis);
+		return this;
+	}
+
+	/**
 	 * Sets the degree of parallelism for this operator. The degree must be 1 or
 	 * more.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 64fdc03..7a10246 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -19,19 +19,13 @@
 
 package org.apache.flink.streaming.api;
 
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -42,9 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobOutputVertex;
 import org.apache.flink.runtime.jobgraph.JobTaskVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
@@ -55,9 +49,10 @@ import org.apache.flink.streaming.api.streamcomponent.StreamSource;
 import org.apache.flink.streaming.api.streamcomponent.StreamTask;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
- * Object for building Flink stream processing job graphs
+ * Object for building Apache Flink stream processing job graphs
  */
 public class JobGraphBuilder {
 
@@ -67,19 +62,27 @@ public class JobGraphBuilder {
 	// Graph attributes
 	private Map<String, AbstractJobVertex> components;
 	private Map<String, Integer> componentParallelism;
-	private Map<String, ArrayList<String>> outEdgeList;
-	private Map<String, ArrayList<Integer>> outEdgeType;
+	private Map<String, Long> bufferTimeout;
+	private Map<String, List<String>> outEdgeList;
+	private Map<String, List<Integer>> outEdgeType;
+	private Map<String, List<List<String>>> outEdgeNames;
+	private Map<String, Boolean> mutability;
 	private Map<String, List<String>> inEdgeList;
-	private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
-	private Map<String, String> userDefinedNames;
+	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
-	private Map<String, StreamComponentInvokable> invokableObjects;
+	private Map<String, StreamComponentInvokable<?>> invokableObjects;
+	private Map<String, TypeSerializerWrapper<?, ?, ?>> typeWrappers;
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> componentClasses;
 	private Map<String, String> iterationIds;
-	private Map<String, String> iterationHeadNames;
+	private Map<String, String> iterationIDtoSourceName;
+	private Map<String, String> iterationIDtoSinkName;
 	private Map<String, Integer> iterationTailCount;
+	private Map<String, Long> iterationWaitTime;
+
+	private int degreeOfParallelism;
+	private int executionParallelism;
 
 	private String maxParallelismVertexName;
 	private int maxParallelism;
@@ -98,19 +101,24 @@ public class JobGraphBuilder {
 
 		components = new HashMap<String, AbstractJobVertex>();
 		componentParallelism = new HashMap<String, Integer>();
-		outEdgeList = new HashMap<String, ArrayList<String>>();
-		outEdgeType = new HashMap<String, ArrayList<Integer>>();
+		bufferTimeout = new HashMap<String, Long>();
+		outEdgeList = new HashMap<String, List<String>>();
+		outEdgeType = new HashMap<String, List<Integer>>();
+		outEdgeNames = new HashMap<String, List<List<String>>>();
+		mutability = new HashMap<String, Boolean>();
 		inEdgeList = new HashMap<String, List<String>>();
-		connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
-		userDefinedNames = new HashMap<String, String>();
+		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
-		invokableObjects = new HashMap<String, StreamComponentInvokable>();
+		invokableObjects = new HashMap<String, StreamComponentInvokable<?>>();
+		typeWrappers = new HashMap<String, TypeSerializerWrapper<?, ?, ?>>();
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
 		iterationIds = new HashMap<String, String>();
-		iterationHeadNames = new HashMap<String, String>();
+		iterationIDtoSourceName = new HashMap<String, String>();
+		iterationIDtoSinkName = new HashMap<String, String>();
 		iterationTailCount = new HashMap<String, Integer>();
+		iterationWaitTime = new HashMap<String, Long>();
 
 		maxParallelismVertexName = "";
 		maxParallelism = 0;
@@ -119,6 +127,22 @@ public class JobGraphBuilder {
 		}
 	}
 
+	public int getDefaultParallelism() {
+		return degreeOfParallelism;
+	}
+
+	public void setDefaultParallelism(int defaultParallelism) {
+		this.degreeOfParallelism = defaultParallelism;
+	}
+
+	public int getExecutionParallelism() {
+		return executionParallelism;
+	}
+
+	public void setExecutionParallelism(int executionParallelism) {
+		this.executionParallelism = executionParallelism;
+	}
+
 	/**
 	 * Adds source to the JobGraph with the given parameters
 	 * 
@@ -133,11 +157,11 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSource(String componentName,
-			UserSourceInvokable<? extends Tuple> InvokableObject, String operatorName,
+	public void addSource(String componentName, SourceInvokable<?> InvokableObject,
+			TypeSerializerWrapper<?, ?, ?> typeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
+		addComponent(componentName, StreamSource.class, typeWrapper, InvokableObject, operatorName,
 				serializedFunction, parallelism);
 
 		if (LOG.isDebugEnabled()) {
@@ -157,18 +181,24 @@ public class JobGraphBuilder {
 	 *            ID of iteration for multiple iterations
 	 * @param parallelism
 	 *            Number of parallel instances created
+	 * @param waitTime
+	 *            Max wait time for next record
 	 */
 	public void addIterationSource(String componentName, String iterationHead, String iterationID,
-			int parallelism) {
+			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
+		addComponent(componentName, StreamIterationSource.class, null, null, null, null,
+				parallelism);
 		iterationIds.put(componentName, iterationID);
-		iterationHeadNames.put(iterationID, componentName);
+		iterationIDtoSourceName.put(iterationID, componentName);
 
 		setBytesFrom(iterationHead, componentName);
 
 		setEdge(componentName, iterationHead,
-				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
+				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0,
+				new ArrayList<String>());
+
+		iterationWaitTime.put(iterationIDtoSourceName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SOURCE: " + componentName);
@@ -178,8 +208,8 @@ public class JobGraphBuilder {
 	/**
 	 * Adds a task to the JobGraph with the given parameters
 	 * 
-	 * @param componentName
-	 *            Name of the component
+	 * @param componentNameTypeSerializerWrapper
+	 *            <?, ?, ?> typeWrapper, Name of the component
 	 * @param taskInvokableObject
 	 *            User defined operator
 	 * @param operatorName
@@ -189,24 +219,26 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public <IN extends Tuple, OUT extends Tuple> void addTask(String componentName,
-			UserTaskInvokable<IN, OUT> taskInvokableObject, String operatorName,
+	public <IN, OUT> void addTask(String componentName,
+			UserTaskInvokable<IN, OUT> taskInvokableObject,
+			TypeSerializerWrapper<?, ?, ?> typeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
-				serializedFunction, parallelism);
+		addComponent(componentName, StreamTask.class, typeWrapper, taskInvokableObject,
+				operatorName, serializedFunction, parallelism);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("TASK: " + componentName);
 		}
 	}
 
-	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void addCoTask(
-			String componentName, CoInvokable<IN1, IN2, OUT> taskInvokableObject,
-			String operatorName, byte[] serializedFunction, int parallelism) {
+	public <IN1, IN2, OUT> void addCoTask(String componentName,
+			CoInvokable<IN1, IN2, OUT> taskInvokableObject,
+			TypeSerializerWrapper<?, ?, ?> typeWrapper, String operatorName,
+			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
-				serializedFunction, parallelism);
+		addComponent(componentName, CoStreamTask.class, typeWrapper, taskInvokableObject,
+				operatorName, serializedFunction, parallelism);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: " + componentName);
@@ -227,10 +259,11 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
-			String operatorName, byte[] serializedFunction, int parallelism) {
+	public void addSink(String componentName, SinkInvokable<?> InvokableObject,
+			TypeSerializerWrapper<?, ?, ?> typeWrapper, String operatorName,
+			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
+		addComponent(componentName, StreamSink.class, typeWrapper, InvokableObject, operatorName,
 				serializedFunction, parallelism);
 
 		if (LOG.isDebugEnabled()) {
@@ -254,19 +287,17 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 * @param directName
 	 *            Id of the output direction
+	 * @param waitTime
+	 *            Max waiting time for next record
 	 */
 	public void addIterationSink(String componentName, String iterationTail, String iterationID,
-			int parallelism, String directName) {
+			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
+		addComponent(componentName, StreamIterationSink.class, null, null, null, null, parallelism);
 		iterationIds.put(componentName, iterationID);
+		iterationIDtoSinkName.put(iterationID, componentName);
 		setBytesFrom(iterationTail, componentName);
-
-		if (directName != null) {
-			setUserDefinedName(componentName, directName);
-		} else {
-			setUserDefinedName(componentName, "iterate");
-		}
+		iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SINK: " + componentName);
@@ -281,6 +312,8 @@ public class JobGraphBuilder {
 	 *            Name of the component
 	 * @param componentClass
 	 *            The class of the vertex
+	 * @param typeWrapper
+	 *            Wrapper of the types for serialization
 	 * @param invokableObject
 	 *            The user defined invokable object
 	 * @param operatorName
@@ -292,18 +325,22 @@ public class JobGraphBuilder {
 	 */
 	private void addComponent(String componentName,
 			Class<? extends AbstractInvokable> componentClass,
-			StreamComponentInvokable invokableObject, String operatorName,
+			TypeSerializerWrapper<?, ?, ?> typeWrapper,
+			StreamComponentInvokable<?> invokableObject, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
 		componentClasses.put(componentName, componentClass);
+		typeWrappers.put(componentName, typeWrapper);
 		setParallelism(componentName, parallelism);
+		mutability.put(componentName, false);
 		invokableObjects.put(componentName, invokableObject);
 		operatorNames.put(componentName, operatorName);
 		serializedFunctions.put(componentName, serializedFunction);
 		outEdgeList.put(componentName, new ArrayList<String>());
 		outEdgeType.put(componentName, new ArrayList<Integer>());
+		outEdgeNames.put(componentName, new ArrayList<List<String>>());
 		inEdgeList.put(componentName, new ArrayList<String>());
-		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
+		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<?>>());
 		iterationTailCount.put(componentName, 0);
 	}
 
@@ -318,12 +355,11 @@ public class JobGraphBuilder {
 
 		// Get vertex attributes
 		Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
-		StreamComponentInvokable invokableObject = invokableObjects.get(componentName);
+		StreamComponentInvokable<?> invokableObject = invokableObjects.get(componentName);
 		String operatorName = operatorNames.get(componentName);
 		byte[] serializedFunction = serializedFunctions.get(componentName);
 		int parallelism = componentParallelism.get(componentName);
 		byte[] outputSelector = outputSelectors.get(componentName);
-		String userDefinedName = userDefinedNames.get(componentName);
 
 		// Create vertex object
 		AbstractJobVertex component = null;
@@ -336,6 +372,8 @@ public class JobGraphBuilder {
 		} else if (componentClass.equals(StreamSink.class)
 				|| componentClass.equals(StreamIterationSink.class)) {
 			component = new JobOutputVertex(componentName, this.jobGraph);
+		} else {
+			throw new RuntimeException("Unsupported component class");
 		}
 
 		component.setInvokableClass(componentClass);
@@ -344,31 +382,21 @@ public class JobGraphBuilder {
 			LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
 		}
 
-		Configuration config = component.getConfiguration();
+		StreamConfig config = new StreamConfig(component.getConfiguration());
 
+		config.setMutability(mutability.get(componentName));
+		config.setBufferTimeout(bufferTimeout.get(componentName));
+		config.setTypeWrapper(typeWrappers.get(componentName));
 		// Set vertex config
-		if (invokableObject != null) {
-			config.setClass("userfunction", invokableObject.getClass());
-			addSerializedObject(invokableObject, config);
-		}
-		config.setString("componentName", componentName);
-		if (serializedFunction != null) {
-			config.setBytes("operator", serializedFunction);
-			config.setString("operatorName", operatorName);
-		}
-
-		if (userDefinedName != null) {
-			config.setString("userDefinedName", userDefinedName);
-		}
-
-		if (outputSelector != null) {
-			config.setBoolean("directedEmit", true);
-			config.setBytes("outputSelector", outputSelector);
-		}
+		config.setUserInvokable(invokableObject);
+		config.setComponentName(componentName);
+		config.setFunction(serializedFunction, operatorName);
+		config.setOutputSelector(outputSelector);
 
 		if (componentClass.equals(StreamIterationSource.class)
 				|| componentClass.equals(StreamIterationSink.class)) {
-			config.setString("iteration-id", iterationIds.get(componentName));
+			config.setIterationId(iterationIds.get(componentName));
+			config.setIterationWaitTime(iterationWaitTime.get(componentName));
 		}
 
 		components.put(componentName, component);
@@ -380,51 +408,6 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Adds serialized invokable object to the JobVertex configuration
-	 * 
-	 * @param invokableObject
-	 *            Invokable object to serialize
-	 * @param config
-	 *            JobVertex configuration to which the serialized invokable will
-	 *            be added
-	 */
-	private void addSerializedObject(Serializable invokableObject, Configuration config) {
-
-		ByteArrayOutputStream baos = null;
-		ObjectOutputStream oos = null;
-		try {
-			baos = new ByteArrayOutputStream();
-
-			oos = new ObjectOutputStream(baos);
-
-			oos.writeObject(invokableObject);
-
-			config.setBytes("serializedudf", baos.toByteArray());
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot serialize invokable object "
-					+ invokableObject.getClass(), e);
-		}
-
-	}
-
-	/**
-	 * Sets the user defined name for the selected component
-	 * 
-	 * @param componentName
-	 *            Name of the component for which the user defined name will be
-	 *            set
-	 * @param userDefinedName
-	 *            User defined name to set for the component
-	 */
-	public void setUserDefinedName(String componentName, String userDefinedName) {
-		userDefinedNames.put(componentName, userDefinedName);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Name set: " + userDefinedName + " for " + componentName);
-		}
-	}
-
-	/**
 	 * Sets the number of parallel instances created for the given component.
 	 * 
 	 * @param componentName
@@ -436,6 +419,14 @@ public class JobGraphBuilder {
 		componentParallelism.put(componentName, parallelism);
 	}
 
+	public void setMutability(String componentName, boolean isMutable) {
+		mutability.put(componentName, isMutable);
+	}
+
+	public void setBufferTimeout(String componentName, long bufferTimeout) {
+		this.bufferTimeout.put(componentName, bufferTimeout);
+	}
+
 	/**
 	 * Connects two vertices in the JobGraph using the selected partitioner
 	 * settings
@@ -448,13 +439,16 @@ public class JobGraphBuilder {
 	 *            Partitioner object
 	 * @param typeNumber
 	 *            Number of the type (used at co-functions)
+	 * @param outputNames
+	 *            User defined names of the out edge
 	 */
 	public void setEdge(String upStreamComponentName, String downStreamComponentName,
-			StreamPartitioner<? extends Tuple> partitionerObject, int typeNumber) {
+			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
 		outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
 		outEdgeType.get(upStreamComponentName).add(typeNumber);
 		inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
 		connectionTypes.get(upStreamComponentName).add(partitionerObject);
+		outEdgeNames.get(upStreamComponentName).add(outputNames);
 	}
 
 	/**
@@ -468,13 +462,13 @@ public class JobGraphBuilder {
 	 * @param partitionerObject
 	 *            The partitioner
 	 */
-	private <T extends Tuple> void connect(String upStreamComponentName,
-			String downStreamComponentName, StreamPartitioner<T> partitionerObject) {
+	private <T> void connect(String upStreamComponentName, String downStreamComponentName,
+			StreamPartitioner<T> partitionerObject) {
 
 		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
 		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
 
-		Configuration config = upStreamComponent.getConfiguration();
+		StreamConfig config = new StreamConfig(upStreamComponent.getConfiguration());
 
 		try {
 			if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
@@ -497,49 +491,25 @@ public class JobGraphBuilder {
 
 		int outputIndex = upStreamComponent.getNumberOfForwardConnections() - 1;
 
-		putOutputNameToConfig(upStreamComponentName, downStreamComponentName, outputIndex, config);
-
-		config.setBytes("partitionerObject_" + outputIndex,
-				SerializationUtils.serialize(partitionerObject));
-
-		config.setInteger("numOfOutputs_" + outputIndex,
+		config.setOutputName(outputIndex, outEdgeNames.get(upStreamComponentName).get(outputIndex));
+		config.setPartitioner(outputIndex, partitionerObject);
+		config.setNumberOfOutputChannels(outputIndex,
 				componentParallelism.get(downStreamComponentName));
-
 	}
 
 	/**
-	 * Sets the user defined name for an output edge in the graph
-	 * 
-	 * @param upStreamComponentName
-	 *            The name of the component to which the output name will be set
-	 * @param downStreamComponentName
-	 *            The name of the component representing the output
-	 * @param index
-	 *            Index of the output channel
-	 * @param config
-	 *            Config of the upstream component
-	 */
-	private void putOutputNameToConfig(String upStreamComponentName,
-			String downStreamComponentName, int index, Configuration config) {
-
-		String outputName = userDefinedNames.get(downStreamComponentName);
-		if (outputName != null) {
-			config.setString("outputName_" + (index), outputName);
-		}
-	}
-
-	/**
-	 * Sets the parallelism of the iteration head of the given iteration id to
-	 * the parallelism given.
+	 * Sets the parallelism and buffertimeout of the iteration head of the given
+	 * iteration id to the parallelism given.
 	 * 
 	 * @param iterationID
 	 *            ID of the iteration
-	 * @param parallelism
-	 *            Parallelism to set, typically the parallelism of the iteration
-	 *            tail.
+	 * @param iterationTail
+	 *            ID of the iteration tail
 	 */
-	public void setIterationSourceParallelism(String iterationID, int parallelism) {
-		setParallelism(iterationHeadNames.get(iterationID), parallelism);
+	public void setIterationSourceSettings(String iterationID, String iterationTail) {
+		setParallelism(iterationIDtoSourceName.get(iterationID),
+				componentParallelism.get(iterationTail));
+		setBufferTimeout(iterationIDtoSourceName.get(iterationID), bufferTimeout.get(iterationTail));
 	}
 
 	/**
@@ -552,8 +522,7 @@ public class JobGraphBuilder {
 	 * @param serializedOutputSelector
 	 *            Byte array representing the serialized output selector.
 	 */
-	public <T extends Tuple> void setOutputSelector(String componentName,
-			byte[] serializedOutputSelector) {
+	public <T> void setOutputSelector(String componentName, byte[] serializedOutputSelector) {
 		outputSelectors.put(componentName, serializedOutputSelector);
 
 		if (LOG.isDebugEnabled()) {
@@ -563,7 +532,8 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Sets udf operator from one component to another, used with some sinks.
+	 * Sets udf operator and TypeSerializerWrapper from one component to
+	 * another, used with some sinks.
 	 * 
 	 * @param from
 	 *            from
@@ -574,7 +544,7 @@ public class JobGraphBuilder {
 
 		operatorNames.put(to, operatorNames.get(from));
 		serializedFunctions.put(to, serializedFunctions.get(from));
-
+		typeWrappers.put(to, typeWrappers.get(from));
 	}
 
 	/**
@@ -600,7 +570,7 @@ public class JobGraphBuilder {
 		AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
 
 		for (String componentName : components.keySet()) {
-			if (componentName != maxParallelismVertexName) {
+			if (!componentName.equals(maxParallelismVertexName)) {
 				components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
 			}
 		}
@@ -612,8 +582,8 @@ public class JobGraphBuilder {
 	 */
 	private void setNumberOfJobInputs() {
 		for (AbstractJobVertex component : components.values()) {
-			component.getConfiguration().setInteger("numberOfInputs",
-					component.getNumberOfBackwardConnections());
+			(new StreamConfig(component.getConfiguration())).setNumberOfInputs(component
+					.getNumberOfBackwardConnections());
 		}
 	}
 
@@ -623,8 +593,8 @@ public class JobGraphBuilder {
 	 */
 	private void setNumberOfJobOutputs() {
 		for (AbstractJobVertex component : components.values()) {
-			component.getConfiguration().setInteger("numberOfOutputs",
-					component.getNumberOfForwardConnections());
+			(new StreamConfig(component.getConfiguration())).setNumberOfOutputs(component
+					.getNumberOfForwardConnections());
 		}
 	}
 
@@ -641,16 +611,16 @@ public class JobGraphBuilder {
 		for (String upStreamComponentName : outEdgeList.keySet()) {
 			int i = 0;
 
-			ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
+			List<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
 
 			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
-				Configuration downStreamComponentConfig = components.get(downStreamComponentName)
-						.getConfiguration();
+				StreamConfig downStreamComponentConfig = new StreamConfig(components.get(
+						downStreamComponentName).getConfiguration());
 
-				int inputNumber = downStreamComponentConfig.getInteger("numberOfInputs", 0);
-				downStreamComponentConfig.setInteger("inputType_" + inputNumber++,
-						outEdgeTypeList.get(i));
-				downStreamComponentConfig.setInteger("numberOfInputs", inputNumber);
+				int inputNumber = downStreamComponentConfig.getNumberOfInputs();
+
+				downStreamComponentConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
+				downStreamComponentConfig.setNumberOfInputs(inputNumber);
 
 				connect(upStreamComponentName, downStreamComponentName,
 						connectionTypes.get(upStreamComponentName).get(i));
@@ -661,7 +631,6 @@ public class JobGraphBuilder {
 		setAutomaticInstanceSharing();
 		setNumberOfJobInputs();
 		setNumberOfJobOutputs();
-
 	}
 
 	/**
@@ -674,4 +643,4 @@ public class JobGraphBuilder {
 		return jobGraph;
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 35cfc24..f56614d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -127,6 +127,10 @@ public abstract class StreamExecutionEnvironment {
 	protected void setMutability(DataStream<?> stream, boolean isMutable) {
 		jobGraphBuilder.setMutability(stream.getId(), isMutable);
 	}
+	
+	protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout) {
+		jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
+	}
 
 	/**
 	 * Sets the number of hardware contexts (CPU cores / threads) used when

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 1a51492..8afbddf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -48,7 +48,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
-	
+
 	private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
 
 	protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
@@ -89,7 +89,6 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		}
 		return collector;
 	}
-	
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
@@ -99,9 +98,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		outTupleSerializer = new StreamRecordSerializer(outTupleTypeInfo.createSerializer());
 		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
 	}
-	
 
-	protected void setConfigOutputs(List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
+	protected void setConfigOutputs(
+			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 
 		int numberOfOutputs = configuration.getInteger("numberOfOutputs", 0);
 
@@ -109,7 +108,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 			setPartitioner(i, outputs);
 		}
 	}
-	
+
 	private void setPartitioner(int outputNumber,
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 
@@ -120,8 +119,18 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		try {
 			outputPartitioner = deserializeObject(serializedPartitioner);
 
-			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
-					this, outputPartitioner);
+			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+
+			long bufferTimeout = configuration.getLong("bufferTimeout", 0);
+
+			if (bufferTimeout > 0) {
+				output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(this,
+						outputPartitioner, bufferTimeout);
+			} else {
+				output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(this,
+						outputPartitioner);
+			}
+
 			outputs.add(output);
 			String outputName = configuration.getString("outputName_" + outputNumber, null);
 
@@ -134,12 +143,12 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 						+ " with " + outputNumber + " outputs");
 			}
 		} catch (Exception e) {
-			throw new StreamComponentException("Cannot deserialize " + outputPartitioner.getClass().getSimpleName() + " of " + 
-					name + " with " + outputNumber
-					+ " outputs", e);
+			throw new StreamComponentException("Cannot deserialize "
+					+ outputPartitioner.getClass().getSimpleName() + " of " + name + " with "
+					+ outputNumber + " outputs", e);
 		}
 	}
-	
+
 	/**
 	 * Reads and creates a StreamComponent from the config.
 	 * 
@@ -166,9 +175,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 
 		return userFunction;
 	}
-	
-	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(MutableReader<?> inputReader,
-			TypeSerializer<?> serializer) {
+
+	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
+			MutableReader<?> inputReader, TypeSerializer<?> serializer) {
 
 		// generic data type serialization
 		@SuppressWarnings("unchecked")
@@ -177,13 +186,13 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(reader, serializer);
 		return iter;
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
 			ClassNotFoundException {
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
-	
+
 	protected abstract void setInvokable();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
new file mode 100755
index 0000000..a89935a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
@@ -0,0 +1,115 @@
+package org.apache.flink.streaming.api.streamcomponent;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.Buffer;
+import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
+import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+	private final BufferProvider bufferPool;
+
+	private final ChannelSelector<T> channelSelector;
+
+	private int numChannels;
+
+	private long timeout;
+
+	/** RecordSerializer per outgoing channel */
+	private RecordSerializer<T>[] serializers;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public StreamRecordWriter(AbstractInvokable invokable) {
+		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+		this(invokable, channelSelector, 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector,
+			long timeout) {
+		// initialize the gate
+		super(invokable);
+
+		this.timeout = timeout;
+		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
+		this.channelSelector = channelSelector;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void initializeSerializers() {
+		this.numChannels = this.outputGate.getNumChannels();
+		this.serializers = new RecordSerializer[numChannels];
+		for (int i = 0; i < this.numChannels; i++) {
+			this.serializers[i] = new SpanningRecordSerializer<T>();
+		}
+		(new OutputFlusher()).start();
+	}
+
+	@Override
+	public void emit(final T record) throws IOException, InterruptedException {
+		for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+			// serialize with corresponding serializer and send full buffer
+
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			synchronized (serializer) {
+				RecordSerializer.SerializationResult result = serializer.addRecord(record);
+				while (result.isFullBuffer()) {
+					Buffer buffer = serializer.getCurrentBuffer();
+					if (buffer != null) {
+						sendBuffer(buffer, targetChannel);
+					}
+
+					buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+					result = serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void flush() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+			synchronized (serializer) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer != null) {
+					sendBuffer(buffer, targetChannel);
+				}
+
+				serializer.clear();
+			}
+
+		}
+	}
+
+	private class OutputFlusher extends Thread {
+
+		@Override
+		public void run() {
+			while (!outputGate.isClosed()) {
+				try {
+					Thread.sleep(timeout);
+					flush();
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a2c4137f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 1c43a66..67dce9d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -19,57 +19,15 @@
 
 package org.apache.flink.streaming.api;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.util.LogUtils;
-import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;
 import org.junit.Test;
 
 public class PrintTest {
 
-	public static final class MyFlatMap extends
-			FlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
-				throws Exception {
-			out.collect(new Tuple2<Integer, String>(value.f0 * value.f0, value.f1));
-
-		}
-
-	}
-
+	
 	private static final long MEMORYSIZE = 32;
 
-	public static final class Increment extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
-			if (value.f0 < 5) {
-				out.collect(new Tuple1<Integer>(value.f0 + 1));
-			}
-
-		}
-
-	}
-
-	public static final class Forward extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
-			out.collect(value);
-
-		}
-
-	}
 
 	@Test
 	public void test() throws Exception {
@@ -82,7 +40,4 @@ public class PrintTest {
 
 	}
 
-	
-
-	
 }