You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/19 08:33:56 UTC

flink git commit: [FLINK-7452] [types] Add helper methods for all built-in Flink types to Types

Repository: flink
Updated Branches:
  refs/heads/master 7f99a0df6 -> e30066dbd


[FLINK-7452] [types] Add helper methods for all built-in Flink types to Types

This closes #4612.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30066db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30066db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30066db

Branch: refs/heads/master
Commit: e30066dbd1ebf3c5780df89d766554042c8345a7
Parents: 7f99a0d
Author: twalthr <tw...@apache.org>
Authored: Mon Aug 28 14:13:07 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Dec 19 09:33:29 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   6 +-
 docs/dev/table/tableApi.md                      |   6 +-
 .../apache/flink/orc/OrcTableSourceTest.java    |   2 +-
 .../apache/flink/api/common/typeinfo/Types.java | 419 ++++++++++++++++++-
 .../api/java/typeutils/TypeInfoParser.java      |   6 +-
 .../org/apache/flink/table/api/Types.scala      | 141 +++++--
 .../CorrelateStringExpressionTest.scala         |   3 +-
 .../flink/api/scala/typeutils/Types.scala       | 388 +++++++++++++++++
 8 files changed, 907 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index e5de70a..04d6e84 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -821,9 +821,9 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
 | `Types.FLOAT`          | `REAL, FLOAT`               | `java.lang.Float`      |
 | `Types.DOUBLE`         | `DOUBLE`                    | `java.lang.Double`     |
 | `Types.DECIMAL`        | `DECIMAL`                   | `java.math.BigDecimal` |
-| `Types.DATE`           | `DATE`                      | `java.sql.Date`        |
-| `Types.TIME`           | `TIME`                      | `java.sql.Time`        |
-| `Types.TIMESTAMP`      | `TIMESTAMP(3)`              | `java.sql.Timestamp`   |
+| `Types.SQL_DATE`       | `DATE`                      | `java.sql.Date`        |
+| `Types.SQL_TIME`       | `TIME`                      | `java.sql.Time`        |
+| `Types.SQL_TIMESTAMP`  | `TIMESTAMP(3)`              | `java.sql.Timestamp`   |
 | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH`    | `java.lang.Integer`    |
 | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`       |
 | `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`           |

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 8b6fa72..1cf2a0c 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1559,9 +1559,9 @@ The Table API is built on top of Flink's DataSet and DataStream APIs. Internally
 | `Types.FLOAT`          | `REAL, FLOAT`               | `java.lang.Float`      |
 | `Types.DOUBLE`         | `DOUBLE`                    | `java.lang.Double`     |
 | `Types.DECIMAL`        | `DECIMAL`                   | `java.math.BigDecimal` |
-| `Types.DATE`           | `DATE`                      | `java.sql.Date`        |
-| `Types.TIME`           | `TIME`                      | `java.sql.Time`        |
-| `Types.TIMESTAMP`      | `TIMESTAMP(3)`              | `java.sql.Timestamp`   |
+| `Types.SQL_DATE`       | `DATE`                      | `java.sql.Date`        |
+| `Types.SQL_TIME`       | `TIME`                      | `java.sql.Time`        |
+| `Types.SQL_TIMESTAMP`  | `TIMESTAMP(3)`              | `java.sql.Timestamp`   |
 | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH`    | `java.lang.Integer`    |
 | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`       |
 | `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`           |

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
index 4e4be77..f65faf3 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
@@ -108,7 +108,7 @@ public class OrcTableSourceTest {
 		assertTrue(returnType instanceof RowTypeInfo);
 		RowTypeInfo rowType = (RowTypeInfo) returnType;
 
-		RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
+		TypeInformation<Row> expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
 		assertEquals(expected, rowType);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index e19cdd8..9259064 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -19,56 +19,431 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common types.
+ * This class gives access to the type information of the most common types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * <p>In many cases, Flink tries to analyze generic signatures of functions to determine return
+ * types automatically. This class is intended for cases where type information has to be
+ * supplied manually or cases where automatic type inference results in an inefficient type.
+ *
+ * <p>Please note that the Scala API and Table API have dedicated Types classes.
+ * (See <code>org.apache.flink.api.scala.Types</code> and <code>org.apache.flink.table.api.Types</code>)
+ *
+ * <p>A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-	public static final BasicTypeInfo<String> STRING = BasicTypeInfo.STRING_TYPE_INFO;
-	public static final BasicTypeInfo<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO;
-	public static final BasicTypeInfo<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO;
-	public static final BasicTypeInfo<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO;
-	public static final BasicTypeInfo<Integer> INT = BasicTypeInfo.INT_TYPE_INFO;
-	public static final BasicTypeInfo<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO;
-	public static final BasicTypeInfo<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO;
-	public static final BasicTypeInfo<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO;
-	public static final BasicTypeInfo<BigDecimal> DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+	/**
+	 * Returns type information for {@link java.lang.Void}. Does not support a null value.
+	 */
+	public static final TypeInformation<Void> VOID = BasicTypeInfo.VOID_TYPE_INFO;
+
+	/**
+	 * Returns type information for {@link java.lang.String}. Supports a null value.
+	 */
+	public static final TypeInformation<String> STRING = BasicTypeInfo.STRING_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>byte</code> and {@link java.lang.Byte}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>boolean</code> and {@link java.lang.Boolean}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>short</code> and {@link java.lang.Short}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>int</code> and {@link java.lang.Integer}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Integer> INT = BasicTypeInfo.INT_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>long</code> and {@link java.lang.Long}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>float</code> and {@link java.lang.Float}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO;
+
+	/**
+	 * Returns type information for both a primitive <code>double</code> and {@link java.lang.Double}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO;
 
-	public static final SqlTimeTypeInfo<Date> SQL_DATE = SqlTimeTypeInfo.DATE;
-	public static final SqlTimeTypeInfo<Time> SQL_TIME = SqlTimeTypeInfo.TIME;
-	public static final SqlTimeTypeInfo<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
+	/**
+	 * Returns type information for both a primitive <code>char</code> and {@link java.lang.Character}.
+	 * Does not support a null value.
+	 */
+	public static final TypeInformation<Character> CHAR = BasicTypeInfo.CHAR_TYPE_INFO;
 
 	/**
-	 * Generates a RowTypeInfo with fields of the given types.
-	 * The fields have the default names (f0, f1, f2 ..).
+	 * Returns type information for {@link java.math.BigDecimal}. Supports a null value.
+	 */
+	public static final TypeInformation<BigDecimal> BIG_DEC = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+
+	/**
+	 * Returns type information for {@link java.math.BigInteger}. Supports a null value.
+	 */
+	public static final TypeInformation<BigInteger> BIG_INT = BasicTypeInfo.BIG_INT_TYPE_INFO;
+
+	/**
+	 * Returns type information for {@link java.sql.Date}. Supports a null value.
+	 */
+	public static final TypeInformation<Date> SQL_DATE = SqlTimeTypeInfo.DATE;
+
+	/**
+	 * Returns type information for {@link java.sql.Time}. Supports a null value.
+	 */
+	public static final TypeInformation<Time> SQL_TIME = SqlTimeTypeInfo.TIME;
+
+	/**
+	 * Returns type information for {@link java.sql.Timestamp}. Supports a null value.
+	 */
+	public static final TypeInformation<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
+
+	/**
+	 * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types.
+	 * A row itself must not be null.
+	 *
+	 * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
+	 * deterministic field order. Every field can be null regardless of the field's type.
+	 * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+	 * type information whenever a row is used.
+	 *
+	 * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
+	 * must strictly adhere to the schema defined by the type info.
 	 * 
-	 * <p>This method is a shortcut to {@code new RowTypeInfo(types)}.
+	 * <p>This method generates type information with fields of the given types; the fields have
+	 * the default names (f0, f1, f2 ..).
 	 *
 	 * @param types The types of the row fields, e.g., Types.STRING, Types.INT
 	 */
-	public static RowTypeInfo ROW(TypeInformation<?>... types) {
+	public static TypeInformation<Row> ROW(TypeInformation<?>... types) {
 		return new RowTypeInfo(types);
 	}
 
 	/**
-	 * Generates a RowTypeInfo with fields of the given types and with given names.
-	 * 
+	 * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types and
+	 * with given names. A row must not be null.
+	 *
+	 * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
+	 * deterministic field order. Every field can be null independent of the field's type.
+	 * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+	 * type information whenever a row is used.
+	 *
+	 * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
+	 * must strictly adhere to the schema defined by the type info.
+	 *
 	 * <p>Example use: {@code ROW_NAMED(new String[]{"name", "number"}, Types.STRING, Types.INT)}.
-	 * 
-	 * <p>This method is identical to {@code new RowTypeInfo(types, names)}.
 	 *
 	 * @param fieldNames array of field names
 	 * @param types array of field types
 	 */
-	public static RowTypeInfo ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) {
+	public static TypeInformation<Row> ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) {
 		return new RowTypeInfo(types, fieldNames);
 	}
+
+	/**
+	 * Returns type information for subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple}
+	 * (namely {@link org.apache.flink.api.java.tuple.Tuple0} till {@link org.apache.flink.api.java.tuple.Tuple25})
+	 * with fields of the given types. A tuple must not be null.
+	 *
+	 * <p>A tuple is a fixed-length composite type for storing multiple values in a
+	 * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite
+	 * type; a tuple does not support null-valued fields unless the type of the field supports nullability.
+	 *
+	 * @param types The types of the tuple fields, e.g., Types.STRING, Types.INT
+	 */
+	public static <T extends Tuple> TypeInformation<T> TUPLE(TypeInformation<?>... types) {
+		return new TupleTypeInfo<>(types);
+	}
+
+	/**
+	 * Returns type information for typed subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple}.
+	 * Typed subclassed are classes that extend {@link org.apache.flink.api.java.tuple.Tuple0} till
+	 * {@link org.apache.flink.api.java.tuple.Tuple25} to provide types for all fields and might add
+	 * additional getters and setters for better readability. Additional member fields must not be added.
+	 * A tuple must not be null.
+	 *
+	 * <p>A tuple is a fixed-length composite type for storing multiple values in a
+	 * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite
+	 * type; a tuple does not support null-valued fields unless the type of the field supports nullability.
+	 *
+	 * <p>The generic types for all fields of the tuple can be defined in a hierarchy of subclasses.
+	 *
+	 * <p>If Flink's type analyzer is unable to extract a tuple type information with
+	 * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException}
+	 * is thrown.
+	 *
+	 * <p>Example use:
+	 * <pre>
+	 * {@code
+	 *   class MyTuple extends Tuple2<Integer, String> {
+	 *
+	 *     public int getId() { return f0; }
+	 *
+	 *     public String getName() { return f1; }
+	 *   }
+	 * }
+	 *
+	 * Types.TUPLE(MyTuple.class)
+	 * </pre>
+	 *
+	 * @param tupleSubclass A subclass of {@link org.apache.flink.api.java.tuple.Tuple0} till
+	 *                      {@link org.apache.flink.api.java.tuple.Tuple25} that defines all field types and
+	 *                      does not add any additional fields
+	 */
+	public static <T extends Tuple> TypeInformation<T> TUPLE(Class<T> tupleSubclass) {
+		final TypeInformation<T> ti = TypeExtractor.createTypeInfo(tupleSubclass);
+		if (ti instanceof TupleTypeInfo) {
+			return ti;
+		}
+		throw new InvalidTypesException("Tuple type expected but was: " + ti);
+	}
+
+	/**
+	 * Returns type information for a POJO (Plain Old Java Object).
+	 *
+	 * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+	 * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public
+	 * (and non-final) or have a public getter and a setter method that follows the Java beans naming
+	 * conventions for getters and setters.
+	 *
+	 * <p>A POJO is a fixed-length and null-aware composite type. Every field can be null independent
+	 * of the field's type.
+	 *
+	 * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+	 *
+	 * <p>If Flink's type analyzer is unable to extract a valid POJO type information with
+	 * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException}
+	 * is thrown. Alternatively, you can use {@link Types#POJO(Class, Map)} to specify all fields manually.
+	 *
+	 * @param pojoClass POJO class to be analyzed by Flink
+	 */
+	public static <T> TypeInformation<T> POJO(Class<T> pojoClass) {
+		final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoClass);
+		if (ti instanceof PojoTypeInfo) {
+			return ti;
+		}
+		throw new InvalidTypesException("POJO type expected but was: " + ti);
+	}
+
+	/**
+	 * Returns type information for a POJO (Plain Old Java Object) and allows to specify all fields manually.
+	 *
+	 * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+	 * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public
+	 * (and non-final) or have a public getter and a setter method that follows the Java beans naming
+	 * conventions for getters and setters.
+	 *
+	 * <p>A POJO is a fixed-length, null-aware composite type with non-deterministic field order. Every field
+	 * can be null independent of the field's type.
+	 *
+	 * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+	 *
+	 * <p>If Flink's type analyzer is unable to extract a POJO field, an
+	 * {@link org.apache.flink.api.common.functions.InvalidTypesException} is thrown.
+	 *
+	 * <p><strong>Note:</strong> In most cases the type information of fields can be determined automatically,
+	 * we recommend to use {@link Types#POJO(Class)}.
+	 *
+	 * @param pojoClass POJO class
+	 * @param fields map of fields that map a name to type information. The map key is the name of
+	 *               the field and the value is its type.
+	 */
+	public static <T> TypeInformation<T> POJO(Class<T> pojoClass, Map<String, TypeInformation<?>> fields) {
+		final List<PojoField> pojoFields = new ArrayList<>(fields.size());
+		for (Map.Entry<String, TypeInformation<?>> field : fields.entrySet()) {
+			final Field f = TypeExtractor.getDeclaredField(pojoClass, field.getKey());
+			if (f == null) {
+				throw new InvalidTypesException("Field '" + field.getKey() + "'could not be accessed.");
+			}
+			pojoFields.add(new PojoField(f, field.getValue()));
+		}
+
+		return new PojoTypeInfo<>(pojoClass, pojoFields);
+	}
+
+	/**
+	 * Returns generic type information for any Java object. The serialization logic will
+	 * use the general purpose serializer Kryo.
+	 *
+	 * <p>Generic types are black-boxes for Flink, but allow any object and null values in fields.
+	 *
+	 * <p>By default, serialization of this type is not very efficient. Please read the documentation
+	 * about how to improve efficiency (namely by pre-registering classes).
+	 *
+	 * @param genericClass any Java class
+	 */
+	public static <T> TypeInformation<T> GENERIC(Class<T> genericClass) {
+		return new GenericTypeInfo<>(genericClass);
+	}
+
+	/**
+	 * Returns type information for Java arrays of primitive type (such as <code>byte[]</code>). The array
+	 * must not be null.
+	 *
+	 * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE)
+	 */
+	public static TypeInformation<?> PRIMITIVE_ARRAY(TypeInformation<?> elementType) {
+		if (elementType == BOOLEAN) {
+			return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == BYTE) {
+			return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == SHORT) {
+			return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == INT) {
+			return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == LONG) {
+			return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == FLOAT) {
+			return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == DOUBLE) {
+			return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO;
+		} else if (elementType == CHAR) {
+			return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO;
+		}
+		throw new IllegalArgumentException("Invalid element type for a primitive array.");
+	}
+
+	/**
+	 * Returns type information for Java arrays of object types (such as <code>String[]</code>,
+	 * <code>Integer[]</code>). The array itself must not be null. Null values for elements are supported.
+	 *
+	 * @param elementType element type of the array
+	 */
+	@SuppressWarnings("unchecked")
+	public static <E> TypeInformation<E[]> OBJECT_ARRAY(TypeInformation<E> elementType) {
+		if (elementType == Types.STRING) {
+			return (TypeInformation) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
+		}
+		return ObjectArrayTypeInfo.getInfoFor(elementType);
+	}
+
+	/**
+	 * Returns type information for Flink value types (classes that implement
+	 * {@link org.apache.flink.types.Value}). Built-in value types do not support null values (except
+	 * for {@link org.apache.flink.types.StringValue}).
+	 *
+	 * <p>Value types describe their serialization and deserialization manually. Instead of going
+	 * through a general purpose serialization framework. A value type is reasonable when general purpose
+	 * serialization would be highly inefficient. The wrapped value can be altered, allowing programmers to
+	 * reuse objects and take pressure off the garbage collector.
+	 *
+	 * <p>Flink provides built-in value types for all Java primitive types (such as
+	 * {@link org.apache.flink.types.BooleanValue}, {@link org.apache.flink.types.IntValue}) as well
+	 * as {@link org.apache.flink.types.StringValue}, {@link org.apache.flink.types.NullValue},
+	 * {@link org.apache.flink.types.ListValue}, and {@link org.apache.flink.types.MapValue}.
+	 *
+	 * @param valueType class that implements {@link org.apache.flink.types.Value}
+	 */
+	public static <V extends Value> TypeInformation<V> VALUE(Class<V> valueType) {
+		return new ValueTypeInfo<>(valueType);
+	}
+
+	/**
+	 * Returns type information for a Java {@link java.util.Map}. A map must not be null. Null values
+	 * in keys are not supported. An entry's value can be null.
+	 *
+	 * <p>By default, maps are untyped and treated as a generic type in Flink; therefore, it is useful
+	 * to pass type information whenever a map is used.
+	 *
+	 * <p><strong>Note:</strong> Flink does not preserve the concrete {@link Map} type. It converts a map into {@link HashMap} when
+	 * copying or deserializing.
+	 *
+	 * @param keyType type information for the map's keys
+	 * @param valueType type information for the map's values
+	 */
+	public static <K, V> TypeInformation<Map<K, V>> MAP(TypeInformation<K> keyType, TypeInformation<V> valueType) {
+		return new MapTypeInfo<>(keyType, valueType);
+	}
+
+	/**
+	 * Returns type information for a Java {@link java.util.List}. A list must not be null. Null values
+	 * in elements are not supported.
+	 *
+	 * <p>By default, lists are untyped and treated as a generic type in Flink; therefore, it is useful
+	 * to pass type information whenever a list is used.
+	 *
+	 * <p><strong>Note:</strong> Flink does not preserve the concrete {@link List} type. It converts a list into {@link ArrayList} when
+	 * copying or deserializing.
+	 *
+	 * @param elementType type information for the list's elements
+	 */
+	public static <E> TypeInformation<List<E>> LIST(TypeInformation<E> elementType) {
+		return new ListTypeInfo<>(elementType);
+	}
+
+	/**
+	 * Returns type information for Java enumerations. Null values are not supported.
+	 *
+	 * @param enumType enumeration class extending {@link java.lang.Enum}
+	 */
+	public static <E extends Enum<E>> TypeInformation<E> ENUM(Class<E> enumType) {
+		return new EnumTypeInfo<>(enumType);
+	}
+
+	/**
+	 * Returns type information for Flink's {@link org.apache.flink.types.Either} type. Null values
+	 * are not supported.
+	 *
+	 * <p>Either type can be used for a value of two possible types.
+	 *
+	 * <p>Example use: <code>Types.EITHER(Types.VOID, Types.INT)</code>
+	 *
+	 * @param leftType type information of left side / {@link org.apache.flink.types.Either.Left}
+	 * @param rightType type information of right side / {@link org.apache.flink.types.Either.Right}
+	 */
+	public static <L, R> TypeInformation<Either<L, R>> EITHER(TypeInformation<L> leftType, TypeInformation<R> rightType) {
+		return new EitherTypeInfo<>(leftType, rightType);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 33820e5..12a9ae0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-                                           
+
 package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Field;
@@ -31,6 +31,10 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Value;
 
+/**
+ * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
+ */
+@Deprecated
 @Public
 public class TypeInfoParser {
 	private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
index 100c22b..4be137d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.table.api
 
+import _root_.java.{lang, math, sql, util}
+
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -25,35 +27,95 @@ import org.apache.flink.types.Row
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
 
-  val STRING = JTypes.STRING
-  val BOOLEAN = JTypes.BOOLEAN
+  /**
+    * Returns type information for a Table API string or SQL VARCHAR type.
+    */
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+    * Returns type information for a Table API boolean or SQL BOOLEAN type.
+    */
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+    * Returns type information for a Table API byte or SQL TINYINT type.
+    */
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+    * Returns type information for a Table API short or SQL SMALLINT type.
+    */
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+    * Returns type information for a Table API integer or SQL INT/INTEGER type.
+    */
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
 
-  val BYTE = JTypes.BYTE
-  val SHORT = JTypes.SHORT
-  val INT = JTypes.INT
-  val LONG = JTypes.LONG
-  val FLOAT = JTypes.FLOAT
-  val DOUBLE = JTypes.DOUBLE
-  val DECIMAL = JTypes.DECIMAL
+  /**
+    * Returns type information for a Table API long or SQL BIGINT type.
+    */
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
 
-  val SQL_DATE = JTypes.SQL_DATE
-  val SQL_TIME = JTypes.SQL_TIME
-  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  /**
+    * Returns type information for a Table API float or SQL FLOAT/REAL type.
+    */
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+    * Returns type information for a Table API integer or SQL DOUBLE type.
+    */
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
 
   /**
-    * Generates row type information.
+    * Returns type information for a Table API big decimal or SQL DECIMAL type.
+    */
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+    * Returns type information for a Table API SQL date or SQL DATE type.
+    */
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+    * Returns type information for a Table API SQL time or SQL TIME type.
+    */
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+    * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type.
+    */
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+    * Returns type information for a Table API interval of months.
+    */
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+    * Returns type information for a Table API interval milliseconds.
+    */
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+    * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
+    *
+    * A row is a variable-length, null-aware composite type for storing multiple values in a
+    * deterministic field order. Every field can be null regardless of the field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+    * type information whenever a row is used.
     *
-    * A row type consists of zero or more fields with a field name and a corresponding type.
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
+    * row instances must strictly adhere to the schema defined by the type info.
     *
-    * The fields have the default names (f0, f1, f2 ..).
+    * This method generates type information with fields of the given types; the fields have
+    * the default names (f0, f1, f2 ..).
     *
-    * @param types types of row fields; e.g. Types.STRING, Types.INT
+    * @param types The types of the row fields, e.g., Types.STRING, Types.INT
     */
   @varargs
   def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
@@ -61,19 +123,29 @@ object Types {
   }
 
   /**
-    * Generates row type information.
+    * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
+    * and with given names.
+    *
+    * A row is a variable-length, null-aware composite type for storing multiple values in a
+    * deterministic field order. Every field can be null independent of the field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+    * type information whenever a row is used.
+    *
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
+    * row instances must strictly adhere to the schema defined by the type info.
     *
-    * A row type consists of zero or more fields with a field name and a corresponding type.
+    * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
     *
-    * @param names names of row fields, e.g. "userid", "name"
-    * @param types types of row fields; e.g. Types.STRING, Types.INT
+    * @param fieldNames array of field names
+    * @param types      array of field types
     */
-  def ROW(names: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
-    JTypes.ROW_NAMED(names, types: _*)
+  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
+    JTypes.ROW_NAMED(fieldNames, types: _*)
   }
 
   /**
-    * Generates type information for an array consisting of Java primitive elements.
+    * Generates type information for an array consisting of Java primitive elements. The elements
+    * do not support null values.
     *
     * @param elementType type of the array elements; e.g. Types.INT
     */
@@ -93,30 +165,35 @@ object Types {
   }
 
   /**
-    * Generates type information for an array consisting of Java object elements.
+    * Generates type information for an array consisting of Java object elements. Null values for
+    * elements are supported.
     *
     * @param elementType type of the array elements; e.g. Types.STRING or Types.INT
     */
-  def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+  def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
     ObjectArrayTypeInfo.getInfoFor(elementType)
   }
 
   /**
-    * Generates type information for a Java HashMap.
+    * Generates type information for a Java HashMap. Null values in keys are not supported. An
+    * entry's value can be null.
     *
     * @param keyType type of the keys of the map e.g. Types.STRING
     * @param valueType type of the values of the map e.g. Types.STRING
     */
-  def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
+  def MAP[K, V](
+      keyType: TypeInformation[K],
+      valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
     new MapTypeInfo(keyType, valueType)
   }
 
   /**
-    * Generates type information for a Multiset.
+    * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an
+    * arbitrary key to an integer value. Null values in keys are not supported.
     *
     * @param elementType type of the elements of the multiset e.g. Types.STRING
     */
-  def MULTISET(elementType: TypeInformation[_]): TypeInformation[_] = {
+  def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = {
     new MultisetTypeInfo(elementType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
index 0d12400..110ea6a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils._
 import org.apache.flink.types.Row
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
new file mode 100644
index 0000000..4ce9b0f
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+  * This class gives access to the type information of the most common Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions to determine return
+  * types automatically. This class is intended for cases where type information has to be
+  * supplied manually or cases where automatic type inference results in an inefficient type.
+  *
+  * Scala macros allow to determine type information of classes and type parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
+
+  /**
+    * Generates type information based on the given class and/or its type parameters.
+    *
+    * The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+    * not require to implement anonymous classes.
+    *
+    * If the class could not be analyzed by the Scala type analyzer, the Java analyzer
+    * will be used.
+    *
+    * Example use:
+    *
+    * `Types.of[(Int, String, String)]` for Scala tuples
+    * `Types.of[Unit]` for Scala specific types
+    *
+    * @tparam T class to be analyzed
+    */
+  def of[T: TypeInformation]: TypeInformation[T] = {
+    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+    typeInfo
+  }
+
+  /**
+    * Returns type information for Scala [[Nothing]]. Does not support a null value.
+    */
+  val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+  /**
+    * Returns type information for Scala [[Unit]]. Does not support a null value.
+    */
+  val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+  /**
+    * Returns type information for [[String]] and [[java.lang.String]]. Supports a null value.
+    */
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+    * Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not
+    * support a null value.
+    */
+  val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+  /**
+    * Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not
+    * support a null value.
+    */
+  val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+    * Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not
+    * support a null value.
+    */
+  val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+  /**
+    * Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not
+    * support a null value.
+    */
+  val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+  /**
+    * Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not
+    * support a null value.
+    */
+  val LONG: TypeInformation[java.lang.Long] = JTypes.LONG
+
+  /**
+    * Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not
+    * support a null value.
+    */
+  val FLOAT: TypeInformation[java.lang.Float] = JTypes.FLOAT
+
+  /**
+    * Returns type information for primitive [[Double]] and [[java.lang.Double]]. Does not
+    * support a null value.
+    */
+  val DOUBLE: TypeInformation[java.lang.Double] = JTypes.DOUBLE
+
+  /**
+    * Returns type information for primitive [[Char]] and [[java.lang.Character]]. Does not
+    * support a null value.
+    */
+  val CHAR: TypeInformation[java.lang.Character] = JTypes.CHAR
+
+  /**
+    * Returns type information for Java [[java.math.BigDecimal]]. Supports a null value.
+    *
+    * Note that Scala [[BigDecimal]] is not supported yet.
+    */
+  val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+    * Returns type information for Java [[java.math.BigInteger]]. Supports a null value.
+    *
+    * Note that Scala [[BigInt]] is not supported yet.
+    */
+  val JAVA_BIG_INT: TypeInformation[java.math.BigInteger] = JTypes.BIG_INT
+
+  /**
+    * Returns type information for [[java.sql.Date]]. Supports a null value.
+    */
+  val SQL_DATE: TypeInformation[java.sql.Date] = JTypes.SQL_DATE
+
+  /**
+    * Returns type information for [[java.sql.Time]]. Supports a null value.
+    */
+  val SQL_TIME: TypeInformation[java.sql.Time] = JTypes.SQL_TIME
+
+  /**
+    * Returns type information for [[java.sql.Timestamp]]. Supports a null value.
+    */
+  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+    * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
+    * A row itself must not be null.
+    *
+    * A row is a fixed-length, null-aware composite type for storing multiple values in a
+    * deterministic field order. Every field can be null regardless of the field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+    * type information whenever a row is used.
+    *
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row
+    * instances must strictly adhere to the schema defined by the type info.
+    *
+    * This method generates type information with fields of the given types; the fields have
+    * the default names (f0, f1, f2 ..).
+    *
+    * @param types The types of the row fields, e.g., Types.STRING, Types.INT
+    */
+  def ROW(types: TypeInformation[_]*): TypeInformation[Row] = JTypes.ROW(types: _*)
+
+  /**
+    * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
+    * and with given names. A row must not be null.
+    *
+    * A row is a variable-length, null-aware composite type for storing multiple values in a
+    * deterministic field order. Every field can be null independent of the field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+    * type information whenever a row is used.
+    *
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row
+    * instances must strictly adhere to the schema defined by the type info.
+    *
+    * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
+    *
+    * @param fieldNames array of field names
+    * @param types      array of field types
+    */
+  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] =
+    JTypes.ROW_NAMED(fieldNames, types: _*)
+
+  /**
+    * Returns type information for a POJO (Plain Old Java Object).
+    *
+    * A POJO class is public and standalone (no non-static inner class). It has a public
+    * no-argument constructor. All non-static, non-transient fields in the class (and all
+    * superclasses) are either public (and non-final) or have a public getter and a setter
+    * method that follows the Java beans naming conventions for getters and setters.
+    *
+    * A POJO is a fixed-length, null-aware composite type with non-deterministic field order.
+    * Every field can be null independent of the field's type.
+    *
+    * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+    *
+    * If Flink's type analyzer is unable to extract a valid POJO type information with
+    * type information for all fields, an
+    * [[org.apache.flink.api.common.functions.InvalidTypesException}]] is thrown.
+    * Alternatively, you can use [[Types.POJO(Class, Map)]] to specify all fields manually.
+    *
+    * @param pojoClass POJO class to be analyzed by Flink
+    */
+  def POJO[T](pojoClass: Class[T]): TypeInformation[T] = {
+    JTypes.POJO(pojoClass)
+  }
+
+  /**
+    * Returns type information for a POJO (Plain Old Java Object) and allows to specify all
+    * fields manually.
+    *
+    * A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+    * constructor. All non-static, non-transient fields in the class (and all superclasses) are
+    * either public (and non-final) or have a public getter and a setter method that follows the
+    * Java beans naming conventions for getters and setters.
+    *
+    * A POJO is a fixed-length, null-aware composite type with non-deterministic field order.
+    * Every field can be null independent of the field's type.
+    *
+    * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+    *
+    * If Flink's type analyzer is unable to extract a POJO field, an
+    * [[org.apache.flink.api.common.functions.InvalidTypesException]] is thrown.
+    *
+    * '''Note:''' In most cases the type information of fields can be determined automatically,
+    * we recommend to use [[Types.POJO(Class)]].
+    *
+    * @param pojoClass POJO class
+    * @param fields    map of fields that map a name to type information. The map key is the name of
+    *                  the field and the value is its type.
+    */
+  def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T] = {
+    JTypes.POJO(pojoClass, fields.asJava)
+  }
+
+  /**
+    * Returns generic type information for any Scala/Java object. The serialization logic will
+    * use the general purpose serializer Kryo.
+    *
+    * Generic types are black-boxes for Flink, but allow any object and null values in fields.
+    *
+    * By default, serialization of this type is not very efficient. Please read the documentation
+    * about how to improve efficiency (namely by pre-registering classes).
+    *
+    * @param genericClass any Scala/Java class
+    */
+  def GENERIC[T](genericClass: Class[T]): TypeInformation[T] = JTypes.GENERIC(genericClass)
+
+  /**
+    * Returns type information for a Scala case class and Scala tuples.
+    *
+    * A Scala case class is a fixed-length composite type for storing multiple values in a
+    * deterministic field order. Fields of a case class are typed. Case classes and tuples are
+    * the most efficient composite type; therefore, they do not not support null-valued fields
+    * unless the type of the field supports nullability.
+    *
+    * Example use: `Types.CASE_CLASS[MyCaseClass]`
+    *
+    * @tparam T case class to be analyzed
+    */
+  def CASE_CLASS[T: TypeInformation]: TypeInformation[T] = {
+    val t = Types.of[T]
+    if (t.isInstanceOf[CaseClassTypeInfo[_]]) {
+      t
+    } else {
+      throw new InvalidTypesException("Case class type expected but was: " + t)
+    }
+  }
+
+  /**
+    * Returns type information for a Scala tuple.
+    *
+    * A Scala tuple is a fixed-length composite type for storing multiple values in a
+    * deterministic field order. Fields of a tuple are typed. Tuples are
+    * the most efficient composite type; therefore, they do not not support null-valued fields
+    * unless the type of the field supports nullability.
+    *
+    * Example use: `Types.TUPLE[(String, Int)]`
+    *
+    * @tparam T tuple to be analyzed
+    */
+  def TUPLE[T: TypeInformation]: TypeInformation[T] = {
+    CASE_CLASS[T]
+  }
+
+  /**
+    * Returns type information for Scala/Java arrays of primitive type (such as `Array[Byte]`).
+    * The array and its elements do not support null values.
+    *
+    * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE)
+    */
+  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+    JTypes.PRIMITIVE_ARRAY(elementType)
+  }
+
+  /**
+    * Returns type information for Scala/Java arrays of object types (such as `Array[String]`,
+    * `Array[java.lang.Integer]`). The array itself must not be null. Null values for elements
+    * are supported.
+    *
+    * @param elementType element type of the array
+    */
+  def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
+    // necessary for the Scala compiler
+    JTypes.OBJECT_ARRAY(elementType).asInstanceOf[TypeInformation[Array[E]]]
+  }
+
+  /**
+    * Returns type information for Scala [[Either]] type. Null values are not supported.
+    *
+    * The either type can be used for a value of two possible types.
+    *
+    * Example use: `Types.EITHER(Types.INT, Types.NOTHING]`
+    *
+    * @param leftType type information of left side / [[Left]]
+    * @param rightType type information of right side / [[Right]]
+    */
+  def EITHER[A, B](
+      leftType: TypeInformation[A],
+      rightType: TypeInformation[B]): TypeInformation[Either[A, B]] = {
+    new EitherTypeInfo(classOf[Either[A, B]], leftType, rightType)
+  }
+
+  /**
+    * Returns type information for Scala [[Option]] type. Null values are not supported.
+    *
+    * The option type can be used for distinguishing between a value or no value.
+    *
+    * Example use: `Types.OPTION(Types.INT)`
+    *
+    * @param valueType type information of the option's value
+    */
+  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T] = {
+    new OptionTypeInfo(valueType)
+  }
+
+  /**
+    * Returns type information for Scala [[Try]] type. Null values are not supported.
+    *
+    * The try type can be used for distinguishing between a value or throwable.
+    *
+    * Example use: `Types.TRY(Types.INT)`
+    *
+    * @param valueType type information of the try's value
+    */
+  def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T] = {
+    new TryTypeInfo(valueType)
+  }
+
+  /**
+    * Returns type information for Scala enumerations. Null values are not supported.
+    *
+    * @param enum enumeration object
+    * @param valueClass value class
+    */
+  def ENUMERATION[E <: Enumeration](
+      enum: E,
+      valueClass: Class[E#Value]): TypeInformation[E#Value] = {
+    new EnumValueTypeInfo(enum, valueClass)
+  }
+
+  /**
+    * Returns type information for Scala collections that implement [[Traversable]]. Null values
+    * are not supported.
+    */
+  def TRAVERSABLE[T: TypeInformation]: TypeInformation[T] = {
+    val t = Types.of[T]
+    if (t.isInstanceOf[TraversableTypeInfo[_, _]]) {
+      t
+    } else {
+      throw new InvalidTypesException("Traversable type expected but was: " + t)
+    }
+  }
+}