You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/16 07:40:48 UTC

[flink] 02/02: [FLINK-18248][docs] Update data type documentation for 1.11

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9c33dd03685c7c1a6c440808fb65568a5ae50eb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Jun 11 14:06:21 2020 +0200

    [FLINK-18248][docs] Update data type documentation for 1.11
    
    This closes #12606.
---
 docs/dev/table/types.md | 354 +++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 287 insertions(+), 67 deletions(-)

diff --git a/docs/dev/table/types.md b/docs/dev/table/types.md
index 4dc42af..16991d6 100644
--- a/docs/dev/table/types.md
+++ b/docs/dev/table/types.md
@@ -36,7 +36,7 @@ Starting with Flink 1.9, the Table & SQL API will receive a new type system that
 solution for API stability and standard compliance.
 
 Reworking the type system is a major effort that touches almost all user-facing interfaces. Therefore, its
-introduction spans multiple releases, and the community aims to finish this effort by Flink 1.10.
+introduction spans multiple releases, and the community aims to finish this effort by Flink 1.12.
 
 Due to the simultaneous addition of a new planner for table programs (see [FLINK-11439](https://issues.apache.org/jira/browse/FLINK-11439)),
 not every combination of planner and data type is supported. Furthermore, planners might not support every
@@ -211,14 +211,15 @@ The following data types are supported:
 | `DOUBLE` | |
 | `DATE` | |
 | `TIME` | Supports only a precision of `0`. |
-| `TIMESTAMP` | Supports only a precision of `3`. |
-| `TIMESTAMP WITH LOCAL TIME ZONE` | Supports only a precision of `3`. |
+| `TIMESTAMP` | |
+| `TIMESTAMP WITH LOCAL TIME ZONE` | |
 | `INTERVAL` | Supports only interval of `MONTH` and `SECOND(3)`. |
 | `ARRAY` | |
 | `MULTISET` | |
 | `MAP` | |
 | `ROW` | |
 | `RAW` | |
+| stuctured types | Only exposed in user-defined functions yet. |
 
 Limitations
 -----------
@@ -227,10 +228,7 @@ Limitations
 have not been updated to the new type system yet. Use the string representations declared in
 the [old planner section](#old-planner).
 
-**Connector Descriptors and SQL Client**: Descriptor string representations have not been updated to the new
-type system yet. Use the string representation declared in the [Connect to External Systems section](./connect.html#type-strings)
-
-**User-defined Functions**: User-defined functions cannot declare a data type yet.
+**User-defined Functions**: User-defined aggregate functions cannot declare a data type yet. Scalar and table functions fully support data types.
 
 List of Data Types
 ------------------
@@ -267,10 +265,11 @@ and `2,147,483,647` (both inclusive). If no length is specified, `n` is equal to
 
 **Bridging to JVM Types**
 
-| Java Type          | Input | Output | Remarks                 |
-|:-------------------|:-----:|:------:|:------------------------|
-|`java.lang.String`  | X     | X      | *Default*               |
-|`byte[]`            | X     | X      | Assumes UTF-8 encoding. |
+| Java Type                               | Input | Output | Remarks                  |
+|:----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.lang.String`                       | X     | X      | *Default*                |
+|`byte[]`                                 | X     | X      | Assumes UTF-8 encoding.  |
+|`org.apache.flink.table.data.StringData` | X     | X      | Internal data structure. |
 
 #### `VARCHAR` / `STRING`
 
@@ -306,10 +305,11 @@ between `1` and `2,147,483,647` (both inclusive). If no length is specified, `n`
 
 **Bridging to JVM Types**
 
-| Java Type          | Input | Output | Remarks                 |
-|:-------------------|:-----:|:------:|:------------------------|
-|`java.lang.String`  | X     | X      | *Default*               |
-|`byte[]`            | X     | X      | Assumes UTF-8 encoding. |
+| Java Type                               | Input | Output | Remarks                  |
+|:----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.lang.String`                       | X     | X      | *Default*                |
+|`byte[]`                                 | X     | X      | Assumes UTF-8 encoding.  |
+|`org.apache.flink.table.data.StringData` | X     | X      | Internal data structure. |
 
 ### Binary Strings
 
@@ -428,9 +428,10 @@ The default value for `s` is `0`.
 
 **Bridging to JVM Types**
 
-| Java Type             | Input | Output | Remarks                 |
-|:----------------------|:-----:|:------:|:------------------------|
-|`java.math.BigDecimal` | X     | X      | *Default*               |
+| Java Type                                | Input | Output | Remarks                  |
+|:-----------------------------------------|:-----:|:------:|:-------------------------|
+|`java.math.BigDecimal`                    | X     | X      | *Default*                |
+|`org.apache.flink.table.data.DecimalData` | X     | X      | Internal data structure. |
 
 #### `TINYINT`
 
@@ -459,7 +460,7 @@ DataTypes.TINYINT()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Byte`    | X     | X      | *Default*                                    |
-|`byte`              | X     | (X)    | Output only if type is not nullable. |
+|`byte`              | X     | (X)    | Output only if type is not nullable.         |
 
 #### `SMALLINT`
 
@@ -488,7 +489,7 @@ DataTypes.SMALLINT()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Short`   | X     | X      | *Default*                                    |
-|`short`             | X     | (X)    | Output only if type is not nullable. |
+|`short`             | X     | (X)    | Output only if type is not nullable.         |
 
 #### `INT`
 
@@ -521,7 +522,7 @@ DataTypes.INT()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Integer` | X     | X      | *Default*                                    |
-|`int`               | X     | (X)    | Output only if type is not nullable. |
+|`int`               | X     | (X)    | Output only if type is not nullable.         |
 
 #### `BIGINT`
 
@@ -551,7 +552,7 @@ DataTypes.BIGINT()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Long`    | X     | X      | *Default*                                    |
-|`long`              | X     | (X)    | Output only if type is not nullable. |
+|`long`              | X     | (X)    | Output only if type is not nullable.         |
 
 ### Approximate Numerics
 
@@ -584,7 +585,7 @@ DataTypes.FLOAT()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Float`   | X     | X      | *Default*                                    |
-|`float`             | X     | (X)    | Output only if type is not nullable. |
+|`float`             | X     | (X)    | Output only if type is not nullable.         |
 
 #### `DOUBLE`
 
@@ -617,7 +618,7 @@ DataTypes.DOUBLE()
 | Java Type          | Input | Output | Remarks                                      |
 |:-------------------|:-----:|:------:|:---------------------------------------------|
 |`java.lang.Double`  | X     | X      | *Default*                                    |
-|`double`            | X     | (X)    | Output only if type is not nullable. |
+|`double`            | X     | (X)    | Output only if type is not nullable.         |
 
 ### Date and Time
 
@@ -741,10 +742,11 @@ is specified, `p` is equal to `6`.
 
 **Bridging to JVM Types**
 
-| Java Type                | Input | Output | Remarks                                             |
-|:-------------------------|:-----:|:------:|:----------------------------------------------------|
-|`java.time.LocalDateTime` | X     | X      | *Default*                                           |
-|`java.sql.Timestamp`      | X     | X      |                                                     |
+| Java Type                                  | Input | Output | Remarks                  |
+|:-------------------------------------------|:-----:|:------:|:-------------------------|
+|`java.time.LocalDateTime`                   | X     | X      | *Default*                |
+|`java.sql.Timestamp`                        | X     | X      |                          |
+|`org.apache.flink.table.data.TimestampData` | X     | X      | Internal data structure. |
 
 #### `TIMESTAMP WITH TIME ZONE`
 
@@ -837,6 +839,7 @@ of digits of fractional seconds (*precision*). `p` must have a value between `0`
 |`int`               | X     | (X)    | Describes the number of seconds since epoch.<br>Output only if type is not nullable. |
 |`java.lang.Long`    | X     | X      | Describes the number of milliseconds since epoch. |
 |`long`              | X     | (X)    | Describes the number of milliseconds since epoch.<br>Output only if type is not nullable. |
+|`org.apache.flink.table.data.TimestampData` | X     | X      | Internal data structure. |
 
 #### `INTERVAL YEAR TO MONTH`
 
@@ -1001,9 +1004,10 @@ equivalent to `ARRAY<INT>`.
 
 **Bridging to JVM Types**
 
-| Java Type | Input | Output | Remarks                           |
-|:----------|:-----:|:------:|:----------------------------------|
-|*t*`[]`    | (X)   | (X)    | Depends on the subtype. *Default* |
+| Java Type                              | Input | Output | Remarks                           |
+|:---------------------------------------|:-----:|:------:|:----------------------------------|
+|*t*`[]`                                 | (X)   | (X)    | Depends on the subtype. *Default* |
+|`org.apache.flink.table.data.ArrayData` | X     | X      | Internal data structure.          |
 
 #### `MAP`
 
@@ -1037,10 +1041,11 @@ and `vt` is the data type of the value elements.
 
 **Bridging to JVM Types**
 
-| Java Type                             | Input | Output | Remarks   |
-|:--------------------------------------|:-----:|:------:|:----------|
-| `java.util.Map<kt, vt>`               | X     | X      | *Default* |
-| *subclass* of `java.util.Map<kt, vt>` | X     |        |           |
+| Java Type                             | Input | Output | Remarks                  |
+|:--------------------------------------|:-----:|:------:|:-------------------------|
+| `java.util.Map<kt, vt>`               | X     | X      | *Default*                |
+| *subclass* of `java.util.Map<kt, vt>` | X     |        |                          |
+|`org.apache.flink.table.data.MapData`  | X     | X      | Internal data structure. |
 
 #### `MULTISET`
 
@@ -1076,10 +1081,11 @@ equivalent to `MULTISET<INT>`.
 
 **Bridging to JVM Types**
 
-| Java Type                            | Input | Output | Remarks                                                  |
-|:-------------------------------------|:-----:|:------:|:---------------------------------------------------------|
-|`java.util.Map<t, java.lang.Integer>` | X     | X      | Assigns each value to an integer multiplicity. *Default* |
-| *subclass* of `java.util.Map<kt, java.lang.Integer>` | X     |        | Assigns each value to an integer multiplicity. |
+| Java Type                             | Input | Output | Remarks                                                  |
+|:--------------------------------------|:-----:|:------:|:---------------------------------------------------------|
+|`java.util.Map<t, java.lang.Integer>`  | X     | X      | Assigns each value to an integer multiplicity. *Default* |
+| *subclass* of `java.util.Map<t, java.lang.Integer>>` | X     |        |                                           |
+|`org.apache.flink.table.data.MapData`  | X     | X      | Internal data structure.                                 |
 
 #### `ROW`
 
@@ -1125,9 +1131,110 @@ equivalent to `ROW<myField INT, myOtherField BOOLEAN>`.
 
 **Bridging to JVM Types**
 
-| Java Type                   | Input | Output | Remarks                 |
-|:----------------------------|:-----:|:------:|:------------------------|
-|`org.apache.flink.types.Row` | X     | X      | *Default*               |
+| Java Type                            | Input | Output | Remarks                  |
+|:-------------------------------------|:-----:|:------:|:-------------------------|
+|`org.apache.flink.types.Row`          | X     | X      | *Default*                |
+|`org.apache.flink.table.data.RowData` | X     | X      | Internal data structure. |
+
+### User-Defined Data Types
+
+<span class="label label-danger">Attention</span> User-defined data types are not fully supported yet. They are
+currently (as of Flink 1.11) only exposed as unregistered structured types in parameters and return types of functions.
+
+A structured type is similar to an object in an object-oriented programming language. It contains
+zero, one or more attributes. Each attribute consists of a name and a type.
+
+There are two kinds of structured types:
+
+- Types that are stored in a catalog and are identified by a _catalog identifer_ (like `cat.db.MyType`). Those
+are equal to the SQL standard definition of structured types.
+
+- Anonymously defined, unregistered types (usually reflectively extracted) that are identified by
+an _implementation class_ (like `com.myorg.model.MyType`). Those are useful when programmatically
+defining a table program. They enable reusing existing JVM classes without manually defining the
+schema of a data type again.
+
+#### Registered Structured Types
+
+Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog
+or referenced in a `CREATE TABLE` DDL.
+
+#### Unregistered Structured Types
+
+Unregistered structured types can be created from regular POJOs (Plain Old Java Objects) using automatic reflective extraction.
+
+The implementation class of a structured type must meet the following requirements:
+- The class must be globally accessible which means it must be declared `public`, `static`, and not `abstract`.
+- The class must offer a default constructor with zero arguments or a full constructor that assigns all
+fields.
+- All fields of the class must be readable by either `public` declaration or a getter that follows common
+coding style such as `getField()`, `isField()`, `field()`.
+- All fields of the class must be writable by either `public` declaration, fully assigning constructor,
+or a setter that follows common coding style such as `setField(...)`, `field(...)`.
+- All fields must be mapped to a data type either implicitly via reflective extraction or explicitly
+using the `@DataTypeHint` [annotations](#data-type-annotations).
+- Fields that are declared `static` or `transient` are ignored.
+
+The reflective extraction supports arbitrary nesting of fields as long as a field type does not
+(transitively) refer to itself.
+
+The declared field class (e.g. `public int age;`) must be contained in the list of supported JVM
+bridging classes defined for every data type in this document (e.g. `java.lang.Integer` or `int` for `INT`).
+
+For some classes an annotation is required in order to map the class to a data type (e.g. `@DataTypeHint("DECIMAL(10, 2)")`
+to assign a fixed precision and scale for `java.math.BigDecimal`).
+
+**Declaration**
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+class User {
+
+    // extract fields automatically
+    public int age;
+    public String name;
+
+    // enrich the extraction with precision information
+    public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
+
+    // enrich the extraction with forcing using RAW types
+    public @DataTypeHint("RAW") Class<?> modelClass;
+}
+
+DataTypes.of(User.class);
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+case class User(
+
+    // extract fields automatically
+    age: Int,
+    name: String,
+
+    // enrich the extraction with precision information
+    @DataTypeHint("DECIMAL(10, 2)") totalBalance: java.math.BigDecimal,
+
+    // enrich the extraction with forcing using a RAW type
+    @DataTypeHint("RAW") modelClass: Class[_]
+)
+
+DataTypes.of(classOf[User])
+{% endhighlight %}
+</div>
+
+</div>
+
+**Bridging to JVM Types**
+
+| Java Type                            | Input | Output | Remarks                                 |
+|:-------------------------------------|:-----:|:------:|:----------------------------------------|
+|*class*                               | X     | X      | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
+|`org.apache.flink.types.Row`          | X     | X      | Represent the structured type as a row. |
+|`org.apache.flink.table.data.RowData` | X     | X      | Internal data structure.                |
 
 ### Other Data Types
 
@@ -1160,6 +1267,48 @@ DataTypes.BOOLEAN()
 |`java.lang.Boolean` | X     | X      | *Default*                            |
 |`boolean`           | X     | (X)    | Output only if type is not nullable. |
 
+#### `RAW`
+
+Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
+and is only deserialized at the edges.
+
+The raw type is an extension to the SQL standard.
+
+**Declaration**
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="SQL" markdown="1">
+{% highlight text %}
+RAW('class', 'snapshot')
+{% endhighlight %}
+</div>
+
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+DataTypes.RAW(class, serializer)
+
+DataTypes.RAW(class)
+{% endhighlight %}
+</div>
+
+</div>
+
+The type can be declared using `RAW('class', 'snapshot')` where `class` is the originating class and
+`snapshot` is the serialized `TypeSerializerSnapshot` in Base64 encoding. Usually, the type string is not
+declared directly but is generated while persisting the type.
+
+In the API, the `RAW` type can be declared either by directly supplying a `Class` + `TypeSerializer` or
+by passing `Class` and letting the framework extract `Class` + `TypeSerializer` from there.
+
+**Bridging to JVM Types**
+
+| Java Type         | Input | Output | Remarks                              |
+|:------------------|:-----:|:------:|:-------------------------------------------|
+|*class*            | X     | X      | Originating class or subclasses (for input) or <br>superclasses (for output). *Default* |
+|`byte[]`           |       | X      |                                      |
+|`org.apache.flink.table.data.RawValueData` | X     | X      | Internal data structure. |
+
 #### `NULL`
 
 Data type for representing untyped `NULL` values.
@@ -1197,45 +1346,116 @@ DataTypes.NULL()
 |`java.lang.Object` | X     | X      | *Default*                            |
 |*any class*        |       | (X)    | Any non-primitive type.              |
 
-#### `RAW`
+Data Type Annotations
+---------------------
 
-Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
-and is only deserialized at the edges.
+At many locations in the API, Flink tries to automatically extract data type from class information using
+reflection to avoid repetitive manual schema work. However, extracting a data type reflectively is not always
+successful because logical information might be missing. Therefore, it might be necessary to add additional
+information close to a class or field declaration for supporting the extraction logic.
+
+The following table lists classes that can be implicitly mapped to a data type without requiring further information:
+
+| Class                       | Data Type                           |
+|:----------------------------|:------------------------------------|
+| `java.lang.String`          | `STRING`                            |
+| `java.lang.Boolean`         | `BOOLEAN`                           |
+| `boolean`                   | `BOOLEAN NOT NULL`                  |
+| `java.lang.Byte`            | `TINYINT`                           |
+| `byte`                      | `TINYINT NOT NULL`                  |
+| `java.lang.Short`           | `SMALLINT`                          |
+| `short`                     | `SMALLINT NOT NULL`                 |
+| `java.lang.Integer`         | `INT`                               |
+| `int`                       | `INT NOT NULL`                      |
+| `java.lang.Long`            | `BIGINT`                            |
+| `long`                      | `BIGINT NOT NULL`                   |
+| `java.lang.Float`           | `FLOAT`                             |
+| `float`                     | `FLOAT NOT NULL`                    |
+| `java.lang.Double`          | `DOUBLE`                            |
+| `double`                    | `DOUBLE NOT NULL`                   |
+| `java.sql.Date`             | `DATE`                              |
+| `java.time.LocalDate`       | `DATE`                              |
+| `java.sql.Time`             | `TIME(0)`                           |
+| `java.time.LocalTime`       | `TIME(9)`                           |
+| `java.sql.Timestamp`        | `TIMESTAMP(9)`                      |
+| `java.time.LocalDateTime`   | `TIMESTAMP(9)`                      |
+| `java.time.OffsetDateTime`  | `TIMESTAMP(9) WITH TIME ZONE`       |
+| `java.time.Instant`         | `TIMESTAMP(9) WITH LOCAL TIME ZONE` |
+| `java.time.Duration`        | `INVERVAL SECOND(9)`                |
+| `java.time.Period`          | `INTERVAL YEAR(4) TO MONTH`         |
+| `byte[]`                    | `BYTES`                             |
+| `T[]`                       | `ARRAY<T>`                          |
+| `java.lang.Map<K, V>`       | `MAP<K, V>`                         |
+| structured type `T`         | anonymous structured type `T`       |
+
+Other JVM bridging classes mentioned in this document require a `@DataTypeHint` annotation.
+
+_Data type hints_ can parameterize or replace the default extraction logic of individual function parameters
+and return types, structured classes, or fields of structured classes. An implementer can choose to what
+extent the default extraction logic should be modified by declaring a `@DataTypeHint` annotation.
+
+The `@DataTypeHint` annotation provides a set of optional hint parameters. Some of those parameters are shown in the
+following example. More information can be found in the documentation of the annotation class.
 
-The raw type is an extension to the SQL standard.
+<div class="codetabs" markdown="1">
 
-**Declaration**
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
 
-<div class="codetabs" markdown="1">
+class User {
 
-<div data-lang="SQL" markdown="1">
-{% highlight text %}
-RAW('class', 'snapshot')
+    // defines an INT data type with a default conversion class `java.lang.Integer`
+    public @DataTypeHint("INT") Object o;
+
+    // defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
+    public @DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class) Object o;
+
+    // enrich the extraction with forcing using a RAW type
+    public @DataTypeHint("RAW") Class<?> modelClass;
+
+    // defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
+    // extracted as DECIMAL(12, 2)
+    public @DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2) AccountStatement stmt;
+
+    // defines that whenever a type cannot be mapped to a data type, instead of throwing
+    // an exception, always treat it as a RAW type
+    public @DataTypeHint(allowRawGlobally = HintFlag.TRUE) ComplexModel model;
+}
 {% endhighlight %}
 </div>
 
-<div data-lang="Java/Scala" markdown="1">
+<div data-lang="Scala" markdown="1">
 {% highlight java %}
-DataTypes.RAW(class, serializer)
+import org.apache.flink.table.annotation.DataTypeHint
 
-DataTypes.RAW(typeInfo)
-{% endhighlight %}
-</div>
+class User {
 
-</div>
+    // defines an INT data type with a default conversion class `java.lang.Integer`
+    @DataTypeHint("INT")
+    var o: AnyRef
 
-The type can be declared using `RAW('class', 'snapshot')` where `class` is the originating class and
-`snapshot` is the serialized `TypeSerializerSnapshot` in Base64 encoding. Usually, the type string is not
-declared directly but is generated while persisting the type.
+    // defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
+    @DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class)
+    var o: AnyRef
 
-In the API, the `RAW` type can be declared either by directly supplying a `Class` + `TypeSerializer` or
-by passing `TypeInformation` and let the framework extract `Class` + `TypeSerializer` from there.
+    // enrich the extraction with forcing using a RAW type
+    @DataTypeHint("RAW")
+    var modelClass: Class[_]
 
-**Bridging to JVM Types**
+    // defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
+    // extracted as DECIMAL(12, 2)
+    @DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2)
+    var stmt: AccountStatement
 
-| Java Type         | Input | Output | Remarks                              |
-|:------------------|:-----:|:------:|:-------------------------------------------|
-|*class*            | X     | X      | Originating class or subclasses (for input) or superclasses (for output). *Default* |
-|`byte[]`           |       | X      |                                      |
+    // defines that whenever a type cannot be mapped to a data type, instead of throwing
+    // an exception, always treat it as a RAW type
+    @DataTypeHint(allowRawGlobally = HintFlag.TRUE)
+    var model: ComplexModel
+}
+{% endhighlight %}
+</div>
+
+</div>
 
 {% top %}