You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Maciej Bryński (Jira)" <ji...@apache.org> on 2020/10/23 10:23:00 UTC

[jira] [Updated] (FLINK-19786) Flink doesn't set proper nullability for Logical types for Confluent Avro Serialization

     [ https://issues.apache.org/jira/browse/FLINK-19786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maciej Bryński updated FLINK-19786:
-----------------------------------
    Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Flink doesn't set proper nullability for Logical types for Confluent Avro Serialization
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-19786
>                 URL: https://issues.apache.org/jira/browse/FLINK-19786
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.12.0
>            Reporter: Maciej Bryński
>            Priority: Major
>
> When Flink is creating schema in registry nullability is not properly set for logical types.
> Examples. Table:
> {code:sql}
> create table `test_logical_null` (
> 	`string_field` STRING,
> 	`timestamp_field` TIMESTAMP(3)
> ) WITH (
>   'connector' = 'kafka', 
>   'topic' = 'test-logical-null', 
>   'properties.bootstrap.servers' = 'localhost:9092', 
>   'properties.group.id' = 'test12345', 
>    'scan.startup.mode' = 'earliest-offset', 
>   'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
>   'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to connect to Confluent Schema Registry
>   'avro-confluent.schema-registry.subject' = 'test-logical-null' -- Subject name to write to the Schema Registry service; required for sinks
> )
> {code}
> Schema:
> {code:json}
> {
>   "type": "record",
>   "name": "record",
>   "fields": [
>     {
>       "name": "string_field",
>       "type": [
>         "string",
>         "null"
>       ]
>     },
>     {
>       "name": "timestamp_field",
>       "type": {
>         "type": "long",
>         "logicalType": "timestamp-millis"
>       }
>     }
>   ]
> }
> {code}
> For not null fields:
> {code:sql}
> create table `test_logical_notnull` (
> 	`string_field` STRING NOT NULL,
> 	`timestamp_field` TIMESTAMP(3) NOT NULL
> ) WITH (
>   'connector' = 'kafka', 
>   'topic' = 'test-logical-notnull', 
>   'properties.bootstrap.servers' = 'localhost:9092', 
>   'properties.group.id' = 'test12345', 
>    'scan.startup.mode' = 'earliest-offset', 
>   'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
>   'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to connect to Confluent Schema Registry
>   'avro-confluent.schema-registry.subject' = 'test-logical-notnull-value' -- Subject name to write to the Schema Registry service; required for sinks
> );
> {code}
> Schema
> {code:json}
> {
>   "type": "record",
>   "name": "record",
>   "fields": [
>     {
>       "name": "string_field",
>       "type": "string"
>     },
>     {
>       "name": "timestamp_field",
>       "type": {
>         "type": "long",
>         "logicalType": "timestamp-millis"
>       }
>     }
>   ]
> }
> {code}
> As we can see for string_field we have proper union with null (for nullable field). For timestamp_field in both examples union is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)