You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/10 15:30:42 UTC

[1/5] incubator-flink git commit: Updated java 8 package to match flink-examples standards

Repository: incubator-flink
Updated Branches:
  refs/heads/master 94c8e3fa9 -> 26820ea70


Updated java 8 package to match flink-examples standards

Renamed org.apache.flink.example package to org.apache.flink.examples
Default data is provided from flink-java examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/26820ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/26820ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/26820ea7

Branch: refs/heads/master
Commit: 26820ea709a6de72fc7b71349fd7cff30c436aa7
Parents: d8066c5
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 9 16:22:06 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 flink-java8/pom.xml                             |  55 ++++-
 .../example/java8/relational/TPCHQuery10.java   | 221 -------------------
 .../example/java8/wordcount/WordCount.java      | 127 -----------
 .../java8/wordcount/util/WordCountData.java     |  71 ------
 .../examples/java8/relational/TPCHQuery10.java  | 221 +++++++++++++++++++
 .../examples/java8/wordcount/WordCount.java     | 127 +++++++++++
 6 files changed, 391 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index ec6a3e7..0aa06fa 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -92,17 +92,6 @@ under the License.
 			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
 				<configuration>
 					<systemPropertyVariables>
@@ -118,8 +107,50 @@ under the License.
 					</systemPropertyVariables>
 				</configuration>
 			</plugin>
+
+			<!-- get default data from flink-java-examples package -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>unpack</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-java-examples</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
-		
+	
 		<pluginManagement>
 			<plugins>
 				<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
deleted file mode 100644
index 9b67a43..0000000
--- a/flink-java8/src/main/java/org/apache/flink/example/java8/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java8.relational;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- * 
- * <p>
- * This program implements the following SQL equivalent:
- * 
- * <p>
- * <code><pre>
- * SELECT 
- *        c_custkey,
- *        c_name, 
- *        c_address,
- *        n_name, 
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
- * FROM   
- *        customer, 
- *        orders, 
- *        lineitem, 
- *        nation 
- * WHERE 
- *        c_custkey = o_custkey 
- *        AND l_orderkey = o_orderkey 
- *        AND YEAR(o_orderdate) > '1990' 
- *        AND l_returnflag = 'R' 
- *        AND c_nationkey = n_nationkey 
- * GROUP BY 
- *        c_custkey, 
- *        c_name, 
- *        c_acctbal, 
- *        n_name, 
- *        c_address
- * </pre></code>
- *        
- * <p>
- * Compared to the original TPC-H query this version does not print 
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue.
- * 
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> inline-defined functions using Java 8 Lambda Expressions
- * </ul>
- */
-public class TPCHQuery10 {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get customer data set: (custkey, name, address, nationkey, acctbal) 
-		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
-		// get orders data set: (orderkey, custkey, orderdate)
-		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
-		// get nation data set: (nationkey, name)
-		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter by year
-				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
-				// project fields out that are no longer required
-				.project(0,1).types(Integer.class, Integer.class);
-
-		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
-				// filter by flag
-				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
-				// project fields out that are no longer required
-				.project(0,1,2).types(Integer.class, Double.class, Double.class);
-
-		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
-				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
-									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1,2)
-									.types(Integer.class, Double.class, Double.class);
-
-		// aggregate for revenue: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
-				// calculate the revenue for each item
-				// revenue per item = l_extendedprice * (1 - l_discount)
-				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
-				// aggregate the revenues per item to revenue per customer
-				.groupBy(0).sum(1);
-
-		// join customer with nation (custkey, name, address, nationname, acctbal)
-		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
-						.joinWithTiny(nations)
-						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
-						.types(Integer.class, String.class, String.class, String.class, Double.class);
-
-		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
-				customerWithNation.join(revenueOfCustomerKey)
-				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1)
-				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
-
-		// emit result
-		customerWithRevenue.writeAsCsv(outputPath);
-		
-		// execute program
-		env.execute("TPCH Query 10 Example");
-		
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static String customerPath;
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String nationPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 5) {
-				customerPath = programArguments[0];
-				ordersPath = programArguments[1];
-				lineitemPath = programArguments[2];
-				nationPath = programArguments[3];
-				outputPath = programArguments[4];
-			} else {
-				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
-								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-					.fieldDelimiter('|')
-					.includeFields("11110100")
-					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
-	}
-	
-	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter('|')
-					.includeFields("110010000")
-					.types(Integer.class, Integer.class, String.class);
-	}
-
-	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter('|')
-					.includeFields("1000011010000000")
-					.types(Integer.class, Double.class, Double.class, String.class);
-	}
-	
-	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(nationPath)
-					.fieldDelimiter('|')
-					.includeFields("1100")
-					.types(Integer.class, String.class);
-	}
-			
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
deleted file mode 100644
index 793962f..0000000
--- a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/WordCount.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java8.wordcount;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.example.java8.wordcount.util.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram
- * over text files. 
- * 
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>write a compact Flink program with Java 8 Lambda Expressions.
- * </ul>
- * 
- */
-public class WordCount {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-		
-		DataSet<Tuple2<String, Integer>> counts = 
-				// normalize and split each line
-				text.map(line -> line.toLowerCase().split("\\W+"))
-				// convert splitted line in pairs (2-tuples) containing: (word,1)
-				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
-					// emit the pairs with non-zero-length words
-					Arrays.stream(tokens)
-					.filter(t -> t.length() > 0)
-					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
-				})
-				// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
-
-		// emit result
-		if(fileOutput) {
-			counts.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			counts.print();
-		}
-		
-		// execute program
-		env.execute("WordCount Example");
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("Usage: WordCount <text path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WordCount <text path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return WordCountData.getDefaultTextLineDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java b/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
deleted file mode 100644
index 9933696..0000000
--- a/flink-java8/src/main/java/org/apache/flink/example/java8/wordcount/util/WordCountData.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java8.wordcount.util;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Provides the default data sets used for the WordCount example program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class WordCountData {
-
-	public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
-		
-		return env.fromElements(
-				"To be, or not to be,--that is the question:--",
-				"Whether 'tis nobler in the mind to suffer",
-				"The slings and arrows of outrageous fortune",
-				"Or to take arms against a sea of troubles,",
-				"And by opposing end them?--To die,--to sleep,--",
-				"No more; and by a sleep to say we end",
-				"The heartache, and the thousand natural shocks",
-				"That flesh is heir to,--'tis a consummation",
-				"Devoutly to be wish'd. To die,--to sleep;--",
-				"To sleep! perchance to dream:--ay, there's the rub;",
-				"For in that sleep of death what dreams may come,",
-				"When we have shuffled off this mortal coil,",
-				"Must give us pause: there's the respect",
-				"That makes calamity of so long life;",
-				"For who would bear the whips and scorns of time,",
-				"The oppressor's wrong, the proud man's contumely,",
-				"The pangs of despis'd love, the law's delay,",
-				"The insolence of office, and the spurns",
-				"That patient merit of the unworthy takes,",
-				"When he himself might his quietus make",
-				"With a bare bodkin? who would these fardels bear,",
-				"To grunt and sweat under a weary life,",
-				"But that the dread of something after death,--",
-				"The undiscover'd country, from whose bourn",
-				"No traveller returns,--puzzles the will,",
-				"And makes us rather bear those ills we have",
-				"Than fly to others that we know not of?",
-				"Thus conscience does make cowards of us all;",
-				"And thus the native hue of resolution",
-				"Is sicklied o'er with the pale cast of thought;",
-				"And enterprises of great pith and moment,",
-				"With this regard, their currents turn awry,",
-				"And lose the name of action.--Soft you now!",
-				"The fair Ophelia!--Nymph, in thy orisons",
-				"Be all my sins remember'd."
-				);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
new file mode 100644
index 0000000..4fd8b39
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java8.relational;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ * The original query can be found at
+ * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
+ * 
+ * <p>
+ * This program implements the following SQL equivalent:
+ * 
+ * <p>
+ * <code><pre>
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * </pre></code>
+ *        
+ * <p>
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue.
+ * 
+ * <p>
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
+ * 
+ * <p>
+ * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
+ *  
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li> inline-defined functions using Java 8 Lambda Expressions
+ * </ul>
+ */
+public class TPCHQuery10 {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// get customer data set: (custkey, name, address, nationkey, acctbal) 
+		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
+
+		// get orders data set: (orderkey, custkey, orderdate)
+		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
+
+		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
+
+		// get nation data set: (nationkey, name)
+		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
+
+		// orders filtered by year: (orderkey, custkey)
+		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
+				// filter by year
+				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
+				// project fields out that are no longer required
+				.project(0,1).types(Integer.class, Integer.class);
+
+		// lineitems filtered by flag: (orderkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
+				// filter by flag
+				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
+				// project fields out that are no longer required
+				.project(0,1,2).types(Integer.class, Double.class, Double.class);
+
+		// join orders with lineitems: (custkey, extendedprice, discount)
+		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
+				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+									.where(0).equalTo(0)
+									.projectFirst(1).projectSecond(1,2)
+									.types(Integer.class, Double.class, Double.class);
+
+		// aggregate for revenue: (custkey, revenue)
+		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
+				// calculate the revenue for each item
+				// revenue per item = l_extendedprice * (1 - l_discount)
+				.map(i -> new Tuple2<>(i.f0, i.f1 * (1 - i.f2)))
+				// aggregate the revenues per item to revenue per customer
+				.groupBy(0).sum(1);
+
+		// join customer with nation (custkey, name, address, nationname, acctbal)
+		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
+						.joinWithTiny(nations)
+						.where(3).equalTo(0)
+						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
+						.types(Integer.class, String.class, String.class, String.class, Double.class);
+
+		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
+				customerWithNation.join(revenueOfCustomerKey)
+				.where(0).equalTo(0)
+				.projectFirst(0,1,2,3,4).projectSecond(1)
+				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+
+		// emit result
+		customerWithRevenue.writeAsCsv(outputPath);
+		
+		// execute program
+		env.execute("TPCH Query 10 Example");
+		
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static String customerPath;
+	private static String ordersPath;
+	private static String lineitemPath;
+	private static String nationPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			if(programArguments.length == 5) {
+				customerPath = programArguments[0];
+				ordersPath = programArguments[1];
+				lineitemPath = programArguments[2];
+				nationPath = programArguments[3];
+				outputPath = programArguments[4];
+			} else {
+				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+				return false;
+			}
+		} else {
+			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+								"  Due to legal restrictions, we can not ship generated data.\n" +
+								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
+								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
+			return false;
+		}
+		return true;
+	}
+	
+	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(customerPath)
+					.fieldDelimiter('|')
+					.includeFields("11110100")
+					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
+	}
+	
+	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(ordersPath)
+					.fieldDelimiter('|')
+					.includeFields("110010000")
+					.types(Integer.class, Integer.class, String.class);
+	}
+
+	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(lineitemPath)
+					.fieldDelimiter('|')
+					.includeFields("1000011010000000")
+					.types(Integer.class, Double.class, Double.class, String.class);
+	}
+	
+	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
+		return env.readCsvFile(nationPath)
+					.fieldDelimiter('|')
+					.includeFields("1100")
+					.types(Integer.class, String.class);
+	}
+			
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26820ea7/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..aead125
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink program with Java 8 Lambda Expressions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// normalize and split each line
+				text.map(line -> line.toLowerCase().split("\\W+"))
+				// convert splitted line in pairs (2-tuples) containing: (word,1)
+				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+					// emit the pairs with non-zero-length words
+					Arrays.stream(tokens)
+					.filter(t -> t.length() > 0)
+					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+				})
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return WordCountData.getDefaultTextLineDataSet(env);
+		}
+	}
+}


[4/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas

Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 43b3993..b0ab99f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -23,9 +23,11 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
@@ -35,13 +37,12 @@ import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
  * A {@link WindowedDataStream} represents a data stream that has been divided
@@ -225,8 +226,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
-				dataStream.outTypeWrapper, dataStream.outTypeWrapper,
+		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, getType(), getType(),
 				getReduceInvokable(reduceFunction));
 	}
 
@@ -245,9 +245,13 @@ public class WindowedDataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
 			GroupReduceFunction<OUT, R> reduceFunction) {
-		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
-				dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
-						GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
+
+		TypeInformation<OUT> inType = getType();
+		TypeInformation<R> outType = TypeExtractor
+				.getGroupReduceReturnTypes(reduceFunction, inType);
+
+		return dataStream.addFunction("NextGenWindowReduce", reduceFunction, inType, outType,
+				getReduceGroupInvokable(reduceFunction));
 	}
 
 	/**
@@ -261,7 +265,7 @@ public class WindowedDataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		dataStream.checkFieldRange(positionToSum);
 		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
-				dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+				dataStream.getClassAtPos(positionToSum), getType()));
 	}
 
 	/**
@@ -276,8 +280,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
-				getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
 	}
 
 	/**
@@ -290,7 +293,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		dataStream.checkFieldRange(positionToMin);
-		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
 				AggregationType.MIN));
 	}
 
@@ -307,8 +310,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+				false));
 	}
 
 	/**
@@ -339,7 +342,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		dataStream.checkFieldRange(positionToMinBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -359,7 +362,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -373,7 +376,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		dataStream.checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
 				AggregationType.MAX));
 	}
 
@@ -390,8 +393,8 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+				false));
 	}
 
 	/**
@@ -422,7 +425,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		dataStream.checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -442,7 +445,7 @@ public class WindowedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -450,7 +453,7 @@ public class WindowedDataStream<OUT> {
 		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
-				aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+				aggregator, getType(), getType(), invokable);
 
 		return returnStream;
 	}
@@ -576,8 +579,8 @@ public class WindowedDataStream<OUT> {
 	 * 
 	 * @return The output type.
 	 */
-	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+	public TypeInformation<OUT> getType() {
+		return dataStream.getType();
 	}
 
 	protected WindowedDataStream<OUT> copy() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4f1efd1..5c47592 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -24,22 +24,21 @@ import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -131,12 +130,14 @@ public abstract class StreamExecutionEnvironment {
 	public long getBufferTimeout() {
 		return this.bufferTimeout;
 	}
-	
+
 	/**
-	 * Sets the default parallelism that will be used for the local execution environment created by
-	 * {@link #createLocalEnvironment()}.
+	 * Sets the default parallelism that will be used for the local execution
+	 * environment created by {@link #createLocalEnvironment()}.
 	 * 
-	 * @param degreeOfParallelism The degree of parallelism to use as the default local parallelism.
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism to use as the default local
+	 *            parallelism.
 	 */
 	public static void setDefaultLocalParallelism(int degreeOfParallelism) {
 		defaultLocalDop = degreeOfParallelism;
@@ -210,14 +211,14 @@ public abstract class StreamExecutionEnvironment {
 					"fromElements needs at least one element as argument");
 		}
 
-		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
-				outTypeWrapper);
+				outTypeInfo);
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
@@ -246,16 +247,16 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
-				outTypeWrapper);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
+				outTypeInfo);
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
 			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
-					new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
-					.iterator().next()), "source", SerializationUtils.serialize(function), 1);
+					new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source",
+					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
 		}
@@ -271,17 +272,16 @@ public abstract class StreamExecutionEnvironment {
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
-	 * 			  The port number which a server socket bind. A port number of
-	 * 			  0 means that the port number is automatically allocated.
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
 	 * @param delimiter
-	 * 			  A character which split received strings into records.
+	 *            A character which split received strings into records.
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
 		return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
 	}
-	
-	
+
 	/**
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
@@ -290,8 +290,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @param hostname
 	 *            The host name which a server socket bind.
 	 * @param port
-	 * 			  The port number which a server socket bind. A port number of
-	 * 			  0 means that the port number is automatically allocated.
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port) {
@@ -324,14 +324,14 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-		TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
-				SourceFunction.class, 0);
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
-				outTypeWrapper);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
+				function.getClass(), 0, null, null);
+
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
 
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
 					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index f72f66e..4666a85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -17,24 +17,25 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	transient OUT outTuple;
-	TypeWrapper<OUT> outTypeWrapper;
+	TypeSerializer<OUT> outTypeSerializer;
 	int[] fields;
 	int numFields;
 
-	public ProjectInvokable(int[] fields, TypeWrapper<OUT> outTypeWrapper) {
+	public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
 		super(null);
 		this.fields = fields;
 		this.numFields = this.fields.length;
-		this.outTypeWrapper = outTypeWrapper;
+		this.outTypeSerializer = outTypeInformation.createSerializer();
 	}
 
 	@Override
@@ -60,6 +61,6 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		outTuple = outTypeWrapper.getTypeInfo().createSerializer().createInstance();
+		outTuple = outTypeSerializer.createInstance();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 2464ff2..0058c66 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import java.util.ArrayList;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -52,11 +51,8 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	}
 
 	private void setDeserializers() {
-		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
-		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
-
-		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
-		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
+		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 9d65a21..090c1a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -52,7 +51,7 @@ public class InputHandler<IN> {
 
 	@SuppressWarnings("unchecked")
 	protected void setConfigInputs() throws StreamVertexException {
-		setDeserializer();
+		inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
 
 		int numberOfInputs = configuration.getNumberOfInputs();
 		if (numberOfInputs > 0) {
@@ -74,14 +73,6 @@ public class InputHandler<IN> {
 		}
 	}
 
-	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = configuration
-				.getTypeInfoIn1(streamVertex.userClassLoader);
-		if (inTupleTypeInfo != null) {
-			inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
-		}
-	}
-
 	private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index cc67d6e..5381e22 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -98,9 +98,8 @@ public class OutputHandler<OUT> {
 	}
 
 	void setSerializers() {
-		outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
-		if (outTypeInfo != null) {
-			outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+		outSerializer = configuration.getTypeSerializerOut1(streamVertex.userClassLoader);
+		if (outSerializer != null) {
 			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
 			outSerializationDelegate.setInstance(outSerializer.createInstance());
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
deleted file mode 100644
index 2839535..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ClassTypeWrapper<T> extends TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Class<T> clazz;
-
-	public ClassTypeWrapper(Class<T> clazz) {
-		this.clazz = clazz;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (clazz != null) {
-			typeInfo = TypeExtractor.getForClass(clazz);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
deleted file mode 100644
index b4cd65d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class CombineTypeWrapper<OUT1, OUT2> extends
-		TypeWrapper<Tuple2<OUT1, OUT2>> {
-
-	private static final long serialVersionUID = 1L;
-	// Info about OUT
-	private TypeWrapper<OUT1> outTypeWrapper1;
-	private TypeWrapper<OUT2> outTypeWrapper2;
-
-	public CombineTypeWrapper(TypeWrapper<OUT1> outTypeWrapper1,
-			TypeWrapper<OUT2> outTypeWrapper2) {
-		this.outTypeWrapper1 = outTypeWrapper1;
-		this.outTypeWrapper2 = outTypeWrapper2;
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		typeInfo = new TupleTypeInfo<Tuple2<OUT1, OUT2>>(
-				outTypeWrapper1.getTypeInfo(), outTypeWrapper2.getTypeInfo());
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
deleted file mode 100644
index 4255912..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class FunctionTypeWrapper<T> extends TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private Function function;
-	private Class<? extends Function> functionSuperClass;
-	private int typeParameterNumber;
-
-	public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
-			int typeParameterNumber) {
-		this.function = function;
-		this.functionSuperClass = functionSuperClass;
-		this.typeParameterNumber = typeParameterNumber;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (typeParameterNumber != -1) {
-			typeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
-					typeParameterNumber, null, null);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
deleted file mode 100644
index 6bf90c4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ObjectTypeWrapper<T> extends
-		TypeWrapper<T> {
-	private static final long serialVersionUID = 1L;
-
-	private T instance;
-
-	public ObjectTypeWrapper(T instance) {
-		this.instance = instance;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		if (instance != null) {
-			typeInfo = TypeExtractor.getForObject(instance);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
deleted file mode 100644
index 9e8d4b4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class ProjectTypeWrapper<IN,OUT extends Tuple> extends
-		TypeWrapper<OUT> {
-	private static final long serialVersionUID = 1L;
-
-
-	private TypeWrapper<IN> inType;
-	Class<?>[] givenTypes;
-	int[] fields;
-
-	public ProjectTypeWrapper(TypeWrapper<IN> inType,int[] fields,Class<?>[] givenTypes) {
-		this.inType = inType;
-		this.givenTypes = givenTypes;
-		this.fields = fields;
-		setTypeInfo();
-	}
-
-	private void readObject(java.io.ObjectInputStream in) throws IOException,
-			ClassNotFoundException {
-		in.defaultReadObject();
-		setTypeInfo();
-	}
-
-	@Override
-	protected void setTypeInfo() {
-		TypeInformation<?>[] outTypes = extractFieldTypes();
-		this.typeInfo = new TupleTypeInfo<OUT>(outTypes);
-	}
-	
-	private TypeInformation<?>[] extractFieldTypes() {
-		
-		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType.getTypeInfo();
-		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-				
-		for(int i=0; i<fields.length; i++) {
-			
-			if(inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
-				throw new IllegalArgumentException("Given types do not match types of input data set.");
-			}
-				
-			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
-		}
-		
-		return fieldTypes;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
deleted file mode 100644
index a2e16b6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public abstract class TypeWrapper<T>
-		implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	protected transient TypeInformation<T> typeInfo = null;
-	
-	public TypeInformation<T> getTypeInfo() {
-		if (typeInfo == null) {
-			throw new RuntimeException("There is no TypeInformation in the wrapper");
-		}
-		return typeInfo;
-	}
-
-	protected abstract void setTypeInfo();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 5157dcb..288d4ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
 import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.junit.Test;
 
 public class ProjectTest implements Serializable {
@@ -37,17 +37,18 @@ public class ProjectTest implements Serializable {
 	@Test
 	public void test() {
 
-		TypeWrapper<Tuple5<Integer, String, Integer, String, Integer>> inTypeWrapper = new ObjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>>(
-				new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
+						4));
 
 		int[] fields = new int[] { 4, 4, 3 };
 		Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
 
-		TypeWrapper<Tuple3<Integer, Integer, String>> outTypeWrapper = new ProjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				inTypeWrapper, fields, classes);
-
+		@SuppressWarnings("unchecked")
 		ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				fields, outTypeWrapper);
+				fields,
+				(TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
+						.extractFieldTypes(fields, classes, inType));
 
 		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
 		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
deleted file mode 100644
index 1d2fd2e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Test;
-
-public class TypeSerializationTest {
-
-	private static class MyMap extends RichMapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			return null;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void functionTypeSerializationTest() {
-		TypeWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
-				RichMapFunction.class, 0);
-
-		byte[] serializedType = SerializationUtils.serialize(ser);
-
-		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
-				.deserialize(serializedType);
-
-		assertNotNull(ser.getTypeInfo());
-		assertNotNull(ser2.getTypeInfo());
-
-		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void objectTypeSerializationTest() {
-		Integer instance = Integer.valueOf(22);
-		
-		TypeWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
-		
-		byte[] serializedType = SerializationUtils.serialize(ser);
-
-		TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
-				.deserialize(serializedType);
-
-		assertNotNull(ser.getTypeInfo());
-		assertNotNull(ser2.getTypeInfo());
-
-		assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
-	}
-}


[5/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas

Posted by mb...@apache.org.
[FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/51c1f677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/51c1f677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/51c1f677

Branch: refs/heads/master
Commit: 51c1f67791307c2b9355171f7398d104befc8de5
Parents: 94c8e3f
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Dec 8 17:12:01 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  85 +++--
 .../flink/streaming/api/StreamConfig.java       |  93 +++---
 .../api/datastream/ConnectedDataStream.java     | 119 +++----
 .../streaming/api/datastream/DataStream.java    | 130 ++++----
 .../api/datastream/DataStreamSink.java          |  85 ++---
 .../api/datastream/DataStreamSource.java        |   8 +-
 .../api/datastream/GroupedDataStream.java       |   8 +-
 .../datastream/SingleOutputStreamOperator.java  |   6 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../api/datastream/StreamJoinOperator.java      |   8 +-
 .../api/datastream/StreamProjection.java        | 307 ++++++++++---------
 .../api/datastream/WindowedDataStream.java      |  49 +--
 .../environment/StreamExecutionEnvironment.java |  56 ++--
 .../invokable/operator/ProjectInvokable.java    |  11 +-
 .../api/streamvertex/CoStreamVertex.java        |   8 +-
 .../api/streamvertex/InputHandler.java          |  11 +-
 .../api/streamvertex/OutputHandler.java         |   5 +-
 .../util/serialization/ClassTypeWrapper.java    |  46 ---
 .../util/serialization/CombineTypeWrapper.java  |  50 ---
 .../util/serialization/FunctionTypeWrapper.java |  53 ----
 .../util/serialization/ObjectTypeWrapper.java   |  47 ---
 .../util/serialization/ProjectTypeWrapper.java  |  70 -----
 .../util/serialization/TypeWrapper.java         |  38 ---
 .../api/invokable/operator/ProjectTest.java     |  19 +-
 .../serialization/TypeSerializationTest.java    |  72 -----
 25 files changed, 493 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 8a8595a..c45164a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
@@ -40,7 +41,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,10 +66,10 @@ public class JobGraphBuilder {
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
 	private Map<String, StreamInvokable<?, ?>> invokableObjects;
-	private Map<String, TypeWrapper<?>> typeWrapperIn1;
-	private Map<String, TypeWrapper<?>> typeWrapperIn2;
-	private Map<String, TypeWrapper<?>> typeWrapperOut1;
-	private Map<String, TypeWrapper<?>> typeWrapperOut2;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
 	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
@@ -98,10 +98,10 @@ public class JobGraphBuilder {
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
 		invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
-		typeWrapperIn1 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperIn2 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperOut1 = new HashMap<String, TypeWrapper<?>>();
-		typeWrapperOut2 = new HashMap<String, TypeWrapper<?>>();
+		typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
 		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
@@ -124,10 +124,10 @@ public class JobGraphBuilder {
 	 *            Name of the vertex
 	 * @param invokableObject
 	 *            User defined operator
-	 * @param inTypeWrapper
-	 *            Input type wrapper for serialization
-	 * @param outTypeWrapper
-	 *            Output type wrapper for serialization
+	 * @param inTypeInfo
+	 *            Input type for serialization
+	 * @param outTypeInfo
+	 *            Output typ for serialization
 	 * @param operatorName
 	 *            Operator type
 	 * @param serializedFunction
@@ -136,14 +136,19 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	public <IN, OUT> void addStreamVertex(String vertexName,
-			StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
-			TypeWrapper<?> outTypeWrapper, String operatorName, byte[] serializedFunction,
+			StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
+			TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction,
 			int parallelism) {
 
 		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
 				serializedFunction, parallelism);
 
-		addTypeWrappers(vertexName, inTypeWrapper, null, outTypeWrapper, null);
+		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+				inTypeInfo) : null;
+		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+				outTypeInfo) : null;
+
+		addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Vertex: {}", vertexName);
@@ -224,14 +229,16 @@ public class JobGraphBuilder {
 	}
 
 	public <IN1, IN2, OUT> void addCoTask(String vertexName,
-			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeWrapper<?> in1TypeWrapper,
-			TypeWrapper<?> in2TypeWrapper, TypeWrapper<?> outTypeWrapper, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
+			String operatorName, byte[] serializedFunction, int parallelism) {
 
 		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
 				serializedFunction, parallelism);
 
-		addTypeWrappers(vertexName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
+		addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
+				new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
+						outTypeInfo), null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexName);
@@ -273,12 +280,13 @@ public class JobGraphBuilder {
 		iterationTailCount.put(vertexName, 0);
 	}
 
-	private void addTypeWrappers(String vertexName, TypeWrapper<?> in1, TypeWrapper<?> in2,
-			TypeWrapper<?> out1, TypeWrapper<?> out2) {
-		typeWrapperIn1.put(vertexName, in1);
-		typeWrapperIn2.put(vertexName, in2);
-		typeWrapperOut1.put(vertexName, out1);
-		typeWrapperOut2.put(vertexName, out2);
+	private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
+			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
+			StreamRecordSerializer<?> out2) {
+		typeSerializersIn1.put(vertexName, in1);
+		typeSerializersIn2.put(vertexName, in2);
+		typeSerializersOut1.put(vertexName, out1);
+		typeSerializersOut2.put(vertexName, out2);
 	}
 
 	/**
@@ -315,10 +323,10 @@ public class JobGraphBuilder {
 		config.setMutability(mutability.get(vertexName));
 		config.setBufferTimeout(bufferTimeout.get(vertexName));
 
-		config.setTypeWrapperIn1(typeWrapperIn1.get(vertexName));
-		config.setTypeWrapperIn2(typeWrapperIn2.get(vertexName));
-		config.setTypeWrapperOut1(typeWrapperOut1.get(vertexName));
-		config.setTypeWrapperOut2(typeWrapperOut2.get(vertexName));
+		config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+		config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+		config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+		config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
 
 		// Set vertex config
 		config.setUserInvokable(invokableObject);
@@ -482,19 +490,10 @@ public class JobGraphBuilder {
 		operatorNames.put(to, operatorNames.get(from));
 		serializedFunctions.put(to, serializedFunctions.get(from));
 
-		typeWrapperIn1.put(to, typeWrapperOut1.get(from));
-		typeWrapperIn2.put(to, typeWrapperOut2.get(from));
-		typeWrapperOut1.put(to, typeWrapperOut1.get(from));
-		typeWrapperOut2.put(to, typeWrapperOut2.get(from));
-	}
-
-	public TypeInformation<?> getInTypeInfo(String id) {
-		System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
-		return typeWrapperIn1.get(id).getTypeInfo();
-	}
-
-	public TypeInformation<?> getOutTypeInfo(String id) {
-		return typeWrapperOut1.get(id).getTypeInfo();
+		typeSerializersIn1.put(to, typeSerializersOut1.get(from));
+		typeSerializersIn2.put(to, typeSerializersOut2.get(from));
+		typeSerializersOut1.put(to, typeSerializersOut1.get(from));
+		typeSerializersOut2.put(to, typeSerializersOut2.get(from));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3dba376..31af9cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -25,15 +25,14 @@ import java.util.Map;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig {
@@ -54,6 +53,12 @@ public class StreamConfig {
 	private static final String USER_FUNCTION = "userfunction";
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
 	private static final String OPERATOR_STATES = "operatorStates";
+	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+	private static final String MUTABILITY = "isMutable";
+	private static final String ITERATON_WAIT = "iterationWait";
 
 	// DEFAULT VALUES
 
@@ -61,10 +66,7 @@ public class StreamConfig {
 
 	private static final long DEFAULT_TIMEOUT = 0;
 
-	// STRINGS
-
-	private static final String MUTABILITY = "isMutable";
-	private static final String ITERATON_WAIT = "iterationWait";
+	// CONFIG METHODS
 
 	private Configuration config;
 
@@ -76,65 +78,64 @@ public class StreamConfig {
 		return config;
 	}
 
-	// CONFIGS
-
-	private static final String TYPE_WRAPPER_IN_1 = "typeWrapper_in_1";
-	private static final String TYPE_WRAPPER_IN_2 = "typeWrapper_in_2";
-	private static final String TYPE_WRAPPER_OUT_1 = "typeWrapper_out_1";
-	private static final String TYPE_WRAPPER_OUT_2 = "typeWrapper_out_2";
-
-	public void setTypeWrapperIn1(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_IN_1, typeWrapper);
+	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}
 
-	public void setTypeWrapperIn2(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_IN_2, typeWrapper);
+	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
 	}
 
-	public void setTypeWrapperOut1(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_OUT_1, typeWrapper);
+	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
-	public void setTypeWrapperOut2(TypeWrapper<?> typeWrapper) {
-		setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
+	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
-	}
-
-	public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
-	}
-
-	public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
-		return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
-	private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
-		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
 	}
 
 	@SuppressWarnings("unchecked")
-	private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
-
-		TypeWrapper<T> typeWrapper;
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
 		try {
-			typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
-					cl);
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_2, cl);
 		} catch (Exception e) {
-			throw new RuntimeException("Cannot load typeinfo");
-		}
-		if (typeWrapper != null) {
-			return typeWrapper.getTypeInfo();
-		} else {
-			return null;
+			throw new RuntimeException("Could not instantiate serializer.");
 		}
+	}
 
+	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
 	}
 
 	public void setMutability(boolean isMutable) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 1621752..6336e68 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
@@ -44,10 +46,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.apache.flink.util.Collector;
 
 /**
@@ -122,7 +120,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The type of the first input
 	 */
 	public TypeInformation<IN1> getInputType1() {
-		return dataStream1.getOutputType();
+		return dataStream1.getType();
 	}
 
 	/**
@@ -131,7 +129,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The type of the second input
 	 */
 	public TypeInformation<IN2> getInputType2() {
-		return dataStream2.getOutputType();
+		return dataStream2.getType();
 	}
 
 	/**
@@ -403,15 +401,11 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The transformed {@link DataStream}
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
-				CoMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
-				CoMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
-				CoMapFunction.class, 2);
-
-		return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
+				coMapper.getClass(), 2, null, null);
+
+		return addCoFunction("coMap", coMapper, outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
+				coMapper));
 	}
 
 	/**
@@ -431,15 +425,11 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
 			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
-				CoFlatMapFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
-				CoFlatMapFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
-				CoFlatMapFunction.class, 2);
-
-		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
+				coFlatMapper.getClass(), 2, null, null);
+
+		return addCoFunction("coFlatMap", coFlatMapper, outTypeInfo,
+				new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
 	}
 
 	/**
@@ -460,14 +450,10 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
-				CoReduceFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
-				CoReduceFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
-				CoReduceFunction.class, 2);
-		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				getReduceInvokable(coReducer));
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
+				coReducer.getClass(), 2, null, null);
+
+		return addCoFunction("coReduce", coReducer, outTypeInfo, getReduceInvokable(coReducer));
 	}
 
 	/**
@@ -528,16 +514,12 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coWindowFunction,
-				CoWindowFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coWindowFunction,
-				CoWindowFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coWindowFunction,
-				CoWindowFunction.class, 2);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
+				coWindowFunction.getClass(), 2, null, null);
 
-		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
-						slideInterval, timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+						timestamp1, timestamp2));
 	}
 
 	protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -556,26 +538,23 @@ public class ConnectedDataStream<IN1, IN2> {
 			CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval,
 			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
-		TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
-				.createSerializer().createInstance());
-		TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
-				.createSerializer().createInstance());
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class,
+				crossFunction.getClass(), 2, null, null);
 
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(crossFunction,
-				CrossFunction.class, 2);
+		CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
+				crossFunction);
 
-		CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(crossFunction);
-		
-		return addGeneralWindowCombine(crossWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+		return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
+				timestamp1, timestamp2);
 	}
 
-	private static class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+	private static class CrossWindowFunction<IN1, IN2, OUT> implements
+			CoWindowFunction<IN1, IN2, OUT> {
 
 		private static final long serialVersionUID = 1L;
 
 		private CrossFunction<IN1, IN2, OUT> crossFunction;
-		
+
 		public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
 			this.crossFunction = crossFunction;
 		}
@@ -590,27 +569,22 @@ public class ConnectedDataStream<IN1, IN2> {
 			}
 		}
 	}
-	
+
 	protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
 			CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
 			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
 
-		TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
-				.createSerializer().createInstance());
-		TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
-				.createSerializer().createInstance());
-
-		CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
-				in1TypeWrapper, in2TypeWrapper);
+		TypeInformation<Tuple2<IN1, IN2>> outType = new TupleTypeInfo<Tuple2<IN1, IN2>>(
+				getInputType1(), getInputType2());
 
-		return addGeneralWindowCombine(coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+		return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval,
+				timestamp1, timestamp2);
 	}
 
 	private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
-			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeWrapper<IN1> in1TypeWrapper,
-			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper, long windowSize,
-			long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
+			long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
+			TimeStamp<IN2> timestamp2) {
 
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
@@ -619,23 +593,22 @@ public class ConnectedDataStream<IN1, IN2> {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
 
-		return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
-				outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
-						slideInterval, timestamp1, timestamp2));
+		return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+				new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+						timestamp1, timestamp2));
 	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
-			final Function function, TypeWrapper<IN1> in1TypeWrapper,
-			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
+			final Function function, TypeInformation<OUT> outTypeInfo,
 			CoInvokable<IN1, IN2, OUT> functionInvokable) {
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName, outTypeWrapper);
+				environment, functionName, outTypeInfo);
 
 		try {
 			dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
-					in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
+					getInputType1(), getInputType2(), outTypeInfo, functionName,
 					SerializationUtils.serialize((Serializable) function),
 					environment.getDegreeOfParallelism());
 		} catch (SerializationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f0e4309..978f5fa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -70,9 +71,6 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
 import org.apache.flink.streaming.util.keys.PojoKeySelector;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -97,7 +95,7 @@ public class DataStream<OUT> {
 	protected List<String> userDefinedNames;
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
-	protected final TypeWrapper<OUT> outTypeWrapper;
+	protected final TypeInformation<OUT> typeInfo;
 	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -110,11 +108,11 @@ public class DataStream<OUT> {
 	 *            StreamExecutionEnvironment
 	 * @param operatorType
 	 *            The type of the operator in the component
-	 * @param outTypeWrapper
-	 *            Type of the output
+	 * @param typeInfo
+	 *            Type of the datastream
 	 */
 	public DataStream(StreamExecutionEnvironment environment, String operatorType,
-			TypeWrapper<OUT> outTypeWrapper) {
+			TypeInformation<OUT> typeInfo) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
@@ -127,7 +125,7 @@ public class DataStream<OUT> {
 		this.userDefinedNames = new ArrayList<String>();
 		this.selectAll = false;
 		this.partitioner = new DistributePartitioner<OUT>(true);
-		this.outTypeWrapper = outTypeWrapper;
+		this.typeInfo = typeInfo;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 	}
@@ -146,7 +144,7 @@ public class DataStream<OUT> {
 		this.selectAll = dataStream.selectAll;
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
-		this.outTypeWrapper = dataStream.outTypeWrapper;
+		this.typeInfo = dataStream.typeInfo;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
 		if (dataStream.mergedStreams.size() > 1) {
@@ -176,12 +174,12 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Gets the output type.
+	 * Gets the type of the stream.
 	 * 
-	 * @return The output type.
+	 * @return The type of the datastream.
 	 */
-	public TypeInformation<OUT> getOutputType() {
-		return this.outTypeWrapper.getTypeInfo();
+	public TypeInformation<OUT> getType() {
+		return this.typeInfo;
 	}
 
 	/**
@@ -230,7 +228,7 @@ public class DataStream<OUT> {
 	 */
 	public GroupedDataStream<OUT> groupBy(int... fields) {
 
-		return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+		return groupBy(FieldsKeySelector.getSelector(getType(), fields));
 
 	}
 
@@ -248,7 +246,7 @@ public class DataStream<OUT> {
 	 **/
 	public GroupedDataStream<OUT> groupBy(String... fields) {
 
-		return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+		return groupBy(new PojoKeySelector<OUT>(getType(), fields));
 
 	}
 
@@ -277,7 +275,7 @@ public class DataStream<OUT> {
 	public DataStream<OUT> partitionBy(int... fields) {
 
 		return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
-				getOutputType(), fields)));
+				getType(), fields)));
 	}
 
 	/**
@@ -290,8 +288,8 @@ public class DataStream<OUT> {
 	 */
 	public DataStream<OUT> partitionBy(String... fields) {
 
-		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
-				getOutputType(), fields)));
+		return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(getType(),
+				fields)));
 	}
 
 	/**
@@ -387,13 +385,10 @@ public class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(mapper,
-				MapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(mapper,
-				MapFunction.class, 1);
 
-		return addFunction("map", mapper, inTypeWrapper, outTypeWrapper, new MapInvokable<OUT, R>(
-				mapper));
+		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(mapper, getType());
+
+		return addFunction("map", mapper, getType(), outType, new MapInvokable<OUT, R>(mapper));
 	}
 
 	/**
@@ -413,13 +408,11 @@ public class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(flatMapper,
-				FlatMapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(flatMapper,
-				FlatMapFunction.class, 1);
 
-		return addFunction("flatMap", flatMapper, inTypeWrapper, outTypeWrapper,
-				new FlatMapInvokable<OUT, R>(flatMapper));
+		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType());
+
+		return addFunction("flatMap", flatMapper, getType(), outType, new FlatMapInvokable<OUT, R>(
+				flatMapper));
 	}
 
 	/**
@@ -434,9 +427,9 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+
+		return addFunction("reduce", reducer, getType(), getType(), new StreamReduceInvokable<OUT>(
+				reducer));
 	}
 
 	/**
@@ -454,11 +447,7 @@ public class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
-				FilterFunction.class, 0);
-
-		return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
-				filter));
+		return addFunction("filter", filter, getType(), getType(), new FilterInvokable<OUT>(filter));
 	}
 
 	/**
@@ -543,7 +532,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		checkFieldRange(positionToSum);
 		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
-				getClassAtPos(positionToSum), getOutputType()));
+				getClassAtPos(positionToSum), getType()));
 	}
 
 	/**
@@ -559,8 +548,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
-				getOutputType()));
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
 	}
 
 	/**
@@ -573,7 +561,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		checkFieldRange(positionToMin);
-		return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
 				AggregationType.MIN));
 	}
 
@@ -590,8 +578,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+				false));
 	}
 
 	/**
@@ -604,7 +592,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
 				AggregationType.MAX));
 	}
 
@@ -621,8 +609,8 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+				false));
 	}
 
 	/**
@@ -641,7 +629,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -661,7 +649,7 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -694,7 +682,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
 		checkFieldRange(positionToMinBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
 				AggregationType.MINBY, first));
 	}
 
@@ -727,7 +715,7 @@ public class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
 		checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
 				AggregationType.MAXBY, first));
 	}
 
@@ -737,11 +725,9 @@ public class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<Long, ?> count() {
-		TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
-		TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(Long.valueOf(0));
+		TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0));
 
-		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
-				new CounterInvokable<OUT>());
+		return addFunction("counter", null, getType(), outTypeInfo, new CounterInvokable<OUT>());
 	}
 
 	/**
@@ -803,7 +789,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> print() {
 		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
 
 		return returnStream;
 	}
@@ -923,7 +909,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.outTypeWrapper);
+				path, format, millis, endTuple), inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -951,7 +937,7 @@ public class DataStream<OUT> {
 			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.outTypeWrapper);
+				inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1074,7 +1060,7 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
-				path, format, millis, endTuple), inputStream.outTypeWrapper);
+				path, format, millis, endTuple), inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1102,7 +1088,7 @@ public class DataStream<OUT> {
 			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
-				inputStream.outTypeWrapper);
+				inputStream.typeInfo);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -1112,7 +1098,7 @@ public class DataStream<OUT> {
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
-				outTypeWrapper, outTypeWrapper, invokable);
+				typeInfo, typeInfo, invokable);
 
 		return returnStream;
 	}
@@ -1142,16 +1128,16 @@ public class DataStream<OUT> {
 	 * @return the data stream constructed
 	 */
 	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
-			final Function function, TypeWrapper<OUT> inTypeWrapper, TypeWrapper<R> outTypeWrapper,
-			StreamInvokable<OUT, R> functionInvokable) {
+			final Function function, TypeInformation<OUT> inTypeInfo,
+			TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> functionInvokable) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
-				functionName, outTypeWrapper);
+				functionName, outTypeInfo);
 
 		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
-					outTypeWrapper, functionName,
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeInfo,
+					outTypeInfo, functionName,
 					SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize user defined function");
@@ -1220,18 +1206,16 @@ public class DataStream<OUT> {
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
-		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT>(sinkFunction,
-				SinkFunction.class, 0));
+		return addSink(inputStream, sinkFunction, getType());
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction, TypeWrapper<OUT> inTypeWrapper) {
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
-				outTypeWrapper);
+			SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) {
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
 
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
-					sinkFunction), inTypeWrapper, null, "sink", SerializationUtils
+					sinkFunction), inTypeInfo, null, "sink", SerializationUtils
 					.serialize(sinkFunction), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");
@@ -1252,7 +1236,7 @@ public class DataStream<OUT> {
 	@SuppressWarnings("rawtypes")
 	protected Class<?> getClassAtPos(int pos) {
 		Class<?> type;
-		TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+		TypeInformation<OUT> outTypeInfo = getType();
 		if (outTypeInfo.isTupleType()) {
 			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 6bf6f43..369c3eb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -1,44 +1,45 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
-
-/**
- * Represents the end of a DataStream.
- *
- * @param <IN>
- *            The type of the DataStream closed by the sink.
- */
-public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
-
-	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<IN> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
-	}
-
-	protected DataStreamSink(DataStream<IN> dataStream) {
-		super(dataStream);
-	}
-
-	@Override
-	protected DataStreamSink<IN> copy() {
-		throw new RuntimeException("Data stream sinks cannot be copied");
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Represents the end of a DataStream.
+ *
+ * @param <IN>
+ *            The type of the DataStream closed by the sink.
+ */
+public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
+
+	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType,
+			TypeInformation<IN> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
+	}
+
+	protected DataStreamSink(DataStream<IN> dataStream) {
+		super(dataStream);
+	}
+
+	@Override
+	protected DataStreamSink<IN> copy() {
+		throw new RuntimeException("Data stream sinks cannot be copied");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 5b2747f..978ea42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
  */
 public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
 
-	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<OUT> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
 	}
 
 	public DataStreamSource(DataStream<OUT> dataStream) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 6b07cca..32f664f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
  * A GroupedDataStream represents a {@link DataStream} which has been
@@ -62,9 +61,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
-				ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keySelector));
+		return addFunction("groupReduce", reducer, getType(), getType(),
+				new GroupedReduceInvokable<OUT>(reducer, keySelector));
 	}
 
 	/**
@@ -184,7 +182,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 				keySelector);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
-				outTypeWrapper, outTypeWrapper, invokable);
+				typeInfo, typeInfo, invokable);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 714807c..76da27c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,12 +23,12 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 /**
  * The SingleOutputStreamOperator represents a user defined transformation
@@ -43,8 +43,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		DataStream<OUT> {
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
-			String operatorType, TypeWrapper<OUT> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
+			String operatorType, TypeInformation<OUT> outTypeInfo) {
+		super(environment, operatorType, outTypeInfo);
 		setBufferTimeout(environment.getBufferTimeout());
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index e1c091c..5a8f038 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -44,7 +44,7 @@ public class SplitDataStream<OUT> {
 	 * @return The output type.
 	 */
 	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+		return dataStream.getType();
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index ba6e75e..89c80ab 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -60,7 +60,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 */
 		public JoinPredicate<I1, I2> where(int... fields) {
 			return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(
-					op.input1.getOutputType(), fields));
+					op.input1.getType(), fields));
 		}
 
 		/**
@@ -76,7 +76,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 *         {@link JoinPredicate#equalTo} to continue the Join.
 		 */
 		public JoinPredicate<I1, I2> where(String... fields) {
-			return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getOutputType(),
+			return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getType(),
 					fields));
 		}
 
@@ -135,7 +135,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return The joined data stream.
 		 */
 		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) {
-			return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getOutputType(),
+			return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(),
 					fields));
 		}
 
@@ -154,7 +154,7 @@ public class StreamJoinOperator<I1, I2> extends
 		 * @return The joined data stream.
 		 */
 		public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) {
-			return createJoinOperator(new PojoKeySelector<I2>(op.input2.getOutputType(), fields));
+			return createJoinOperator(new PojoKeySelector<I2>(op.input2.getType(), fields));
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 265e033..cc5f66e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple10;
@@ -43,21 +44,20 @@ import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
 
 public class StreamProjection<IN> {
 
 	private DataStream<IN> dataStream;
 	private int[] fieldIndexes;
-	private TypeWrapper<IN> inTypeWrapper;
+	private TypeInformation<IN> inTypeInfo;
 
 	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
 		this.dataStream = dataStream;
 		this.fieldIndexes = fieldIndexes;
-		this.inTypeWrapper = dataStream.outTypeWrapper;
-		if (!inTypeWrapper.getTypeInfo().isTupleType()) {
+		this.inTypeInfo = dataStream.typeInfo;
+		if (!inTypeInfo.isTupleType()) {
 			throw new RuntimeException("Only Tuple DataStreams can be projected");
 		}
 	}
@@ -80,12 +80,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple1<T0>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple1<T0>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -109,11 +108,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple2<T0, T1>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple2<T0, T1>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outTypeWrapper));
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -139,12 +138,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple3<T0, T1, T2>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple3<T0, T1, T2>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -172,12 +170,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple4<T0, T1, T2, T3>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple4<T0, T1, T2, T3>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -206,13 +203,11 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-
-		TypeWrapper<Tuple5<T0, T1, T2, T3, T4>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple5<T0, T1, T2, T3, T4>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outTypeWrapper));
-
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -245,12 +240,11 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple6<T0, T1, T2, T3, T4, T5>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(
-				inTypeWrapper, fieldIndexes, types);
-
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes,
-						outTypeWrapper));
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
 	}
 
 	/**
@@ -284,12 +278,14 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
-						outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream
+				.addFunction("projection", null, inTypeInfo, outType,
+						new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
+								outType));
 	}
 
 	/**
@@ -325,12 +321,13 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
-						outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
+						outType));
 	}
 
 	/**
@@ -369,11 +366,12 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
 				new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
-						outTypeWrapper));
+						outType));
 	}
 
 	/**
@@ -414,11 +412,12 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
 				new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
-						fieldIndexes, outTypeWrapper));
+						fieldIndexes, outType));
 	}
 
 	/**
@@ -462,12 +461,13 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
-				inTypeWrapper, fieldIndexes, types);
-		return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
-				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
-						fieldIndexes, outTypeWrapper));
 
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
+		return dataStream.addFunction("projection", null, inTypeInfo, outType,
+				new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+						fieldIndexes, outType));
 	}
 
 	/**
@@ -513,17 +513,18 @@ public class StreamProjection<IN> {
 			throw new IllegalArgumentException(
 					"Numbers of projected fields and types do not match.");
 		}
-		TypeWrapper<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
-				inTypeWrapper, fieldIndexes, types);
+
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -572,16 +573,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -632,17 +634,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -696,17 +698,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -762,17 +764,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -830,16 +832,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -899,16 +902,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -971,16 +975,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = (TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1045,17 +1050,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = (TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
-								fieldIndexes, outTypeWrapper));
-
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1123,16 +1128,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = (TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1202,16 +1208,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = (TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1284,16 +1291,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = (TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1368,16 +1376,17 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType = (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
 	}
 
 	/**
@@ -1454,16 +1463,36 @@ public class StreamProjection<IN> {
 					"Numbers of projected fields and types do not match.");
 		}
 
-		TypeWrapper<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
-				inTypeWrapper, fieldIndexes, types);
+		@SuppressWarnings("unchecked")
+		TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) extractFieldTypes(
+				fieldIndexes, types, inTypeInfo);
 		return dataStream
 				.addFunction(
 						"projection",
 						null,
-						inTypeWrapper,
-						outTypeWrapper,
+						inTypeInfo,
+						outType,
 						new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
-								fieldIndexes, outTypeWrapper));
+								fieldIndexes, outType));
+	}
+
+	public static TypeInformation<?> extractFieldTypes(int[] fields, Class<?>[] givenTypes,
+			TypeInformation<?> inType) {
+
+		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
+		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+		for (int i = 0; i < fields.length; i++) {
+
+			if (inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
+				throw new IllegalArgumentException(
+						"Given types do not match types of input data set.");
+			}
+
+			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+		}
+
+		return new TupleTypeInfo<Tuple>(fieldTypes);
 	}
 
 }


[3/5] incubator-flink git commit: [streaming] Java 8 WordCount example added for streaming

Posted by mb...@apache.org.
[streaming] Java 8 WordCount example added for streaming

[streaming] Documentation added for streaming java 8 support


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8066c55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8066c55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8066c55

Branch: refs/heads/master
Commit: d8066c55baa6e2f8373f74cccfffe5fc352ea845
Parents: 4a7ba2d
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 9 15:13:49 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  53 ++++++--
 .../flink/streaming/api/JobGraphBuilder.java    |   2 +-
 flink-java8/pom.xml                             |  15 +++
 .../examples/java8/wordcount/WordCount.java     | 127 +++++++++++++++++++
 4 files changed, 188 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index ec3675a..379bb73 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -366,15 +366,19 @@ Calling the `.groupBy(fields)` method on a windowed stream groups the elements b
 The user can also create windows and triggers on a per group basis calling `.window(…).every(…)` on an already grouped data stream. To highlight the differences let us look at to examples.
 
 To get the maximal value by key on the last 100 elements we use the first approach:
+
 ~~~java
 dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
 ~~~
+
 Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation.
 
 To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group:
+
 ~~~java
 dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
 ~~~
+
 This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.
 
 ### Temporal database style operators
@@ -451,7 +455,7 @@ dataStream1.connect(dataStream2)
         })
 ~~~
 
-#### windowReduce on ConnectedDataStream
+#### WindowReduce on ConnectedDataStream
 The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
 
 #### Reduce on ConnectedDataStream
@@ -497,13 +501,6 @@ Iterable<String> select(Integer value) {
 }
 ~~~
 
-Or more compactly we can use lambda expressions in Java 8:
-
-~~~java
-SplitDataStream<Integer> split = someDataStream
-					.split(x -> Arrays.asList(String.valueOf(x % 2)));
-~~~
-
 Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
 
 ### Iterations
@@ -549,6 +546,46 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 
 [Back to top](#top)
 
+### Lambda expressions with Java 8
+
+For a more consice code one can rely on one of the main feautere of Java 8, lambda expressions. The following program has similar functionality to the one provided in the [example](#example-program) section, while showcasing the usage of lambda expressions.
+
+~~~java
+public class StreamingWordCount {
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+	    DataStream<String> text = env.fromElements(
+                "Who's there?",
+                "I think I hear them. Stand, ho! Who's there?");
+
+            DataStream<Tuple2<String, Integer>> counts = 
+		// normalize and split each line
+		text.map(line -> line.toLowerCase().split("\\W+"))
+		// convert splitted line in pairs (2-tuples) containing: (word,1)
+		.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+		// emit the pairs with non-zero-length words
+			Arrays.stream(tokens)
+				.filter(t -> t.length() > 0)
+				.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+		})
+		// group by the tuple field "0" and sum up tuple field "1"
+		.groupBy(0)
+		.sum(1);
+
+        counts.print();
+
+        env.execute("Streaming WordCount");
+    }
+}
+~~~
+
+For a detailed Java 8 Guide please refer to the [Java 8 Programming Guide](java8_programming_guide.html). Operators specific to streaming, such as Operator splitting also support this usage. [Output splitting](#output-splitting) can be rewritten as follows:
+
+~~~java
+SplitDataStream<Integer> split = someDataStream
+					.split(x -> Arrays.asList(String.valueOf(x % 2)));
+~~~
 
 Operator Settings
 ----------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index c45164a..e80d86d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -127,7 +127,7 @@ public class JobGraphBuilder {
 	 * @param inTypeInfo
 	 *            Input type for serialization
 	 * @param outTypeInfo
-	 *            Output typ for serialization
+	 *            Output type for serialization
 	 * @param operatorName
 	 *            Operator type
 	 * @param serializedFunction

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 2a3eb09..ec6a3e7 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -45,16 +45,31 @@ under the License.
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..c1d6042
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the streaming "WordCount" program that computes a simple word occurrences
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+		
+		DataStream<Tuple2<String, Integer>> counts = 
+				// normalize and split each line
+				text.map(line -> line.toLowerCase().split("\\W+"))
+				// convert splitted line in pairs (2-tuples) containing: (word,1)
+				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+					// emit the pairs with non-zero-length words
+					Arrays.stream(tokens)
+					.filter(t -> t.length() > 0)
+					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+				})
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, 1);
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("Streaming WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}


[2/5] incubator-flink git commit: [FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting

Posted by mb...@apache.org.
[FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4a7ba2db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4a7ba2db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4a7ba2db

Branch: refs/heads/master
Commit: 4a7ba2dbb7744b41135994d7f947198b9c4e5065
Parents: 51c1f67
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Dec 9 18:11:32 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  32 ++++--
 .../api/collector/DirectedStreamCollector.java  |  27 +++--
 .../streaming/api/collector/OutputSelector.java |  30 ++----
 .../api/collector/DirectedOutputTest.java       |  26 ++---
 .../api/collector/OutputSelectorTest.java       | 108 ++++++++++---------
 .../examples/iteration/IterateExample.java      |  15 +--
 6 files changed, 124 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 294da24..ec3675a 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -459,36 +459,52 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce transf
 
 ### Output splitting
 
-Most data stream operators support directed outputs, meaning that different data elements are received by only given outputs. The outputs are referenced by their name given at the point of receiving:
+Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
-DataStream<Integer> even = split.select("even");
+DataStream<Integer> even = split.select("even”);
 DataStream<Integer> odd = split.select("odd");
 ~~~
 
-Data streams only receive the elements directed to selected output names. These outputs are directed by implementing a selector function (extending `OutputSelector`):
+In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
+
+Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
+
+The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
 
 ~~~java
-void select(OUT value, Collection<String> outputs);
+Iterable<String> select(OUT value);
 ~~~
 
-The data is sent to all the outputs added to the collection outputs (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. For example:
+The data is sent to all the outputs returned in the iterable (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. 
+
+For example to split even and odd numbers:
 
 ~~~java
 @Override
-void select(Integer value, Collection<String> outputs) {
+Iterable<String> select(Integer value) {	
+
+    List<String> outputs = new ArrayList<String>();
+
     if (value % 2 == 0) {
         outputs.add("even");
     } else {
         outputs.add("odd");
     }
+
+    return outputs;
 }
 ~~~
 
-This output selection allows data streams to listen to multiple outputs, and data points to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector` and a data stream will receive a value if it has selected any of the outputs the value is sent to. The stream will receive the data at most once.
-It is common that a stream listens to all the outputs, so `split.selectAll()` is provided as an alias for explicitly selecting all output names.
+Or more compactly we can use lambda expressions in Java 8:
+
+~~~java
+SplitDataStream<Integer> split = someDataStream
+					.split(x -> Arrays.asList(String.valueOf(x % 2)));
+~~~
 
+Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
 
 ### Iterations
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 5b9e88c..d029a6e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -17,9 +17,8 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -61,7 +60,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 		super(channelID, serializationDelegate);
 		this.outputSelector = outputSelector;
 		this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		this.selectAllOutputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.selectAllOutputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 	}
 
 	@Override
@@ -81,19 +80,25 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 	 *
 	 */
 	protected void emitToOutputs() {
-		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
+		Iterable<String> outputNames = outputSelector.select(streamRecord.getObject());
 		emitted.clear();
+
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
+			try {
+				output.emit(serializationDelegate);
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Emit to {} failed due to: {}", output,
+							StringUtils.stringifyException(e));
+				}
+			}
+		}
+		emitted.addAll(selectAllOutputs);
+
 		for (String outputName : outputNames) {
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap
 					.get(outputName);
 			try {
-				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
-					if (!emitted.contains(output)) {
-						output.emit(serializationDelegate);
-						emitted.add(output);
-					}
-				}
-
 				if (outputList == null) {
 					if (LOG.isErrorEnabled()) {
 						String format = String.format(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 9eb33d9..6dbcff4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -18,45 +18,27 @@
 package org.apache.flink.streaming.api.collector;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
 
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 
 /**
- * Class for defining an OutputSelector for a {@link SplitDataStream} using the
- * {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * Interface for defining an OutputSelector for a {@link SplitDataStream} using
+ * the {@link SingleOutputStreamOperator#split} call. Every output object of a
  * {@link SplitDataStream} will run through this operator to select outputs.
  * 
  * @param <OUT>
  *            Type parameter of the split values.
  */
-public abstract class OutputSelector<OUT> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private Collection<String> outputs;
-
-	public OutputSelector() {
-		outputs = new ArrayList<String>();
-	}
-
-	Collection<String> getOutputs(OUT outputObject) {
-		outputs.clear();
-		select(outputObject, outputs);
-		return outputs;
-	}
-
+public interface OutputSelector<OUT> extends Serializable {
 	/**
 	 * Method for selecting output names for the emitted objects when using the
 	 * {@link SingleOutputStreamOperator#split} method. The values will be
-	 * emitted only to output names which are added to the outputs collection.
-	 * The outputs collection is cleared automatically after each select call.
+	 * emitted only to output names which are contained in the returned
+	 * iterable.
 	 * 
 	 * @param value
 	 *            Output object for which the output selection should be made.
-	 * @param outputs
-	 *            Selected output names should be added to this collection.
 	 */
-	public abstract void select(OUT value, Collection<String> outputs);
+	public Iterable<String> select(OUT value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index b82ecc2..4ab1be2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,27 +52,31 @@ public class DirectedOutputTest {
 		}
 	}
 
-	static final class MyOutputSelector extends OutputSelector<Long> {
+	static final class MyOutputSelector implements OutputSelector<Long> {
 		private static final long serialVersionUID = 1L;
 
+		List<String> outputs = new ArrayList<String>();
+
 		@Override
-		public void select(Long value, Collection<String> outputs) {
+		public Iterable<String> select(Long value) {
+			outputs.clear();
 			if (value % 2 == 0) {
 				outputs.add(EVEN);
 			} else {
 				outputs.add(ODD);
 			}
-			
+
 			if (value == 10L) {
 				outputs.add(TEN);
 			}
-			
+
 			if (value == 11L) {
 				outputs.add(NON_SELECTED);
 			}
+			return outputs;
 		}
 	}
-	
+
 	static final class ListSink implements SinkFunction<Long> {
 		private static final long serialVersionUID = 1L;
 
@@ -99,23 +101,23 @@ public class DirectedOutputTest {
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
-	
+
 	@Test
 	public void outputSelectorTest() throws Exception {
-		
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		
+
 		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
 		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
 		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
 		source.selectAll().addSink(new ListSink(ALL));
-		
+
 		env.executeTest(128);
 		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
 		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				outputs.get(EVEN_AND_ODD));
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
index e465e2f..1615a45 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
@@ -1,54 +1,58 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.junit.Test;
-
-public class OutputSelectorTest {
-
-	static final class MyOutputSelector extends OutputSelector<Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void select(Tuple1<Integer> tuple, Collection<String> outputs) {
-			for (Integer i = 0; i < tuple.f0; i++) {
-				outputs.add(i.toString());
-			}
-		}
-	}
-
-	@Test
-	public void testGetOutputs() {
-		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
-		List<String> expectedOutputs = new ArrayList<String>();
-		expectedOutputs.add("0");
-		expectedOutputs.add("1");
-		assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(2)));
-		expectedOutputs.add("2");
-		assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(3)));
-	}
-
-}
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.junit.Test;
+
+public class OutputSelectorTest {
+
+	static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple1<Integer> tuple) {
+
+			String[] outputs = new String[tuple.f0];
+
+			for (Integer i = 0; i < tuple.f0; i++) {
+				outputs[i] = i.toString();
+			}
+			return Arrays.asList(outputs);
+		}
+	}
+
+	@Test
+	public void testGetOutputs() {
+		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
+		List<String> expectedOutputs = new ArrayList<String>();
+		expectedOutputs.add("0");
+		expectedOutputs.add("1");
+		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
+		expectedOutputs.add("2");
+		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4a7ba2db/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 8454f52..54dbdb0 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.examples.iteration;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
@@ -79,8 +78,8 @@ public class IterateExample {
 
 		// apply the step function to add new random value to the tuple and to
 		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle().setBufferTimeout(1)
-				.split(new MySelector());
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+				.setBufferTimeout(1).split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the
 		// 'iterate' channel in the output selector
@@ -129,16 +128,18 @@ public class IterateExample {
 	/**
 	 * OutputSelector testing which tuple needs to be iterated again.
 	 */
-	public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
+	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
+		public Iterable<String> select(Tuple2<Double, Integer> value) {
+			List<String> output = new ArrayList<String>();
 			if (value.f0 > 100) {
-				outputs.add("output");
+				output.add("output");
 			} else {
-				outputs.add("iterate");
+				output.add("iterate");
 			}
+			return output;
 		}
 
 	}