You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/05 10:53:06 UTC

[1/3] flink git commit: [FLINK-1808] [streaming] Send barrier requests only when the execution graph is running

Repository: flink
Updated Branches:
  refs/heads/master f36eb54ee -> 1da4b6437


[FLINK-1808] [streaming] Send barrier requests only when the execution graph is running

Closes #551


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1da4b643
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1da4b643
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1da4b643

Branch: refs/heads/master
Commit: 1da4b6437e33f794dd3603f029010f0cc70607d1
Parents: 1cf49e9
Author: Paris Carbone <se...@gmail.com>
Authored: Tue Mar 31 13:51:07 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/StreamCheckpointCoordinator.scala     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1da4b643/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index d9a3421..f42d08ab 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -23,7 +23,6 @@ import java.lang.Long
 import akka.actor._
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.execution.ExecutionState.RUNNING
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
 import org.apache.flink.runtime.jobgraph.JobVertexID
@@ -84,13 +83,16 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
         case FAILED | CANCELED | FINISHED =>
           log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
           self ! PoisonPill
-        case _ =>
+        case RUNNING =>
           curId += 1
           log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
           vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
                   v.getExecutionState == RUNNING).foreach(vertex
           => vertex.getCurrentAssignedResource.getInstance.getTaskManager
                     ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+        case _ =>
+          log.debug("Omitting sending barrier since graph is in {} state for job {}",
+            executionGraph.getState, executionGraph.getJobID)
       }
       
     case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
@@ -112,7 +114,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
       ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
       acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
       states = states.filterKeys(_._3 >= ackId)
-      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+      log.debug("Last global barrier is " + ackId)
       executionGraph.loadOperatorStates(states)
       
   }


[3/3] flink git commit: Fix the constructor comments with correct parameter in DefaultMemoryManager

Posted by mb...@apache.org.
Fix the constructor comments with correct parameter in DefaultMemoryManager

Closes #568


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/909cee28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/909cee28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/909cee28

Branch: refs/heads/master
Commit: 909cee281292be7408b1b9f0c25cfbd6cf185b4a
Parents: f36eb54
Author: hongsibao <ho...@huawei.com>
Authored: Sat Apr 4 10:28:59 2015 +0800
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/memorymanager/DefaultMemoryManager.java   | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/909cee28/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 5f84b23..cd677ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -84,6 +84,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * Creates a memory manager with the given capacity, using the default page size.
 	 * 
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
+	 * @param numberOfSlots The number of slots of the task manager.
 	 */
 	public DefaultMemoryManager(long memorySize, int numberOfSlots) {
 		this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
@@ -93,6 +94,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	 * Creates a memory manager with the given capacity and given page size.
 	 * 
 	 * @param memorySize The total size of the memory to be managed by this memory manager.
+	 * @param numberOfSlots The number of slots of the task manager.
 	 * @param pageSize The size of the pages handed out by the memory manager.
 	 */
 	public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {


[2/3] flink git commit: [FLINK-1824] [streaming] Support added for missing types in DataStream api

Posted by mb...@apache.org.
[FLINK-1824] [streaming] Support added for missing types in DataStream api

Closes #567


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cf49e90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cf49e90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cf49e90

Branch: refs/heads/master
Commit: 1cf49e90cddea610b4cfa3623a53ced66aadfc2d
Parents: 909cee2
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Apr 3 20:03:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java |   2 +-
 .../apache/flink/streaming/api/StreamGraph.java |  17 +-
 .../api/datastream/ConnectedDataStream.java     |  30 +--
 .../streaming/api/datastream/DataStream.java    |  90 ++++++---
 .../api/datastream/GroupedDataStream.java       |  28 +--
 .../datastream/SingleOutputStreamOperator.java  |   3 +-
 .../flink/streaming/api/TypeFillTest.java       | 181 +++++++++++++++++++
 7 files changed, 295 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index ae6063a..0fec67f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -272,7 +272,7 @@ public class TypeExtractor {
 	}
 	
 	@SuppressWarnings("unchecked")
-	private static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
+	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
 			boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
 			String functionName, boolean allowMissing)
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 970ce49..351dec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -34,12 +34,13 @@ import java.util.Set;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -169,7 +170,9 @@ public class StreamGraph extends StreamingPlan {
 
 		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
 				inTypeInfo, executionConfig) : null;
-		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+
+		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
 				outTypeInfo, executionConfig) : null;
 
 		addTypeSerializers(vertexID, inSerializer, null, outSerializer, null);
@@ -215,7 +218,8 @@ public class StreamGraph extends StreamingPlan {
 		setSerializersFrom(iterationHead, vertexID);
 
 		int outpartitionerIndexToCopy = edges.getInEdgeIndices(iterationHead).get(0);
-		StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy).get(0).getPartitioner();
+		StreamPartitioner<?> outputPartitioner = edges.getOutEdges(outpartitionerIndexToCopy)
+				.get(0).getPartitioner();
 
 		setEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
 
@@ -273,9 +277,12 @@ public class StreamGraph extends StreamingPlan {
 
 		addVertex(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
 
+		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+				outTypeInfo, executionConfig) : null;
+
 		addTypeSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
-				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig),
-				new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null);
+				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer, null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexID);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index a9ae77a..e867359 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.StreamGraph;
@@ -241,8 +242,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The transformed {@link DataStream}
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
-				coMapper.getClass(), 2, null, null);
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
+				CoMapFunction.class, false, true, getInputType1(), getInputType2(),
+				Utils.getCallLocationName(), true);
 
 		return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
 				clean(coMapper)));
@@ -266,8 +269,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
 			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
-				coFlatMapper.getClass(), 2, null, null);
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+				CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
+				Utils.getCallLocationName(), true);
 
 		return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
 				clean(coFlatMapper)));
@@ -291,8 +296,9 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
-				coReducer.getClass(), 2, null, null);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
+				CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
+				Utils.getCallLocationName(), true);
 
 		return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
 
@@ -357,9 +363,10 @@ public class ConnectedDataStream<IN1, IN2> {
 		if (slideInterval < 1) {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
-
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
-				coWindowFunction.getClass(), 2, null, null);
+		
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
+				CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
+				Utils.getCallLocationName(), true);
 
 		return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
@@ -402,9 +409,8 @@ public class ConnectedDataStream<IN1, IN2> {
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
 				environment, functionName, outTypeInfo, functionInvokable);
 
-		dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable,
-				getInputType1(), getInputType2(), outTypeInfo, functionName,
-				environment.getParallelism());
+		dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable, getInputType1(),
+				getInputType2(), outTypeInfo, functionName, environment.getParallelism());
 
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index cab6271..27c831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
@@ -38,11 +39,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
@@ -102,7 +105,6 @@ public class DataStream<OUT> {
 	protected static Integer counter = 0;
 	protected final StreamExecutionEnvironment environment;
 	protected final Integer id;
-	protected final String type;
 	protected int parallelism;
 	protected List<String> userDefinedNames;
 	protected StreamPartitioner<OUT> partitioner;
@@ -111,6 +113,7 @@ public class DataStream<OUT> {
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final StreamGraph streamGraph;
+	private boolean typeUsed;
 
 	/**
 	 * Create a new {@link DataStream} in the given execution environment with
@@ -131,7 +134,6 @@ public class DataStream<OUT> {
 
 		counter++;
 		this.id = counter;
-		this.type = operatorType;
 		this.environment = environment;
 		this.parallelism = environment.getParallelism();
 		this.streamGraph = environment.getStreamGraph();
@@ -151,7 +153,6 @@ public class DataStream<OUT> {
 	public DataStream(DataStream<OUT> dataStream) {
 		this.environment = dataStream.environment;
 		this.id = dataStream.id;
-		this.type = dataStream.type;
 		this.parallelism = dataStream.parallelism;
 		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
 		this.partitioner = dataStream.partitioner;
@@ -192,9 +193,45 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings("unchecked")
 	public TypeInformation<OUT> getType() {
+		if (typeInfo instanceof MissingTypeInfo) {
+			MissingTypeInfo typeInfo = (MissingTypeInfo) this.typeInfo;
+			throw new InvalidTypesException(
+					"The return type of function '"
+							+ typeInfo.getFunctionName()
+							+ "' could not be determined automatically, due to type erasure. "
+							+ "You can give type information hints by using the returns(...) method on the result of "
+							+ "the transformation call, or by letting your function implement the 'ResultTypeQueryable' "
+							+ "interface.", typeInfo.getTypeException());
+		}
+		typeUsed = true;
 		return this.typeInfo;
 	}
 
+	/**
+	 * Tries to fill in the type information. Type information can be filled in
+	 * later when the program uses a type hint. This method checks whether the
+	 * type information has ever been accessed before and does not allow
+	 * modifications if the type was accessed already. This ensures consistency
+	 * by making sure different parts of the operation do not assume different
+	 * type information.
+	 * 
+	 * @param typeInfo
+	 *            The type information to fill in.
+	 * 
+	 * @throws IllegalStateException
+	 *             Thrown, if the type information has been accessed before.
+	 */
+	protected void fillInType(TypeInformation<OUT> typeInfo) {
+		if (typeUsed) {
+			throw new IllegalStateException(
+					"TypeInformation cannot be filled in for the type after it has been used. "
+							+ "Please make sure that the type info hints are the first call after the transformation function, "
+							+ "before any access to types or semantic properties, etc.");
+		}
+		streamGraph.setOutType(id, typeInfo);
+		this.typeInfo = typeInfo;
+	}
+
 	public <F> F clean(F f) {
 		if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
 			ClosureCleaner.clean(f, true);
@@ -234,8 +271,8 @@ public class DataStream<OUT> {
 
 	/**
 	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}. Calling
-	 * this method on an operator creates a new {@link SplitDataStream}.
+	 * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
+	 * Calling this method on an operator creates a new {@link SplitDataStream}.
 	 * 
 	 * @param outputSelector
 	 *            The user defined
@@ -471,7 +508,8 @@ public class DataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
 
-		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
+		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
+				Utils.getCallLocationName(), true);
 
 		return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper)));
 	}
@@ -495,7 +533,7 @@ public class DataStream<OUT> {
 	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
 
 		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
-				getType());
+				getType(), Utils.getCallLocationName(), true);
 
 		return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
 
@@ -521,20 +559,22 @@ public class DataStream<OUT> {
 
 	/**
 	 * Applies a fold transformation on the data stream. The returned stream
-	 * contains all the intermediate values of the fold transformation. The
-	 * user can also extend the {@link RichFoldFunction} to gain access to
-	 * other features provided by the {@link org.apache.flink.api.common.functions.RichFunction}
-	 * interface
-	 *
+	 * contains all the intermediate values of the fold transformation. The user
+	 * can also extend the {@link RichFoldFunction} to gain access to other
+	 * features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface
+	 * 
 	 * @param folder
-	 *          The {@link FoldFunction} that will be called for every element
-	 *          of the input values.
+	 *            The {@link FoldFunction} that will be called for every element
+	 *            of the input values.
 	 * @return The transformed DataStream
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+				Utils.getCallLocationName(), false);
 
-		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));
+		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder),
+				initialValue, outType));
 	}
 
 	/**
@@ -1111,15 +1151,19 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Writes the DataStream to a socket as a byte array. The format of the output is
-	 * specified by a {@link SerializationSchema}.
-	 *
-	 * @param hostName host of the socket
-	 * @param port port of the socket
-	 * @param schema schema for serialization
+	 * Writes the DataStream to a socket as a byte array. The format of the
+	 * output is specified by a {@link SerializationSchema}.
+	 * 
+	 * @param hostName
+	 *            host of the socket
+	 * @param port
+	 *            port of the socket
+	 * @param schema
+	 *            schema for serialization
 	 * @return the closed DataStream
 	 */
-	public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+	public DataStreamSink<OUT> writeToSocket(String hostName, int port,
+			SerializationSchema<OUT, byte[]> schema) {
 		DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index b565931..034425d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -70,21 +71,21 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 	@Override
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
-				keySelector));
+		return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(
+				clean(reducer), keySelector));
 	}
 
 	/**
 	 * Applies a fold transformation on the grouped data stream grouped on by
 	 * the given key position. The {@link FoldFunction} will receive input
 	 * values based on the key value. Only input values with the same key will
-	 * go to the same folder.The user can also extend
-	 * {@link RichFoldFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
-	 *
+	 * go to the same folder.The user can also extend {@link RichFoldFunction}
+	 * to gain access to other features provided by the {@link RichFuntion}
+	 * interface.
+	 * 
 	 * @param folder
-	 *            The {@link FoldFunction} that will be called for every
-	 *            element of the input values with the same key.
+	 *            The {@link FoldFunction} that will be called for every element
+	 *            of the input values with the same key.
 	 * @param initialValue
 	 *            The initialValue passed to the folders for each key.
 	 * @return The transformed DataStream.
@@ -93,10 +94,11 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	@Override
 	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
 
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
+				Utils.getCallLocationName(), false);
 
-		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder), keySelector,
-				initialValue, outType));
+		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder),
+				keySelector, initialValue, outType));
 	}
 
 	/**
@@ -215,8 +217,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 		GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
 				keySelector);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", getType(),
-				invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
+				getType(), invokable);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 0a3b29f..7b6a367 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -194,8 +194,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		if (typeInfo == null) {
 			throw new IllegalArgumentException("Type information must not be null.");
 		}
-		streamGraph.setOutType(id, typeInfo);
-		this.typeInfo = typeInfo;
+		fillInType(typeInfo);
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;
 		return returnType;

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf49e90/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
new file mode 100644
index 0000000..9a19dde
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.function.co.CoWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class TypeFillTest {
+
+	@Test
+	public void test() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Long> source = env.generateSequence(1, 10);
+
+		try {
+			source.map(new TestMap<Long, Long>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.flatMap(new TestFlatMap<Long, Long>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.connect(source).reduce(new TestCoReduce<Long, Long, Integer>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+		try {
+			source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
+					.print();
+			fail();
+		} catch (Exception e) {
+		}
+
+		source.map(new TestMap<Long, Long>()).returns(Long.class).print();
+		source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
+		source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
+		source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
+				.returns(BasicTypeInfo.INT_TYPE_INFO).print();
+		source.connect(source).reduce(new TestCoReduce<Long, Long, Integer>())
+				.returns(Integer.class).print();
+		source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
+				.returns("String").print();
+
+		assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+				source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
+
+		SingleOutputStreamOperator<String, ?> map = source.map(new MapFunction<Long, String>() {
+
+			@Override
+			public String map(Long value) throws Exception {
+				return null;
+			}
+		});
+
+		map.print();
+		try {
+			map.returns("String");
+			fail();
+		} catch (Exception e) {
+		}
+
+	}
+
+	private class TestMap<T, O> implements MapFunction<T, O> {
+		@Override
+		public O map(T value) throws Exception {
+			return null;
+		}
+	}
+
+	private class TestFlatMap<T, O> implements FlatMapFunction<T, O> {
+		@Override
+		public void flatMap(T value, Collector<O> out) throws Exception {
+		}
+	}
+
+	private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, IN2, OUT> {
+
+		@Override
+		public OUT map1(IN1 value) {
+			return null;
+		}
+
+		@Override
+		public OUT map2(IN2 value) {
+			return null;
+		}
+
+	}
+
+	private class TestCoFlatMap<IN1, IN2, OUT> implements CoFlatMapFunction<IN1, IN2, OUT> {
+
+		@Override
+		public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+		}
+
+		@Override
+		public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {
+		}
+
+	}
+
+	private class TestCoReduce<IN1, IN2, OUT> implements CoReduceFunction<IN1, IN2, OUT> {
+
+		@Override
+		public IN1 reduce1(IN1 value1, IN1 value2) {
+			return null;
+		}
+
+		@Override
+		public IN2 reduce2(IN2 value1, IN2 value2) {
+			return null;
+		}
+
+		@Override
+		public OUT map1(IN1 value) {
+			return null;
+		}
+
+		@Override
+		public OUT map2(IN2 value) {
+			return null;
+		}
+
+	}
+
+	private class TestCoWindow<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+
+		@Override
+		public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
+				throws Exception {
+		}
+
+	}
+
+}