You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/22 16:04:59 UTC

flink git commit: [FLINK-1909] [streaming] Type handling refactor for sources + scala api

Repository: flink
Updated Branches:
  refs/heads/master bad77a365 -> 6df1dd2cc


[FLINK-1909] [streaming] Type handling refactor for sources + scala api


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6df1dd2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6df1dd2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6df1dd2c

Branch: refs/heads/master
Commit: 6df1dd2cc848d0a691a98309a3bb760f9a998673
Parents: bad77a3
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Apr 18 18:31:21 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 22 15:50:09 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     | 28 ++++----
 .../streaming/api/datastream/DataStream.java    |  2 +-
 .../api/datastream/DiscretizedStream.java       |  2 +-
 .../environment/StreamExecutionEnvironment.java | 67 +++++++-------------
 .../flink/streaming/api/TypeFillTest.java       | 21 ++++++
 .../api/scala/ConnectedDataStream.scala         | 45 +++++++------
 .../flink/streaming/api/scala/DataStream.scala  | 30 ++++-----
 .../api/scala/StreamExecutionEnvironment.scala  |  4 +-
 8 files changed, 99 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index dd4a84b..46a4cfc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -123,7 +123,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * 
 	 * @return The type of the first input
 	 */
-	public TypeInformation<IN1> getInputType1() {
+	public TypeInformation<IN1> getType1() {
 		return dataStream1.getType();
 	}
 
@@ -132,7 +132,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * 
 	 * @return The type of the second input
 	 */
-	public TypeInformation<IN2> getInputType2() {
+	public TypeInformation<IN2> getType2() {
 		return dataStream2.getType();
 	}
 
@@ -244,10 +244,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
 
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
-				CoMapFunction.class, false, true, getInputType1(), getInputType2(),
+				CoMapFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
+		return transform("Co-Map", outTypeInfo, new CoStreamMap<IN1, IN2, OUT>(
 				clean(coMapper)));
 
 	}
@@ -271,10 +271,10 @@ public class ConnectedDataStream<IN1, IN2> {
 			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
 
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(),
+				CoFlatMapFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
+		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<IN1, IN2, OUT>(
 				clean(coFlatMapper)));
 	}
 
@@ -297,10 +297,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer,
-				CoReduceFunction.class, false, true, getInputType1(), getInputType2(),
+				CoReduceFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
+		return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer)));
 
 	}
 
@@ -365,10 +365,10 @@ public class ConnectedDataStream<IN1, IN2> {
 		}
 		
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction,
-				CoWindowFunction.class, false, true, getInputType1(), getInputType2(),
+				CoWindowFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
+		return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 
 	}
@@ -397,20 +397,20 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
+		return transform("Co-Window", outTypeInfo, new CoStreamWindow<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 
 	}
 
-	public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
 			TypeInformation<OUT> outTypeInfo, CoStreamOperator<IN1, IN2, OUT> operator) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
 				environment, functionName, outTypeInfo, operator);
 
-		dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(),
-				getInputType2(), outTypeInfo, functionName);
+		dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(),
+				getType2(), outTypeInfo, functionName);
 
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 94aca48..f4d4965 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -93,7 +93,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  * <ul>
  * <li>{@link DataStream#map},</li>
  * <li>{@link DataStream#filter}, or</li>
- * <li>{@link DataStream#aggregate}.</li>
+ * <li>{@link DataStream#sum}.</li>
  * </ul>
  * 
  * @param <OUT>

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index c6ee36d..53c35e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -133,7 +133,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		return reduced.discretizedStream
 				.groupBy(new WindowKey<OUT>())
 				.connect(numOfParts.groupBy(0))
-				.addCoFunction(
+				.transform(
 						"CoFlatMap",
 						reduced.discretizedStream.getType(),
 						new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 988fdfb..3e935f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,11 +22,10 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
-import com.esotericsoftware.kryo.Serializer;
-
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -34,6 +33,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
@@ -44,6 +44,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
@@ -53,11 +54,12 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 
+import com.esotericsoftware.kryo.Serializer;
+
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
  * necessary to construct streaming topologies.
@@ -420,7 +422,7 @@ public abstract class StreamExecutionEnvironment {
 	public DataStream<String> readFileStream(String filePath, long intervalMillis,
 			WatchType watchType) {
 		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-				filePath, intervalMillis, watchType), null, "File Stream");
+				filePath, intervalMillis, watchType), "File Stream");
 
 		return source.flatMap(new FileReadFunction());
 	}
@@ -448,7 +450,7 @@ public abstract class StreamExecutionEnvironment {
 
 		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
-		return addSource(function, outTypeInfo, "Elements source");
+		return addSource(function, "Elements source").returns(outTypeInfo);
 	}
 
 	/**
@@ -475,7 +477,7 @@ public abstract class StreamExecutionEnvironment {
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
 		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
-		return addSource(function, outTypeInfo, "Collection Source");
+		return addSource(function, "Collection Source").returns(outTypeInfo);
 	}
 
 	/**
@@ -508,7 +510,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
 			long maxRetry) {
-		return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
+		return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
 				"Socket Stream");
 	}
 
@@ -560,13 +562,13 @@ public abstract class StreamExecutionEnvironment {
 		if (from > to) {
 			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
 		}
-		return addSource(new GenSequenceFunction(from, to), null, "Sequence Source");
+		return addSource(new GenSequenceFunction(from, to), "Sequence Source");
 	}
 
 	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
 			TypeInformation<String> typeInfo) {
 		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
-		DataStreamSource<String> returnStream = addSource(function, null, "File Source");
+		DataStreamSource<String> returnStream = addSource(function, "File Source");
 		streamGraph.setInputFormat(returnStream.getId(), inputFormat);
 		return returnStream;
 	}
@@ -588,31 +590,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-		return addSource(function, null);
-	}
-
-	/**
-	 * Ads a data source with a custom type information thus opening a
-	 * {@link DataStream}. Only in very special cases does the user need to
-	 * support type information. Otherwise use
-	 * {@link #addSource(SourceFunction)} </p> By default sources have a
-	 * parallelism of 1. To enable parallel execution, the user defined source
-	 * should implement {@link ParallelSourceFunction} or extend
-	 * {@link RichParallelSourceFunction}. In these cases the resulting source
-	 * will have the parallelism of the environment. To change this afterwards
-	 * call {@link DataStreamSource#setParallelism(int)}
-	 * 
-	 * @param function
-	 *            the user defined function
-	 * @param outTypeInfo
-	 *            the user defined type information for the stream
-	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return the data stream constructed
-	 */
-	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
-			TypeInformation<OUT> outTypeInfo) {
-		return addSource(function, outTypeInfo, "Custom Source");
+		return addSource(function, "Custom source");
 	}
 
 	/**
@@ -623,8 +601,6 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param function
 	 *            the user defined function
-	 * @param outTypeInfo
-	 *            the user defined type information for the stream
 	 * @param sourceName
 	 *            Name of the data source
 	 * @param <OUT>
@@ -632,15 +608,18 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@SuppressWarnings("unchecked")
-	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
-			TypeInformation<OUT> outTypeInfo, String sourceName) {
+	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+
+		TypeInformation<OUT> outTypeInfo;
 
-		if (outTypeInfo == null) {
-			if (function instanceof GenericSourceFunction) {
-				outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
-			} else {
+		if (function instanceof GenericSourceFunction) {
+			outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+		} else {
+			try {
 				outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
 						function.getClass(), 0, null, null);
+			} catch (InvalidTypesException e) {
+				outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
 			}
 		}
 
@@ -649,8 +628,8 @@ public abstract class StreamExecutionEnvironment {
 		ClosureCleaner.clean(function, true);
 		StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function);
 
-		return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator,
-				isParallel, sourceName);
+		return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
+				sourceName);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index f163b9e..35cbaba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -42,6 +43,12 @@ public class TypeFillTest {
 	public void test() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		try {
+			env.addSource(new TestSource<Integer>()).print();
+			fail();
+		} catch (Exception e) {
+		}
+
 		DataStream<Long> source = env.generateSequence(1, 10);
 
 		try {
@@ -76,6 +83,7 @@ public class TypeFillTest {
 		} catch (Exception e) {
 		}
 
+		env.addSource(new TestSource<Integer>()).returns("Integer");
 		source.map(new TestMap<Long, Long>()).returns(Long.class).print();
 		source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
 		source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
@@ -106,6 +114,19 @@ public class TypeFillTest {
 
 	}
 
+	private class TestSource<T> implements SourceFunction<T> {
+
+		@Override
+		public void run(Collector<T> collector) throws Exception {
+
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+	}
+
 	private class TestMap<T, O> implements MapFunction<T, O> {
 		@Override
 		public O map(T value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index fbd7a02..47d8fd2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.util
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
+import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream, DataStream => JavaStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.util.Collector
@@ -54,8 +54,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       def map2(in2: IN2): R = clean(fun2)(in2)
     }
 
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoStreamMap[IN1, IN2, R](comapper)))
+    map(comapper)
   }
 
   /**
@@ -78,8 +77,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]],
-      new CoStreamMap[IN1, IN2, R](coMapper)))
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -102,8 +101,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]],
-      new CoStreamFlatMap[IN1, IN2, R](coFlatMapper)))
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -235,13 +235,13 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for grouping the second input
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
+  def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
   ConnectedDataStream[IN1, IN2] = {
 
-    val keyExtractor1 = new KeySelector[IN1, Any] {
+    val keyExtractor1 = new KeySelector[IN1, K] {
       def getKey(in: IN1) = clean(fun1)(in)
     }
-    val keyExtractor2 = new KeySelector[IN2, Any] {
+    val keyExtractor2 = new KeySelector[IN2, L] {
       def getKey(in: IN2) = clean(fun2)(in)
     }
 
@@ -267,9 +267,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     if (coReducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-
-    new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]],
-      new CoStreamReduce[IN1, IN2, R](coReducer)))
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    javaStream.reduce(coReducer).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -325,12 +325,16 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return The transformed { @link DataStream}.
    */
   def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
-      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
+      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long):
+      DataStream[R] = {
     if (coWindowFunction == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
-
-    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    
+    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval).
+    returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -351,7 +355,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return The transformed { @link DataStream}.
    */
   def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
-      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
+      Collector[R]) => Unit, windowSize: Long, slideInterval: Long):
+      DataStream[R] = {
     if (coWindower == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
@@ -361,7 +366,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
           out: Collector[R]): Unit = clean(coWindower)(first, second, out)
     }
 
-    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
+    windowReduce(coWindowFun, windowSize, slideInterval)
   }
 
   /**
@@ -388,7 +393,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return The type of the first input
    */
   def getInputType1(): TypeInformation[IN1] = {
-    javaStream.getInputType1
+    javaStream.getType1
   }
 
   /**
@@ -397,7 +402,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return The type of the second input
    */
   def getInputType2(): TypeInformation[IN2] = {
-    javaStream.getInputType2
+    javaStream.getType2
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5a4b611..4ccb073 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -365,8 +365,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       val cleanFun = clean(fun)
       def map(in: T): R = cleanFun(in)
     }
-
-    javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
+    
+    map(mapper)
   }
 
   /**
@@ -377,7 +377,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper))
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+    javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -388,8 +389,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
-       new StreamFlatMap[T, R](flatMapper))
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+    javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -430,12 +432,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (reducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    javaStream match {
-      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
-        javaStream.getType(), new StreamGroupedReduce[T](reducer, ds.getKeySelector()))
-      case _ => javaStream.transform("reduce", javaStream.getType(),
-        new StreamReduce[T](reducer))
-    }
+ 
+    javaStream.reduce(reducer)
   }
 
   /**
@@ -462,13 +460,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    javaStream match {
-      case ds: GroupedDataStream[_] => javaStream.transform("fold",
-        implicitly[TypeInformation[R]], new StreamGroupedFold[T,R](folder, ds.getKeySelector(), 
-            initialValue, implicitly[TypeInformation[R]]))
-      case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
-        new StreamFold[T,R](folder, initialValue, implicitly[TypeInformation[R]]))
-    }
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+    javaStream.fold(initialValue, folder).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6df1dd2c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 0217793..c7716ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -261,7 +261,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
         .asJavaCollection(data))
         
-    javaEnv.addSource(sourceFunction, typeInfo)
+    javaEnv.addSource(sourceFunction).returns(typeInfo)
   }
 
   /**
@@ -277,7 +277,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     Validate.notNull(function, "Function must not be null.")
     val cleanFun = StreamExecutionEnvironment.clean(function)
     val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.addSource(cleanFun, typeInfo)
+    javaEnv.addSource(cleanFun).returns(typeInfo)
   }
   
    /**