You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2022/09/12 14:59:00 UTC

[jira] [Created] (FLINK-29267) Support external type systems in DDL

Timo Walther created FLINK-29267:
------------------------------------

             Summary: Support external type systems in DDL
                 Key: FLINK-29267
                 URL: https://issues.apache.org/jira/browse/FLINK-29267
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / JDBC, Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem
            Reporter: Timo Walther
            Assignee: Timo Walther


Many connectors and formats require supporting external data types. Postgres users request UUID support, Avro users require enum support, etc.

FLINK-19869 implemented support for Postgres UUIDs poorly and event impacts pipelines with regular strings.

The long-term solution should be user-defined types in Flink. This is however a bigger effort that requires a FLIP and a bigger amount of resources.

As a mid-term solution, we should offer a consistent approach based on DDL options that allows to define a mapping from Flink type system to the external type system. I suggest the following:

{code}
CREATE TABLE MyTable (
...
) WITH(
  'mapping.data-types' = '<Flink field name>: <External field data type>'
)
{code}

The mapping defines a map from Flink data type to external data type. The external data type should be string parsable. This works for most connectors and formats (e.g. Avro schema string).


Examples:

{code}
CREATE TABLE MyTable (
  regular_col STRING,
  uuid_col STRING,
  point_col ARRAY<DOUBLE>,
  box_col ARRAY<ARRAY<DOUBLE>>
) WITH(
  'mapping.data-types' = 'uuid_col: uuid, point_col: point, box_col: box'
)
{code}

We provide a table of supported mapping data types. E.g. the {{point}} type is always maped to {{ARRAY<DOUBLE>}}. In general we choose a data type in Flink that comes closest to the required functionality.


Future work:

In theory, we can also offer mapping of field names. It might be a requirement that Flink's column name is different from the external system's one. 

{code}
CREATE TABLE MyTable (
...
) WITH(
  'mapping.names' = '<Flink field name>: <External field name>'
)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)