You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:45 UTC

[02/18] git commit: [streaming] Added TypeInfo to DataStream

[streaming] Added TypeInfo to DataStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4d73f51c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4d73f51c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4d73f51c

Branch: refs/heads/master
Commit: 4d73f51c5acdabac98557fd53933d4b7ad42f98a
Parents: 0c8f1da
Author: mbalassi <ba...@gmail.com>
Authored: Fri Sep 5 14:41:53 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  21 +-
 .../streaming/api/datastream/DataStream.java    |  25 +-
 .../api/datastream/DataStreamSink.java          |   5 +-
 .../api/datastream/DataStreamSource.java        |   5 +-
 .../api/datastream/GroupedDataStream.java       |  26 +-
 .../api/datastream/IterativeDataStream.java     |   2 +-
 .../api/datastream/MergedDataStream.java        |   6 +-
 .../datastream/SingleOutputStreamOperator.java  | 292 ++++++++++---------
 .../api/datastream/SplitDataStream.java         |  14 +-
 .../environment/StreamExecutionEnvironment.java |  30 +-
 .../api/collector/DirectedOutputTest.java       |  19 +-
 .../api/invokable/operator/CoFlatMapTest.java   |  11 +-
 12 files changed, 270 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 4f17ceb..e00919f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It
@@ -58,7 +59,7 @@ public class ConnectedDataStream<IN1, IN2> {
 		this.jobGraphBuilder = jobGraphBuilder;
 		this.environment = environment;
 		this.input1 = input1.copy();
-		this.input2 = input2.copy();
+		this.input2 = input2.copy();		
 	}
 
 	/**
@@ -80,6 +81,22 @@ public class ConnectedDataStream<IN1, IN2> {
 	}
 
 	/**
+	 * Gets the type of the first input
+	 * @return The type of the first input
+	 */
+	public TypeInformation<IN1> getInputType1() {
+		return input1.getOutputType();
+	}
+	
+	/**
+	 * Gets the type of the second input
+	 * @return The type of the second input
+	 */
+	public TypeInformation<IN2> getInputType2() {
+		return input2.getOutputType();
+	}
+	
+	/**
 	 * GroupBy operation for connected data stream. Groups the elements of
 	 * input1 and input2 according to keyPosition1 and keyPosition2. Used for
 	 * applying function on grouped data streams for example
@@ -189,7 +206,7 @@ public class ConnectedDataStream<IN1, IN2> {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName);
+				environment, functionName, outTypeWrapper);
 
 		try {
 			input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 9375762..28d07d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -89,6 +89,7 @@ public abstract class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
+	protected TypeSerializerWrapper<OUT> outTypeWrapper;
 
 	protected final JobGraphBuilder jobGraphBuilder;
 
@@ -100,8 +101,11 @@ public abstract class DataStream<OUT> {
 	 *            StreamExecutionEnvironment
 	 * @param operatorType
 	 *            The type of the operator in the component
+	 * @param outTypeWrapper
+	 *            Type of the output
 	 */
-	public DataStream(StreamExecutionEnvironment environment, String operatorType) {
+	public DataStream(StreamExecutionEnvironment environment, String operatorType,
+			TypeSerializerWrapper<OUT> outTypeWrapper) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
@@ -114,7 +118,7 @@ public abstract class DataStream<OUT> {
 		this.userDefinedNames = new ArrayList<String>();
 		this.selectAll = false;
 		this.partitioner = new ForwardPartitioner<OUT>();
-
+		this.outTypeWrapper = outTypeWrapper;
 	}
 
 	/**
@@ -131,7 +135,7 @@ public abstract class DataStream<OUT> {
 		this.selectAll = dataStream.selectAll;
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
-
+		this.outTypeWrapper = dataStream.outTypeWrapper;
 	}
 
 	/**
@@ -160,6 +164,15 @@ public abstract class DataStream<OUT> {
 	}
 
 	/**
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getOutputType() {
+		return this.outTypeWrapper.getTypeInfo();
+	}
+	
+	/**
 	 * Creates a new {@link MergedDataStream} by merging {@link DataStream}
 	 * outputs of the same type with each other. The DataStreams merged using
 	 * this operator will be transformed simultaneously.
@@ -890,7 +903,7 @@ public abstract class DataStream<OUT> {
 
 	protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
 
-		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
+		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
 
 		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism, waitTime);
@@ -919,7 +932,7 @@ public abstract class DataStream<OUT> {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
-				functionName);
+				functionName, outTypeWrapper);
 
 		try {
 			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
@@ -1001,7 +1014,7 @@ public abstract class DataStream<OUT> {
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
 			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", outTypeWrapper);
 
 		try {
 			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 71e88d8..f51e0e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
  * Represents the end of a DataStream.
@@ -27,8 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  */
 public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
 
-	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
+	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<IN> outTypeWrapper) {
+		super(environment, operatorType, outTypeWrapper);
 	}
 
 	protected DataStreamSink(DataStream<IN> dataStream) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 2a05e27..29389ae 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -27,8 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  */
 public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
 
-	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+		super(environment, operatorType, outTypeWrapper);
 	}
 
 	public DataStreamSource(DataStream<OUT> dataStream) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 94d6c8d..2e1ed57 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -31,8 +31,7 @@ import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvoka
 import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
 import org.apache.flink.streaming.api.invokable.util.Timestamp;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-
-//import org.apache.jasper.compiler.Node.ParamsAction;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * A GroupedDataStream represents a data stream which has been partitioned by
@@ -53,12 +52,21 @@ public class GroupedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies a reduce transformation on the grouped data stream grouped by the
-	 * given key position. The {@link ReduceFunction} will receive input values
-	 * based on the key value. Only input values with the same key will go to
-	 * the same reducer.The user can also extend {@link RichReduceFunction} to
-	 * gain access to other features provided by the {@link RichFuntion}
-	 * interface.
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getOutputType() {
+		return dataStream.getOutputType();
+	}
+	
+	/**
+	 * Applies a reduce transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link ReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.The user can also extend
+	 * {@link RichReduceFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
 	 * 
 	 * @param reducer
 	 *            The {@link ReduceFunction} that will be called for every
@@ -70,7 +78,7 @@ public class GroupedDataStream<OUT> {
 				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
 				ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
 	}
-
+	
 	/**
 	 * Applies a group reduce transformation on preset chunks of the grouped
 	 * data stream. The {@link GroupReduceFunction} will receive input values

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 16362ba..573dffd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -80,7 +80,7 @@ public class IterativeDataStream<IN> extends
 	 * 
 	 */
 	public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
-		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
+		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
 
 		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
 				iterationID.toString(), iterationTail.getParallelism(), waitTime);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
index 045af4f..c1618b2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
  * The MergedDataStream represents a DataStream which consists of merged outputs
@@ -35,8 +36,9 @@ public class MergedDataStream<OUT> extends DataStream<OUT> {
 
 	protected List<DataStream<OUT>> mergedStreams;
 
-	protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
+	protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType,
+			TypeSerializerWrapper<OUT> outTypeWrapper) {
+		super(environment, operatorType, outTypeWrapper);
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 1f01feb..b2fe551 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -1,147 +1,149 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * The SingleOutputStreamOperator represents a user defined transformation
- * applied on a {@link DataStream} with one predefined output type.
- *
- * @param <OUT>
- *            Output type of the operator.
- * @param <O>
- *            Type of the operator.
- */
-public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
-		DataStream<OUT> {
-
-	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
-		setBufferTimeout(environment.getBufferTimeout());
-	}
-
-	protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
-		super(dataStream);
-	}
-
-	/**
-	 * Sets the degree of parallelism for this operator. The degree must be 1 or
-	 * more.
-	 * 
-	 * @param dop
-	 *            The degree of parallelism for this operator.
-	 * @return The operator with set degree of parallelism.
-	 */
-	public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
-		if (dop < 1) {
-			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-		}
-		this.degreeOfParallelism = dop;
-
-		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
-
-		return this;
-	}
-
-	/**
-	 * Sets the mutability of the operator. If the operator is set to mutable,
-	 * the tuples received in the user defined functions, will be reused after
-	 * the function call. Setting an operator to mutable reduces garbage
-	 * collection overhead and thus increases scalability. Please note that if a
-	 * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
-	 * as mutable, the user can only iterate through the iterator once in every
-	 * invoke.
-	 * 
-	 * @param isMutable
-	 *            The mutability of the operator.
-	 * @return The operator with mutability set.
-	 */
-	public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
-		jobGraphBuilder.setMutability(id, isMutable);
-		return this;
-	}
-
-	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffer. By default the output buffers flush only when they are full.
-	 * 
-	 * @param timeoutMillis
-	 *            The maximum time between two output flushes.
-	 * @return The operator with buffer timeout set.
-	 */
-	public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
-		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
-		return this;
-	}
-
-	/**
-	 * Operator used for directing tuples to specific named outputs using an
-	 * {@link OutputSelector}. Calling this method on an operator creates a new
-	 * {@link SplitDataStream}.
-	 * 
-	 * @param outputSelector
-	 *            The user defined {@link OutputSelector} for directing the
-	 *            tuples.
-	 * @return The {@link SplitDataStream}
-	 */
-	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		try {
-			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
-
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize OutputSelector");
-		}
-
-		return new SplitDataStream<OUT>(this);
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
-		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> broadcast() {
-		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> shuffle() {
-		return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> forward() {
-		return (SingleOutputStreamOperator<OUT, O>) super.forward();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> distribute() {
-		return (SingleOutputStreamOperator<OUT, O>) super.distribute();
-	}
-
-	@Override
-	protected SingleOutputStreamOperator<OUT, O> copy() {
-		return new SingleOutputStreamOperator<OUT, O>(this);
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <OUT>
+ *            Output type of the operator.
+ * @param <O>
+ *            Type of the operator.
+ */
+public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
+		DataStream<OUT> {
+
+	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
+			String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+		super(environment, operatorType, outTypeWrapper);
+		setBufferTimeout(environment.getBufferTimeout());
+	}
+
+	protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
+		super(dataStream);
+	}
+
+	/**
+	 * Sets the degree of parallelism for this operator. The degree must be 1 or
+	 * more.
+	 * 
+	 * @param dop
+	 *            The degree of parallelism for this operator.
+	 * @return The operator with set degree of parallelism.
+	 */
+	public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
+		if (dop < 1) {
+			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+		}
+		this.degreeOfParallelism = dop;
+
+		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
+
+		return this;
+	}
+
+	/**
+	 * Sets the mutability of the operator. If the operator is set to mutable,
+	 * the tuples received in the user defined functions, will be reused after
+	 * the function call. Setting an operator to mutable reduces garbage
+	 * collection overhead and thus increases scalability. Please note that if a
+	 * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
+	 * as mutable, the user can only iterate through the iterator once in every
+	 * invoke.
+	 * 
+	 * @param isMutable
+	 *            The mutability of the operator.
+	 * @return The operator with mutability set.
+	 */
+	public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
+		jobGraphBuilder.setMutability(id, isMutable);
+		return this;
+	}
+
+	/**
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffer. By default the output buffers flush only when they are full.
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 * @return The operator with buffer timeout set.
+	 */
+	public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
+		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
+		return this;
+	}
+
+	/**
+	 * Operator used for directing tuples to specific named outputs using an
+	 * {@link OutputSelector}. Calling this method on an operator creates a new
+	 * {@link SplitDataStream}.
+	 * 
+	 * @param outputSelector
+	 *            The user defined {@link OutputSelector} for directing the
+	 *            tuples.
+	 * @return The {@link SplitDataStream}
+	 */
+	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+		try {
+			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelector");
+		}
+
+		return new SplitDataStream<OUT>(this);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
+		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> broadcast() {
+		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> shuffle() {
+		return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> forward() {
+		return (SingleOutputStreamOperator<OUT, O>) super.forward();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> distribute() {
+		return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+	}
+
+	@Override
+	protected SingleOutputStreamOperator<OUT, O> copy() {
+		return new SingleOutputStreamOperator<OUT, O>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 0ddb4f0..838f228 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -18,8 +18,9 @@
 package org.apache.flink.streaming.api.datastream;
 
 import java.util.Arrays;
-
+
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * The SplitDataStream represents an operator that has been split using an
@@ -36,7 +37,16 @@ public class SplitDataStream<OUT> {
 	protected SplitDataStream(DataStream<OUT> dataStream) {
 		this.dataStream = dataStream.copy();
 	}
-
+
+	/**
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getOutputType() {
+		return dataStream.getOutputType();
+	}
+	
 	/**
 	 * Sets the output names for which the next operator will receive values.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7d983ad..6187c99 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -189,7 +190,6 @@ public abstract class StreamExecutionEnvironment {
 		return addSource(new FileStreamFunction(filePath), parallelism);
 	}
 
-
 	private static void checkIfFileExists(String filePath) {
 		File file = new File(filePath);
 		if (!file.exists()) {
@@ -199,12 +199,12 @@ public abstract class StreamExecutionEnvironment {
 		if (!file.canRead()) {
 			throw new IllegalArgumentException("Cannot read file: " + filePath);
 		}
-		
+
 		if (file.isDirectory()) {
 			throw new IllegalArgumentException("Given path is a directory: " + filePath);
 		}
 	}
-	
+
 	/**
 	 * Creates a new DataStream that contains the given elements. The elements
 	 * must all be of the same type, for example, all of the String or Integer.
@@ -219,18 +219,19 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the elements.
 	 */
 	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
-
 		if (data.length == 0) {
 			throw new IllegalArgumentException(
 					"fromElements needs at least one element as argument");
 		}
 
+		TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
+				outTypeWrapper);
+
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					new ObjectTypeWrapper<OUT>(data[0]), "source",
-					SerializationUtils.serialize(function), 1);
+					outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
 		}
@@ -250,8 +251,6 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the elements.
 	 */
 	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
-
 		if (data == null) {
 			throw new NullPointerException("Collection must not be null");
 		}
@@ -260,6 +259,11 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
+		TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
+				.next());
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
+				outTypeWrapper);
+
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
@@ -301,12 +305,14 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source");
+		TypeSerializerWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
+				SourceFunction.class, 0);
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
+				outTypeWrapper);
 
 		try {
 			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					new FunctionTypeWrapper<OUT>(function, SourceFunction.class, 0), "source",
-					SerializationUtils.serialize(function), parallelism);
+					outTypeWrapper, "source", SerializationUtils.serialize(function), parallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fdf9db3..e9d3994 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertEquals;
@@ -82,7 +99,7 @@ public class DirectedOutputTest {
 	public void outputSelectorTest() throws Exception {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
+		
 		SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
 		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index e7f12bd..0c4bad1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -72,12 +72,19 @@ public class CoFlatMapTest implements Serializable {
 
 		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
 		DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-
+		
 		try {
 			ds1.forward().merge(ds2);
 			fail();
 		} catch (RuntimeException e) {
-			// good
+			// expected
+		}
+		
+		try {
+			env.fromElements(10, 11).connect(ds2);
+			fail();
+		} catch (RuntimeException e) {
+			// expected
 		}
 	}
 }