You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tom Thornton <th...@yelp.com> on 2022/03/16 22:59:19 UTC

Potential Bug with Date Serialization for Table Stream

Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>, I'm
hoping to confirm whether or not an error we are seeing is a bug with
Flink. We have a job that uses a Kafka source to read Avro records. The
kafka source is converted into a StreamTableSource. We are using the new
Blink table planner to execute SQL on the table stream. The output is then
put in a sink back to kafka as Avro records. Whenever a query selects a
column that has an avro logicalType of date, we get this error (link to full
stack trace <https://pastebin.com/raw/duQaTAh6>).

Caused by: java.lang.ClassCastException: class java.sql.Date cannot be
cast to class java.time.LocalDate (java.sql.Date is in module java.sql
of loader 'platform'; java.time.LocalDate is in module java.base of
loader 'bootstrap')
        at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
        at java.base/java.lang.Thread.run(Thread.java:829)


The avro schema definition for a date field is as follows:

            {
                "name": "date",
                "type": {
                    "type": "int",
                    "logicalType": "date"
                },
                "doc": "date"
            },

Any query that selects a date column would produce the error (and any
query without a column with type date will work). Example of a query
that causes the error:

select `date` from table1

As suggested in the docs, I also tried this with parent-first loading
and got the same error. When we run the same job without the Blink
table planner, i.e., useOldPlanner(), we do not get this error. Is
this a bug with Flink? Or is there something we can change in the
application code to prevent this error? Any help/suggestions would be
appreciated.

Re: [External] Re: Potential Bug with Date Serialization for Table Stream

Posted by Tom Thornton <th...@yelp.com>.
Hi Martijn,

Thank you for following up on this. We ended up changing two parts:

When creating the DataType we instead used

new AtomicDataType(new DateType(false), java.sql.Date.class);

So we could override the conversion class for the constructor
<https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/types/AtomicDataType.html#AtomicDataType-org.apache.flink.table.types.logical.LogicalType-java.lang.Class->.
We also changed the logic when converting an avro schema to type
information and used the following in place of the default of
LocalTimeTypeInfo[java.time.LocalDate]

org.apache.flink.table.api.Types.SQL_DATE()

Thank you for the help. We do want to upgrade versions and would likely
help. For now we have this workaround with the current version.

On Fri, Apr 1, 2022 at 4:28 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Tom,
>
> Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
> number of improvements on CAST will be included [1] Would you be able to
> test this with the current RC0 of Flink 1.15? [2]
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
> [1] https://issues.apache.org/jira/browse/FLINK-24403
> [2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg
>
>
> On Tue, 22 Mar 2022 at 18:06, Tom Thornton <th...@yelp.com> wrote:
>
>> Hi Martijn,
>>
>> Do you know what could be causing this issue given our Flink version? Is
>> this possibly a bug with that version?
>>
>> Thanks,
>> Tom
>>
>> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <th...@yelp.com> wrote:
>>
>>> Hi Martijn,
>>>
>>> We are using 1.11.6.
>>>
>>> Thank you for the help.
>>>
>>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <ma...@apache.org>
>>> wrote:
>>>
>>>> Hi Tom,
>>>>
>>>> Which version of Flink are you using?
>>>>
>>>> Best regards,
>>>>
>>>> Martijn Visser
>>>> https://twitter.com/MartijnVisser82
>>>>
>>>>
>>>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <th...@yelp.com> wrote:
>>>>
>>>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>>>> kafka source is converted into a StreamTableSource. We are using the
>>>>> new Blink table planner to execute SQL on the table stream. The output is
>>>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>>>> a column that has an avro logicalType of date, we get this error (link to full
>>>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>>>
>>>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 'platform'; java.time.LocalDate is in module java.base of loader 'bootstrap')
>>>>>         at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>>>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>>>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>>
>>>>>
>>>>> The avro schema definition for a date field is as follows:
>>>>>
>>>>>             {
>>>>>                 "name": "date",
>>>>>                 "type": {
>>>>>                     "type": "int",
>>>>>                     "logicalType": "date"
>>>>>                 },
>>>>>                 "doc": "date"
>>>>>             },
>>>>>
>>>>> Any query that selects a date column would produce the error (and any query without a column with type date will work). Example of a query that causes the error:
>>>>>
>>>>> select `date` from table1
>>>>>
>>>>> As suggested in the docs, I also tried this with parent-first loading and got the same error. When we run the same job without the Blink table planner, i.e., useOldPlanner(), we do not get this error. Is this a bug with Flink? Or is there something we can change in the application code to prevent this error? Any help/suggestions would be appreciated.
>>>>>
>>>>>

Re: [External] Re: Potential Bug with Date Serialization for Table Stream

Posted by Martijn Visser <ma...@apache.org>.
Hi Tom,

Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
number of improvements on CAST will be included [1] Would you be able to
test this with the current RC0 of Flink 1.15? [2]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] https://issues.apache.org/jira/browse/FLINK-24403
[2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg


On Tue, 22 Mar 2022 at 18:06, Tom Thornton <th...@yelp.com> wrote:

> Hi Martijn,
>
> Do you know what could be causing this issue given our Flink version? Is
> this possibly a bug with that version?
>
> Thanks,
> Tom
>
> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <th...@yelp.com> wrote:
>
>> Hi Martijn,
>>
>> We are using 1.11.6.
>>
>> Thank you for the help.
>>
>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi Tom,
>>>
>>> Which version of Flink are you using?
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>>
>>>
>>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <th...@yelp.com> wrote:
>>>
>>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>>> kafka source is converted into a StreamTableSource. We are using the
>>>> new Blink table planner to execute SQL on the table stream. The output is
>>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>>> a column that has an avro logicalType of date, we get this error (link to full
>>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>>
>>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 'platform'; java.time.LocalDate is in module java.base of loader 'bootstrap')
>>>>         at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>
>>>>
>>>> The avro schema definition for a date field is as follows:
>>>>
>>>>             {
>>>>                 "name": "date",
>>>>                 "type": {
>>>>                     "type": "int",
>>>>                     "logicalType": "date"
>>>>                 },
>>>>                 "doc": "date"
>>>>             },
>>>>
>>>> Any query that selects a date column would produce the error (and any query without a column with type date will work). Example of a query that causes the error:
>>>>
>>>> select `date` from table1
>>>>
>>>> As suggested in the docs, I also tried this with parent-first loading and got the same error. When we run the same job without the Blink table planner, i.e., useOldPlanner(), we do not get this error. Is this a bug with Flink? Or is there something we can change in the application code to prevent this error? Any help/suggestions would be appreciated.
>>>>
>>>>

Re: [External] Re: Potential Bug with Date Serialization for Table Stream

Posted by Tom Thornton <th...@yelp.com>.
Hi Martijn,

Do you know what could be causing this issue given our Flink version? Is
this possibly a bug with that version?

Thanks,
Tom

On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <th...@yelp.com> wrote:

> Hi Martijn,
>
> We are using 1.11.6.
>
> Thank you for the help.
>
> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Tom,
>>
>> Which version of Flink are you using?
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>>
>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <th...@yelp.com> wrote:
>>
>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>> kafka source is converted into a StreamTableSource. We are using the
>>> new Blink table planner to execute SQL on the table stream. The output is
>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>> a column that has an avro logicalType of date, we get this error (link to full
>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>
>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 'platform'; java.time.LocalDate is in module java.base of loader 'bootstrap')
>>>         at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>>
>>> The avro schema definition for a date field is as follows:
>>>
>>>             {
>>>                 "name": "date",
>>>                 "type": {
>>>                     "type": "int",
>>>                     "logicalType": "date"
>>>                 },
>>>                 "doc": "date"
>>>             },
>>>
>>> Any query that selects a date column would produce the error (and any query without a column with type date will work). Example of a query that causes the error:
>>>
>>> select `date` from table1
>>>
>>> As suggested in the docs, I also tried this with parent-first loading and got the same error. When we run the same job without the Blink table planner, i.e., useOldPlanner(), we do not get this error. Is this a bug with Flink? Or is there something we can change in the application code to prevent this error? Any help/suggestions would be appreciated.
>>>
>>>

Re: [External] Re: Potential Bug with Date Serialization for Table Stream

Posted by Tom Thornton <th...@yelp.com>.
Hi Martijn,

We are using 1.11.6.

Thank you for the help.

On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Tom,
>
> Which version of Flink are you using?
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <th...@yelp.com> wrote:
>
>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>> Flink. We have a job that uses a Kafka source to read Avro records. The
>> kafka source is converted into a StreamTableSource. We are using the new
>> Blink table planner to execute SQL on the table stream. The output is then
>> put in a sink back to kafka as Avro records. Whenever a query selects a
>> column that has an avro logicalType of date, we get this error (link to full
>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>
>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 'platform'; java.time.LocalDate is in module java.base of loader 'bootstrap')
>>         at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>
>>
>> The avro schema definition for a date field is as follows:
>>
>>             {
>>                 "name": "date",
>>                 "type": {
>>                     "type": "int",
>>                     "logicalType": "date"
>>                 },
>>                 "doc": "date"
>>             },
>>
>> Any query that selects a date column would produce the error (and any query without a column with type date will work). Example of a query that causes the error:
>>
>> select `date` from table1
>>
>> As suggested in the docs, I also tried this with parent-first loading and got the same error. When we run the same job without the Blink table planner, i.e., useOldPlanner(), we do not get this error. Is this a bug with Flink? Or is there something we can change in the application code to prevent this error? Any help/suggestions would be appreciated.
>>
>>

Re: Potential Bug with Date Serialization for Table Stream

Posted by Martijn Visser <ma...@apache.org>.
Hi Tom,

Which version of Flink are you using?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Wed, 16 Mar 2022 at 23:59, Tom Thornton <th...@yelp.com> wrote:

> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>, I'm
> hoping to confirm whether or not an error we are seeing is a bug with
> Flink. We have a job that uses a Kafka source to read Avro records. The
> kafka source is converted into a StreamTableSource. We are using the new
> Blink table planner to execute SQL on the table stream. The output is then
> put in a sink back to kafka as Avro records. Whenever a query selects a
> column that has an avro logicalType of date, we get this error (link to full
> stack trace <https://pastebin.com/raw/duQaTAh6>).
>
> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 'platform'; java.time.LocalDate is in module java.base of loader 'bootstrap')
>         at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>         at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>         at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>         at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>         at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>
>
> The avro schema definition for a date field is as follows:
>
>             {
>                 "name": "date",
>                 "type": {
>                     "type": "int",
>                     "logicalType": "date"
>                 },
>                 "doc": "date"
>             },
>
> Any query that selects a date column would produce the error (and any query without a column with type date will work). Example of a query that causes the error:
>
> select `date` from table1
>
> As suggested in the docs, I also tried this with parent-first loading and got the same error. When we run the same job without the Blink table planner, i.e., useOldPlanner(), we do not get this error. Is this a bug with Flink? Or is there something we can change in the application code to prevent this error? Any help/suggestions would be appreciated.
>
>