You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2021/01/23 07:57:00 UTC
[jira] [Updated] (FLINK-21094) Support StreamExecSink json
serialization/deserialization
[ https://issues.apache.org/jira/browse/FLINK-21094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-21094:
-------------------------------
Description:
Similar to StreamExecTableSourceScan, if a sink table's ddl is
{code:sql}
CREATE TABLE MySink (
a bigint,
b int,
c varchar
) with (
'connector' = 'filesystem',
'path' = '/tmp',
'format' = 'testcsv'
)
{code}
Its corresponding StreamExecSink's json representation looks like:
{code:json}
{
{
"id": 2,
"description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])",
"class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
"dynamicTableSink": {
"identifier": {
"catalogName": "default_catalog",
"databaseName": "default_database",
"tableName": "MySink"
},
"catalogTable": {
"connector": "filesystem",
"format": "testcsv",
"path": "/tmp",
"schema.0.name": "a",
"schema.0.data-type": "BIGINT",
"schema.1.name": "b",
"schema.1.data-type": "INT",
"schema.2.name": "c",
"schema.2.data-type": "VARCHAR(2147483647)"
},
"configuration": {}
},
"inputChangelogMode": [
"INSERT"
],
"inputEdges": [
{
"requiredShuffle": {
"type": "UNKNOWN"
},
"damBehavior": "PIPELINED",
"priority": 0
}
],
"outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
"inputs": [
1
]
}
}
{code}
All properties of the catalog table will be serialized, so that the table sink instance can be created based on those properties from json.
> Support StreamExecSink json serialization/deserialization
> ---------------------------------------------------------
>
> Key: FLINK-21094
> URL: https://issues.apache.org/jira/browse/FLINK-21094
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Fix For: 1.13.0
>
>
> Similar to StreamExecTableSourceScan, if a sink table's ddl is
> {code:sql}
> CREATE TABLE MySink (
> a bigint,
> b int,
> c varchar
> ) with (
> 'connector' = 'filesystem',
> 'path' = '/tmp',
> 'format' = 'testcsv'
> )
> {code}
> Its corresponding StreamExecSink's json representation looks like:
> {code:json}
> {
> {
> "id": 2,
> "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])",
> "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
> "dynamicTableSink": {
> "identifier": {
> "catalogName": "default_catalog",
> "databaseName": "default_database",
> "tableName": "MySink"
> },
> "catalogTable": {
> "connector": "filesystem",
> "format": "testcsv",
> "path": "/tmp",
> "schema.0.name": "a",
> "schema.0.data-type": "BIGINT",
> "schema.1.name": "b",
> "schema.1.data-type": "INT",
> "schema.2.name": "c",
> "schema.2.data-type": "VARCHAR(2147483647)"
> },
> "configuration": {}
> },
> "inputChangelogMode": [
> "INSERT"
> ],
> "inputEdges": [
> {
> "requiredShuffle": {
> "type": "UNKNOWN"
> },
> "damBehavior": "PIPELINED",
> "priority": 0
> }
> ],
> "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
> "inputs": [
> 1
> ]
> }
> }
> {code}
> All properties of the catalog table will be serialized, so that the table sink instance can be created based on those properties from json.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)