You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sunfulin <su...@163.com> on 2020/02/07 06:57:38 UTC

Flink DataTypes json parse exception

Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :



.withSchema(
    new Schema()
            // eventTime
            .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
                new Rowtime()
                    .timestampsFromField("recv_time")
                    .watermarksPeriodicBounded(1000)
            )
            .field("user_id", DataTypes.STRING())






But, I am running an issue and got exception like the following:


Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)


Re:Re: Flink DataTypes json parse exception

Posted by sunfulin <su...@163.com>.
Hi, 
I am using the latest Flink 1.10 rc. When I run the same code using Flink 1.8.2, there is no problem. But using 1.10 the issue just occur. 
Confused by the related reason.











At 2020-02-11 18:33:50, "Timo Walther" <tw...@apache.org> wrote:
>Hi,
>
>from which Flink version are you upgrading? There were some changes in 
>1.9 for how to parse timestamps in JSON format.
>
>Your error might be related to those changes:
>
>https://issues.apache.org/jira/browse/FLINK-11727
>
>I hope this helps.
>
>Timo
>
>
>On 07.02.20 07:57, sunfulin wrote:
>> Hi, guys
>> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
>> I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :
>> 
>> 
>> 
>> .withSchema(
>>      new Schema()
>>              // eventTime
>>              .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>>                  new Rowtime()
>>                      .timestampsFromField("recv_time")
>>                      .watermarksPeriodicBounded(1000)
>>              )
>>              .field("user_id", DataTypes.STRING())
>> 
>> 
>> 
>> 
>> 
>> 
>> But, I am running an issue and got exception like the following:
>> 
>> 
>> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
>> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> 

Re: Flink DataTypes json parse exception

Posted by Timo Walther <tw...@apache.org>.
Hi,

from which Flink version are you upgrading? There were some changes in 
1.9 for how to parse timestamps in JSON format.

Your error might be related to those changes:

https://issues.apache.org/jira/browse/FLINK-11727

I hope this helps.

Timo


On 07.02.20 07:57, sunfulin wrote:
> Hi, guys
> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination.
> I am reading and consuming records from kafka with json schema like   {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :
> 
> 
> 
> .withSchema(
>      new Schema()
>              // eventTime
>              .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>                  new Rowtime()
>                      .timestampsFromField("recv_time")
>                      .watermarksPeriodicBounded(1000)
>              )
>              .field("user_id", DataTypes.STRING())
> 
> 
> 
> 
> 
> 
> But, I am running an issue and got exception like the following:
> 
> 
> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0
> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>