You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by LakeShen <sh...@gmail.com> on 2019/12/26 03:43:53 UTC

Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
    'format.type' = 'json',
    'format.property-version' = '1',
    'format.derive-schema' = 'true',
    'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.

Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

Posted by Jark Wu <im...@gmail.com>.
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen <sh...@gmail.com> wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>    ...
>     'format.type' = 'json',
>     'format.property-version' = '1',
>     'format.derive-schema' = 'true',
>     'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>

Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

Posted by Jark Wu <im...@gmail.com>.
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen <sh...@gmail.com> wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>    ...
>     'format.type' = 'json',
>     'format.property-version' = '1',
>     'format.derive-schema' = 'true',
>     'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>

Re: Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

Posted by Jark Wu <im...@gmail.com>.
Hi LakeShen,

I'm sorry there is no such configuration for json format currently.
I think it makes sense to add such configuration like
'format.ignore-parse-errors' in csv format.
I created FLINK-15396[1] to track this.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15396

On Thu, 26 Dec 2019 at 11:44, LakeShen <sh...@gmail.com> wrote:

> Hi community,when I write the flink ddl sql like this:
>
> CREATE TABLE kafka_src (
>   id varchar,
>   a varchar,
>   b TIMESTAMP,
>   c TIMESTAMP
> )
>   with (
>    ...
>     'format.type' = 'json',
>     'format.property-version' = '1',
>     'format.derive-schema' = 'true',
>     'update-mode' = 'append'
> );
>
> If the message is not the json format ,there is a error in the log。
> My question is that how to deal with the message which it not json format?
> My thought is that I can catch the exception
> in JsonRowDeserializationSchema deserialize() method,is there any
> parameters to do this?
> Thanks your replay.
>
>