You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/16 07:40:48 UTC
[flink] 02/02: [FLINK-18248][docs] Update data type documentation
for 1.11
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b9c33dd03685c7c1a6c440808fb65568a5ae50eb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Jun 11 14:06:21 2020 +0200
[FLINK-18248][docs] Update data type documentation for 1.11
This closes #12606.
---
docs/dev/table/types.md | 354 +++++++++++++++++++++++++++++++++++++++---------
1 file changed, 287 insertions(+), 67 deletions(-)
diff --git a/docs/dev/table/types.md b/docs/dev/table/types.md
index 4dc42af..16991d6 100644
--- a/docs/dev/table/types.md
+++ b/docs/dev/table/types.md
@@ -36,7 +36,7 @@ Starting with Flink 1.9, the Table & SQL API will receive a new type system that
solution for API stability and standard compliance.
Reworking the type system is a major effort that touches almost all user-facing interfaces. Therefore, its
-introduction spans multiple releases, and the community aims to finish this effort by Flink 1.10.
+introduction spans multiple releases, and the community aims to finish this effort by Flink 1.12.
Due to the simultaneous addition of a new planner for table programs (see [FLINK-11439](https://issues.apache.org/jira/browse/FLINK-11439)),
not every combination of planner and data type is supported. Furthermore, planners might not support every
@@ -211,14 +211,15 @@ The following data types are supported:
| `DOUBLE` | |
| `DATE` | |
| `TIME` | Supports only a precision of `0`. |
-| `TIMESTAMP` | Supports only a precision of `3`. |
-| `TIMESTAMP WITH LOCAL TIME ZONE` | Supports only a precision of `3`. |
+| `TIMESTAMP` | |
+| `TIMESTAMP WITH LOCAL TIME ZONE` | |
| `INTERVAL` | Supports only interval of `MONTH` and `SECOND(3)`. |
| `ARRAY` | |
| `MULTISET` | |
| `MAP` | |
| `ROW` | |
| `RAW` | |
+| stuctured types | Only exposed in user-defined functions yet. |
Limitations
-----------
@@ -227,10 +228,7 @@ Limitations
have not been updated to the new type system yet. Use the string representations declared in
the [old planner section](#old-planner).
-**Connector Descriptors and SQL Client**: Descriptor string representations have not been updated to the new
-type system yet. Use the string representation declared in the [Connect to External Systems section](./connect.html#type-strings)
-
-**User-defined Functions**: User-defined functions cannot declare a data type yet.
+**User-defined Functions**: User-defined aggregate functions cannot declare a data type yet. Scalar and table functions fully support data types.
List of Data Types
------------------
@@ -267,10 +265,11 @@ and `2,147,483,647` (both inclusive). If no length is specified, `n` is equal to
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:-------------------|:-----:|:------:|:------------------------|
-|`java.lang.String` | X | X | *Default* |
-|`byte[]` | X | X | Assumes UTF-8 encoding. |
+| Java Type | Input | Output | Remarks |
+|:----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.lang.String` | X | X | *Default* |
+|`byte[]` | X | X | Assumes UTF-8 encoding. |
+|`org.apache.flink.table.data.StringData` | X | X | Internal data structure. |
#### `VARCHAR` / `STRING`
@@ -306,10 +305,11 @@ between `1` and `2,147,483,647` (both inclusive). If no length is specified, `n`
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:-------------------|:-----:|:------:|:------------------------|
-|`java.lang.String` | X | X | *Default* |
-|`byte[]` | X | X | Assumes UTF-8 encoding. |
+| Java Type | Input | Output | Remarks |
+|:----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.lang.String` | X | X | *Default* |
+|`byte[]` | X | X | Assumes UTF-8 encoding. |
+|`org.apache.flink.table.data.StringData` | X | X | Internal data structure. |
### Binary Strings
@@ -428,9 +428,10 @@ The default value for `s` is `0`.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:----------------------|:-----:|:------:|:------------------------|
-|`java.math.BigDecimal` | X | X | *Default* |
+| Java Type | Input | Output | Remarks |
+|:-----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.math.BigDecimal` | X | X | *Default* |
+|`org.apache.flink.table.data.DecimalData` | X | X | Internal data structure. |
#### `TINYINT`
@@ -459,7 +460,7 @@ DataTypes.TINYINT()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Byte` | X | X | *Default* |
-|`byte` | X | (X) | Output only if type is not nullable. |
+|`byte` | X | (X) | Output only if type is not nullable. |
#### `SMALLINT`
@@ -488,7 +489,7 @@ DataTypes.SMALLINT()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Short` | X | X | *Default* |
-|`short` | X | (X) | Output only if type is not nullable. |
+|`short` | X | (X) | Output only if type is not nullable. |
#### `INT`
@@ -521,7 +522,7 @@ DataTypes.INT()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Integer` | X | X | *Default* |
-|`int` | X | (X) | Output only if type is not nullable. |
+|`int` | X | (X) | Output only if type is not nullable. |
#### `BIGINT`
@@ -551,7 +552,7 @@ DataTypes.BIGINT()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Long` | X | X | *Default* |
-|`long` | X | (X) | Output only if type is not nullable. |
+|`long` | X | (X) | Output only if type is not nullable. |
### Approximate Numerics
@@ -584,7 +585,7 @@ DataTypes.FLOAT()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Float` | X | X | *Default* |
-|`float` | X | (X) | Output only if type is not nullable. |
+|`float` | X | (X) | Output only if type is not nullable. |
#### `DOUBLE`
@@ -617,7 +618,7 @@ DataTypes.DOUBLE()
| Java Type | Input | Output | Remarks |
|:-------------------|:-----:|:------:|:---------------------------------------------|
|`java.lang.Double` | X | X | *Default* |
-|`double` | X | (X) | Output only if type is not nullable. |
+|`double` | X | (X) | Output only if type is not nullable. |
### Date and Time
@@ -741,10 +742,11 @@ is specified, `p` is equal to `6`.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:-------------------------|:-----:|:------:|:----------------------------------------------------|
-|`java.time.LocalDateTime` | X | X | *Default* |
-|`java.sql.Timestamp` | X | X | |
+| Java Type | Input | Output | Remarks |
+|:-------------------------------------------|:-----:|:------:|:-------------------------|
+|`java.time.LocalDateTime` | X | X | *Default* |
+|`java.sql.Timestamp` | X | X | |
+|`org.apache.flink.table.data.TimestampData` | X | X | Internal data structure. |
#### `TIMESTAMP WITH TIME ZONE`
@@ -837,6 +839,7 @@ of digits of fractional seconds (*precision*). `p` must have a value between `0`
|`int` | X | (X) | Describes the number of seconds since epoch.<br>Output only if type is not nullable. |
|`java.lang.Long` | X | X | Describes the number of milliseconds since epoch. |
|`long` | X | (X) | Describes the number of milliseconds since epoch.<br>Output only if type is not nullable. |
+|`org.apache.flink.table.data.TimestampData` | X | X | Internal data structure. |
#### `INTERVAL YEAR TO MONTH`
@@ -1001,9 +1004,10 @@ equivalent to `ARRAY<INT>`.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:----------|:-----:|:------:|:----------------------------------|
-|*t*`[]` | (X) | (X) | Depends on the subtype. *Default* |
+| Java Type | Input | Output | Remarks |
+|:---------------------------------------|:-----:|:------:|:----------------------------------|
+|*t*`[]` | (X) | (X) | Depends on the subtype. *Default* |
+|`org.apache.flink.table.data.ArrayData` | X | X | Internal data structure. |
#### `MAP`
@@ -1037,10 +1041,11 @@ and `vt` is the data type of the value elements.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:--------------------------------------|:-----:|:------:|:----------|
-| `java.util.Map<kt, vt>` | X | X | *Default* |
-| *subclass* of `java.util.Map<kt, vt>` | X | | |
+| Java Type | Input | Output | Remarks |
+|:--------------------------------------|:-----:|:------:|:-------------------------|
+| `java.util.Map<kt, vt>` | X | X | *Default* |
+| *subclass* of `java.util.Map<kt, vt>` | X | | |
+|`org.apache.flink.table.data.MapData` | X | X | Internal data structure. |
#### `MULTISET`
@@ -1076,10 +1081,11 @@ equivalent to `MULTISET<INT>`.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:-------------------------------------|:-----:|:------:|:---------------------------------------------------------|
-|`java.util.Map<t, java.lang.Integer>` | X | X | Assigns each value to an integer multiplicity. *Default* |
-| *subclass* of `java.util.Map<kt, java.lang.Integer>` | X | | Assigns each value to an integer multiplicity. |
+| Java Type | Input | Output | Remarks |
+|:--------------------------------------|:-----:|:------:|:---------------------------------------------------------|
+|`java.util.Map<t, java.lang.Integer>` | X | X | Assigns each value to an integer multiplicity. *Default* |
+| *subclass* of `java.util.Map<t, java.lang.Integer>>` | X | | |
+|`org.apache.flink.table.data.MapData` | X | X | Internal data structure. |
#### `ROW`
@@ -1125,9 +1131,110 @@ equivalent to `ROW<myField INT, myOtherField BOOLEAN>`.
**Bridging to JVM Types**
-| Java Type | Input | Output | Remarks |
-|:----------------------------|:-----:|:------:|:------------------------|
-|`org.apache.flink.types.Row` | X | X | *Default* |
+| Java Type | Input | Output | Remarks |
+|:-------------------------------------|:-----:|:------:|:-------------------------|
+|`org.apache.flink.types.Row` | X | X | *Default* |
+|`org.apache.flink.table.data.RowData` | X | X | Internal data structure. |
+
+### User-Defined Data Types
+
+<span class="label label-danger">Attention</span> User-defined data types are not fully supported yet. They are
+currently (as of Flink 1.11) only exposed as unregistered structured types in parameters and return types of functions.
+
+A structured type is similar to an object in an object-oriented programming language. It contains
+zero, one or more attributes. Each attribute consists of a name and a type.
+
+There are two kinds of structured types:
+
+- Types that are stored in a catalog and are identified by a _catalog identifer_ (like `cat.db.MyType`). Those
+are equal to the SQL standard definition of structured types.
+
+- Anonymously defined, unregistered types (usually reflectively extracted) that are identified by
+an _implementation class_ (like `com.myorg.model.MyType`). Those are useful when programmatically
+defining a table program. They enable reusing existing JVM classes without manually defining the
+schema of a data type again.
+
+#### Registered Structured Types
+
+Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog
+or referenced in a `CREATE TABLE` DDL.
+
+#### Unregistered Structured Types
+
+Unregistered structured types can be created from regular POJOs (Plain Old Java Objects) using automatic reflective extraction.
+
+The implementation class of a structured type must meet the following requirements:
+- The class must be globally accessible which means it must be declared `public`, `static`, and not `abstract`.
+- The class must offer a default constructor with zero arguments or a full constructor that assigns all
+fields.
+- All fields of the class must be readable by either `public` declaration or a getter that follows common
+coding style such as `getField()`, `isField()`, `field()`.
+- All fields of the class must be writable by either `public` declaration, fully assigning constructor,
+or a setter that follows common coding style such as `setField(...)`, `field(...)`.
+- All fields must be mapped to a data type either implicitly via reflective extraction or explicitly
+using the `@DataTypeHint` [annotations](#data-type-annotations).
+- Fields that are declared `static` or `transient` are ignored.
+
+The reflective extraction supports arbitrary nesting of fields as long as a field type does not
+(transitively) refer to itself.
+
+The declared field class (e.g. `public int age;`) must be contained in the list of supported JVM
+bridging classes defined for every data type in this document (e.g. `java.lang.Integer` or `int` for `INT`).
+
+For some classes an annotation is required in order to map the class to a data type (e.g. `@DataTypeHint("DECIMAL(10, 2)")`
+to assign a fixed precision and scale for `java.math.BigDecimal`).
+
+**Declaration**
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+class User {
+
+ // extract fields automatically
+ public int age;
+ public String name;
+
+ // enrich the extraction with precision information
+ public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
+
+ // enrich the extraction with forcing using RAW types
+ public @DataTypeHint("RAW") Class<?> modelClass;
+}
+
+DataTypes.of(User.class);
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+case class User(
+
+ // extract fields automatically
+ age: Int,
+ name: String,
+
+ // enrich the extraction with precision information
+ @DataTypeHint("DECIMAL(10, 2)") totalBalance: java.math.BigDecimal,
+
+ // enrich the extraction with forcing using a RAW type
+ @DataTypeHint("RAW") modelClass: Class[_]
+)
+
+DataTypes.of(classOf[User])
+{% endhighlight %}
+</div>
+
+</div>
+
+**Bridging to JVM Types**
+
+| Java Type | Input | Output | Remarks |
+|:-------------------------------------|:-----:|:------:|:----------------------------------------|
+|*class* | X | X | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
+|`org.apache.flink.types.Row` | X | X | Represent the structured type as a row. |
+|`org.apache.flink.table.data.RowData` | X | X | Internal data structure. |
### Other Data Types
@@ -1160,6 +1267,48 @@ DataTypes.BOOLEAN()
|`java.lang.Boolean` | X | X | *Default* |
|`boolean` | X | (X) | Output only if type is not nullable. |
+#### `RAW`
+
+Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
+and is only deserialized at the edges.
+
+The raw type is an extension to the SQL standard.
+
+**Declaration**
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="SQL" markdown="1">
+{% highlight text %}
+RAW('class', 'snapshot')
+{% endhighlight %}
+</div>
+
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+DataTypes.RAW(class, serializer)
+
+DataTypes.RAW(class)
+{% endhighlight %}
+</div>
+
+</div>
+
+The type can be declared using `RAW('class', 'snapshot')` where `class` is the originating class and
+`snapshot` is the serialized `TypeSerializerSnapshot` in Base64 encoding. Usually, the type string is not
+declared directly but is generated while persisting the type.
+
+In the API, the `RAW` type can be declared either by directly supplying a `Class` + `TypeSerializer` or
+by passing `Class` and letting the framework extract `Class` + `TypeSerializer` from there.
+
+**Bridging to JVM Types**
+
+| Java Type | Input | Output | Remarks |
+|:------------------|:-----:|:------:|:-------------------------------------------|
+|*class* | X | X | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
+|`byte[]` | | X | |
+|`org.apache.flink.table.data.RawValueData` | X | X | Internal data structure. |
+
#### `NULL`
Data type for representing untyped `NULL` values.
@@ -1197,45 +1346,116 @@ DataTypes.NULL()
|`java.lang.Object` | X | X | *Default* |
|*any class* | | (X) | Any non-primitive type. |
-#### `RAW`
+Data Type Annotations
+---------------------
-Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
-and is only deserialized at the edges.
+At many locations in the API, Flink tries to automatically extract data type from class information using
+reflection to avoid repetitive manual schema work. However, extracting a data type reflectively is not always
+successful because logical information might be missing. Therefore, it might be necessary to add additional
+information close to a class or field declaration for supporting the extraction logic.
+
+The following table lists classes that can be implicitly mapped to a data type without requiring further information:
+
+| Class | Data Type |
+|:----------------------------|:------------------------------------|
+| `java.lang.String` | `STRING` |
+| `java.lang.Boolean` | `BOOLEAN` |
+| `boolean` | `BOOLEAN NOT NULL` |
+| `java.lang.Byte` | `TINYINT` |
+| `byte` | `TINYINT NOT NULL` |
+| `java.lang.Short` | `SMALLINT` |
+| `short` | `SMALLINT NOT NULL` |
+| `java.lang.Integer` | `INT` |
+| `int` | `INT NOT NULL` |
+| `java.lang.Long` | `BIGINT` |
+| `long` | `BIGINT NOT NULL` |
+| `java.lang.Float` | `FLOAT` |
+| `float` | `FLOAT NOT NULL` |
+| `java.lang.Double` | `DOUBLE` |
+| `double` | `DOUBLE NOT NULL` |
+| `java.sql.Date` | `DATE` |
+| `java.time.LocalDate` | `DATE` |
+| `java.sql.Time` | `TIME(0)` |
+| `java.time.LocalTime` | `TIME(9)` |
+| `java.sql.Timestamp` | `TIMESTAMP(9)` |
+| `java.time.LocalDateTime` | `TIMESTAMP(9)` |
+| `java.time.OffsetDateTime` | `TIMESTAMP(9) WITH TIME ZONE` |
+| `java.time.Instant` | `TIMESTAMP(9) WITH LOCAL TIME ZONE` |
+| `java.time.Duration` | `INVERVAL SECOND(9)` |
+| `java.time.Period` | `INTERVAL YEAR(4) TO MONTH` |
+| `byte[]` | `BYTES` |
+| `T[]` | `ARRAY<T>` |
+| `java.lang.Map<K, V>` | `MAP<K, V>` |
+| structured type `T` | anonymous structured type `T` |
+
+Other JVM bridging classes mentioned in this document require a `@DataTypeHint` annotation.
+
+_Data type hints_ can parameterize or replace the default extraction logic of individual function parameters
+and return types, structured classes, or fields of structured classes. An implementer can choose to what
+extent the default extraction logic should be modified by declaring a `@DataTypeHint` annotation.
+
+The `@DataTypeHint` annotation provides a set of optional hint parameters. Some of those parameters are shown in the
+following example. More information can be found in the documentation of the annotation class.
-The raw type is an extension to the SQL standard.
+<div class="codetabs" markdown="1">
-**Declaration**
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
-<div class="codetabs" markdown="1">
+class User {
-<div data-lang="SQL" markdown="1">
-{% highlight text %}
-RAW('class', 'snapshot')
+ // defines an INT data type with a default conversion class `java.lang.Integer`
+ public @DataTypeHint("INT") Object o;
+
+ // defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
+ public @DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class) Object o;
+
+ // enrich the extraction with forcing using a RAW type
+ public @DataTypeHint("RAW") Class<?> modelClass;
+
+ // defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
+ // extracted as DECIMAL(12, 2)
+ public @DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2) AccountStatement stmt;
+
+ // defines that whenever a type cannot be mapped to a data type, instead of throwing
+ // an exception, always treat it as a RAW type
+ public @DataTypeHint(allowRawGlobally = HintFlag.TRUE) ComplexModel model;
+}
{% endhighlight %}
</div>
-<div data-lang="Java/Scala" markdown="1">
+<div data-lang="Scala" markdown="1">
{% highlight java %}
-DataTypes.RAW(class, serializer)
+import org.apache.flink.table.annotation.DataTypeHint
-DataTypes.RAW(typeInfo)
-{% endhighlight %}
-</div>
+class User {
-</div>
+ // defines an INT data type with a default conversion class `java.lang.Integer`
+ @DataTypeHint("INT")
+ var o: AnyRef
-The type can be declared using `RAW('class', 'snapshot')` where `class` is the originating class and
-`snapshot` is the serialized `TypeSerializerSnapshot` in Base64 encoding. Usually, the type string is not
-declared directly but is generated while persisting the type.
+ // defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
+ @DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class)
+ var o: AnyRef
-In the API, the `RAW` type can be declared either by directly supplying a `Class` + `TypeSerializer` or
-by passing `TypeInformation` and let the framework extract `Class` + `TypeSerializer` from there.
+ // enrich the extraction with forcing using a RAW type
+ @DataTypeHint("RAW")
+ var modelClass: Class[_]
-**Bridging to JVM Types**
+ // defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
+ // extracted as DECIMAL(12, 2)
+ @DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2)
+ var stmt: AccountStatement
-| Java Type | Input | Output | Remarks |
-|:------------------|:-----:|:------:|:-------------------------------------------|
-|*class* | X | X | Originating class or subclasses (for input) or superclasses (for output). *Default* |
-|`byte[]` | | X | |
+ // defines that whenever a type cannot be mapped to a data type, instead of throwing
+ // an exception, always treat it as a RAW type
+ @DataTypeHint(allowRawGlobally = HintFlag.TRUE)
+ var model: ComplexModel
+}
+{% endhighlight %}
+</div>
+
+</div>
{% top %}