You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/19 08:33:56 UTC
flink git commit: [FLINK-7452] [types] Add helper methods for all
built-in Flink types to Types
Repository: flink
Updated Branches:
refs/heads/master 7f99a0df6 -> e30066dbd
[FLINK-7452] [types] Add helper methods for all built-in Flink types to Types
This closes #4612.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30066db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30066db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30066db
Branch: refs/heads/master
Commit: e30066dbd1ebf3c5780df89d766554042c8345a7
Parents: 7f99a0d
Author: twalthr <tw...@apache.org>
Authored: Mon Aug 28 14:13:07 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Dec 19 09:33:29 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 6 +-
docs/dev/table/tableApi.md | 6 +-
.../apache/flink/orc/OrcTableSourceTest.java | 2 +-
.../apache/flink/api/common/typeinfo/Types.java | 419 ++++++++++++++++++-
.../api/java/typeutils/TypeInfoParser.java | 6 +-
.../org/apache/flink/table/api/Types.scala | 141 +++++--
.../CorrelateStringExpressionTest.scala | 3 +-
.../flink/api/scala/typeutils/Types.scala | 388 +++++++++++++++++
8 files changed, 907 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index e5de70a..04d6e84 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -821,9 +821,9 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` |
| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` |
| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` |
-| `Types.DATE` | `DATE` | `java.sql.Date` |
-| `Types.TIME` | `TIME` | `java.sql.Time` |
-| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
+| `Types.SQL_DATE` | `DATE` | `java.sql.Date` |
+| `Types.SQL_TIME` | `TIME` | `java.sql.Time` |
+| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 8b6fa72..1cf2a0c 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1559,9 +1559,9 @@ The Table API is built on top of Flink's DataSet and DataStream APIs. Internally
| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` |
| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` |
| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` |
-| `Types.DATE` | `DATE` | `java.sql.Date` |
-| `Types.TIME` | `TIME` | `java.sql.Time` |
-| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
+| `Types.SQL_DATE` | `DATE` | `java.sql.Date` |
+| `Types.SQL_TIME` | `TIME` | `java.sql.Time` |
+| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
index 4e4be77..f65faf3 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
@@ -108,7 +108,7 @@ public class OrcTableSourceTest {
assertTrue(returnType instanceof RowTypeInfo);
RowTypeInfo rowType = (RowTypeInfo) returnType;
- RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
+ TypeInformation<Row> expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
assertEquals(expected, rowType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index e19cdd8..9259064 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -19,56 +19,431 @@
package org.apache.flink.api.common.typeinfo;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.Value;
+import java.lang.reflect.Field;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
- * This class gives access to the type information of the most most common types.
+ * This class gives access to the type information of the most common types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * <p>In many cases, Flink tries to analyze generic signatures of functions to determine return
+ * types automatically. This class is intended for cases where type information has to be
+ * supplied manually or cases where automatic type inference results in an inefficient type.
+ *
+ * <p>Please note that the Scala API and Table API have dedicated Types classes.
+ * (See <code>org.apache.flink.api.scala.Types</code> and <code>org.apache.flink.table.api.Types</code>)
+ *
+ * <p>A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint}
*/
@PublicEvolving
public class Types {
- public static final BasicTypeInfo<String> STRING = BasicTypeInfo.STRING_TYPE_INFO;
- public static final BasicTypeInfo<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO;
- public static final BasicTypeInfo<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO;
- public static final BasicTypeInfo<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO;
- public static final BasicTypeInfo<Integer> INT = BasicTypeInfo.INT_TYPE_INFO;
- public static final BasicTypeInfo<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO;
- public static final BasicTypeInfo<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO;
- public static final BasicTypeInfo<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO;
- public static final BasicTypeInfo<BigDecimal> DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+ /**
+ * Returns type information for {@link java.lang.Void}. Does not support a null value.
+ */
+ public static final TypeInformation<Void> VOID = BasicTypeInfo.VOID_TYPE_INFO;
+
+ /**
+ * Returns type information for {@link java.lang.String}. Supports a null value.
+ */
+ public static final TypeInformation<String> STRING = BasicTypeInfo.STRING_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>byte</code> and {@link java.lang.Byte}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>boolean</code> and {@link java.lang.Boolean}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>short</code> and {@link java.lang.Short}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>int</code> and {@link java.lang.Integer}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Integer> INT = BasicTypeInfo.INT_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>long</code> and {@link java.lang.Long}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>float</code> and {@link java.lang.Float}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO;
+
+ /**
+ * Returns type information for both a primitive <code>double</code> and {@link java.lang.Double}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO;
- public static final SqlTimeTypeInfo<Date> SQL_DATE = SqlTimeTypeInfo.DATE;
- public static final SqlTimeTypeInfo<Time> SQL_TIME = SqlTimeTypeInfo.TIME;
- public static final SqlTimeTypeInfo<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
+ /**
+ * Returns type information for both a primitive <code>char</code> and {@link java.lang.Character}.
+ * Does not support a null value.
+ */
+ public static final TypeInformation<Character> CHAR = BasicTypeInfo.CHAR_TYPE_INFO;
/**
- * Generates a RowTypeInfo with fields of the given types.
- * The fields have the default names (f0, f1, f2 ..).
+ * Returns type information for {@link java.math.BigDecimal}. Supports a null value.
+ */
+ public static final TypeInformation<BigDecimal> BIG_DEC = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+
+ /**
+ * Returns type information for {@link java.math.BigInteger}. Supports a null value.
+ */
+ public static final TypeInformation<BigInteger> BIG_INT = BasicTypeInfo.BIG_INT_TYPE_INFO;
+
+ /**
+ * Returns type information for {@link java.sql.Date}. Supports a null value.
+ */
+ public static final TypeInformation<Date> SQL_DATE = SqlTimeTypeInfo.DATE;
+
+ /**
+ * Returns type information for {@link java.sql.Time}. Supports a null value.
+ */
+ public static final TypeInformation<Time> SQL_TIME = SqlTimeTypeInfo.TIME;
+
+ /**
+ * Returns type information for {@link java.sql.Timestamp}. Supports a null value.
+ */
+ public static final TypeInformation<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP;
+
+ /**
+ * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types.
+ * A row itself must not be null.
+ *
+ * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null regardless of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
+ * must strictly adhere to the schema defined by the type info.
*
- * <p>This method is a shortcut to {@code new RowTypeInfo(types)}.
+ * <p>This method generates type information with fields of the given types; the fields have
+ * the default names (f0, f1, f2 ..).
*
* @param types The types of the row fields, e.g., Types.STRING, Types.INT
*/
- public static RowTypeInfo ROW(TypeInformation<?>... types) {
+ public static TypeInformation<Row> ROW(TypeInformation<?>... types) {
return new RowTypeInfo(types);
}
/**
- * Generates a RowTypeInfo with fields of the given types and with given names.
- *
+ * Returns type information for {@link org.apache.flink.types.Row} with fields of the given types and
+ * with given names. A row must not be null.
+ *
+ * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null independent of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
+ * must strictly adhere to the schema defined by the type info.
+ *
* <p>Example use: {@code ROW_NAMED(new String[]{"name", "number"}, Types.STRING, Types.INT)}.
- *
- * <p>This method is identical to {@code new RowTypeInfo(types, names)}.
*
* @param fieldNames array of field names
* @param types array of field types
*/
- public static RowTypeInfo ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) {
+ public static TypeInformation<Row> ROW_NAMED(String[] fieldNames, TypeInformation<?>... types) {
return new RowTypeInfo(types, fieldNames);
}
+
+ /**
+ * Returns type information for subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple}
+ * (namely {@link org.apache.flink.api.java.tuple.Tuple0} till {@link org.apache.flink.api.java.tuple.Tuple25})
+ * with fields of the given types. A tuple must not be null.
+ *
+ * <p>A tuple is a fixed-length composite type for storing multiple values in a
+ * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite
+ * type; a tuple does not support null-valued fields unless the type of the field supports nullability.
+ *
+ * @param types The types of the tuple fields, e.g., Types.STRING, Types.INT
+ */
+ public static <T extends Tuple> TypeInformation<T> TUPLE(TypeInformation<?>... types) {
+ return new TupleTypeInfo<>(types);
+ }
+
+ /**
+ * Returns type information for typed subclasses of Flink's {@link org.apache.flink.api.java.tuple.Tuple}.
+ * Typed subclassed are classes that extend {@link org.apache.flink.api.java.tuple.Tuple0} till
+ * {@link org.apache.flink.api.java.tuple.Tuple25} to provide types for all fields and might add
+ * additional getters and setters for better readability. Additional member fields must not be added.
+ * A tuple must not be null.
+ *
+ * <p>A tuple is a fixed-length composite type for storing multiple values in a
+ * deterministic field order. Fields of a tuple are typed. Tuples are the most efficient composite
+ * type; a tuple does not support null-valued fields unless the type of the field supports nullability.
+ *
+ * <p>The generic types for all fields of the tuple can be defined in a hierarchy of subclasses.
+ *
+ * <p>If Flink's type analyzer is unable to extract a tuple type information with
+ * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException}
+ * is thrown.
+ *
+ * <p>Example use:
+ * <pre>
+ * {@code
+ * class MyTuple extends Tuple2<Integer, String> {
+ *
+ * public int getId() { return f0; }
+ *
+ * public String getName() { return f1; }
+ * }
+ * }
+ *
+ * Types.TUPLE(MyTuple.class)
+ * </pre>
+ *
+ * @param tupleSubclass A subclass of {@link org.apache.flink.api.java.tuple.Tuple0} till
+ * {@link org.apache.flink.api.java.tuple.Tuple25} that defines all field types and
+ * does not add any additional fields
+ */
+ public static <T extends Tuple> TypeInformation<T> TUPLE(Class<T> tupleSubclass) {
+ final TypeInformation<T> ti = TypeExtractor.createTypeInfo(tupleSubclass);
+ if (ti instanceof TupleTypeInfo) {
+ return ti;
+ }
+ throw new InvalidTypesException("Tuple type expected but was: " + ti);
+ }
+
+ /**
+ * Returns type information for a POJO (Plain Old Java Object).
+ *
+ * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+ * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public
+ * (and non-final) or have a public getter and a setter method that follows the Java beans naming
+ * conventions for getters and setters.
+ *
+ * <p>A POJO is a fixed-length and null-aware composite type. Every field can be null independent
+ * of the field's type.
+ *
+ * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+ *
+ * <p>If Flink's type analyzer is unable to extract a valid POJO type information with
+ * type information for all fields, an {@link org.apache.flink.api.common.functions.InvalidTypesException}
+ * is thrown. Alternatively, you can use {@link Types#POJO(Class, Map)} to specify all fields manually.
+ *
+ * @param pojoClass POJO class to be analyzed by Flink
+ */
+ public static <T> TypeInformation<T> POJO(Class<T> pojoClass) {
+ final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoClass);
+ if (ti instanceof PojoTypeInfo) {
+ return ti;
+ }
+ throw new InvalidTypesException("POJO type expected but was: " + ti);
+ }
+
+ /**
+ * Returns type information for a POJO (Plain Old Java Object) and allows to specify all fields manually.
+ *
+ * <p>A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+ * constructor. All non-static, non-transient fields in the class (and all superclasses) are either public
+ * (and non-final) or have a public getter and a setter method that follows the Java beans naming
+ * conventions for getters and setters.
+ *
+ * <p>A POJO is a fixed-length, null-aware composite type with non-deterministic field order. Every field
+ * can be null independent of the field's type.
+ *
+ * <p>The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+ *
+ * <p>If Flink's type analyzer is unable to extract a POJO field, an
+ * {@link org.apache.flink.api.common.functions.InvalidTypesException} is thrown.
+ *
+ * <p><strong>Note:</strong> In most cases the type information of fields can be determined automatically,
+ * we recommend to use {@link Types#POJO(Class)}.
+ *
+ * @param pojoClass POJO class
+ * @param fields map of fields that map a name to type information. The map key is the name of
+ * the field and the value is its type.
+ */
+ public static <T> TypeInformation<T> POJO(Class<T> pojoClass, Map<String, TypeInformation<?>> fields) {
+ final List<PojoField> pojoFields = new ArrayList<>(fields.size());
+ for (Map.Entry<String, TypeInformation<?>> field : fields.entrySet()) {
+ final Field f = TypeExtractor.getDeclaredField(pojoClass, field.getKey());
+ if (f == null) {
+ throw new InvalidTypesException("Field '" + field.getKey() + "'could not be accessed.");
+ }
+ pojoFields.add(new PojoField(f, field.getValue()));
+ }
+
+ return new PojoTypeInfo<>(pojoClass, pojoFields);
+ }
+
+ /**
+ * Returns generic type information for any Java object. The serialization logic will
+ * use the general purpose serializer Kryo.
+ *
+ * <p>Generic types are black-boxes for Flink, but allow any object and null values in fields.
+ *
+ * <p>By default, serialization of this type is not very efficient. Please read the documentation
+ * about how to improve efficiency (namely by pre-registering classes).
+ *
+ * @param genericClass any Java class
+ */
+ public static <T> TypeInformation<T> GENERIC(Class<T> genericClass) {
+ return new GenericTypeInfo<>(genericClass);
+ }
+
+ /**
+ * Returns type information for Java arrays of primitive type (such as <code>byte[]</code>). The array
+ * must not be null.
+ *
+ * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE)
+ */
+ public static TypeInformation<?> PRIMITIVE_ARRAY(TypeInformation<?> elementType) {
+ if (elementType == BOOLEAN) {
+ return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == BYTE) {
+ return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == SHORT) {
+ return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == INT) {
+ return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == LONG) {
+ return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == FLOAT) {
+ return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == DOUBLE) {
+ return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO;
+ } else if (elementType == CHAR) {
+ return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO;
+ }
+ throw new IllegalArgumentException("Invalid element type for a primitive array.");
+ }
+
+ /**
+ * Returns type information for Java arrays of object types (such as <code>String[]</code>,
+ * <code>Integer[]</code>). The array itself must not be null. Null values for elements are supported.
+ *
+ * @param elementType element type of the array
+ */
+ @SuppressWarnings("unchecked")
+ public static <E> TypeInformation<E[]> OBJECT_ARRAY(TypeInformation<E> elementType) {
+ if (elementType == Types.STRING) {
+ return (TypeInformation) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
+ }
+ return ObjectArrayTypeInfo.getInfoFor(elementType);
+ }
+
+ /**
+ * Returns type information for Flink value types (classes that implement
+ * {@link org.apache.flink.types.Value}). Built-in value types do not support null values (except
+ * for {@link org.apache.flink.types.StringValue}).
+ *
+ * <p>Value types describe their serialization and deserialization manually. Instead of going
+ * through a general purpose serialization framework. A value type is reasonable when general purpose
+ * serialization would be highly inefficient. The wrapped value can be altered, allowing programmers to
+ * reuse objects and take pressure off the garbage collector.
+ *
+ * <p>Flink provides built-in value types for all Java primitive types (such as
+ * {@link org.apache.flink.types.BooleanValue}, {@link org.apache.flink.types.IntValue}) as well
+ * as {@link org.apache.flink.types.StringValue}, {@link org.apache.flink.types.NullValue},
+ * {@link org.apache.flink.types.ListValue}, and {@link org.apache.flink.types.MapValue}.
+ *
+ * @param valueType class that implements {@link org.apache.flink.types.Value}
+ */
+ public static <V extends Value> TypeInformation<V> VALUE(Class<V> valueType) {
+ return new ValueTypeInfo<>(valueType);
+ }
+
+ /**
+ * Returns type information for a Java {@link java.util.Map}. A map must not be null. Null values
+ * in keys are not supported. An entry's value can be null.
+ *
+ * <p>By default, maps are untyped and treated as a generic type in Flink; therefore, it is useful
+ * to pass type information whenever a map is used.
+ *
+ * <p><strong>Note:</strong> Flink does not preserve the concrete {@link Map} type. It converts a map into {@link HashMap} when
+ * copying or deserializing.
+ *
+ * @param keyType type information for the map's keys
+ * @param valueType type information for the map's values
+ */
+ public static <K, V> TypeInformation<Map<K, V>> MAP(TypeInformation<K> keyType, TypeInformation<V> valueType) {
+ return new MapTypeInfo<>(keyType, valueType);
+ }
+
+ /**
+ * Returns type information for a Java {@link java.util.List}. A list must not be null. Null values
+ * in elements are not supported.
+ *
+ * <p>By default, lists are untyped and treated as a generic type in Flink; therefore, it is useful
+ * to pass type information whenever a list is used.
+ *
+ * <p><strong>Note:</strong> Flink does not preserve the concrete {@link List} type. It converts a list into {@link ArrayList} when
+ * copying or deserializing.
+ *
+ * @param elementType type information for the list's elements
+ */
+ public static <E> TypeInformation<List<E>> LIST(TypeInformation<E> elementType) {
+ return new ListTypeInfo<>(elementType);
+ }
+
+ /**
+ * Returns type information for Java enumerations. Null values are not supported.
+ *
+ * @param enumType enumeration class extending {@link java.lang.Enum}
+ */
+ public static <E extends Enum<E>> TypeInformation<E> ENUM(Class<E> enumType) {
+ return new EnumTypeInfo<>(enumType);
+ }
+
+ /**
+ * Returns type information for Flink's {@link org.apache.flink.types.Either} type. Null values
+ * are not supported.
+ *
+ * <p>Either type can be used for a value of two possible types.
+ *
+ * <p>Example use: <code>Types.EITHER(Types.VOID, Types.INT)</code>
+ *
+ * @param leftType type information of left side / {@link org.apache.flink.types.Either.Left}
+ * @param rightType type information of right side / {@link org.apache.flink.types.Either.Right}
+ */
+ public static <L, R> TypeInformation<Either<L, R>> EITHER(TypeInformation<L> leftType, TypeInformation<R> rightType) {
+ return new EitherTypeInfo<>(leftType, rightType);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 33820e5..12a9ae0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-
+
package org.apache.flink.api.java.typeutils;
import java.lang.reflect.Field;
@@ -31,6 +31,10 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Value;
+/**
+ * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
+ */
+@Deprecated
@Public
public class TypeInfoParser {
private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
index 100c22b..4be137d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -17,6 +17,8 @@
*/
package org.apache.flink.table.api
+import _root_.java.{lang, math, sql, util}
+
import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -25,35 +27,95 @@ import org.apache.flink.types.Row
import _root_.scala.annotation.varargs
/**
- * This class enumerates all supported types of the Table API.
+ * This class enumerates all supported types of the Table API & SQL.
*/
object Types {
- val STRING = JTypes.STRING
- val BOOLEAN = JTypes.BOOLEAN
+ /**
+ * Returns type information for a Table API string or SQL VARCHAR type.
+ */
+ val STRING: TypeInformation[String] = JTypes.STRING
+
+ /**
+ * Returns type information for a Table API boolean or SQL BOOLEAN type.
+ */
+ val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+ /**
+ * Returns type information for a Table API byte or SQL TINYINT type.
+ */
+ val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+ /**
+ * Returns type information for a Table API short or SQL SMALLINT type.
+ */
+ val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+ /**
+ * Returns type information for a Table API integer or SQL INT/INTEGER type.
+ */
+ val INT: TypeInformation[lang.Integer] = JTypes.INT
- val BYTE = JTypes.BYTE
- val SHORT = JTypes.SHORT
- val INT = JTypes.INT
- val LONG = JTypes.LONG
- val FLOAT = JTypes.FLOAT
- val DOUBLE = JTypes.DOUBLE
- val DECIMAL = JTypes.DECIMAL
+ /**
+ * Returns type information for a Table API long or SQL BIGINT type.
+ */
+ val LONG: TypeInformation[lang.Long] = JTypes.LONG
- val SQL_DATE = JTypes.SQL_DATE
- val SQL_TIME = JTypes.SQL_TIME
- val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
- val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
- val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+ /**
+ * Returns type information for a Table API float or SQL FLOAT/REAL type.
+ */
+ val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+ /**
+ * Returns type information for a Table API integer or SQL DOUBLE type.
+ */
+ val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
/**
- * Generates row type information.
+ * Returns type information for a Table API big decimal or SQL DECIMAL type.
+ */
+ val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+ /**
+ * Returns type information for a Table API SQL date or SQL DATE type.
+ */
+ val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+ /**
+ * Returns type information for a Table API SQL time or SQL TIME type.
+ */
+ val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+ /**
+ * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type.
+ */
+ val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+ /**
+ * Returns type information for a Table API interval of months.
+ */
+ val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+ /**
+ * Returns type information for a Table API interval milliseconds.
+ */
+ val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
+ *
+ * A row is a variable-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null regardless of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
*
- * A row type consists of zero or more fields with a field name and a corresponding type.
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
+ * row instances must strictly adhere to the schema defined by the type info.
*
- * The fields have the default names (f0, f1, f2 ..).
+ * This method generates type information with fields of the given types; the fields have
+ * the default names (f0, f1, f2 ..).
*
- * @param types types of row fields; e.g. Types.STRING, Types.INT
+ * @param types The types of the row fields, e.g., Types.STRING, Types.INT
*/
@varargs
def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
@@ -61,19 +123,29 @@ object Types {
}
/**
- * Generates row type information.
+ * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
+ * and with given names.
+ *
+ * A row is a variable-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null independent of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
+ * row instances must strictly adhere to the schema defined by the type info.
*
- * A row type consists of zero or more fields with a field name and a corresponding type.
+ * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
*
- * @param names names of row fields, e.g. "userid", "name"
- * @param types types of row fields; e.g. Types.STRING, Types.INT
+ * @param fieldNames array of field names
+ * @param types array of field types
*/
- def ROW(names: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
- JTypes.ROW_NAMED(names, types: _*)
+ def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
+ JTypes.ROW_NAMED(fieldNames, types: _*)
}
/**
- * Generates type information for an array consisting of Java primitive elements.
+ * Generates type information for an array consisting of Java primitive elements. The elements
+ * do not support null values.
*
* @param elementType type of the array elements; e.g. Types.INT
*/
@@ -93,30 +165,35 @@ object Types {
}
/**
- * Generates type information for an array consisting of Java object elements.
+ * Generates type information for an array consisting of Java object elements. Null values for
+ * elements are supported.
*
* @param elementType type of the array elements; e.g. Types.STRING or Types.INT
*/
- def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+ def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
ObjectArrayTypeInfo.getInfoFor(elementType)
}
/**
- * Generates type information for a Java HashMap.
+ * Generates type information for a Java HashMap. Null values in keys are not supported. An
+ * entry's value can be null.
*
* @param keyType type of the keys of the map e.g. Types.STRING
* @param valueType type of the values of the map e.g. Types.STRING
*/
- def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
+ def MAP[K, V](
+ keyType: TypeInformation[K],
+ valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
new MapTypeInfo(keyType, valueType)
}
/**
- * Generates type information for a Multiset.
+ * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an
+ * arbitrary key to an integer value. Null values in keys are not supported.
*
* @param elementType type of the elements of the multiset e.g. Types.STRING
*/
- def MULTISET(elementType: TypeInformation[_]): TypeInformation[_] = {
+ def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = {
new MultisetTypeInfo(elementType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
index 0d12400..110ea6a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.api.stream.table.stringexpr
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
-import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils._
import org.apache.flink.types.Row
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/e30066db/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
new file mode 100644
index 0000000..4ce9b0f
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes}
+import org.apache.flink.types.Row
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.{Either, Try}
+
+/**
+ * This class gives access to the type information of the most common Scala types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds
+ * types for Scala specific classes (such as [[Unit]] or case classes).
+ *
+ * In many cases, Flink tries to analyze generic signatures of functions to determine return
+ * types automatically. This class is intended for cases where type information has to be
+ * supplied manually or cases where automatic type inference results in an inefficient type.
+ *
+ * Scala macros allow to determine type information of classes and type parameters. You can
+ * use [[Types.of]] to let type information be determined automatically.
+ */
+@PublicEvolving
+object Types {
+
+ /**
+ * Generates type information based on the given class and/or its type parameters.
+ *
+ * The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does
+ * not require to implement anonymous classes.
+ *
+ * If the class could not be analyzed by the Scala type analyzer, the Java analyzer
+ * will be used.
+ *
+ * Example use:
+ *
+ * `Types.of[(Int, String, String)]` for Scala tuples
+ * `Types.of[Unit]` for Scala specific types
+ *
+ * @tparam T class to be analyzed
+ */
+ def of[T: TypeInformation]: TypeInformation[T] = {
+ val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+ typeInfo
+ }
+
+ /**
+ * Returns type information for Scala [[Nothing]]. Does not support a null value.
+ */
+ val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo
+
+ /**
+ * Returns type information for Scala [[Unit]]. Does not support a null value.
+ */
+ val UNIT: TypeInformation[Unit] = new UnitTypeInfo
+
+ /**
+ * Returns type information for [[String]] and [[java.lang.String]]. Supports a null value.
+ */
+ val STRING: TypeInformation[String] = JTypes.STRING
+
+ /**
+ * Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not
+ * support a null value.
+ */
+ val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE
+
+ /**
+ * Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not
+ * support a null value.
+ */
+ val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN
+
+ /**
+ * Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not
+ * support a null value.
+ */
+ val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT
+
+ /**
+ * Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not
+ * support a null value.
+ */
+ val INT: TypeInformation[java.lang.Integer] = JTypes.INT
+
+ /**
+ * Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not
+ * support a null value.
+ */
+ val LONG: TypeInformation[java.lang.Long] = JTypes.LONG
+
+ /**
+ * Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not
+ * support a null value.
+ */
+ val FLOAT: TypeInformation[java.lang.Float] = JTypes.FLOAT
+
+ /**
+ * Returns type information for primitive [[Double]] and [[java.lang.Double]]. Does not
+ * support a null value.
+ */
+ val DOUBLE: TypeInformation[java.lang.Double] = JTypes.DOUBLE
+
+ /**
+ * Returns type information for primitive [[Char]] and [[java.lang.Character]]. Does not
+ * support a null value.
+ */
+ val CHAR: TypeInformation[java.lang.Character] = JTypes.CHAR
+
+ /**
+ * Returns type information for Java [[java.math.BigDecimal]]. Supports a null value.
+ *
+ * Note that Scala [[BigDecimal]] is not supported yet.
+ */
+ val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal] = JTypes.BIG_DEC
+
+ /**
+ * Returns type information for Java [[java.math.BigInteger]]. Supports a null value.
+ *
+ * Note that Scala [[BigInt]] is not supported yet.
+ */
+ val JAVA_BIG_INT: TypeInformation[java.math.BigInteger] = JTypes.BIG_INT
+
+ /**
+ * Returns type information for [[java.sql.Date]]. Supports a null value.
+ */
+ val SQL_DATE: TypeInformation[java.sql.Date] = JTypes.SQL_DATE
+
+ /**
+ * Returns type information for [[java.sql.Time]]. Supports a null value.
+ */
+ val SQL_TIME: TypeInformation[java.sql.Time] = JTypes.SQL_TIME
+
+ /**
+ * Returns type information for [[java.sql.Timestamp]]. Supports a null value.
+ */
+ val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
+ * A row itself must not be null.
+ *
+ * A row is a fixed-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null regardless of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row
+ * instances must strictly adhere to the schema defined by the type info.
+ *
+ * This method generates type information with fields of the given types; the fields have
+ * the default names (f0, f1, f2 ..).
+ *
+ * @param types The types of the row fields, e.g., Types.STRING, Types.INT
+ */
+ def ROW(types: TypeInformation[_]*): TypeInformation[Row] = JTypes.ROW(types: _*)
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
+ * and with given names. A row must not be null.
+ *
+ * A row is a variable-length, null-aware composite type for storing multiple values in a
+ * deterministic field order. Every field can be null independent of the field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row
+ * instances must strictly adhere to the schema defined by the type info.
+ *
+ * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
+ *
+ * @param fieldNames array of field names
+ * @param types array of field types
+ */
+ def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] =
+ JTypes.ROW_NAMED(fieldNames, types: _*)
+
+ /**
+ * Returns type information for a POJO (Plain Old Java Object).
+ *
+ * A POJO class is public and standalone (no non-static inner class). It has a public
+ * no-argument constructor. All non-static, non-transient fields in the class (and all
+ * superclasses) are either public (and non-final) or have a public getter and a setter
+ * method that follows the Java beans naming conventions for getters and setters.
+ *
+ * A POJO is a fixed-length, null-aware composite type with non-deterministic field order.
+ * Every field can be null independent of the field's type.
+ *
+ * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+ *
+ * If Flink's type analyzer is unable to extract a valid POJO type information with
+ * type information for all fields, an
+ * [[org.apache.flink.api.common.functions.InvalidTypesException}]] is thrown.
+ * Alternatively, you can use [[Types.POJO(Class, Map)]] to specify all fields manually.
+ *
+ * @param pojoClass POJO class to be analyzed by Flink
+ */
+ def POJO[T](pojoClass: Class[T]): TypeInformation[T] = {
+ JTypes.POJO(pojoClass)
+ }
+
+ /**
+ * Returns type information for a POJO (Plain Old Java Object) and allows to specify all
+ * fields manually.
+ *
+ * A POJO class is public and standalone (no non-static inner class). It has a public no-argument
+ * constructor. All non-static, non-transient fields in the class (and all superclasses) are
+ * either public (and non-final) or have a public getter and a setter method that follows the
+ * Java beans naming conventions for getters and setters.
+ *
+ * A POJO is a fixed-length, null-aware composite type with non-deterministic field order.
+ * Every field can be null independent of the field's type.
+ *
+ * The generic types for all fields of the POJO can be defined in a hierarchy of subclasses.
+ *
+ * If Flink's type analyzer is unable to extract a POJO field, an
+ * [[org.apache.flink.api.common.functions.InvalidTypesException]] is thrown.
+ *
+ * '''Note:''' In most cases the type information of fields can be determined automatically,
+ * we recommend to use [[Types.POJO(Class)]].
+ *
+ * @param pojoClass POJO class
+ * @param fields map of fields that map a name to type information. The map key is the name of
+ * the field and the value is its type.
+ */
+ def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T] = {
+ JTypes.POJO(pojoClass, fields.asJava)
+ }
+
+ /**
+ * Returns generic type information for any Scala/Java object. The serialization logic will
+ * use the general purpose serializer Kryo.
+ *
+ * Generic types are black-boxes for Flink, but allow any object and null values in fields.
+ *
+ * By default, serialization of this type is not very efficient. Please read the documentation
+ * about how to improve efficiency (namely by pre-registering classes).
+ *
+ * @param genericClass any Scala/Java class
+ */
+ def GENERIC[T](genericClass: Class[T]): TypeInformation[T] = JTypes.GENERIC(genericClass)
+
+ /**
+ * Returns type information for a Scala case class and Scala tuples.
+ *
+ * A Scala case class is a fixed-length composite type for storing multiple values in a
+ * deterministic field order. Fields of a case class are typed. Case classes and tuples are
+ * the most efficient composite type; therefore, they do not not support null-valued fields
+ * unless the type of the field supports nullability.
+ *
+ * Example use: `Types.CASE_CLASS[MyCaseClass]`
+ *
+ * @tparam T case class to be analyzed
+ */
+ def CASE_CLASS[T: TypeInformation]: TypeInformation[T] = {
+ val t = Types.of[T]
+ if (t.isInstanceOf[CaseClassTypeInfo[_]]) {
+ t
+ } else {
+ throw new InvalidTypesException("Case class type expected but was: " + t)
+ }
+ }
+
+ /**
+ * Returns type information for a Scala tuple.
+ *
+ * A Scala tuple is a fixed-length composite type for storing multiple values in a
+ * deterministic field order. Fields of a tuple are typed. Tuples are
+ * the most efficient composite type; therefore, they do not not support null-valued fields
+ * unless the type of the field supports nullability.
+ *
+ * Example use: `Types.TUPLE[(String, Int)]`
+ *
+ * @tparam T tuple to be analyzed
+ */
+ def TUPLE[T: TypeInformation]: TypeInformation[T] = {
+ CASE_CLASS[T]
+ }
+
+ /**
+ * Returns type information for Scala/Java arrays of primitive type (such as `Array[Byte]`).
+ * The array and its elements do not support null values.
+ *
+ * @param elementType element type of the array (e.g. Types.BOOLEAN, Types.INT, Types.DOUBLE)
+ */
+ def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+ JTypes.PRIMITIVE_ARRAY(elementType)
+ }
+
+ /**
+ * Returns type information for Scala/Java arrays of object types (such as `Array[String]`,
+ * `Array[java.lang.Integer]`). The array itself must not be null. Null values for elements
+ * are supported.
+ *
+ * @param elementType element type of the array
+ */
+ def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
+ // necessary for the Scala compiler
+ JTypes.OBJECT_ARRAY(elementType).asInstanceOf[TypeInformation[Array[E]]]
+ }
+
+ /**
+ * Returns type information for Scala [[Either]] type. Null values are not supported.
+ *
+ * The either type can be used for a value of two possible types.
+ *
+ * Example use: `Types.EITHER(Types.INT, Types.NOTHING]`
+ *
+ * @param leftType type information of left side / [[Left]]
+ * @param rightType type information of right side / [[Right]]
+ */
+ def EITHER[A, B](
+ leftType: TypeInformation[A],
+ rightType: TypeInformation[B]): TypeInformation[Either[A, B]] = {
+ new EitherTypeInfo(classOf[Either[A, B]], leftType, rightType)
+ }
+
+ /**
+ * Returns type information for Scala [[Option]] type. Null values are not supported.
+ *
+ * The option type can be used for distinguishing between a value or no value.
+ *
+ * Example use: `Types.OPTION(Types.INT)`
+ *
+ * @param valueType type information of the option's value
+ */
+ def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T] = {
+ new OptionTypeInfo(valueType)
+ }
+
+ /**
+ * Returns type information for Scala [[Try]] type. Null values are not supported.
+ *
+ * The try type can be used for distinguishing between a value or throwable.
+ *
+ * Example use: `Types.TRY(Types.INT)`
+ *
+ * @param valueType type information of the try's value
+ */
+ def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T] = {
+ new TryTypeInfo(valueType)
+ }
+
+ /**
+ * Returns type information for Scala enumerations. Null values are not supported.
+ *
+ * @param enum enumeration object
+ * @param valueClass value class
+ */
+ def ENUMERATION[E <: Enumeration](
+ enum: E,
+ valueClass: Class[E#Value]): TypeInformation[E#Value] = {
+ new EnumValueTypeInfo(enum, valueClass)
+ }
+
+ /**
+ * Returns type information for Scala collections that implement [[Traversable]]. Null values
+ * are not supported.
+ */
+ def TRAVERSABLE[T: TypeInformation]: TypeInformation[T] = {
+ val t = Types.of[T]
+ if (t.isInstanceOf[TraversableTypeInfo[_, _]]) {
+ t
+ } else {
+ throw new InvalidTypesException("Traversable type expected but was: " + t)
+ }
+ }
+}