You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/17 13:14:01 UTC

[1/4] flink git commit: [FLINK-9299] [docs] Fix errors in ProcessWindowFunction documentation Java examples

Repository: flink
Updated Branches:
  refs/heads/master c6fa05e56 -> 3df780902


[FLINK-9299] [docs] Fix errors in ProcessWindowFunction documentation Java examples

This closes #6001


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22531a87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22531a87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22531a87

Branch: refs/heads/master
Commit: 22531a87158a725804734b27e091ff6597686dd6
Parents: 9ddb978
Author: yanghua <ya...@gmail.com>
Authored: Sun May 13 16:27:11 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:50 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 28 ++++++++++++++--------------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22531a87/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 649bfe8..ad2b516 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -504,7 +504,7 @@ private static class AverageAggregate
 
   @Override
   public Double getResult(Tuple2<Long, Long> accumulator) {
-    return accumulator.f0 / accumulator.f1;
+    return ((double) accumulator.f0) / accumulator.f1;
   }
 
   @Override
@@ -730,7 +730,7 @@ input
 
 /* ... */
 
-public class MyProcessWindowFunction implements ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
+public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
 
   void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {
     long count = 0;
@@ -778,7 +778,7 @@ The example shows a `ProcessWindowFunction` that counts the elements in a window
 A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`, or a `FoldFunction` to
 incrementally aggregate elements as they arrive in the window.
 When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result.
-This allows to incrementally compute windows while having access to the
+This allows it to incrementally compute windows while having access to the
 additional window meta information of the `ProcessWindowFunction`.
 
 <span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of
@@ -797,7 +797,7 @@ DataStream<SensorReading> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
 
 // Function definitions
@@ -810,7 +810,7 @@ private static class MyReduceFunction implements ReduceFunction<SensorReading> {
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
+    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -830,7 +830,7 @@ val input: DataStream[SensorReading] = ...
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .reduce(
     (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
     ( key: String,
@@ -856,11 +856,11 @@ the average.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<Tuple2<String, Long> input = ...;
+DataStream<Tuple2<String, Long>> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
 
 // Function definitions
@@ -883,7 +883,7 @@ private static class AverageAggregate
 
   @Override
   public Double getResult(Tuple2<Long, Long> accumulator) {
-    return accumulator.f0 / accumulator.f1;
+    return ((double) accumulator.f0) / accumulator.f1;
   }
 
   @Override
@@ -893,7 +893,7 @@ private static class AverageAggregate
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
+    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -913,7 +913,7 @@ val input: DataStream[(String, Long)] = ...
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
 
 // Function definitions
@@ -959,7 +959,7 @@ DataStream<SensorReading> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
 
 // Function definitions
@@ -975,7 +975,7 @@ private static class MyFoldFunction
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
+    extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -995,7 +995,7 @@ val input: DataStream[SensorReading] = ...
 
 input
  .keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
  .fold (
     ("", 0L, 0),
     (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },


[3/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

Posted by se...@apache.org.
[FLINK-9292] [core] Remove TypeInfoParser (part 2)

This changes #5970


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb978b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb978b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb978b

Branch: refs/heads/master
Commit: 9ddb978b04fb602609675936df3d0c6ff9b8519b
Parents: c6fa05e
Author: yanghua <ya...@gmail.com>
Authored: Wed May 9 17:13:06 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:50 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaProducerTestBase.java |   4 +-
 flink-core/pom.xml                              |   1 +
 .../api/java/typeutils/TypeInfoParser.java      | 417 -------------------
 .../api/java/typeutils/TypeExtractorTest.java   | 105 +++--
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  80 ++--
 .../flink/api/java/sca/UdfAnalyzerTest.java     | 241 +++++------
 .../graph/asm/dataset/ChecksumHashCodeTest.java |   5 +-
 .../flink/graph/asm/dataset/CollectTest.java    |   5 +-
 .../flink/graph/asm/dataset/CountTest.java      |   5 +-
 9 files changed, 226 insertions(+), 637 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 5023a7e..629497e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -116,7 +116,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
 			expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
 
-			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){});
 
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setRestartStrategy(RestartStrategies.noRestart());

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index efd7b12..9826be0 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -177,6 +177,7 @@ under the License.
 
 							<!-- leaked constructor in TypeHint -->
 							<exclude>org.apache.flink.api.common.typeinfo.TypeHint</exclude>
+							<exclude>org.apache.flink.api.java.typeutils.TypeInfoParser</exclude>
 						</excludes>
 					</parameter>
 				</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
deleted file mode 100644
index bb74e70..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ /dev/null
@@ -1,417 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
- import org.apache.flink.annotation.Public;
- import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.types.Value;
-
- import java.lang.reflect.Field;
- import java.util.ArrayList;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
-
-/**
- * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
- */
-@Deprecated
-@Public
-public class TypeInfoParser {
-	private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
-	private static final String VALUE_PACKAGE = "org.apache.flink.types";
-	private static final String WRITABLE_PACKAGE = "org.apache.hadoop.io";
-
-	private static final Pattern tuplePattern = Pattern.compile("^(" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?((Tuple[1-9][0-9]?)<|(Tuple0))");
-	private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Writable)<([^\\s,>]*)(,|>|$|\\[)");
-	private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$|\\[)");
-	private static final Pattern basicTypePattern = Pattern
-			.compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)");
-	private static final Pattern basicTypeDatePattern = Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)");
-	private static final Pattern basicTypeBigIntPattern = Pattern.compile("^((java\\.math\\.)?BigInteger)(,|>|$|\\[)");
-	private static final Pattern basicTypeBigDecPattern = Pattern.compile("^((java\\.math\\.)?BigDecimal)(,|>|$|\\[)");
-	private static final Pattern primitiveTypePattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)");
-	private static final Pattern valueTypePattern = Pattern.compile("^((" + VALUE_PACKAGE.replaceAll("\\.", "\\\\.")
-			+ "\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)");
-	private static final Pattern pojoGenericObjectPattern = Pattern.compile("^([^\\s,<>\\[]+)(<)?");
-	private static final Pattern fieldPattern = Pattern.compile("^([^\\s,<>\\[]+)=");
-
-	/**
-	 * Generates an instance of <code>TypeInformation</code> by parsing a type
-	 * information string. A type information string can contain the following
-	 * types:
-	 *
-	 * <ul>
-	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
-	 * <li>Basic type arrays such as <code>Integer[]</code>,
-	 * <code>String[]</code>, etc.
-	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
-	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
-	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
-	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
-	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
-	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
-	 * <li>Value types such as <code>DoubleValue</code>,
-	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
-	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
-	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
-	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
-	 * </ul>
-	 *
-	 * Example:
-	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
-	 *
-	 * @param infoString
-	 *            type information string to be parsed
-	 * @return <code>TypeInformation</code> representation of the string
-	 */
-	@SuppressWarnings("unchecked")
-	public static <X> TypeInformation<X> parse(String infoString) {
-		try {
-			if (infoString == null) {
-				throw new IllegalArgumentException("String is null.");
-			}
-			String clearedString = infoString.replaceAll("\\s", "");
-			if (clearedString.length() == 0) {
-				throw new IllegalArgumentException("String must not be empty.");
-			}
-			StringBuilder sb = new StringBuilder(clearedString);
-			TypeInformation<X> ti = (TypeInformation<X>) parse(sb);
-			if (sb.length() > 0) {
-				throw new IllegalArgumentException("String could not be parsed completely.");
-			}
-			return ti;
-		} catch (Exception e) {
-			throw new IllegalArgumentException("String could not be parsed: " + e.getMessage(), e);
-		}
-	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	private static TypeInformation<?> parse(StringBuilder sb) throws ClassNotFoundException {
-		String infoString = sb.toString();
-		final Matcher tupleMatcher = tuplePattern.matcher(infoString);
-
-		final Matcher writableMatcher = writablePattern.matcher(infoString);
-		final Matcher enumMatcher = enumPattern.matcher(infoString);
-
-		final Matcher basicTypeMatcher = basicTypePattern.matcher(infoString);
-		final Matcher basicTypeDateMatcher = basicTypeDatePattern.matcher(infoString);
-		final Matcher basicTypeBigIntMatcher = basicTypeBigIntPattern.matcher(infoString);
-		final Matcher basicTypeBigDecMatcher = basicTypeBigDecPattern.matcher(infoString);
-
-		final Matcher primitiveTypeMatcher = primitiveTypePattern.matcher(infoString);
-
-		final Matcher valueTypeMatcher = valueTypePattern.matcher(infoString);
-
-		final Matcher pojoGenericMatcher = pojoGenericObjectPattern.matcher(infoString);
-
-		if (infoString.length() == 0) {
-			return null;
-		}
-
-		TypeInformation<?> returnType = null;
-		boolean isPrimitiveType = false;
-
-		// tuples
-		if (tupleMatcher.find()) {
-			boolean isGenericTuple = true;
-			String className = tupleMatcher.group(3);
-			if(className == null) { // matched Tuple0
-				isGenericTuple = false;
-				className = tupleMatcher.group(2);
-				sb.delete(0, className.length());
-			} else {
-				sb.delete(0, className.length() + 1); // +1 for "<"
-			}
-
-			if (infoString.startsWith(TUPLE_PACKAGE)) {
-				sb.delete(0, TUPLE_PACKAGE.length() + 1); // +1 for trailing "."
-			}
-
-			int arity = Integer.parseInt(className.replaceAll("\\D", ""));
-			Class<?> clazz = loadClass(TUPLE_PACKAGE + "." + className);
-
-			TypeInformation<?>[] types = new TypeInformation<?>[arity];
-			for (int i = 0; i < arity; i++) {
-				types[i] = parse(sb);
-				if (types[i] == null) {
-					throw new IllegalArgumentException("Tuple arity does not match given parameters.");
-				}
-			}
-			if (isGenericTuple) {
-				if(sb.charAt(0) != '>') {
-					throw new IllegalArgumentException("Tuple arity does not match given parameters.");
-				}
-				// remove '>'
-				sb.deleteCharAt(0);
-			}
-			returnType = new TupleTypeInfo(clazz, types);
-		}
-		// writable types
-		else if (writableMatcher.find()) {
-			String className = writableMatcher.group(1);
-			String fullyQualifiedName = writableMatcher.group(3);
-			sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1);
-			Class<?> clazz = loadClass(fullyQualifiedName);
-			returnType = TypeExtractor.createHadoopWritableTypeInfo(clazz);
-		}
-		// enum types
-		else if (enumMatcher.find()) {
-			String className = enumMatcher.group(1);
-			String fullyQualifiedName = enumMatcher.group(3);
-			sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1);
-			Class<?> clazz = loadClass(fullyQualifiedName);
-			returnType = new EnumTypeInfo(clazz);
-		}
-		// basic types
-		else if (basicTypeMatcher.find()) {
-			String className = basicTypeMatcher.group(1);
-			sb.delete(0, className.length());
-			Class<?> clazz;
-			// check if fully qualified
-			if (className.startsWith("java.lang")) {
-				clazz = loadClass(className);
-			} else {
-				clazz = loadClass("java.lang." + className);
-			}
-			returnType = BasicTypeInfo.getInfoFor(clazz);
-		}
-		// special basic type "Date"
-		else if (basicTypeDateMatcher.find()) {
-			String className = basicTypeDateMatcher.group(1);
-			sb.delete(0, className.length());
-			Class<?> clazz;
-			// check if fully qualified
-			if (className.startsWith("java.util")) {
-				clazz = loadClass(className);
-			} else {
-				clazz = loadClass("java.util." + className);
-			}
-			returnType = BasicTypeInfo.getInfoFor(clazz);
-		}
-		// special basic type "BigInteger"
-		else if (basicTypeBigIntMatcher.find()) {
-			String className = basicTypeBigIntMatcher.group(1);
-			sb.delete(0, className.length());
-			Class<?> clazz;
-			// check if fully qualified
-			if (className.startsWith("java.math")) {
-				clazz = loadClass(className);
-			} else {
-				clazz = loadClass("java.math." + className);
-			}
-			returnType = BasicTypeInfo.getInfoFor(clazz);
-		}
-		// special basic type "BigDecimal"
-		else if (basicTypeBigDecMatcher.find()) {
-			String className = basicTypeBigDecMatcher.group(1);
-			sb.delete(0, className.length());
-			Class<?> clazz;
-			// check if fully qualified
-			if (className.startsWith("java.math")) {
-				clazz = loadClass(className);
-			} else {
-				clazz = loadClass("java.math." + className);
-			}
-			returnType = BasicTypeInfo.getInfoFor(clazz);
-		}
-		// primitive types
-		else if (primitiveTypeMatcher.find()) {
-			String keyword = primitiveTypeMatcher.group(1);
-			sb.delete(0, keyword.length());
-
-			Class<?> clazz = null;
-			if (keyword.equals("int")) {
-				clazz = int.class;
-			} else if (keyword.equals("byte")) {
-				clazz = byte.class;
-			} else if (keyword.equals("short")) {
-				clazz = short.class;
-			} else if (keyword.equals("char")) {
-				clazz = char.class;
-			} else if (keyword.equals("double")) {
-				clazz = double.class;
-			} else if (keyword.equals("float")) {
-				clazz = float.class;
-			} else if (keyword.equals("long")) {
-				clazz = long.class;
-			} else if (keyword.equals("boolean")) {
-				clazz = boolean.class;
-			} else if (keyword.equals("void")) {
-				clazz = void.class;
-			}
-			returnType = BasicTypeInfo.getInfoFor(clazz);
-			isPrimitiveType = true;
-		}
-		// values
-		else if (valueTypeMatcher.find()) {
-			String className = valueTypeMatcher.group(1);
-			sb.delete(0, className.length() + 5);
-
-			Class<?> clazz;
-			// check if fully qualified
-			if (className.startsWith(VALUE_PACKAGE)) {
-				clazz = loadClass(className + "Value");
-			} else {
-				clazz = loadClass(VALUE_PACKAGE + "." + className + "Value");
-			}
-			returnType = ValueTypeInfo.getValueTypeInfo((Class<Value>) clazz);
-		}
-		// pojo objects or generic types
-		else if (pojoGenericMatcher.find()) {
-			String fullyQualifiedName = pojoGenericMatcher.group(1);
-			sb.delete(0, fullyQualifiedName.length());
-
-			boolean isPojo = pojoGenericMatcher.group(2) != null;
-
-			// pojo
-			if (isPojo) {
-				sb.deleteCharAt(0);
-				Class<?> clazz = loadClass(fullyQualifiedName);
-
-				ArrayList<PojoField> fields = new ArrayList<PojoField>();
-				while (sb.charAt(0) != '>') {
-					final Matcher fieldMatcher = fieldPattern.matcher(sb);
-					if (!fieldMatcher.find()) {
-						throw new IllegalArgumentException("Field name missing.");
-					}
-					String fieldName = fieldMatcher.group(1);
-					sb.delete(0, fieldName.length() + 1);
-
-					Field field = TypeExtractor.getDeclaredField(clazz, fieldName);
-					if (field == null) {
-						throw new IllegalArgumentException("Field '" + fieldName + "'could not be accessed.");
-					}
-					fields.add(new PojoField(field, parse(sb)));
-				}
-				sb.deleteCharAt(0); // remove '>'
-				returnType = new PojoTypeInfo(clazz, fields);
-			}
-			// generic type
-			else {
-				returnType = new GenericTypeInfo(loadClass(fullyQualifiedName));
-			}
-		}
-
-		if (returnType == null) {
-			throw new IllegalArgumentException("Error at '" + infoString + "'");
-		}
-
-		// arrays
-		int arrayDimensionCount = 0;
-		while (sb.length() > 1 && sb.charAt(0) == '[' && sb.charAt(1) == ']') {
-			arrayDimensionCount++;
-			sb.delete(0, 2);
-		}
-
-		if (sb.length() > 0 && sb.charAt(0) == '[') {
-			throw new IllegalArgumentException("Closing square bracket missing.");
-		}
-		
-		// construct multidimension array
-		if (arrayDimensionCount > 0) {
-			TypeInformation<?> arrayInfo = null;
-			
-			// first dimension
-			// primitive array
-			if (isPrimitiveType) {
-				if (returnType == BasicTypeInfo.INT_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.BYTE_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.SHORT_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.CHAR_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.DOUBLE_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.FLOAT_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.LONG_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-					arrayInfo = PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.VOID_TYPE_INFO) {
-					throw new IllegalArgumentException("Can not create an array of void.");
-				}
-			}
-			// basic array
-			else if (returnType instanceof BasicTypeInfo
-					&& returnType != BasicTypeInfo.DATE_TYPE_INFO) {
-				if (returnType == BasicTypeInfo.INT_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.BYTE_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.SHORT_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.CHAR_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.DOUBLE_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.FLOAT_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.LONG_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.STRING_TYPE_INFO) {
-					arrayInfo = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
-				} else if (returnType == BasicTypeInfo.VOID_TYPE_INFO) {
-					throw new IllegalArgumentException("Can not create an array of void.");
-				}
-			}
-			// object array
-			else {
-				arrayInfo = ObjectArrayTypeInfo.getInfoFor(loadClass("[L" + returnType.getTypeClass().getName() + ";"),
-						returnType);
-			}
-
-			// further dimensions
-			if (arrayDimensionCount > 1) {
-				String arrayPrefix = "[";
-				for (int i = 1; i < arrayDimensionCount; i++) {
-					arrayPrefix += "[";
-					arrayInfo =  ObjectArrayTypeInfo.getInfoFor(loadClass(arrayPrefix + "L" +
-							returnType.getTypeClass().getName() + ";"), arrayInfo);
-				}
-			}
-			returnType = arrayInfo;
-		}
-
-		// remove possible ','
-		if (sb.length() > 0 && sb.charAt(0) == ',') {
-			sb.deleteCharAt(0);
-		}
-
-		// check if end 
-		return returnType;
-	}
-
-	private static Class<?> loadClass(String fullyQualifiedName) {
-		try {
-			return Class.forName(fullyQualifiedName);
-		} catch (ClassNotFoundException e) {
-			throw new IllegalArgumentException("Class '" + fullyQualifiedName
-					+ "' could not be found. Please note that inner classes must be declared static.");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index 804cf88..b763f54 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -84,7 +85,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));
+		TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) Types.BOOLEAN);
 
 		Assert.assertTrue(ti.isBasicType());
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
@@ -113,7 +114,7 @@ public class TypeExtractorTest {
 
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(9, ti.getArity());
@@ -180,7 +181,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>"));
+		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>(){}));
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
 		Assert.assertTrue(ti instanceof TupleTypeInfo);
@@ -249,7 +250,7 @@ public class TypeExtractorTest {
 		};
 
 		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function,
-				(TypeInformation) TypeInfoParser.parse("Tuple0"));
+				(TypeInformation) TypeInformation.of(new TypeHint<Tuple0>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(0, ti.getArity());
@@ -269,7 +270,7 @@ public class TypeExtractorTest {
 			}			
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), (TypeInformation) TypeInfoParser.parse("String"));
+		TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}), (TypeInformation) Types.STRING);
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -323,8 +324,8 @@ public class TypeExtractorTest {
 		};
 
 		TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(function, 
-				(TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType"),
-				(TypeInformation) TypeInfoParser.parse("Integer"));
+				(TypeInformation) TypeInformation.of(new TypeHint<CustomType>(){}),
+				(TypeInformation) Types.INT);
 
 		Assert.assertFalse(ti.isBasicType());
 		Assert.assertFalse(ti.isTupleType());
@@ -394,7 +395,7 @@ public class TypeExtractorTest {
 		};
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, 
-				(TypeInformation) TypeInfoParser.parse("Tuple2<Long,org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType>"));
+				(TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Long, CustomType>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -451,7 +452,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(function, (TypeInformation) TypeInfoParser.parse("StringValue"));
+		TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<StringValue>(){}));
 
 		Assert.assertFalse(ti.isBasicType());
 		Assert.assertFalse(ti.isTupleType());
@@ -481,7 +482,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<StringValue, IntValue>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<StringValue, IntValue>>(){}));
 
 		Assert.assertFalse(ti.isBasicType());
 		Assert.assertTrue(ti.isTupleType());
@@ -520,7 +521,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Long, String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -563,7 +564,7 @@ public class TypeExtractorTest {
 			}			
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, Integer>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, Integer>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
@@ -605,7 +606,7 @@ public class TypeExtractorTest {
 			}			
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, String>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
@@ -631,7 +632,7 @@ public class TypeExtractorTest {
 			}			
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, String>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
@@ -656,11 +657,11 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true);
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING, "name", true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+			TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
 			Assert.fail("Expected an exception");
 		}
 		catch (InvalidTypesException e) {
@@ -680,11 +681,11 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true);
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING, "name", true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+			TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
 			Assert.fail("Expected an exception");
 		}
 		catch (InvalidTypesException e) {
@@ -712,7 +713,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -744,7 +745,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Tuple2<Integer, Integer>>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Tuple2<Integer, Integer>>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -783,7 +784,7 @@ public class TypeExtractorTest {
 			}
 		};
 		
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>>(){}));
 
 		// Should be 
 		// Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>
@@ -823,11 +824,11 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), "name", true);
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, Types.STRING, "name", true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"));
+			TypeExtractor.getMapReturnTypes(function, Types.STRING);
 			Assert.fail("Expected an exception");
 		}
 		catch (InvalidTypesException e) {
@@ -842,7 +843,7 @@ public class TypeExtractorTest {
 			private static final long serialVersionUID = 1L;
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.BOOLEAN);
 
 		Assert.assertTrue(ti.isBasicType());
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
@@ -906,7 +907,7 @@ public class TypeExtractorTest {
 	public void testFunctionDependingOnInputWithCustomTupleInput() {
 		IdentityMapper<SameTypeVariable<String>> function = new IdentityMapper<SameTypeVariable<String>>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -994,7 +995,7 @@ public class TypeExtractorTest {
 	public void testFunctionWithNoGenericSuperclass() {
 		RichMapFunction<?, ?> function = new Mapper2();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
 
 		Assert.assertTrue(ti.isBasicType());
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
@@ -1015,7 +1016,7 @@ public class TypeExtractorTest {
 			private static final long serialVersionUID = 1L;
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("DoubleValue"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<DoubleValue>(){}));
 
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
@@ -1156,11 +1157,11 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"), "name", true);
+		TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInformation.of(new TypeHint<StringValue>(){}), "name", true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"));
+			TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInformation.of(new TypeHint<StringValue>(){}));
 			Assert.fail("Expected an exception");
 		}
 		catch (InvalidTypesException e) {
@@ -1181,7 +1182,7 @@ public class TypeExtractorTest {
 			}
 		};
 
-		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String[]"), (TypeInformation) TypeInfoParser.parse("String[]"));
+		TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<String[]>(){}), (TypeInformation) TypeInformation.of(new TypeHint<String[]>(){}));
 
 		Assert.assertFalse(ti.isBasicType());
 		Assert.assertFalse(ti.isTupleType());
@@ -1224,7 +1225,7 @@ public class TypeExtractorTest {
 		};
 
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function,
-				(TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomArrayObject[]"));
+				(TypeInformation) TypeInformation.of(new TypeHint<CustomArrayObject[]>(){}));
 
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
 		Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
@@ -1242,7 +1243,7 @@ public class TypeExtractorTest {
 			}
 		};
 		
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>[]"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>[]>(){}));
 
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
 		ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
@@ -1262,7 +1263,7 @@ public class TypeExtractorTest {
 	public void testCustomArrayWithTypeVariable() {
 		RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
 
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple1<Boolean>[]>(){}));
 
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
 		ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
@@ -1287,7 +1288,7 @@ public class TypeExtractorTest {
 			private static final long serialVersionUID = 1L;			
 		};
 		
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean[]"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Boolean[]>(){}));
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?,?>);
 		ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
 		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, oati.getComponentInfo());
@@ -1346,14 +1347,14 @@ public class TypeExtractorTest {
 		};
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Integer, String>"));
+			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
 			Assert.fail("exception expected");
 		} catch (InvalidTypesException e) {
 			// right
 		}
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, String, String>"));
+			TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
 			Assert.fail("exception expected");
 		} catch (InvalidTypesException e) {
 			// right
@@ -1369,7 +1370,7 @@ public class TypeExtractorTest {
 		};
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function2, (TypeInformation) TypeInfoParser.parse("IntValue"));
+			TypeExtractor.getMapReturnTypes(function2, (TypeInformation) TypeInformation.of(new TypeHint<IntValue>(){}));
 			Assert.fail("exception expected");
 		} catch (InvalidTypesException e) {
 			// right
@@ -1385,7 +1386,7 @@ public class TypeExtractorTest {
 		};
 		
 		try {
-			TypeExtractor.getMapReturnTypes(function3, (TypeInformation) TypeInfoParser.parse("Integer[]"));
+			TypeExtractor.getMapReturnTypes(function3, (TypeInformation) TypeInformation.of(new TypeHint<Integer[]>(){}));
 			Assert.fail("exception expected");
 		} catch (InvalidTypesException e) {
 			// right
@@ -1405,12 +1406,12 @@ public class TypeExtractorTest {
 	@Test
 	public void testTypeErasure() {
 		TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), 
-					(TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), "name", true);
+					(TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}), "name", true);
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 		
 		try {
 			TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(), 
-					(TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"));
+					(TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
 			
 			Assert.fail("Expected an exception");
 		}
@@ -1571,7 +1572,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testDuplicateValue() {
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValue<String>(), TypeInfoParser.parse("Tuple1<String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValue<String>(), TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1591,7 +1592,7 @@ public class TypeExtractorTest {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Test
 	public void testDuplicateValueNested() {
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValueNested<String>(), TypeInfoParser.parse("Tuple1<Tuple1<String>>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValueNested<String>(), TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(2, ti.getArity());
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1618,7 +1619,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testInputInference1() {
 		EdgeMapper<String, Double> em = new EdgeMapper<String, Double>();
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<String, String, Double>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>(){}));
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1641,7 +1642,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testInputInference2() {
 		EdgeMapper2<Boolean> em = new EdgeMapper2<Boolean>();
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Boolean"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, Types.BOOLEAN);
 		Assert.assertTrue(ti.isTupleType());
 		Assert.assertEquals(3, ti.getArity());
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1663,7 +1664,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testInputInference3() {
 		EdgeMapper3<Boolean, String> em = new EdgeMapper3<Boolean, String>();
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<Boolean,Boolean,String>>(){}));
 		Assert.assertTrue(ti.isBasicType());
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
@@ -1681,7 +1682,7 @@ public class TypeExtractorTest {
 	@Test
 	public void testInputInference4() {
 		EdgeMapper4<Boolean, String> em = new EdgeMapper4<Boolean, String>();
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>[]"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<Boolean,Boolean,String>[]>(){}));
 		Assert.assertTrue(ti.isBasicType());
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
@@ -1762,7 +1763,7 @@ public class TypeExtractorTest {
 				return null;
 			}
 		};
-		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Tuple2<Integer, Double>[][]"));
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<Tuple2<Integer, Double>[][]>(){}));
 		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString());
 
 		// primitive array
@@ -1775,7 +1776,7 @@ public class TypeExtractorTest {
 				return null;
 			}
 		};
-		ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("int[][][]"));
+		ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<int[][][]>(){}));
 		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString());
 
 		// basic array
@@ -1788,7 +1789,7 @@ public class TypeExtractorTest {
 				return null;
 			}
 		};
-		ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Integer[][][]"));
+		ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<Integer[][][]>(){}));
 		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Integer>>>", ti.toString());
 
 		// pojo array
@@ -1802,16 +1803,14 @@ public class TypeExtractorTest {
 			}
 		};
 		ti = TypeExtractor.getMapReturnTypes((MapFunction)function, 
-				TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType<"
-					+ "myField1=String,myField2=int"
-					+ ">[][][]"));
+				TypeInformation.of(new TypeHint<CustomType[][][]>(){}));
 		
 		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
 				+ "PojoType<org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType, fields = [myField1: String, myField2: Integer]>"
 				+ ">>>", ti.toString());
 		
 		// generic array
-		ti = TypeExtractor.getMapReturnTypes((MapFunction) new MapperWithMultiDimGenericArray<String>(), TypeInfoParser.parse("String[][][]"));
+		ti = TypeExtractor.getMapReturnTypes((MapFunction) new MapperWithMultiDimGenericArray<String>(), TypeInformation.of(new TypeHint<String[][][]>(){}));
 		Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple1<String>>>>", ti.toString());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 1b0cbec..e7b87a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
@@ -161,8 +164,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testEnumTrianglesBasicExamplesTriadBuilder() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, TriadBuilder.class,
-				"Tuple2<Integer, Integer>",
-				"Tuple3<Integer, Integer, Integer>",
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+				TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, Integer>>(){}),
 				new String[] { "0" });
 	}
 
@@ -180,8 +183,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testEnumTrianglesBasicExamplesTupleEdgeConverter() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, TupleEdgeConverter.class,
-				"Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
@@ -196,8 +199,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testEnumTrianglesOptExamplesEdgeDuplicator() {
 		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, EdgeDuplicator.class,
-				"Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	private static class DegreeCounter implements GroupReduceFunction<Edge, Edge> {
@@ -242,8 +245,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testEnumTrianglesOptExamplesDegreeCounter() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DegreeCounter.class,
-				"Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>",
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
 				new String[] { "0" });
 	}
 
@@ -321,8 +324,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testKMeansExamplesCentroidAccumulator() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, CentroidAccumulator.class,
-				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
-				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+				TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
 				new String[] { "0" });
 	}
 
@@ -337,8 +340,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testKMeansExamplesCentroidAverager() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, CentroidAverager.class,
-				"Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Centroid<x=double,y=double,id=int>");
+				TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
+				TypeInformation.of(new TypeHint<Centroid>(){}));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -360,8 +363,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testConnectedComponentsExamplesUndirectEdge() {
 		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, UndirectEdge.class,
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFieldsFirst("*")
@@ -377,9 +380,9 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testConnectedComponentsExamplesComponentIdFilter() {
 		compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, ComponentIdFilter.class,
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFields("*->f0;*->f1")
@@ -393,8 +396,7 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testConnectedComponentsExamplesDuplicateValue() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, DuplicateValue.class,
-				"Long",
-				"Tuple2<Long, Long>");
+			Types.LONG, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFieldsFirst("f1->f1")
@@ -409,9 +411,9 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
 		compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -435,9 +437,9 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testWebLogAnalysisExamplesAntiJoinVisits() {
 		compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
-				"Tuple3<Integer, String, Integer>",
-				"Tuple1<String>",
-				"Tuple3<Integer, String, Integer>",
+				TypeInformation.of(new TypeHint<Tuple3<Integer, String, Integer>>(){}),
+				TypeInformation.of(new TypeHint<Tuple1<String>>(){}),
+				TypeInformation.of(new TypeHint<Tuple3<Integer, String, Integer>>(){}),
 				new String[] { "1" }, new String[] { "0" });
 	}
 
@@ -465,8 +467,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testPageRankBasicExamplesBuildOutgoingEdgeList() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, BuildOutgoingEdgeList.class,
-				"Tuple2<Long, Long>",
-				"Tuple2<Long, Long[]>",
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+				TypeInformation.of(new TypeHint<Tuple2<Long, Long[]>>(){}),
 				new String[] { "0" });
 	}
 
@@ -551,8 +553,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testLogisticRegressionExamplesSumGradient() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
-				"Tuple1<double>",
-				"Tuple1<double>",
+				TypeInformation.of(new TypeHint<Tuple1<Double>>(){}),
+				TypeInformation.of(new TypeHint<Tuple1<Double>>(){}),
 				new String[] { "0" });
 	}
 
@@ -584,8 +586,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testLogisticRegressionExamplesPointParser() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
-				"String",
-				"Tuple2<Integer, double[]>");
+				Types.STRING,
+				TypeInformation.of(new TypeHint<Tuple2<Integer, double[]>>(){}));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -620,8 +622,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testCanopyExamplesMassageBOW() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
-				"String",
-				"Tuple2<Integer, String>");
+				Types.STRING,
+				TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
 	}
 
 	@ForwardedFields("0")
@@ -642,8 +644,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testCanopyExamplesDocumentReducer() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DocumentReducer.class,
-				"Tuple2<Integer, String>",
-				"Tuple5<Integer, Boolean, Boolean, String, String>",
+				TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}),
+				TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}),
 				new String[] { "0" });
 	}
 
@@ -669,8 +671,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testCanopyExamplesMapToCenter() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, MapToCenter.class,
-				"Tuple5<Integer, Boolean, Boolean, String, String>",
-				"Tuple5<Integer, Boolean, Boolean, String, String>");
+				TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}),
+				TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -725,8 +727,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testKMeansppExamplesRecordToDocConverter() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, RecordToDocConverter.class,
-				"Tuple3<Integer, Integer, Double>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$DocumentWithFreq<id=Integer,wordFreq=java.util.HashMap>",
+				TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, Double>>(){}),
+				TypeInformation.of(new TypeHint<DocumentWithFreq>(){}),
 				new String[] { "0" });
 	}
 }


[2/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 7e88838..dcea16d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
@@ -41,7 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -60,6 +62,10 @@ import static org.junit.Assert.assertEquals;
 @SuppressWarnings("serial")
 public class UdfAnalyzerTest {
 
+	private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
+	private static final TypeInformation<Tuple2<String, String>> STRING_STRING_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, String>>(){});
+
 	@ForwardedFields("f0->*")
 	private static class Map1 implements MapFunction<Tuple2<String, Integer>, String> {
 		public String map(Tuple2<String, Integer> value) throws Exception {
@@ -69,8 +75,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testSingleFieldExtract() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, "Tuple2<String,Integer>",
-				"String");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class,
+			STRING_INT_TUPLE2_TYPE_INFO, Types.STRING);
 	}
 
 	@ForwardedFields("f0->f0;f0->f1")
@@ -82,8 +88,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTuple() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, "Tuple2<String,Integer>",
-				"Tuple2<String,String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class,
+			STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	private static class Map3 implements MapFunction<String[], Integer> {
@@ -96,7 +102,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithArrayAttrAccess() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, "String[]", "Integer");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class,
+			TypeInformation.of(new TypeHint<String[]>(){}), Types.INT);
 	}
 
 	private static class Map4 implements MapFunction<MyPojo, String> {
@@ -109,7 +116,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithGenericTypePublicAttrAccess() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+			new GenericTypeInfo<>(MyPojo.class), Types.STRING);
 	}
 
 	@ForwardedFields("field2->*")
@@ -123,7 +130,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithPojoPublicAttrAccess() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+			TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING);
 	}
 
 	@ForwardedFields("field->*")
@@ -137,7 +144,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithPojoPrivateAttrAccess() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+			TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING);
 	}
 
 	@ForwardedFields("f0->f1")
@@ -153,8 +160,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithCondition() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, "Tuple2<String,Integer>",
-				"Tuple2<String,String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class,
+			STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	private static class Map8 implements MapFunction<Tuple2<String, String>, String> {
@@ -169,8 +176,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testSingleFieldExtractWithCondition() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, "Tuple2<String,String>",
-				"String");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class,
+			STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
 	}
 
 	@ForwardedFields("*->f0")
@@ -185,7 +192,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithInstanceVar() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, "String", "Tuple1<String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, Types.STRING,
+			TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
 	}
 
 	@ForwardedFields("*->f0.f0")
@@ -200,8 +208,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithInstanceVar2() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, "String",
-				"Tuple1<Tuple1<String>>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, Types.STRING,
+			TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
 	}
 
 	@ForwardedFields("*->f1")
@@ -222,8 +230,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, "String",
-				"Tuple2<String, String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, Types.STRING,
+			STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	@ForwardedFields("f0->f0.f0;f0->f1.f0")
@@ -236,8 +244,9 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoNestedTuple() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, "Tuple2<String,Integer>",
-				"Tuple2<Tuple1<String>,Tuple1<String>>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class,
+			STRING_INT_TUPLE2_TYPE_INFO,
+			TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){}));
 	}
 
 	@ForwardedFields("f0->f1.f0")
@@ -253,8 +262,9 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoNestedTupleWithVarAndModification() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, "Tuple2<String,Integer>",
-				"Tuple2<Tuple1<String>,Tuple1<String>>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class,
+			STRING_INT_TUPLE2_TYPE_INFO,
+			TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){}));
 	}
 
 	@ForwardedFields("f0")
@@ -268,8 +278,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithAssignment() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2<String,Integer>",
-				"Tuple2<String,String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class,
+			STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	@ForwardedFields("f0.f0->f0")
@@ -284,7 +294,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardIntoTupleWithInputPath() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class,
-				"Tuple2<Tuple1<String>,Integer>", "Tuple2<String,String>");
+			TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Integer>>(){}),
+			STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	@ForwardedFields("field->field2;field2->field")
@@ -300,8 +311,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardIntoPojoByGettersAndSetters() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+			TypeInformation.of(new TypeHint<MyPojo>(){}), TypeInformation.of(new TypeHint<MyPojo>(){}));
 	}
 
 	private static class Map17 implements MapFunction<String, Tuple1<String>> {
@@ -319,7 +329,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoTupleWithInstanceVarAndCondition() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, "String", "Tuple1<String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, Types.STRING,
+			TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
 	}
 
 	private static class Map18 implements MapFunction<Tuple1<String>, ArrayList<String>> {
@@ -333,8 +344,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoUnsupportedObject() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, "Tuple1<String>",
-				"java.util.ArrayList");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class,
+			TypeInformation.of(new TypeHint<Tuple1<String>>(){}), TypeInformation.of(new TypeHint<java.util.ArrayList>(){}));
 	}
 
 	@ForwardedFields("*->f0")
@@ -351,7 +362,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithNewTupleToNewTupleAssignment() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, "Integer", "Tuple1<Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, Types.INT,
+			TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
 	}
 
 	@ForwardedFields("f0;f1")
@@ -371,7 +383,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithGetMethod() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
-				"Tuple4<Integer, Integer, Integer, Integer>", "Tuple4<Integer, Integer, Integer, Integer>");
+			TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){}),
+			TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("f0->f1;f1->f0")
@@ -387,8 +400,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithSetMethod() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, "Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class,
+			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("f0->f1;f1->f0")
@@ -404,8 +417,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardIntoNewTupleWithSetMethod() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, "Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class,
+			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("*")
@@ -426,8 +439,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithGetMethod2() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, "Tuple1<Integer>",
-				"Tuple1<Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class,
+			TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
 	}
 
 	private static class Map24 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@@ -442,8 +455,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithSetMethod2() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, "Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class,
+			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("f1->f0;f1")
@@ -457,8 +470,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithModifiedInput() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, "Tuple2<Integer, Integer>",
-				"Tuple2<Integer, Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class,
+			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("*->1")
@@ -482,8 +495,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithTuplesGetSetFieldMethods() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, "Integer",
-				"Tuple2<Integer, Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, Types.INT,
+			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
 	}
 
 	@ForwardedFields("2->3;3->7")
@@ -515,8 +528,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithTuplesGetSetFieldMethods2() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class,
-				"Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>",
-				"Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>");
+			TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}),
+			TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}));
 	}
 
 	private static class Map28 implements MapFunction<Integer, Integer> {
@@ -531,7 +544,7 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithBranching1() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, "Integer", "Integer");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, Types.INT, Types.INT);
 	}
 
 	@ForwardedFields("0")
@@ -555,7 +568,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithBranching2() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class,
-				"Tuple3<String, String, String>", "Tuple3<String, String, String>");
+			TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}),
+			TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
 	}
 
 	private static class Map30 implements MapFunction<Tuple2<String, String>, String> {
@@ -573,8 +587,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithBranching3() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, "Tuple2<String,String>",
-				"String");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class,
+			STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
 	}
 
 	@ForwardedFields("1->1;1->0")
@@ -591,8 +605,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithInheritance() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, "Tuple2<String,String>",
-				"Tuple2<String,String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class,
+			STRING_STRING_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
 	}
 
 	@ForwardedFields("*")
@@ -620,8 +634,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithUnboxingAndBoxing() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class,
-				"Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>",
-				"Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>");
+			TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){}),
+			TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){}));
 	}
 
 	private static class Map33 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@@ -640,8 +654,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithBranching4() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, "Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFields("1")
@@ -663,8 +677,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithBranching5() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, "Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	private static class Map35 implements MapFunction<String[], Tuple2<String[], String[]>> {
@@ -678,8 +692,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithArrayModification() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, "String[]",
-				"Tuple2<String[], String[]>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, TypeInformation.of(new TypeHint<String[]>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<String[], String[]>>(){}));
 	}
 
 	private static class Map36 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
@@ -696,8 +710,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithBranching6() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, "Tuple3<String, String, String>",
-				"Tuple3<String, String, String>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}),
+			TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
 	}
 
 	private static class Map37 implements MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> {
@@ -711,8 +725,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithGetAndModification() {
-		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, "Tuple1<Tuple1<String>>",
-				"Tuple1<Tuple1<String>>");
+		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}),
+			TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
 	}
 
 	@ForwardedFields("field")
@@ -727,8 +741,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithInheritance2() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>");
+			TypeInformation.of(new TypeHint<MyPojo2>(){}),
+			TypeInformation.of(new TypeHint<MyPojo2>(){}));
 	}
 
 	private static class Map39 implements MapFunction<MyPojo, MyPojo> {
@@ -743,8 +757,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithGenericTypeOutput() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo");
+			TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){}),
+			TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){}));
 	}
 
 	@ForwardedFields("field2")
@@ -766,8 +780,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithRecursion() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			TypeInformation.of(new TypeHint<MyPojo>(){}));
 	}
 
 	@ForwardedFields("field;field2")
@@ -784,8 +798,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithGetRuntimeContext() {
 		compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			TypeInformation.of(new TypeHint<MyPojo>(){}));
 	}
 
 	@ForwardedFields("*")
@@ -798,8 +812,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithCollector() {
-		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, "Tuple1<Integer>",
-				"Tuple1<Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+			TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
 	}
 
 	@ForwardedFields("0->1;1->0")
@@ -817,8 +831,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWith2Collectors() {
-		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, "Tuple2<Long, Long>",
-				"Tuple2<Long, Long>");
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	private static class FlatMap3 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@@ -835,8 +849,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithCollectorPassing() {
-		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, "Tuple1<Integer>",
-				"Tuple1<Integer>");
+		compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+			TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
 	}
 
 	@ForwardedFieldsFirst("f1->f1")
@@ -850,8 +864,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithDualInput() {
-		compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, "Tuple2<Long, Long>",
-				"Tuple2<Long, Long>", "Tuple2<Long, Long>");
+		compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFieldsFirst("*")
@@ -866,8 +880,8 @@ public class UdfAnalyzerTest {
 
 	@Test
 	public void testForwardWithDualInputAndCollector() {
-		compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, "Tuple2<Long, Long>",
-				"Tuple2<Long, Long>", "Tuple2<Long, Long>");
+		compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
 	}
 
 	@ForwardedFields("0")
@@ -881,7 +895,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithIterable() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce1.class,
-				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0" });
 	}
 
 	@ForwardedFields("1->0")
@@ -907,7 +921,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithIterable2() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce2.class,
-				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0", "1" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0", "1" });
 	}
 
 	@ForwardedFields("field2")
@@ -923,8 +937,8 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithIterable3() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce3.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", new String[] { "field2" });
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			TypeInformation.of(new TypeHint<MyPojo>(){}), new String[] { "field2" });
 	}
 
 	@ForwardedFields("f0->*")
@@ -942,7 +956,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
 	}
 
 	@ForwardedFields("f0->*")
@@ -967,7 +981,7 @@ public class UdfAnalyzerTest {
 	public void testForwardWithAtLeastOneIterationAssumptionForJavac() {
 		// this test simulates javac behaviour in Eclipse IDE
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4Javac.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
 	}
 
 	private static class GroupReduce5 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -987,7 +1001,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption2() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce5.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f1" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f1" });
 	}
 
 	private static class GroupReduce6 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1005,7 +1019,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption3() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce6.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
 	}
 
 	private static class GroupReduce7 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1023,7 +1037,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption4() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce7.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
 	}
 
 	@ForwardedFields("f0->*")
@@ -1042,7 +1056,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption5() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce8.class,
-				"Tuple2<Long, Long>", "Long", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
 	}
 
 	@ForwardedFields("f0")
@@ -1061,7 +1075,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption6() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce9.class,
-				"Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "f0" });
 	}
 
 	private static class GroupReduce10 implements GroupReduceFunction<Tuple2<Long, Long>, Boolean> {
@@ -1082,7 +1096,7 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithAtLeastOneIterationAssumption7() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce10.class,
-				"Tuple2<Long, Long>", "Boolean", new String[] { "f0" });
+			TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.BOOLEAN, new String[] { "f0" });
 	}
 
 	@ForwardedFields("field")
@@ -1096,9 +1110,9 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithReduce() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce1.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				new String[] { "field" });
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			new String[] { "field" });
 	}
 
 	@ForwardedFields("field")
@@ -1115,9 +1129,9 @@ public class UdfAnalyzerTest {
 	@Test
 	public void testForwardWithBranchingReduce() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce2.class,
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
-				new String[] { "field" });
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			TypeInformation.of(new TypeHint<MyPojo>(){}),
+			new String[] { "field" });
 	}
 
 	private static class NullReturnMapper1 implements MapFunction<String, String> {
@@ -1215,7 +1229,7 @@ public class UdfAnalyzerTest {
 	public void testFilterModificationException1() {
 		try {
 			final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator",
-					TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+				STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true);
 			ua.analyze();
 			Assert.fail();
 		}
@@ -1237,7 +1251,7 @@ public class UdfAnalyzerTest {
 	public void testFilterModificationException2() {
 		try {
 			final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator",
-					TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+				STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true);
 			ua.analyze();
 			Assert.fail();
 		}
@@ -1303,17 +1317,14 @@ public class UdfAnalyzerTest {
 		}
 	}
 
-	public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, String in,
-			String out) {
-		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, out, null);
+	public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz,
+		TypeInformation<?> inType, TypeInformation<?> outType) {
+		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, inType, outType, null);
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public static void compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, Class<?> clazz,
-			String in, String out, String[] keys) {
-		final TypeInformation<?> inType = TypeInfoParser.parse(in);
-		final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+		TypeInformation<?> inType, TypeInformation<?> outType, String[] keys) {
 		// expected
 		final Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(clazz);
 		SingleInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsSingle(annotations, inType,
@@ -1331,18 +1342,14 @@ public class UdfAnalyzerTest {
 		assertEquals(expected.toString(), actual.toString());
 	}
 
-	public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, String in1,
-			String in2, String out) {
-		compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, in2, out, null, null);
+	public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz,
+		TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType) {
+		compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1Type, in2Type, outType, null, null);
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public static void compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, Class<?> clazz,
-			String in1, String in2, String out, String[] keys1, String[] keys2) {
-		final TypeInformation<?> in1Type = TypeInfoParser.parse(in1);
-		final TypeInformation<?> in2Type = TypeInfoParser.parse(in2);
-		final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+		TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType, String[] keys1, String[] keys2) {
 		// expected
 		final Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(clazz);
 		final DualInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index a31ce2e..06f486d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -62,7 +61,7 @@ public class ChecksumHashCodeTest {
 
 	@Test
 	public void testEmptyList() throws Exception {
-		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
 
 		Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index cfeadce..027b809 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -61,7 +60,7 @@ public class CollectTest {
 
 	@Test
 	public void testEmptyList() throws Exception {
-		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
 
 		List<Long> collected = new Collect<Long>().run(dataset).execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index 0167a5f..dc92c55 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -60,7 +59,7 @@ public class CountTest {
 
 	@Test
 	public void testEmptyList() throws Exception {
-		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
 
 		long count = new Count<Long>().run(dataset).execute();
 


[4/4] flink git commit: [hotfix] [build] Force delete corrupt jar files from cache

Posted by se...@apache.org.
[hotfix] [build] Force delete corrupt jar files from cache


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3df78090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3df78090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3df78090

Branch: refs/heads/master
Commit: 3df7809027b75032aea0aadb53fd53fb13f90754
Parents: 22531a8
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 16 22:51:02 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:51 2018 +0200

----------------------------------------------------------------------
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3df78090/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 4acdeb0..cad9c87 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -112,7 +112,7 @@ before_install:
    - "export PATH=$M2_HOME/bin:$PATH"
    - "export MAVEN_OPTS=\"-Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS\""
 # just in case: clean up the .m2 home and remove invalid jar files
-   - 'test ! -d $HOME/.m2/repository/ || find $HOME/.m2/repository/ -name "*.jar" -exec sh -c ''if ! zip -T {} >/dev/null ; then echo "deleting invalid file: {}"; rm {} ; fi'' \;'
+   - 'test ! -d $HOME/.m2/repository/ || find $HOME/.m2/repository/ -name "*.jar" -exec sh -c ''if ! zip -T {} >/dev/null ; then echo "deleting invalid file: {}"; rm -f {} ; fi'' \;'
 
 # We run mvn and monitor its output. If there is no output for the specified number of seconds, we
 # print the stack traces of all running Java processes.