You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2021/01/23 07:57:00 UTC

[jira] [Updated] (FLINK-21094) Support StreamExecSink json serialization/deserialization

     [ https://issues.apache.org/jira/browse/FLINK-21094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he updated FLINK-21094:
-------------------------------
    Description: 
Similar to StreamExecTableSourceScan, if a sink table's ddl is

{code:sql}
CREATE TABLE MySink (
  a bigint,
  b int,
  c varchar
) with (
  'connector' = 'filesystem',
  'path' = '/tmp',
  'format' = 'testcsv'
)
{code}

Its corresponding StreamExecSink's json representation looks like:

{code:json}
{
 {
    "id": 2,
    "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])",
    "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
    "dynamicTableSink": {
        "identifier": {
            "catalogName": "default_catalog",
            "databaseName": "default_database",
            "tableName": "MySink"
        },
        "catalogTable": {
            "connector": "filesystem",
            "format": "testcsv",
            "path": "/tmp",
            "schema.0.name": "a",
            "schema.0.data-type": "BIGINT",
            "schema.1.name": "b",
            "schema.1.data-type": "INT",
            "schema.2.name": "c",
            "schema.2.data-type": "VARCHAR(2147483647)"
        },
        "configuration": {}
    },
    "inputChangelogMode": [
        "INSERT"
    ],
    "inputEdges": [
        {
            "requiredShuffle": {
                "type": "UNKNOWN"
            },
            "damBehavior": "PIPELINED",
            "priority": 0
        }
    ],
    "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
    "inputs": [
        1
    ]
}
}
{code}

All properties of the catalog table will be serialized, so that the table sink instance can be created based on those properties from json. 


> Support StreamExecSink json serialization/deserialization
> ---------------------------------------------------------
>
>                 Key: FLINK-21094
>                 URL: https://issues.apache.org/jira/browse/FLINK-21094
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>             Fix For: 1.13.0
>
>
> Similar to StreamExecTableSourceScan, if a sink table's ddl is
> {code:sql}
> CREATE TABLE MySink (
>   a bigint,
>   b int,
>   c varchar
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/tmp',
>   'format' = 'testcsv'
> )
> {code}
> Its corresponding StreamExecSink's json representation looks like:
> {code:json}
> {
>  {
>     "id": 2,
>     "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])",
>     "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
>     "dynamicTableSink": {
>         "identifier": {
>             "catalogName": "default_catalog",
>             "databaseName": "default_database",
>             "tableName": "MySink"
>         },
>         "catalogTable": {
>             "connector": "filesystem",
>             "format": "testcsv",
>             "path": "/tmp",
>             "schema.0.name": "a",
>             "schema.0.data-type": "BIGINT",
>             "schema.1.name": "b",
>             "schema.1.data-type": "INT",
>             "schema.2.name": "c",
>             "schema.2.data-type": "VARCHAR(2147483647)"
>         },
>         "configuration": {}
>     },
>     "inputChangelogMode": [
>         "INSERT"
>     ],
>     "inputEdges": [
>         {
>             "requiredShuffle": {
>                 "type": "UNKNOWN"
>             },
>             "damBehavior": "PIPELINED",
>             "priority": 0
>         }
>     ],
>     "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
>     "inputs": [
>         1
>     ]
> }
> }
> {code}
> All properties of the catalog table will be serialized, so that the table sink instance can be created based on those properties from json. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)