You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 16:49:14 UTC

[2/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser (part 1)

[FLINK-9292] [core] Remove TypeInfoParser (part 1)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8fa8d02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8fa8d02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8fa8d02

Branch: refs/heads/master
Commit: c8fa8d025684c2225824c54a7285bbfdec7cfddc
Parents: 6ae81d4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 4 17:15:51 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 18:48:16 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010ITCase.java        |   6 +-
 .../connectors/kafka/Kafka011ITCase.java        |   6 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  20 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   5 +-
 .../kafka/testutils/DataGenerators.java         |   3 +-
 .../java/typeutils/WritableInfoParserTest.java  |  90 -----
 .../api/java/typeutils/TypeInfoParser.java      |  23 +-
 .../java/typeutils/PojoTypeExtractionTest.java  |  99 +++---
 .../api/java/typeutils/TypeInfoParserTest.java  | 340 -------------------
 .../MultidimensionalArraySerializerTest.java    |   7 +-
 .../java/operators/SingleInputUdfOperator.java  |  48 ---
 .../api/java/operators/TwoInputUdfOperator.java |  48 ---
 .../base/CoGroupOperatorCollectionTest.java     |  14 +-
 .../operators/base/GroupReduceOperatorTest.java |  85 ++---
 .../operators/base/ReduceOperatorTest.java      | 116 +++----
 .../flink/api/java/TypeExtractionTest.java      |   6 +-
 .../java/type/lambdas/LambdaExtractionTest.java |  35 +-
 .../operators/sort/LargeRecordHandlerTest.java  |  14 +-
 .../api/datastream/IterativeStream.java         |  17 +-
 .../datastream/SingleOutputStreamOperator.java  |  45 ---
 .../windowing/EvictingWindowOperatorTest.java   |  47 +--
 .../windowing/WindowOperatorMigrationTest.java  |  52 +--
 .../operators/windowing/WindowOperatorTest.java | 158 +++------
 .../flink/test/manual/MassiveStringSorting.java |   4 +-
 .../test/manual/MassiveStringValueSorting.java  |   4 +-
 .../flink/test/operators/OuterJoinITCase.java   |   5 +-
 .../flink/test/operators/TypeHintITCase.java    |   3 +-
 .../test/streaming/runtime/IterateITCase.java   |   3 +-
 28 files changed, 322 insertions(+), 981 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a038c8e..0855e92 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -204,7 +204,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 			}
 		});
 
-		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
 		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner<Long>() {
 			private static final long serialVersionUID = -6730989584364230617L;
 
@@ -316,7 +316,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		long cnt = 0;
 
 		public LimitedLongDeserializer() {
-			this.ti = TypeInfoParser.parse("Long");
+			this.ti = Types.LONG;
 			this.ser = ti.createSerializer(new ExecutionConfig());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index f48f87a..8692b5a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -213,7 +213,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
 			}
 		});
 
-		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
 		FlinkKafkaProducer011<Long> prod = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
 			private static final long serialVersionUID = -6730989584364230617L;
 
@@ -327,7 +327,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
 		long cnt = 0;
 
 		public LimitedLongDeserializer() {
-			this.ti = TypeInfoParser.parse("Long");
+			this.ti = Types.LONG;
 			this.ser = ti.createSerializer(new ExecutionConfig());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 6ed9143..7d88f0d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -722,7 +721,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
 		env.getConfig().disableSysoutLogging();
 
-		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+		TypeInformation<Tuple2<Long, String>> longStringType =
+				TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){});
 
 		TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
 				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
@@ -1233,7 +1233,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		createTestTopic(topic, parallelism, 1);
 
-		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo =
+				TypeInformation.of(new TypeHint<Tuple2<Long, byte[]>>(){});
 
 		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
 				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
@@ -1581,7 +1582,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env1.getConfig().disableSysoutLogging();
 		env1.disableOperatorChaining(); // let the source read everything into the network buffers
 
-		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), env1.getConfig());
+
 		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
 		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 			@Override
@@ -1686,7 +1689,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final int finalCount;
 		int count = 0;
 
-		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){});
 		TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
 
 		public FixedNumberDeserializationSchema(int finalCount) {
@@ -1734,7 +1737,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		final int finalCount = finalCountTmp;
 
-		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType =
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){});
 
 		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
 			new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
@@ -2234,7 +2238,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
 
 		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+			ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec);
 		}
 
 		@Override
@@ -2251,7 +2255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		@Override
 		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+			return TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){});
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 15d972f..f488325 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -218,13 +218,12 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		@Override
 		public TypeInformation<String> getProducedType() {
-			return TypeInfoParser.parse("String");
+			return Types.STRING;
 		}
 	}
 
 	/**
 	 * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
-	 * @throws Exception
 	 */
 	public void runFailOnAutoOffsetResetNone() throws Exception {
 		final String topic = "auto-offset-reset-none-test";

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index e432a65..8c0d766 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -204,7 +203,7 @@ public class DataGenerators {
 
 		private static class MockStreamTransformation extends StreamTransformation<String> {
 			public MockStreamTransformation() {
-				super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+				super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1);
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
deleted file mode 100644
index 7262bb7..0000000
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.io.Writable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Tests for the type information parsing of {@link Writable}.
- */
-public class WritableInfoParserTest {
-
-	@Test
-	public void testWritableType() {
-		TypeInformation<?> ti = TypeInfoParser.parse(
-				"Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
-
-		Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
-		Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
-	}
-
-	@Test
-	public void testPojoWithWritableType() {
-		TypeInformation<?> ti = TypeInfoParser.parse(
-				"org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
-				+ "basic=Integer,"
-				+ "tuple=Tuple2<String, Integer>,"
-				+ "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
-				+ "array=String[]"
-				+ ">");
-		Assert.assertTrue(ti instanceof PojoTypeInfo);
-		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-		Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
-		Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
-		Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
-	}
-	// ------------------------------------------------------------------------
-	//  Test types
-	// ------------------------------------------------------------------------
-
-	private static class MyWritable implements Writable {
-
-		@Override
-		public void write(DataOutput out) throws IOException {}
-
-		@Override
-		public void readFields(DataInput in) throws IOException {}
-	}
-
-	/**
-	 * Test Pojo containing a {@link Writable}.
-	 */
-	public static class MyPojo {
-		public Integer basic;
-		public Tuple2<String, Integer> tuple;
-		public MyWritable hadoopCitizen;
-		public String[] array;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 12a9ae0..bb74e70 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -16,20 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.typeutils;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Value;
+ import org.apache.flink.annotation.Public;
+ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+ import org.apache.flink.api.common.typeinfo.TypeInformation;
+ import org.apache.flink.types.Value;
+
+ import java.lang.reflect.Field;
+ import java.util.ArrayList;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
 
 /**
  * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index ba09786..54127be 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -18,35 +18,52 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyValue;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
 /**
- *  Pojo Type tests
+ *  Pojo Type tests.
  *
- *  A Pojo is a bean-style class with getters, setters and empty ctor
- *   OR a class with all fields public (or for every private field, there has to be a public getter/setter)
- *   everything else is a generic type (that can't be used for field selection)
+ * <p>Pojo is a bean-style class with getters, setters and empty ctor
+ * OR a class with all fields public (or for every private field, there has to be a public getter/setter)
+ * everything else is a generic type (that can't be used for field selection)
  */
 public class PojoTypeExtractionTest {
 
+	/**
+	 * Simple test type that implements the {@link Value} interface.
+	 */
+	public static class MyValue implements Value {
+		private static final long serialVersionUID = 8607223484689147046L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {}
+
+		@Override
+		public void read(DataInputView in) throws IOException {}
+	}
+
 	public static class HasDuplicateField extends WC {
 		@SuppressWarnings("unused")
 		private int count; // duplicate
@@ -568,13 +585,13 @@ public class PojoTypeExtractionTest {
 		}
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference1() {
-		MapFunction<?, ?> function = new MyMapper<String>();
+		MyMapper<String> function = new MyMapper<>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoWithGenerics<Long, String>>(){}));
 		
 		Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
@@ -611,10 +628,12 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference2() {
-		MapFunction<?, ?> function = new MyMapper2<Boolean, Character>();
+		MyMapper2<Boolean, Character> function = new MyMapper2<>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<Tuple2<Character,Boolean>>(){}));
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("Tuple2<Character,Boolean>"));
 		Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
 		for(int i = 0; i < pti.getArity(); i++) {
@@ -646,10 +665,11 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference3() {
-		MapFunction<?, ?> function = new MyMapper3<Boolean, Character>();
+		MyMapper3<Boolean, Character> function = new MyMapper3<>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoTuple<Character, Boolean, Boolean>>(){}));
 		
 		Assert.assertTrue(ti instanceof TupleTypeInfo<?>);
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -672,10 +692,11 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference4() {
-		MapFunction<?, ?> function = new MyMapper4<Byte>();
+		MyMapper4<Byte> function = new MyMapper4<>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoWithParameterizedFields1<Byte>>(){}));
 		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
 	}
 
@@ -694,12 +715,11 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference5() {
-		MapFunction<?, ?> function = new MyMapper5<Byte>();
+		MyMapper5<Byte> function = new MyMapper5<>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields2<"
-						+ "field=org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>"
-						+ ">"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoWithParameterizedFields2<Byte>>(){}));
 		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
 	}
 	
@@ -718,12 +738,11 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference6() {
-		MapFunction<?, ?> function = new MyMapper6<Integer>();
+		MyMapper6<Integer> function = new MyMapper6<>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields3<"
-						+ "field=int[]"
-						+ ">"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoWithParameterizedFields3<Integer>>(){}));
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
 
@@ -742,12 +761,12 @@ public class PojoTypeExtractionTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testGenericPojoTypeInference7() {
-		MapFunction<?, ?> function = new MyMapper7<Integer>();
+		MyMapper7<Integer> function = new MyMapper7<>();
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+				function,
+				TypeInformation.of(new TypeHint<PojoWithParameterizedFields4<Integer>>(){}));
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields4<"
-						+ "field=Tuple1<int>[]"
-						+ ">"));
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
deleted file mode 100644
index 4570f50..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.junit.Assert;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class TypeInfoParserTest {
-	
-	@Test
-	public void testBasicTypes() {
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("Integer"));
-		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("Double"));
-		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("Byte"));
-		Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("Float"));
-		Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("Short"));
-		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("Long"));
-		Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("Character"));
-		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("String"));
-		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("Boolean"));
-		Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("Void"));
-		Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("Date"));
-		Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("BigInteger"));
-		Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("BigDecimal"));
-		
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer"));
-		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("java.lang.Double"));
-		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte"));
-		Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("java.lang.Float"));
-		Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("java.lang.Short"));
-		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("java.lang.Long"));
-		Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("java.lang.Character"));
-		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("java.lang.String"));
-		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean"));
-		Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("java.lang.Void"));
-		Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("java.util.Date"));
-		Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("java.math.BigInteger"));
-		Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("java.math.BigDecimal"));
-		
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("int"));
-		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("double"));
-		Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("byte"));		
-		Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("float"));
-		Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("short"));
-		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("long"));
-		Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("char"));
-		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("boolean"));
-		Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("void"));
-	}
-	
-	@Test
-	public void testValueTypes() {
-		helperValueType("StringValue", StringValue.class);
-		helperValueType("IntValue", IntValue.class);
-		helperValueType("ByteValue", ByteValue.class);
-		helperValueType("ShortValue", ShortValue.class);
-		helperValueType("CharValue", CharValue.class);
-		helperValueType("DoubleValue", DoubleValue.class);
-		helperValueType("FloatValue", FloatValue.class);
-		helperValueType("LongValue", LongValue.class);
-		helperValueType("BooleanValue", BooleanValue.class);
-		helperValueType("ListValue", ListValue.class);
-		helperValueType("MapValue", MapValue.class);
-		helperValueType("NullValue", NullValue.class);
-	}
-	
-	private static void helperValueType(String str, Class<?> clazz) {
-		TypeInformation<?> ti = TypeInfoParser.parse(str);
-		Assert.assertTrue(ti instanceof ValueTypeInfo);
-		ValueTypeInfo<?> vti = (ValueTypeInfo<?>) ti;
-		Assert.assertEquals(clazz, vti.getTypeClass());
-	}
-	
-	
-	@Test
-	@SuppressWarnings("AssertEqualsBetweenInconvertibleTypes")
-	public void testBasicArrays() {
-		Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Integer[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Double[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Byte[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Float[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Short[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("Long[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("Character[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("String[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("Boolean[]"));
-		
-		Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Double[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Float[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Short[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Long[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Character[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.String[]"));
-		Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean[]"));
-		
-		Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("int[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("double[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("byte[]"));		
-		Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("float[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("short[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("long[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("char[]"));
-		Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("boolean[]"));
-	}
-	
-	@Test
-	public void testTuples() {
-		TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Long>");
-		Assert.assertEquals(2, ti.getArity());
-		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(0));
-		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1));
-		
-		ti = TypeInfoParser.parse("Tuple0");
-		Assert.assertEquals(0, ti.getArity());
-		Assert.assertEquals("Java Tuple0", ti.toString());
-		
-		ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple0");
-		Assert.assertEquals(0, ti.getArity());
-		Assert.assertEquals("Java Tuple0", ti.toString());
-		
-		ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>");
-		Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString());
-	}
-	
-	@Test
-	public void testGenericType() {
-		TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class");
-		Assert.assertTrue(ti instanceof GenericTypeInfo);
-		Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) ti).getTypeClass());
-	}
-
-	public static class MyValue implements Value {
-		private static final long serialVersionUID = 8607223484689147046L;
-
-		@Override
-		public void write(DataOutputView out) throws IOException {}
-
-		@Override
-		public void read(DataInputView in) throws IOException {}
-	}
-	
-	public static class MyPojo {
-		public Integer basic;
-		public Tuple2<String, Integer> tuple;
-		public Value valueType;
-		public String[] array;
-	}
-	
-	@Test
-	public void testPojoType() {
-		TypeInformation<?> ti = TypeInfoParser.parse(
-				"org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<"
-				+ "basic=Integer,"
-				+ "tuple=Tuple2<String, Integer>,"
-				+ "valueType=org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue,"
-				+ "array=String[]"
-				+ ">");
-		Assert.assertTrue(ti instanceof PojoTypeInfo);
-		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-		Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
-		Assert.assertEquals("tuple", pti.getPojoFieldAt(2).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof TupleTypeInfo);
-		Assert.assertEquals("valueType", pti.getPojoFieldAt(3).getField().getName());
-
-//		this currently fails but should not
-//		Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof ValueTypeInfo);
-	}
-	
-	@Test
-	public void testPojoType2() {
-		TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<String,Tuple2<Integer,org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>>>");
-		Assert.assertTrue(ti instanceof TupleTypeInfo);
-		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
-		Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo);
-		Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo);
-		TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>)(Object)tti.getTypeAt(1);
-		Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo);
-		Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
-		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo);
-	}
-
-	@Test
-	public void testObjectArrays() {
-		TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]");
-
-		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
-		Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
-		
-		TypeInformation<?> ti2 = TypeInfoParser.parse("Tuple2<Integer,Double>[]");
-
-		Assert.assertTrue(ti2 instanceof ObjectArrayTypeInfo<?, ?>);
-		Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo);
-		
-		TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]");
-		Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
-	}
-	
-	@Test
-	public void testLargeMixedTuple() {
-		TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]");
-		Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<StringValue>, Java Tuple1<Integer>>>", ti.toString());
-	}
-	
-	public static enum MyEnum {
-		ONE, TWO, THREE
-	}
-	
-	@Test
-	public void testEnumType() {
-		TypeInformation<?> ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
-		Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti.toString());
-		
-		TypeInformation<?> ti2 = TypeInfoParser.parse("java.lang.Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
-		Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti2.toString());
-	}
-	
-	@Test
-	public void testException() {
-		try {
-	    	TypeInfoParser.parse("THIS_CLASS_DOES_NOT_EXIST");
-			Assert.fail("exception expected");
-		} catch (IllegalArgumentException e) {
-			// right
-		}
-		
-		try {
-	    	TypeInfoParser.parse("Tuple2<Integer>");
-			Assert.fail("exception expected");
-		} catch (IllegalArgumentException e) {
-			// right
-		}
-		
-		try {
-	    	TypeInfoParser.parse("Tuple3<Integer,,>");
-			Assert.fail("exception expected");
-		} catch (IllegalArgumentException e) {
-			// right
-		}
-		
-		try {
-	    	TypeInfoParser.parse("Tuple1<Integer,Double>");
-			Assert.fail("exception expected");
-		} catch (IllegalArgumentException e) {
-			// right
-		}
-	}
-
-	@Test
-	public void testMultiDimensionalArray() {
-		// tuple
-		TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Double>[][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString());
-		
-		// pojos
-		ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-				+ "PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo, fields = [basic: String]>"
-				+ ">>>", ti.toString());
-		
-		// basic types
-		ti = TypeInfoParser.parse("Float[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Float>>>", ti.toString());
-		ti = TypeInfoParser.parse("String[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<String>>>", ti.toString());
-		ti = TypeInfoParser.parse("Date[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Date>>>", ti.toString());
-		
-		// primitive types
-		ti = TypeInfoParser.parse("int[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString());
-		ti = TypeInfoParser.parse("boolean[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<boolean[]>>", ti.toString());
-		
-		// value types
-		ti = TypeInfoParser.parse("IntValue[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<ValueType<IntValue>>>>", ti.toString());
-		
-		// value types
-		ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue[][][]");
-
-		// this fails because value types are parsed in a wrong way
-//		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-//				+ "ValueType<TypeInfoParserTest$MyValue>"
-//				+ ">>>", ti.toString());
-		
-		
-		
-		// enum types
-		ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>[][][]");
-		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-				+ "EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"
-				+ ">>>", ti.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
index 1c97816..904d1b3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
@@ -19,10 +19,11 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+
 import org.junit.Test;
 
 
@@ -111,7 +112,9 @@ public class MultidimensionalArraySerializerTest {
 		MyGenericPojo<String>[][] array = (MyGenericPojo<String>[][]) new MyGenericPojo[][]{
 			{ new MyGenericPojo<String>(new String[][]{{"a", "b"},{"c", "d"}}), null}
 		};
-		TypeInformation ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]");
+
+		TypeInformation<MyGenericPojo<String>[][]> ti =
+				TypeInformation.of(new TypeHint<MyGenericPojo<String>[][]>(){});
 
 		SerializerTestInstance testInstance = new SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), MyGenericPojo[][].class, -1, (Object) array);
 		testInstance.testAll();

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index 4ce44aa..e98a822 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.SemanticProperties;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
 import java.lang.annotation.Annotation;
@@ -261,52 +259,6 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 		return returnType;
 	}
 
-	/**
-	 * Adds a type information hint about the return type of this operator.
-	 *
-	 *
-	 * <p>Type hints are important in cases where the Java compiler
-	 * throws away generic type information necessary for efficient execution.
-	 *
-	 *
-	 * <p>This method takes a type information string that will be parsed. A type information string can contain the following
-	 * types:
-	 *
-	 * <ul>
-	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
-	 * <li>Basic type arrays such as <code>Integer[]</code>,
-	 * <code>String[]</code>, etc.
-	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
-	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
-	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
-	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
-	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
-	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
-	 * <li>Value types such as <code>DoubleValue</code>,
-	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
-	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
-	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
-	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
-	 * </ul>
-	 *
-	 * <p>Example:
-	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
-	 *
-	 * @param typeInfoString
-	 *            type information string to be parsed
-	 * @return This operator with a given return type hint.
-	 *
-	 * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public O returns(String typeInfoString) {
-		if (typeInfoString == null) {
-			throw new IllegalArgumentException("Type information string must not be null.");
-		}
-		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index b78d17e..a7d9506 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
 import java.lang.annotation.Annotation;
@@ -329,52 +327,6 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 		return returnType;
 	}
 
-	/**
-	 * Adds a type information hint about the return type of this operator.
-	 *
-	 *
-	 * <p>Type hints are important in cases where the Java compiler
-	 * throws away generic type information necessary for efficient execution.
-	 *
-	 *
-	 * <p>This method takes a type information string that will be parsed. A type information string can contain the following
-	 * types:
-	 *
-	 * <ul>
-	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
-	 * <li>Basic type arrays such as <code>Integer[]</code>,
-	 * <code>String[]</code>, etc.
-	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
-	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
-	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
-	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
-	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
-	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
-	 * <li>Value types such as <code>DoubleValue</code>,
-	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
-	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
-	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
-	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
-	 * </ul>
-	 *
-	 * <p>Example:
-	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
-	 *
-	 * @param typeInfoString
-	 *            type information string to be parsed
-	 * @return This operator with a given return type hint.
-	 *
-	 * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
-	 */
-	@Deprecated
-	@PublicEvolving
-	public O returns(String typeInfoString) {
-		if (typeInfoString == null) {
-			throw new IllegalArgumentException("Type information string must not be null.");
-		}
-		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index e1e393b..79591cf 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -26,9 +26,10 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -183,14 +184,11 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 			Tuple2<String, Integer>>> getCoGroupOperator(
 			RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
 
-		return new CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>,
-				CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>>(
+		TypeInformation<Tuple2<String, Integer>> tuple2Info = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
+		return new CoGroupOperatorBase<>(
 				udf,
-				new BinaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>(
-						TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
-						TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
-						TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>")
-				),
+				new BinaryOperatorInformation<>(tuple2Info, tuple2Info, tuple2Info),
 				new int[]{0},
 				new int[]{0},
 				"coGroup on Collections"

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index d788efd..b51b52d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -20,16 +20,15 @@ package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
 
@@ -41,7 +40,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
@@ -55,6 +53,9 @@ import static org.junit.Assert.fail;
 @SuppressWarnings({"serial", "unchecked"})
 public class GroupReduceOperatorTest implements java.io.Serializable {
 
+	private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE =
+			TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
 	@Test
 	public void testGroupReduceCollection() {
 		try {
@@ -79,18 +80,18 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 			};
 
 			GroupReduceOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
-					GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String,
-							Integer>>> op = new GroupReduceOperatorBase<Tuple2<String, Integer>,
-					Tuple2<String, Integer>, GroupReduceFunction<Tuple2<String, Integer>,
-					Tuple2<String, Integer>>>(reducer, new UnaryOperatorInformation<Tuple2<String,
-					Integer>, Tuple2<String, Integer>>(TypeInfoParser.<Tuple2<String,
-					Integer>>parse("Tuple2<String, Integer>"), TypeInfoParser.<Tuple2<String,
-					Integer>>parse("Tuple2<String, Integer>")), new int[]{0}, "TestReducer");
-
-			List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
-					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
-					Integer>("bar", 4)));
+							GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>> op =
+					new GroupReduceOperatorBase<>(
+							reducer,
+							new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+							new int[]{0},
+							"TestReducer");
+
+			List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+					new Tuple2<>("foo", 1),
+					new Tuple2<>("foo", 3),
+					new Tuple2<>("bar", 2),
+					new Tuple2<>("bar", 4)));
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
@@ -98,12 +99,12 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig);
 
-			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
-			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
 
-			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
-					Integer>("bar", 6)));
+			Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+					new Tuple2<>("foo", 4),
+					new Tuple2<>("bar", 6)));
 
 			assertEquals(expectedResult, resultSetMutableSafe);
 			assertEquals(expectedResult, resultSetRegular);
@@ -156,18 +157,18 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 			};
 
 			GroupReduceOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
-					GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String,
-							Integer>>> op = new GroupReduceOperatorBase<Tuple2<String, Integer>,
-					Tuple2<String, Integer>, GroupReduceFunction<Tuple2<String, Integer>,
-					Tuple2<String, Integer>>>(reducer, new UnaryOperatorInformation<Tuple2<String,
-					Integer>, Tuple2<String, Integer>>(TypeInfoParser.<Tuple2<String,
-					Integer>>parse("Tuple2<String, Integer>"), TypeInfoParser.<Tuple2<String,
-					Integer>>parse("Tuple2<String, Integer>")), new int[]{0}, "TestReducer");
-
-			List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
-					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
-					Integer>("bar", 4)));
+							GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>> op =
+					new GroupReduceOperatorBase<>(
+							reducer,
+							new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+							new int[]{0},
+							"TestReducer");
+
+			List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+					new Tuple2<>("foo", 1),
+					new Tuple2<>("foo", 3),
+					new Tuple2<>("bar", 2),
+					new Tuple2<>("bar", 4)));
 
 			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 
@@ -175,25 +176,25 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 			executionConfig.disableObjectReuse();
 			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
-							new HashMap<String, Future<Path>>(),
-							new HashMap<String, Accumulator<?, ?>>(),
+							new HashMap<>(),
+							new HashMap<>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
 
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
-							new HashMap<String, Future<Path>>(),
-							new HashMap<String, Accumulator<?, ?>>(),
+							new HashMap<>(),
+							new HashMap<>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
 
-			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
-			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
 
-			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
-					Integer>("bar", 6)));
+			Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+					new Tuple2<>("foo", 4),
+					new Tuple2<>("bar", 6)));
 
 			assertEquals(expectedResult, resultSetMutableSafe);
 			assertEquals(expectedResult, resultSetRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 396b78f..58a7b18 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -20,27 +20,27 @@ package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
@@ -52,35 +52,29 @@ import static org.junit.Assert.fail;
  * Tests for {@link ReduceOperator}.
  */
 @SuppressWarnings({"serial", "unchecked"})
-public class ReduceOperatorTest implements java.io.Serializable {
+public class ReduceOperatorTest extends TestLogger implements Serializable{
+
+	private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE =
+			TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
 
 	@Test
 	public void testReduceCollection() {
 		try {
-			final ReduceFunction<Tuple2<String, Integer>> reducer = new
-					ReduceFunction<Tuple2<String, Integer>>() {
-
-				@Override
-				public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-														Tuple2<String, Integer> value2) throws
-						Exception {
-					return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
-				}
-			};
-
-			ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String,
-					Integer>>> op = new ReduceOperatorBase<Tuple2<String, Integer>,
-					ReduceFunction<Tuple2<String, Integer>>>(reducer,
-					new UnaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String,
-							Integer>>(TypeInfoParser.<Tuple2<String,
-							Integer>>parse("Tuple2<String, Integer>"),
-							TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, " +
-									"Integer>")), new int[]{0}, "TestReducer");
-
-			List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
-					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
-					Integer>("bar", 4)));
+			final ReduceFunction<Tuple2<String, Integer>> reducer =
+					(value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+
+			ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String, Integer>>> op =
+					new ReduceOperatorBase<>(
+							reducer,
+							new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+							new int[]{0},
+							"TestReducer");
+
+			List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+					new Tuple2<>("foo", 1),
+					new Tuple2<>("foo", 3),
+					new Tuple2<>("bar", 2),
+					new Tuple2<>("bar", 4)));
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
@@ -88,12 +82,12 @@ public class ReduceOperatorTest implements java.io.Serializable {
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig);
 
-			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
-			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
 
-			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
-					Integer>("bar", 6)));
+			Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+					new Tuple2<>("foo", 4),
+					new Tuple2<>("bar", 6)));
 
 			assertEquals(expectedResult, resultSetMutableSafe);
 			assertEquals(expectedResult, resultSetRegular);
@@ -111,13 +105,14 @@ public class ReduceOperatorTest implements java.io.Serializable {
 			final AtomicBoolean opened = new AtomicBoolean();
 			final AtomicBoolean closed = new AtomicBoolean();
 
-			final ReduceFunction<Tuple2<String, Integer>> reducer = new
-					RichReduceFunction<Tuple2<String, Integer>>() {
+			final ReduceFunction<Tuple2<String, Integer>> reducer = new RichReduceFunction<Tuple2<String, Integer>>() {
+
 				@Override
-				public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-														Tuple2<String, Integer> value2) throws
-						Exception {
-					return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
+				public Tuple2<String, Integer> reduce(
+						Tuple2<String, Integer> value1,
+						Tuple2<String, Integer> value2) throws Exception {
+
+					return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 				}
 
 				@Override
@@ -135,19 +130,18 @@ public class ReduceOperatorTest implements java.io.Serializable {
 				}
 			};
 
-			ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String,
-					Integer>>> op = new ReduceOperatorBase<Tuple2<String, Integer>,
-					ReduceFunction<Tuple2<String, Integer>>>(reducer,
-					new UnaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String,
-							Integer>>(TypeInfoParser.<Tuple2<String,
-							Integer>>parse("Tuple2<String, Integer>"),
-							TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, " +
-									"Integer>")), new int[]{0}, "TestReducer");
+			ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String, Integer>>> op =
+					new ReduceOperatorBase<>(
+							reducer,
+							new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+							new int[]{0},
+							"TestReducer");
 
-			List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
-					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
-					Integer>("bar", 4)));
+			List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+					new Tuple2<>("foo", 1),
+					new Tuple2<>("foo", 3),
+					new Tuple2<>("bar", 2),
+					new Tuple2<>("bar", 4)));
 
 			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 
@@ -156,25 +150,25 @@ public class ReduceOperatorTest implements java.io.Serializable {
 			executionConfig.disableObjectReuse();
 			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
-							new HashMap<String, Future<Path>>(),
-							new HashMap<String, Accumulator<?, ?>>(),
+							new HashMap<>(),
+							new HashMap<>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
 
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
-							new HashMap<String, Future<Path>>(),
-							new HashMap<String, Accumulator<?, ?>>(),
+							new HashMap<>(),
+							new HashMap<>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
 
-			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
-			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
 
-			Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
-					Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
-					Integer>("bar", 6)));
+			Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+					new Tuple2<>("foo", 4),
+					new Tuple2<>("bar", 6)));
 
 			assertEquals(expectedResult, resultSetMutableSafe);
 			assertEquals(expectedResult, resultSetRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
index 22620c2..c8a9890 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.common.typeinfo.Types;
 
 import org.junit.Test;
 
@@ -47,9 +47,9 @@ public class TypeExtractionTest {
 
 		TypeInformation<?> info = ExecutionEnvironment.getExecutionEnvironment()
 				.fromElements("arbitrary", "data")
-				.map(function).returns("String").getResultType();
+				.map(function).returns(Types.STRING).getResultType();
 
-		assertEquals(TypeInfoParser.parse("String"), info);
+		assertEquals(Types.STRING, info);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index d862f47..de1f395 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,7 +40,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -57,6 +58,12 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class LambdaExtractionTest {
 
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
+
+	private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
+			new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
+
 	@Test
 	public void testIdentifyLambdas() {
 		try {
@@ -126,7 +133,7 @@ public class LambdaExtractionTest {
 
 	@Test
 	public void testLambdaWithMemberVariable() {
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
 		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
 	}
 
@@ -138,7 +145,7 @@ public class LambdaExtractionTest {
 
 		MapFunction<Integer, String> f = (i) -> s + k + j;
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
 		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
 	}
 
@@ -146,7 +153,7 @@ public class LambdaExtractionTest {
 	public void testMapLambda() {
 		MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -159,7 +166,7 @@ public class LambdaExtractionTest {
 	public void testFlatMapLambda() {
 		FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
 
-		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -172,7 +179,7 @@ public class LambdaExtractionTest {
 	public void testMapPartitionLambda() {
 		MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
 
-		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -185,7 +192,7 @@ public class LambdaExtractionTest {
 	public void testGroupReduceLambda() {
 		GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
 
-		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -198,7 +205,7 @@ public class LambdaExtractionTest {
 	public void testFlatJoinLambda() {
 		FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
 
-		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -211,7 +218,7 @@ public class LambdaExtractionTest {
 	public void testJoinLambda() {
 		JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
 
-		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -224,7 +231,7 @@ public class LambdaExtractionTest {
 	public void testCoGroupLambda() {
 		CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
 
-		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -237,7 +244,7 @@ public class LambdaExtractionTest {
 	public void testCrossLambda() {
 		CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
 
-		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -250,7 +257,7 @@ public class LambdaExtractionTest {
 	public void testKeySelectorLambda() {
 		KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
 
-		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
 		if (!(ti instanceof MissingTypeInfo)) {
 			Assert.assertTrue(ti.isTupleType());
 			Assert.assertEquals(2, ti.getArity());
@@ -262,8 +269,8 @@ public class LambdaExtractionTest {
 	@SuppressWarnings("rawtypes")
 	@Test
 	public void testLambdaTypeErasure() {
-		MapFunction<Tuple1, Tuple1> f = (i) -> null;
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"), null, true);
+		MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 	}