You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Yan Zhou [FDS Science]" <yz...@coupang.com> on 2018/03/08 03:28:23 UTC

flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Hi experts,

I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:


There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.

However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.


DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...



Best

Yan



Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Yan,

This is a bug in flink. As a workaround, you can cast eventTime to other
basic sql types(for example, cast eventTime as varchar).

@Timo and @Xingcan, I think we have to materialize time indicators in
conditions of LogicalFilter. I created an issue and we can have
more discussions there[1].

[1] https://issues.apache.org/jira/browse/FLINK-8898

Best, Hequn

On Thu, Mar 8, 2018 at 8:59 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Xingcan,
>
> thanks for looking into this. This definitely seems to be a bug. Maybe in
> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case
> we should create an issue for it.
>
> Regards,
> Timo
>
>
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>
> Hi Xingcan,
>
>
> Thanks for your help. Attached is a sample code that can reproduce the
> problem.
>
> When I was writing the sample code, if I remove the `distinct` keyword in
> select clause, the AssertionError doesn't occur.
>
>
> *String sql1 = "select distinct id, eventTs, count(*) over (partition by
> id order by eventTs rows between 100 preceding and current row) as cnt1
> from myTable";*
>
>
> Best
> Yan
> ------------------------------
> *From:* xccui-foxmail <xi...@gmail.com> <xi...@gmail.com>
> *Sent:* Wednesday, March 7, 2018 8:10 PM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan,
>
> I’d like to look into this. Can you share more about your queries and the
> full stack trace?
>
> Thank,
> Xingcan
>
> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
> wrote:
>
> Hi experts,
> I am using flink table api to join two tables, which are datastream
> underneath. However, I got an assertion error of "java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>
> There in only one kafka data source, which is then converted to Table and
> registered. One existed column is set as rowtime(event time) attribute. Two
> over-window aggregation queries are run against the table and two tables
> are created as results. Everything works great so far.
> However when timed-window joining two result tables with inherented
> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1
> TIMESTAMP(3)" AssertionError. Can someone let me know what is the
> possible cause? F.Y.I., I rename the rowtime column for one of the result
> table.
>
> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>
> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>
> tableEnv.registerTable(tableName, table);
>
> Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over
> ...  from ...");
>
> Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as
> r_event_time*, count (*) over ...  from ...");
>
> left.join(right).where("id = r_id && eventTime === r_event_time)
>
> .addSink(...); // here calcite throw exception: java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)
>
> source table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_1 table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_2 table
>  |-- rid: Long
>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>
>
> Best
> Yan
>
>
>
>

Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Hequn Cheng <ch...@gmail.com>.
I think it would be easier if we cast eventTs&r_eventTs as TIMESTAMP and do
non-window join. Something like:

    val sql1 = "select distinct id, cast(eventTs as timestamp) as eventTs,
> " +
>       "count(*) over (partition by id order by eventTs rows" +
>       " between 100 preceding and current row) as cnt1 from myTable"
>     val sql2 = "select distinct id as r_id, cast(eventTs as timestamp) as
> r_eventTs, " +
>       "count(*) over (partition by id " +
>       "order by eventTs rows between 50 preceding and current row) as cnt2
> from myTable"
>     val left = tEnv.sqlQuery(sql1)
>     val right = tEnv.sqlQuery(sql2)
>     left.join(right).where("id = r_id && eventTs === r_eventTs")


Hope this helps.

On Fri, Mar 9, 2018 at 10:51 PM, Timo Walther <tw...@apache.org> wrote:

> Another workaround would be to split the query into two Table API parts.
>
> You could do the join, convert into a data stream, and convert into table
> again. The optimizer does not optimize over DataStream API calls.
>
> What also should work is to cast your eventTs to TIMESTAMP as early as
> possible to prevent this bug.
>
> Let us know if this helped. I think this bug has a good chance to be fixed
> in 1.5.0 which will be released soon.
>
> Regards,
> Timo
>
> Am 3/9/18 um 3:28 PM schrieb Xingcan Cui:
>
> Hi Yan,
>
> I think you could try that as a workaround. Don’t forget to follow the
> DataStreamWindowJoin
> <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> to
> hold back watermarks. We’ll continue improving the SQL/Table API part.
>
> Best,
> Xingcan
>
>
> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
> wrote:
>
> Hi Xingcan, Timo,
>
> Thanks for the information.
> I am going to convert the result table to DataStream and follow the logic
> of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do
> this? Is there any concern from performance or stability perspective?
>
> Best
> Yan
>
> ------------------------------
> *From:* Xingcan Cui <xi...@gmail.com>
> *Sent:* Thursday, March 8, 2018 8:21:42 AM
> *To:* Timo Walther
> *Cc:* user; Yan Zhou [FDS Science]
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan & Timo,
>
> this is confirmed to be a bug and I’ve created an issue [1] for it.
>
> I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT
> keyword will be implemented with an aggregation, which outputs a retract
> stream [2]. In that situation, all the time-related fields will be
> materialized as if they were common fields (with the timestamp type).
> Currently, due to the semantics problem, the time-windowed join cannot be
> performed on retract streams. But you could try non-windowed join [3] after
> we fix this.
>
> Best,
> Xingcan
>
> [1] https://issues.apache.org/jira/browse/FLINK-8897
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/streaming.html#table-to-stream-conversion
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
> On 8 Mar 2018, at 8:59 PM, Timo Walther <tw...@apache.org> wrote:
>
> Hi Xingcan,
>
> thanks for looking into this. This definitely seems to be a bug. Maybe in
> the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case
> we should create an issue for it.
>
> Regards,
> Timo
>
>
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>
> Hi Xingcan,
>
> Thanks for your help. Attached is a sample code that can reproduce the
> problem.
> When I was writing the sample code, if I remove the `distinct` keyword in
> select clause, the AssertionError doesn't occur.
>
> *String sql1 = "select distinct id, eventTs, count(*) over (partition by
> id order by eventTs rows between 100 preceding and current row) as cnt1
> from myTable";*
>
>
> Best
> Yan
> ------------------------------
>
> *From:* xccui-foxmail <xi...@gmail.com> <xi...@gmail.com>
> *Sent:* Wednesday, March 7, 2018 8:10 PM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql timed-window join throw "mismatched type"
> AssertionError on rowtime column
>
> Hi Yan,
>
> I’d like to look into this. Can you share more about your queries and the
> full stack trace?
>
> Thank,
> Xingcan
>
> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com>
> wrote:
>
> Hi experts,
> I am using flink table api to join two tables, which are datastream
> underneath. However, I got an assertion error of "java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>
> There in only one kafka data source, which is then converted to Table and
> registered. One existed column is set as rowtime(event time) attribute. Two
> over-window aggregation queries are run against the table and two tables
> are created as results. Everything works great so far.
> However when timed-window joining two result tables with inherented
> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1
> TIMESTAMP(3)" AssertionError. Can someone let me know what is the
> possible cause? F.Y.I., I rename the rowtime column for one of the result
> table.
>
> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>
> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>
> tableEnv.registerTable(tableName, table);
>
> Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over
> ...  from ...");
>
> Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as
> r_event_time*, count (*) over ...  from ...");
>
> left.join(right).where("id = r_id && eventTime === r_event_time)
>
> .addSink(...); // here calcite throw exception: java.lang.AssertionError:
> mismatched type $1 TIMESTAMP(3)
>
> source table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_1 table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>
> result_2 table
>  |-- rid: Long
>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>
>
> Best
> Yan
>
>
>
>

Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Timo Walther <tw...@apache.org>.
Another workaround would be to split the query into two Table API parts.

You could do the join, convert into a data stream, and convert into 
table again. The optimizer does not optimize over DataStream API calls.

What also should work is to cast your eventTs to TIMESTAMP as early as 
possible to prevent this bug.

Let us know if this helped. I think this bug has a good chance to be 
fixed in 1.5.0 which will be released soon.

Regards,
Timo

Am 3/9/18 um 3:28 PM schrieb Xingcan Cui:
> Hi Yan,
>
> I think you could try that as a workaround. Don’t forget to follow the 
> DataStreamWindowJoin 
> <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> to 
> hold back watermarks. We’ll continue improving the SQL/Table API part.
>
> Best,
> Xingcan
>
>
>> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yzhou@coupang.com 
>> <ma...@coupang.com>> wrote:
>>
>> Hi Xingcan, Timo,
>>
>> Thanks for the information.
>> I am going to convert the result table to DataStream and follow the 
>> logic of TimeBoundedStreamInnerJoin to do the timed-window join. 
>> Should I do this? Is there any concern from performance or stability 
>> perspective?
>>
>> Best
>> Yan
>>
>> ------------------------------------------------------------------------
>> *From:*Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>>
>> *Sent:*Thursday, March 8, 2018 8:21:42 AM
>> *To:*Timo Walther
>> *Cc:*user; Yan Zhou [FDS Science]
>> *Subject:*Re: flink sql timed-window join throw "mismatched type" 
>> AssertionError on rowtime column
>> Hi Yan & Timo,
>>
>> this is confirmed to be a bug and I’ve created an issue [1] for it.
>>
>> I’ll explain more about this query. In Flink SQL/Table API, the 
>> DISTINCT keyword will be implemented with an aggregation, which 
>> outputs a retract stream [2]. In that situation, all the time-related 
>> fields will be materialized as if they were common fields (with the 
>> timestamp type). Currently, due to the semantics problem, the 
>> time-windowed join cannot be performed on retract streams. But you 
>> could try non-windowed join [3] after we fix this.
>>
>> Best,
>> Xingcan
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8897
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>>
>>> On 8 Mar 2018, at 8:59 PM, Timo Walther <twalthr@apache.org 
>>> <ma...@apache.org>> wrote:
>>>
>>> Hi Xingcan,
>>>
>>> thanks for looking into this. This definitely seems to be a bug. 
>>> Maybe in the 
>>> org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any 
>>> case we should create an issue for it.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>>>> Hi Xingcan,
>>>>
>>>> Thanks for your help. Attached is a sample code that can reproduce 
>>>> the problem.
>>>> When I was writing the sample code, if I remove the `distinct` 
>>>> keyword in select clause, the AssertionError doesn't occur.
>>>>
>>>>     /String sql1 = "select*distinct*id, eventTs, count(*) over
>>>>     (partition by id order by eventTs rows between 100 preceding
>>>>     and current row) as cnt1 from myTable";/
>>>>
>>>>
>>>> Best
>>>> Yan
>>>> ------------------------------------------------------------------------
>>>> *From:*xccui-foxmail<xi...@gmail.com> <ma...@gmail.com>
>>>> *Sent:*Wednesday, March 7, 2018 8:10 PM
>>>> *To:*Yan Zhou [FDS Science]
>>>> *Cc:*user@flink.apache.org <ma...@flink.apache.org>
>>>> *Subject:*Re: flink sql timed-window join throw "mismatched type" 
>>>> AssertionError on rowtime column
>>>> Hi Yan,
>>>>
>>>> I’d like to look into this. Can you share more about your queries 
>>>> and the full stack trace?
>>>>
>>>> Thank,
>>>> Xingcan
>>>>
>>>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] 
>>>>> <yzhou@coupang.com <ma...@coupang.com>> wrote:
>>>>>
>>>>> Hi experts,
>>>>> I am using flink table api to join two tables, which are 
>>>>> datastream underneath. However, I got an assertion error 
>>>>> of"java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on 
>>>>> rowtime column. Below is more details:
>>>>>
>>>>> There in only one kafka data source, which is then converted to 
>>>>> Table and registered. One existed column is set as rowtime(event 
>>>>> time) attribute. Two over-window aggregation queries are run 
>>>>> against the table and two tables are created as results. 
>>>>> Everything works great so far.
>>>>> However when timed-window joining two result tables with 
>>>>> inherented rowtime, calcite throw the "java.lang.AssertionError: 
>>>>> mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let 
>>>>> me know what is the possible cause? F.Y.I., I rename the rowtime 
>>>>> column for one of the result table.
>>>>>
>>>>>     DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>>>>
>>>>>     Table table = tableEnv.fromDataStream(dataStream, "col1",
>>>>>     "col2", ...);
>>>>>
>>>>>     tableEnv.registerTable(tableName, table);
>>>>>
>>>>>     Table left = tableEnv.sqlQuery("select id,*eventTime*,count
>>>>>     (*) over ...  from ...");
>>>>>
>>>>>     Table right = tableEnv.sqlQuery("select id as r_id,*eventTime
>>>>>     as r_event_time*, count (*) over ... from ...");
>>>>>
>>>>>     left.join(right).where("id = r_id && eventTime === r_event_time)
>>>>>
>>>>>     .addSink(...); // here calcite throw exception:
>>>>>     java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)
>>>>>
>>>>>     source table
>>>>>      |-- id: Long
>>>>>      |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>>      |-- ...
>>>>>      |-- ...
>>>>>     result_1 table
>>>>>      |-- id: Long
>>>>>      |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>>      |-- ...
>>>>>      |-- ...
>>>>>     result_2 table
>>>>>      |-- rid: Long
>>>>>      |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>>>      |-- ...
>>>>>
>>>>>
>>>>> Best
>>>>> Yan
>


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Yan,

I think you could try that as a workaround. Don’t forget to follow the DataStreamWindowJoin <https://github.com/apache/flink/blob/fddedda78ad03f1141f3e32f0e0f39c2e045df0e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala> to hold back watermarks. We’ll continue improving the SQL/Table API part.

Best,
Xingcan


> On 9 Mar 2018, at 4:08 AM, Yan Zhou [FDS Science] <yz...@coupang.com> wrote:
> 
> Hi Xingcan, Timo, 
> 
> Thanks for the information. 
> I am going to convert the result table to DataStream and follow the logic of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is there any concern from performance or stability perspective?
> 
> Best
> Yan
> 
> From: Xingcan Cui <xi...@gmail.com>
> Sent: Thursday, March 8, 2018 8:21:42 AM
> To: Timo Walther
> Cc: user; Yan Zhou [FDS Science]
> Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column
>  
> Hi Yan & Timo,
> 
> this is confirmed to be a bug and I’ve created an issue [1] for it.
> 
> I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this.
> 
> Best,
> Xingcan
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8897 <https://issues.apache.org/jira/browse/FLINK-8897>
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion>
> [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>
> 
>> On 8 Mar 2018, at 8:59 PM, Timo Walther <twalthr@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Xingcan,
>> 
>> thanks for looking into this. This definitely seems to be a bug. Maybe in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should create an issue for it.
>> 
>> Regards,
>> Timo
>> 
>> 
>> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>>> Hi Xingcan,
>>> 
>>> Thanks for your help. Attached is a sample code that can reproduce the problem.
>>> When I was writing the sample code, if I remove the `distinct` keyword in select clause, the AssertionError doesn't occur.
>>> 
>>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable";
>>> 
>>> Best
>>> Yan
>>>  
>>> From: xccui-foxmail <xi...@gmail.com> <ma...@gmail.com>
>>> Sent: Wednesday, March 7, 2018 8:10 PM
>>> To: Yan Zhou [FDS Science]
>>> Cc: user@flink.apache.org <ma...@flink.apache.org>
>>> Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column
>>>  
>>> Hi Yan,
>>> 
>>> I’d like to look into this. Can you share more about your queries and the full stack trace?
>>> 
>>> Thank,
>>> Xingcan
>>> 
>>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yzhou@coupang.com <ma...@coupang.com>> wrote:
>>>> 
>>>> Hi experts, 
>>>> I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>>>> 
>>>> There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.
>>>> However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>>> 
>>>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>>> tableEnv.registerTable(tableName, table);
>>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");
>>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");
>>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3) 
>>>> 
>>>> source table
>>>>  |-- id: Long
>>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>>  |-- ...
>>>>  
>>>> result_1 table
>>>>  |-- id: Long
>>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>>  |-- ...
>>>>  
>>>> result_2 table
>>>>  |-- rid: Long
>>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- ...
>>>> 
>>>> 
>>>> Best
>>>> Yan


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Hi Xingcan, Timo,

Thanks for the information.
I am going to convert the result table to DataStream and follow the logic of TimeBoundedStreamInnerJoin to do the timed-window join. Should I do this? Is there any concern from performance or stability perspective?

Best
Yan


________________________________
From: Xingcan Cui <xi...@gmail.com>
Sent: Thursday, March 8, 2018 8:21:42 AM
To: Timo Walther
Cc: user; Yan Zhou [FDS Science]
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
[3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins

On 8 Mar 2018, at 8:59 PM, Timo Walther <tw...@apache.org>> wrote:

Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should create an issue for it.

Regards,
Timo


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
Hi Xingcan,

Thanks for your help. Attached is a sample code that can reproduce the problem.
When I was writing the sample code, if I remove the `distinct` keyword in select clause, the AssertionError doesn't occur.

String sql1 = "select distinct id, eventTs, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable";

Best
Yan
________________________________
From: xccui-foxmail <xi...@gmail.com>
Sent: Wednesday, March 7, 2018 8:10 PM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and the full stack trace?

Thank,
Xingcan

On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com>> wrote:

Hi experts,
I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:

There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.
However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.


DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...


Best
Yan





Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897 <https://issues.apache.org/jira/browse/FLINK-8897>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion>
[3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>

> On 8 Mar 2018, at 8:59 PM, Timo Walther <tw...@apache.org> wrote:
> 
> Hi Xingcan,
> 
> thanks for looking into this. This definitely seems to be a bug. Maybe in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we should create an issue for it.
> 
> Regards,
> Timo
> 
> 
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>> Hi Xingcan,
>> 
>> Thanks for your help. Attached is a sample code that can reproduce the problem.
>> When I was writing the sample code, if I remove the `distinct` keyword in select clause, the AssertionError doesn't occur.
>> 
>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable";
>> 
>> Best
>> Yan
>> From: xccui-foxmail <xi...@gmail.com> <ma...@gmail.com>
>> Sent: Wednesday, March 7, 2018 8:10 PM
>> To: Yan Zhou [FDS Science]
>> Cc: user@flink.apache.org <ma...@flink.apache.org>
>> Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column
>>  
>> Hi Yan,
>> 
>> I’d like to look into this. Can you share more about your queries and the full stack trace?
>> 
>> Thank,
>> Xingcan
>> 
>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yzhou@coupang.com <ma...@coupang.com>> wrote:
>>> 
>>> Hi experts, 
>>> I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>>> 
>>> There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.
>>> However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>> 
>>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>> tableEnv.registerTable(tableName, table);
>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");
>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");
>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3) 
>>> 
>>> source table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_1 table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_2 table
>>>  |-- rid: Long
>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>> 
>>> 
>>> Best
>>> Yan
>> 
> 
> 


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by Timo Walther <tw...@apache.org>.
Hi Xingcan,

thanks for looking into this. This definitely seems to be a bug. Maybe 
in the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any 
case we should create an issue for it.

Regards,
Timo


Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>
> Hi Xingcan,
>
>
> Thanks for your help. Attached is a sample code that can reproduce the 
> problem.
>
> When I was writing the sample code, if I remove the `distinct` keyword 
> in select clause, the AssertionError doesn't occur.
>
>
>     /String sql1 = "select *distinct* id, eventTs, count(*) over
>     (partition by id order by eventTs rows between 100 preceding and
>     current row) as cnt1 from myTable";/
>
>
> Best
> Yan
> ------------------------------------------------------------------------
> *From:* xccui-foxmail <xi...@gmail.com>
> *Sent:* Wednesday, March 7, 2018 8:10 PM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql timed-window join throw "mismatched type" 
> AssertionError on rowtime column
> Hi Yan,
>
> I’d like to look into this. Can you share more about your queries and 
> the full stack trace?
>
> Thank,
> Xingcan
>
>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yzhou@coupang.com 
>> <ma...@coupang.com>> wrote:
>>
>> Hi experts,
>> I am using flink table api to join two tables, which are datastream 
>> underneath. However, I got an assertion error 
>> of"java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on 
>> rowtime column. Below is more details:
>>
>> There in only one kafka data source, which is then converted to Table 
>> and registered. One existed column is set as rowtime(event time) 
>> attribute. Two over-window aggregation queries are run against the 
>> table and two tables are created as results. Everything works great 
>> so far.
>> However when timed-window joining two result tables with inherented 
>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type 
>> $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the 
>> possible cause? F.Y.I., I rename the rowtime column for one of the 
>> result table.
>>
>>     DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>
>>     Table table = tableEnv.fromDataStream(dataStream, "col1", "col2",
>>     ...);
>>
>>     tableEnv.registerTable(tableName, table);
>>
>>     Table left = tableEnv.sqlQuery("select id,*eventTime*,count (*)
>>     over ...  from ...");
>>
>>     Table right = tableEnv.sqlQuery("select id as r_id,*eventTime as
>>     r_event_time*, count (*) over ...  from ...");
>>
>>     left.join(right).where("id = r_id && eventTime === r_event_time)
>>
>>     .addSink(...); // here calcite throw exception:
>>     java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)
>>
>>     source table
>>      |-- id: Long
>>      |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>      |-- ...
>>      |-- ...
>>     result_1 table
>>      |-- id: Long
>>      |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>      |-- ...
>>      |-- ...
>>     result_2 table
>>      |-- rid: Long
>>      |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>      |-- ...
>>
>>
>> Best
>> Yan
>


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Hi Xingcan,


Thanks for your help. Attached is a sample code that can reproduce the problem.

When I was writing the sample code, if I remove the `distinct` keyword in select clause, the AssertionError doesn't occur.


String sql1 = "select distinct id, eventTs, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable";

Best
Yan
________________________________
From: xccui-foxmail <xi...@gmail.com>
Sent: Wednesday, March 7, 2018 8:10 PM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org
Subject: Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Hi Yan,

I’d like to look into this. Can you share more about your queries and the full stack trace?

Thank,
Xingcan

On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com>> wrote:

Hi experts,
I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:

There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.
However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.


DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);

Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);

tableEnv.registerTable(tableName, table);

Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");

Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");

left.join(right).where("id = r_id && eventTime === r_event_time)

.addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)

source table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_1 table
 |-- id: Long
 |-- eventTime: TimeIndicatorTypeInfo(rowtime)
 |-- ...
 |-- ...

result_2 table
 |-- rid: Long
 |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
 |-- ...



Best
Yan


Re: flink sql timed-window join throw "mismatched type" AssertionError on rowtime column

Posted by xccui-foxmail <xi...@gmail.com>.
Hi Yan,

I’d like to look into this. Can you share more about your queries and the full stack trace?

Thank,
Xingcan

> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yzhou@coupang.com <ma...@coupang.com>> wrote:
> 
> Hi experts, 
> I am using flink table api to join two tables, which are datastream underneath. However, I got an assertion error of "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
> 
> There in only one kafka data source, which is then converted to Table and registered. One existed column is set as rowtime(event time) attribute. Two over-window aggregation queries are run against the table and two tables are created as results. Everything works great so far.
> However when timed-window joining two result tables with inherented rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible cause? F.Y.I., I rename the rowtime column for one of the result table.  
> 
> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
> tableEnv.registerTable(tableName, table);
> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  from ...");
> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as r_event_time, count (*) over ...  from ...");
> left.join(right).where("id = r_id && eventTime === r_event_time)
> .addSink(...); // here calcite throw exception: java.lang.AssertionError: mismatched type $1 TIMESTAMP(3) 
> 
> source table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>  
> result_1 table
>  |-- id: Long
>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
>  |-- ...
>  
> result_2 table
>  |-- rid: Long
>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>  |-- ...
> 
> 
> Best
> Yan