You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Francesco Guardiani (Jira)" <ji...@apache.org> on 2022/04/13 10:09:00 UTC

[jira] [Assigned] (FLINK-26549) INSERT INTO with VALUES leads to wrong type inference with nested types

     [ https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Francesco Guardiani reassigned FLINK-26549:
-------------------------------------------

    Assignee:     (was: Francesco Guardiani)

> INSERT INTO with VALUES leads to wrong type inference with nested types
> -----------------------------------------------------------------------
>
>                 Key: FLINK-26549
>                 URL: https://issues.apache.org/jira/browse/FLINK-26549
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, Least_restrictive_issue.patch
>
>
> While working on casting, I've found out we have an interesting bug in the insert values type inference. This comes from the {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in particular https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java).
> The test scenario is an INSERT INTO VALUES query which is also pushing some metadata to a Kafka table, in particular is writing the headers metadata.
> The table is declared like that:
> {code:sql}
>  CREATE TABLE kafka (
>   `physical_1` STRING,
>   `physical_2` INT,
>   `timestamp-type` STRING METADATA VIRTUAL,
>   `timestamp` TIMESTAMP(3) METADATA,
>   `leader-epoch` INT METADATA VIRTUAL,
>   `headers` MAP<STRING, BYTES> METADATA,
>   `partition` INT METADATA VIRTUAL,
>   `topic` STRING METADATA VIRTUAL,
>   `physical_3` BOOLEAN
> ) WITH (
>    'connector' = 'kafka',
>    [...]
> )
> {code}
> The insert into query looks like:
> {code:sql}
> INSERT INTO kafka VALUES
> ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', x'BABE'], TRUE),
> ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), FALSE),
> ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE)
> {code}
> Note that in the first row, the byte literal is of length 3, while in the last row the byte literal is of length 1.
> The generated plan of this INSERT INTO is:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>    +- LogicalUnion(all=[true])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], EXPR$4=[false])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true])
>          +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
>    :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
>    :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), BINARY(1)) MAP) AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
>    :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Reused(reference_id=[1])
>    +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS physical_3, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Reused(reference_id=[1])
> {code}
> As you see, in the _Abstract Syntax Tree_ section a casting for the headers is injected (although unnecessary, as it should be an identity cast), but then in _Optimized Physical Plan_ another casting is injected:
> {code}
> CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers
> {code}
> Which makes no sense, as it's casting the values of the map first to {{BINARY(1)}} and then to {{BYTES}}, causing to trim the last 2 bytes. Removing the last row to insert makes the VALUES type inference work properly:
> {code}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
>    +- LogicalUnion(all=[true])
>       :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true])
>       :  +- LogicalValues(tuples=[[{ 0 }]])
>       +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP], EXPR$4=[false])
>          +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
>    :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
>    +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, physical_2, physical_3, headers, timestamp])
> +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, timestamp])
>    :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS physical_3, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>    :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
>    +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp])
>       +- Reused(reference_id=[1])
> {code}  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)