You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hongjian Peng <su...@163.com> on 2020/11/25 08:12:58 UTC

Flink 1.11 avro format question

Hi Flink Community,


We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used Kafka source table, and the data is stored in Kafka in Avro format.
Schema is like this:


{
  "type": "record",
  "name": "event",
  "namespace": "busseniss.event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": "long"
          },
          {
            "name": "sentTimestamp",
            "type": "long"
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      },
      "doc": "Rheos header "
    },
    {
      "name": "si",
      "type": [
        "null",
        "string"
      ]
    }
]
}


















--

Hongjian Peng
Department of Computer Science and Engineering
Shanghai Jiao Tong University
Email: superphj@163.com

Re: Flink 1.11 avro format question

Posted by Danny Chan <da...@apache.org>.
For your question 1. This does not work as expected. I would check it soon
to see if it is a bug and fire a fix.

Hongjian Peng <su...@163.com>于2020年11月25日 周三下午4:45写道:

> In Flink 1.10, we can pass this schema with 'format.avro-schema' property
> to SQL DDL, but in Flink 1.11, the Avro schema is always derived from
> the table schema.
> We have two questions about the Flink 1.11 Avro format:
>
> 1. Flink 1.11 maps nullable types to Avro union(something, null). How to
> map the nullable types to union(null, something)? In our schema definition,
> we follow the Avro recommended definition, list 'null' as the first type.
> With Flink default mapping, the AvroRowDataDeserializationSchema will get
> the wrong/error result when lists 'null' as the first type of union.
>
> 2. Now if we want to set the field in a Row to non-nullable, we have to
> set the Row to non-nullable firstly. Is it the expected behavior of
> the Avro format?
> For example,
> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL,
> eventId VARCHAR> will be maped to Avro schema:
>
> {
>   "type": "record",
>   "name": "event",
>   "fields": [
>     {
>       "name": "header",
>       "type": {
>         "type": "record",
>         "name": "header",
>         "fields": [
>           {
>             "name": "createTimestamp",
>             "type": ["long",null]
>           },
>           {
>             "name": "sentTimestamp",
>              "type": ["long",null]
>           },
>           {
>             "name": "eventId",
>             "type": [
>               "null",
>               {
>                 "type": "string",
>                 "avro.java.string": "String"
>               }
>             ]
>           }
>         ]
>       }
>     }
> ]
> }
>
> We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp
> BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of
> fields: createTimestamp and sentTimestamp works.
>
>
>
>
>
> At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:
>
> Hi Flink Community,
>
> We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used
> Kafka source table, and the data is stored in Kafka in Avro format.
> Schema is like this:
>
> {
>   "type": "record",
>   "name": "event",
>   "namespace": "busseniss.event",
>   "fields": [
>     {
>       "name": "header",
>       "type": {
>         "type": "record",
>         "name": "header",
>         "fields": [
>           {
>             "name": "createTimestamp",
>             "type": "long"
>           },
>           {
>             "name": "sentTimestamp",
>             "type": "long"
>           },
>           {
>             "name": "eventId",
>             "type": [
>               "null",
>               {
>                 "type": "string",
>                 "avro.java.string": "String"
>               }
>             ]
>           }
>         ]
>       },
>       "doc": "Rheos header "
>     },
>     {
>       "name": "si",
>       "type": [
>         "null",
>         "string"
>       ]
>     }
> ]
> }
>
>
>
>
>
>
> --
> Hongjian Peng
> Department of Computer Science and Engineering
> Shanghai Jiao Tong University
> Email: superphj@163.com
>
>

Re: Flink 1.11 avro format question

Posted by Danny Chan <da...@apache.org>.
Hi Hongjian Peng ~

For your question 1, it is not work as expected. If it is true, there is
definitely a bug. I would check and fix it later.

For your question 2, yes. This is an intent design. There is a routine in
the type inference: all the fields of a nullable struct type should also be
nullable.

Hongjian Peng <su...@163.com>于2020年11月25日 周三下午4:45写道:

> In Flink 1.10, we can pass this schema with 'format.avro-schema' property
> to SQL DDL, but in Flink 1.11, the Avro schema is always derived from
> the table schema.
> We have two questions about the Flink 1.11 Avro format:
>
> 1. Flink 1.11 maps nullable types to Avro union(something, null). How to
> map the nullable types to union(null, something)? In our schema definition,
> we follow the Avro recommended definition, list 'null' as the first type.
> With Flink default mapping, the AvroRowDataDeserializationSchema will get
> the wrong/error result when lists 'null' as the first type of union.
>
> 2. Now if we want to set the field in a Row to non-nullable, we have to
> set the Row to non-nullable firstly. Is it the expected behavior of
> the Avro format?
> For example,
> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL,
> eventId VARCHAR> will be maped to Avro schema:
>
> {
>   "type": "record",
>   "name": "event",
>   "fields": [
>     {
>       "name": "header",
>       "type": {
>         "type": "record",
>         "name": "header",
>         "fields": [
>           {
>             "name": "createTimestamp",
>             "type": ["long",null]
>           },
>           {
>             "name": "sentTimestamp",
>              "type": ["long",null]
>           },
>           {
>             "name": "eventId",
>             "type": [
>               "null",
>               {
>                 "type": "string",
>                 "avro.java.string": "String"
>               }
>             ]
>           }
>         ]
>       }
>     }
> ]
> }
>
> We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp
> BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of
> fields: createTimestamp and sentTimestamp works.
>
>
>
>
>
> At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:
>
> Hi Flink Community,
>
> We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used
> Kafka source table, and the data is stored in Kafka in Avro format.
> Schema is like this:
>
> {
>   "type": "record",
>   "name": "event",
>   "namespace": "busseniss.event",
>   "fields": [
>     {
>       "name": "header",
>       "type": {
>         "type": "record",
>         "name": "header",
>         "fields": [
>           {
>             "name": "createTimestamp",
>             "type": "long"
>           },
>           {
>             "name": "sentTimestamp",
>             "type": "long"
>           },
>           {
>             "name": "eventId",
>             "type": [
>               "null",
>               {
>                 "type": "string",
>                 "avro.java.string": "String"
>               }
>             ]
>           }
>         ]
>       },
>       "doc": "Rheos header "
>     },
>     {
>       "name": "si",
>       "type": [
>         "null",
>         "string"
>       ]
>     }
> ]
> }
>
>
>
>
>
>
> --
> Hongjian Peng
> Department of Computer Science and Engineering
> Shanghai Jiao Tong University
> Email: superphj@163.com
>
>

Re: Flink 1.11 avro format question

Posted by Hongjian Peng <su...@163.com>.
Thank you for the help.













--

Thanks,
Hongjian Peng




At 2020-11-30 16:16:48, "Dawid Wysakowicz" <dw...@apache.org> wrote:

Hi,

I managed to backport the change to the 1.11 branch. It should be part of the 1.11.3 release.

Best,

Dawid


On 25/11/2020 16:23, Hongjian Peng wrote:

Thanks for Danny and Dawid's quick reply.


Dawid, I find your fix at https://github.com/apache/flink/pull/14085/commits/bc9caab71f51024d1b48c6ee1a3f79777624b6bb#diff-6cc72acf0893bbaadc5b610afbbbae23227971cf5c7d0743dd4b997236baf771R450. Appreciate the fix.
But we may not move to Flink 1.12 in the near future. Does the community have a plan to push the fix back to 1.11?













--

Thanks,
Hongjian Peng




At 2020-11-25 22:26:32, "Dawid Wysakowicz" <dw...@apache.org> wrote:

Hi,

Just wanted to comment on:

How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type.

I've also spotted that problem and it will be fixed in 1.12 in https://issues.apache.org/jira/browse/FLINK-20175

Best,

Dawid


On 25/11/2020 09:44, Hongjian Peng wrote:

In Flink 1.10, we can pass this schema with 'format.avro-schema' property to SQL DDL, but in Flink 1.11, the Avro schema is always derived from the table schema.
We have two questions about the Flink 1.11 Avro format:


1. Flink 1.11 maps nullable types to Avro union(something, null). How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type. With Flink default mapping, the AvroRowDataDeserializationSchema will get the wrong/error result when lists 'null' as the first type of union.


2. Now if we want to set the field in a Row to non-nullable, we have to set the Row to non-nullable firstly. Is it the expected behavior of the Avro format?
For example,
event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> will be maped to Avro schema:


{
  "type": "record",
  "name": "event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": ["long",null]
          },
          {
            "name": "sentTimestamp",
             "type": ["long",null]
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      }
    }
]
}


We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of fields: createTimestamp and sentTimestamp works.













At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:

Hi Flink Community,


We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used Kafka source table, and the data is stored in Kafka in Avro format.
Schema is like this:


{
  "type": "record",
  "name": "event",
  "namespace": "busseniss.event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": "long"
          },
          {
            "name": "sentTimestamp",
            "type": "long"
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      },
      "doc": "Rheos header "
    },
    {
      "name": "si",
      "type": [
        "null",
        "string"
      ]
    }
]
}


















--

Hongjian Peng
Department of Computer Science and Engineering
Shanghai Jiao Tong University
Email: superphj@163.com

Re: Flink 1.11 avro format question

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I managed to backport the change to the 1.11 branch. It should be part
of the 1.11.3 release.

Best,

Dawid

On 25/11/2020 16:23, Hongjian Peng wrote:
> Thanks for Danny and Dawid's quick reply.
>
> Dawid, I find your fix
> at https://github.com/apache/flink/pull/14085/commits/bc9caab71f51024d1b48c6ee1a3f79777624b6bb#diff-6cc72acf0893bbaadc5b610afbbbae23227971cf5c7d0743dd4b997236baf771R450. Appreciate
> the fix.
> But we may not move to Flink 1.12 in the near future. Does the
> community have a plan to push the fix back to 1.11?
>
>
>
>
>
> --
>
> Thanks,
> Hongjian Peng
>
>
> At 2020-11-25 22:26:32, "Dawid Wysakowicz" <dw...@apache.org> wrote:
>
>     Hi,
>
>     Just wanted to comment on:
>
>     How to map the nullable types to union(null, something)? In our
>     schema definition, we follow the Avro recommended definition, list
>     'null' as the first type.
>
>     I've also spotted that problem and it will be fixed in 1.12 in
>     https://issues.apache.org/jira/browse/FLINK-20175
>
>     Best,
>
>     Dawid
>
>     On 25/11/2020 09:44, Hongjian Peng wrote:
>>     In Flink 1.10, we can pass this schema with 'format.avro-schema'
>>     property to SQL DDL, but in Flink 1.11, the Avro schema is always
>>     derived from the table schema.
>>     We have two questions about the Flink 1.11 Avro format:
>>
>>     1. Flink 1.11 maps nullable types to Avro union(something, null).
>>     How to map the nullable types to union(null, something)? In our
>>     schema definition, we follow the Avro recommended definition,
>>     list 'null' as the first type. With Flink default mapping, the
>>     AvroRowDataDeserializationSchema will get the wrong/error result
>>     when lists 'null' as the first type of union.
>>
>>     2. Now if we want to set the field in a Row to non-nullable, we
>>     have to set the Row to non-nullable firstly. Is it the expected
>>     behavior of the Avro format?
>>     For example,
>>     event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT
>>     NOT NULL, eventId VARCHAR> will be maped to Avro schema:
>>
>>     {
>>       "type": "record",
>>       "name": "event",
>>       "fields": [
>>         {
>>           "name": "header",
>>           "type": {
>>             "type": "record",
>>             "name": "header",
>>             "fields": [
>>               {
>>                 "name": "createTimestamp",
>>                 "type": ["long",null]
>>               },
>>               {
>>                 "name": "sentTimestamp",
>>                  "type": ["long",null]
>>               },
>>               {
>>                 "name": "eventId",
>>                 "type": [
>>                   "null",
>>                   {
>>                     "type": "string",
>>                     "avro.java.string": "String"
>>                   }
>>                 ]
>>               }
>>             ]
>>           }
>>         }
>>     ]
>>     }
>>
>>     We have to use "event ROW< createTimestamp BIGINT NOT NULL,
>>     sentTimestamp BIGINT NOT NULL, eventId VARCHAR>NOT NULL" to make
>>     the NOT NULL of fields: createTimestamp and sentTimestamp works.
>>
>>
>>
>>
>>
>>     At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:
>>
>>         Hi Flink Community,
>>
>>         We are trying to upgrade our Flink SQL job from 1.10 to 1.11.
>>         We used Kafka source table, and the data is stored in Kafka
>>         in Avro format.
>>         Schema is like this:
>>
>>         {
>>           "type": "record",
>>           "name": "event",
>>           "namespace": "busseniss.event",
>>           "fields": [
>>             {
>>               "name": "header",
>>               "type": {
>>                 "type": "record",
>>                 "name": "header",
>>                 "fields": [
>>                   {
>>                     "name": "createTimestamp",
>>                     "type": "long"
>>                   },
>>                   {
>>                     "name": "sentTimestamp",
>>                     "type": "long"
>>                   },
>>                   {
>>                     "name": "eventId",
>>                     "type": [
>>                       "null",
>>                       {
>>                         "type": "string",
>>                         "avro.java.string": "String"
>>                       }
>>                     ]
>>                   }
>>                 ]
>>               },
>>               "doc": "Rheos header "
>>             },
>>             {
>>               "name": "si",
>>               "type": [
>>                 "null",
>>                 "string"
>>               ]
>>             }
>>         ]
>>         }
>>
>>
>>
>>
>>
>>
>>         --
>>
>>         Hongjian Peng
>>         Department of Computer Science and Engineering
>>         Shanghai Jiao Tong University
>>         Email: superphj@163.com <ma...@163.com>
>>

Re:Re: Flink 1.11 avro format question

Posted by Hongjian Peng <su...@163.com>.
Thanks for Danny and Dawid's quick reply.


Dawid, I find your fix at https://github.com/apache/flink/pull/14085/commits/bc9caab71f51024d1b48c6ee1a3f79777624b6bb#diff-6cc72acf0893bbaadc5b610afbbbae23227971cf5c7d0743dd4b997236baf771R450. Appreciate the fix.
But we may not move to Flink 1.12 in the near future. Does the community have a plan to push the fix back to 1.11?













--

Thanks,
Hongjian Peng




At 2020-11-25 22:26:32, "Dawid Wysakowicz" <dw...@apache.org> wrote:

Hi,

Just wanted to comment on:

How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type.

I've also spotted that problem and it will be fixed in 1.12 in https://issues.apache.org/jira/browse/FLINK-20175

Best,

Dawid


On 25/11/2020 09:44, Hongjian Peng wrote:

In Flink 1.10, we can pass this schema with 'format.avro-schema' property to SQL DDL, but in Flink 1.11, the Avro schema is always derived from the table schema.
We have two questions about the Flink 1.11 Avro format:


1. Flink 1.11 maps nullable types to Avro union(something, null). How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type. With Flink default mapping, the AvroRowDataDeserializationSchema will get the wrong/error result when lists 'null' as the first type of union.


2. Now if we want to set the field in a Row to non-nullable, we have to set the Row to non-nullable firstly. Is it the expected behavior of the Avro format?
For example,
event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> will be maped to Avro schema:


{
  "type": "record",
  "name": "event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": ["long",null]
          },
          {
            "name": "sentTimestamp",
             "type": ["long",null]
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      }
    }
]
}


We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of fields: createTimestamp and sentTimestamp works.













At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:

Hi Flink Community,


We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used Kafka source table, and the data is stored in Kafka in Avro format.
Schema is like this:


{
  "type": "record",
  "name": "event",
  "namespace": "busseniss.event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": "long"
          },
          {
            "name": "sentTimestamp",
            "type": "long"
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      },
      "doc": "Rheos header "
    },
    {
      "name": "si",
      "type": [
        "null",
        "string"
      ]
    }
]
}


















--

Hongjian Peng
Department of Computer Science and Engineering
Shanghai Jiao Tong University
Email: superphj@163.com

Re: Flink 1.11 avro format question

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Just wanted to comment on:

How to map the nullable types to union(null, something)? In our schema
definition, we follow the Avro recommended definition, list 'null' as
the first type.

I've also spotted that problem and it will be fixed in 1.12 in
https://issues.apache.org/jira/browse/FLINK-20175

Best,

Dawid

On 25/11/2020 09:44, Hongjian Peng wrote:
> In Flink 1.10, we can pass this schema with 'format.avro-schema'
> property to SQL DDL, but in Flink 1.11, the Avro schema is always
> derived from the table schema.
> We have two questions about the Flink 1.11 Avro format:
>
> 1. Flink 1.11 maps nullable types to Avro union(something, null). How
> to map the nullable types to union(null, something)? In our schema
> definition, we follow the Avro recommended definition, list 'null' as
> the first type. With Flink default mapping, the
> AvroRowDataDeserializationSchema will get the wrong/error result when
> lists 'null' as the first type of union.
>
> 2. Now if we want to set the field in a Row to non-nullable, we have
> to set the Row to non-nullable firstly. Is it the expected behavior of
> the Avro format?
> For example,
> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT
> NULL, eventId VARCHAR> will be maped to Avro schema:
>
> {
>   "type": "record",
>   "name": "event",
>   "fields": [
>     {
>       "name": "header",
>       "type": {
>         "type": "record",
>         "name": "header",
>         "fields": [
>           {
>             "name": "createTimestamp",
>             "type": ["long",null]
>           },
>           {
>             "name": "sentTimestamp",
>              "type": ["long",null]
>           },
>           {
>             "name": "eventId",
>             "type": [
>               "null",
>               {
>                 "type": "string",
>                 "avro.java.string": "String"
>               }
>             ]
>           }
>         ]
>       }
>     }
> ]
> }
>
> We have to use "event ROW< createTimestamp BIGINT NOT NULL,
> sentTimestamp BIGINT NOT NULL, eventId VARCHAR>NOT NULL" to make the
> NOT NULL of fields: createTimestamp and sentTimestamp works.
>
>
>
>
>
> At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:
>
>     Hi Flink Community,
>
>     We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We
>     used Kafka source table, and the data is stored in Kafka in Avro
>     format.
>     Schema is like this:
>
>     {
>       "type": "record",
>       "name": "event",
>       "namespace": "busseniss.event",
>       "fields": [
>         {
>           "name": "header",
>           "type": {
>             "type": "record",
>             "name": "header",
>             "fields": [
>               {
>                 "name": "createTimestamp",
>                 "type": "long"
>               },
>               {
>                 "name": "sentTimestamp",
>                 "type": "long"
>               },
>               {
>                 "name": "eventId",
>                 "type": [
>                   "null",
>                   {
>                     "type": "string",
>                     "avro.java.string": "String"
>                   }
>                 ]
>               }
>             ]
>           },
>           "doc": "Rheos header "
>         },
>         {
>           "name": "si",
>           "type": [
>             "null",
>             "string"
>           ]
>         }
>     ]
>     }
>
>
>
>
>
>
>     --
>
>     Hongjian Peng
>     Department of Computer Science and Engineering
>     Shanghai Jiao Tong University
>     Email: superphj@163.com <ma...@163.com>
>

Re:Flink 1.11 avro format question

Posted by Hongjian Peng <su...@163.com>.
In Flink 1.10, we can pass this schema with 'format.avro-schema' property to SQL DDL, but in Flink 1.11, the Avro schema is always derived from the table schema.
We have two questions about the Flink 1.11 Avro format:


1. Flink 1.11 maps nullable types to Avro union(something, null). How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type. With Flink default mapping, the AvroRowDataDeserializationSchema will get the wrong/error result when lists 'null' as the first type of union.


2. Now if we want to set the field in a Row to non-nullable, we have to set the Row to non-nullable firstly. Is it the expected behavior of the Avro format?
For example,
event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> will be maped to Avro schema:


{
  "type": "record",
  "name": "event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": ["long",null]
          },
          {
            "name": "sentTimestamp",
             "type": ["long",null]
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      }
    }
]
}


We have to use "event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT NULL, eventId VARCHAR> NOT NULL" to make the NOT NULL of fields: createTimestamp and sentTimestamp works.













At 2020-11-25 16:12:58, "Hongjian Peng" <su...@163.com> wrote:

Hi Flink Community,


We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We used Kafka source table, and the data is stored in Kafka in Avro format.
Schema is like this:


{
  "type": "record",
  "name": "event",
  "namespace": "busseniss.event",
  "fields": [
    {
      "name": "header",
      "type": {
        "type": "record",
        "name": "header",
        "fields": [
          {
            "name": "createTimestamp",
            "type": "long"
          },
          {
            "name": "sentTimestamp",
            "type": "long"
          },
          {
            "name": "eventId",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ]
          }
        ]
      },
      "doc": "Rheos header "
    },
    {
      "name": "si",
      "type": [
        "null",
        "string"
      ]
    }
]
}


















--

Hongjian Peng
Department of Computer Science and Engineering
Shanghai Jiao Tong University
Email: superphj@163.com