You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2022/09/12 14:59:00 UTC
[jira] [Created] (FLINK-29267) Support external type systems in DDL
Timo Walther created FLINK-29267:
------------------------------------
Summary: Support external type systems in DDL
Key: FLINK-29267
URL: https://issues.apache.org/jira/browse/FLINK-29267
Project: Flink
Issue Type: Improvement
Components: Connectors / JDBC, Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem
Reporter: Timo Walther
Assignee: Timo Walther
Many connectors and formats require supporting external data types. Postgres users request UUID support, Avro users require enum support, etc.
FLINK-19869 implemented support for Postgres UUIDs poorly and event impacts pipelines with regular strings.
The long-term solution should be user-defined types in Flink. This is however a bigger effort that requires a FLIP and a bigger amount of resources.
As a mid-term solution, we should offer a consistent approach based on DDL options that allows to define a mapping from Flink type system to the external type system. I suggest the following:
{code}
CREATE TABLE MyTable (
...
) WITH(
'mapping.data-types' = '<Flink field name>: <External field data type>'
)
{code}
The mapping defines a map from Flink data type to external data type. The external data type should be string parsable. This works for most connectors and formats (e.g. Avro schema string).
Examples:
{code}
CREATE TABLE MyTable (
regular_col STRING,
uuid_col STRING,
point_col ARRAY<DOUBLE>,
box_col ARRAY<ARRAY<DOUBLE>>
) WITH(
'mapping.data-types' = 'uuid_col: uuid, point_col: point, box_col: box'
)
{code}
We provide a table of supported mapping data types. E.g. the {{point}} type is always maped to {{ARRAY<DOUBLE>}}. In general we choose a data type in Flink that comes closest to the required functionality.
Future work:
In theory, we can also offer mapping of field names. It might be a requirement that Flink's column name is different from the external system's one.
{code}
CREATE TABLE MyTable (
...
) WITH(
'mapping.names' = '<Flink field name>: <External field name>'
)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)