You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2019/09/05 12:15:39 UTC
Wrong result of MATCH_RECOGNIZE clause
Hi,
I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
While testing the query in [2] on myself, I've got the different result
from [2]
The query result from [2] is as follows:
symbol start_tstamp end_tstamp avgPrice
========= ================== ================== ============
ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5
ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
The other query result from the attached maven project (which only contains
a sample program that executes the query in [2]) is as follows:
> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
>
There's just one entry, not two.
(As you might notice, the time of the first record in the attached maven
project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)
I dug into the internal implementation of CepOperator and got the
followings:
1. INPUT : ACME,1000,12.0,1
2. PARTIAL MATCH : [A*1]
3.
4. INPUT : ACME,2000,17.0,2
5. PARTIAL MATCH : [A*2]
6.
7. INPUT : ACME,3000,13.0,1
8. PARTIAL MATCH : [A*3]
9. PARTIAL MATCH : [A*1]
10.
11. INPUT : ACME,4000,16.0,3
12. PARTIAL MATCH : [A*4]
13. PARTIAL MATCH : [A*2]
14.
15. *INPUT : ACME,5000,25.0,2*
16. *COMPLETED MATCH : [A*4, B*1]*
17.
18. INPUT : ACME,6000,2.0,1
19. PARTIAL MATCH : [A*1]
20.
21. INPUT : ACME,7000,4.0,1
22. PARTIAL MATCH : [A*2]
23. PARTIAL MATCH : [A*1]
24.
25. INPUT : ACME,8000,10.0,2
26. PARTIAL MATCH : [A*3]
27. PARTIAL MATCH : [A*2]
28. PARTIAL MATCH : [A*1]
29.
30. INPUT : ACME,9000,15.0,2
31. PARTIAL MATCH : [A*4]
32. PARTIAL MATCH : [A*3]
33. PARTIAL MATCH : [A*2]
34.
35. INPUT : ACME,10000,25.0,2
36. PARTIAL MATCH : [A*5]
37. PARTIAL MATCH : [A*4]
38.
39. INPUT : ACME,11000,30.0,1
40. PARTIAL MATCH : [A*6]
My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get
a completed match (line 16) but no partial match (which is [A*1] in my
notation) starting from it.
According to the definition of "AFTER MATCH SKIP TO FIRST B", as
"ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
However, a new match starts from the next one (line 18, 19) in the above
trace.
Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at
that point is 14.3(=2+4+10+15+25+30/6) which is less than 15
so "ACME,11000,30.0,1" belongs to A, not B as shown in the example.
Is it a bug? or did I miss something conceptually?
p.s. how do you load rows from a local csv file with rowtime configured? I
don't like the way I implemented my custom table source in the attached
file which I use for testing.
Best,
Dongwon
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations
Re: Wrong result of MATCH_RECOGNIZE clause
Posted by Dian Fu <di...@gmail.com>.
I have created ticket https://issues.apache.org/jira/browse/FLINK-13999 <https://issues.apache.org/jira/browse/FLINK-13999> to track it.
> 在 2019年9月6日,下午8:58,Dian Fu <di...@gmail.com> 写道:
>
> Hi Dongwon,
>
> I guess you are right and the example is wrong. The new matching sequence should start from line "18".
>
> Regards,
> Dian
>
>> 在 2019年9月5日,下午8:28,Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> 写道:
>>
>> Oops, I think I explained something wrong in the previous email.
>> B means not A.
>> Therefore, after the completed match, there must be no new partial match starting from there.
>> There's nothing wrong with the implementation, but the example in [2] is wrong.
>>
>> Am I right?
>>
>> Best,
>> Dongwon
>>
>> On Thu, Sep 5, 2019 at 9:15 PM Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
>> Hi,
>> I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
>> While testing the query in [2] on myself, I've got the different result from [2]
>> The query result from [2] is as follows:
>> symbol start_tstamp end_tstamp avgPrice
>> ========= ================== ================== ============
>> ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5
>> ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
>> The other query result from the attached maven project (which only contains a sample program that executes the query in [2]) is as follows:
>> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
>> There's just one entry, not two.
>> (As you might notice, the time of the first record in the attached maven project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)
>>
>> I dug into the internal implementation of CepOperator and got the followings:
>> INPUT : ACME,1000,12.0,1
>> PARTIAL MATCH : [A*1]
>>
>> INPUT : ACME,2000,17.0,2
>> PARTIAL MATCH : [A*2]
>>
>> INPUT : ACME,3000,13.0,1
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*1]
>>
>> INPUT : ACME,4000,16.0,3
>> PARTIAL MATCH : [A*4]
>> PARTIAL MATCH : [A*2]
>>
>> INPUT : ACME,5000,25.0,2
>> COMPLETED MATCH : [A*4, B*1]
>>
>> INPUT : ACME,6000,2.0,1
>> PARTIAL MATCH : [A*1]
>>
>> INPUT : ACME,7000,4.0,1
>> PARTIAL MATCH : [A*2]
>> PARTIAL MATCH : [A*1]
>>
>> INPUT : ACME,8000,10.0,2
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*2]
>> PARTIAL MATCH : [A*1]
>>
>> INPUT : ACME,9000,15.0,2
>> PARTIAL MATCH : [A*4]
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*2]
>>
>> INPUT : ACME,10000,25.0,2
>> PARTIAL MATCH : [A*5]
>> PARTIAL MATCH : [A*4]
>>
>> INPUT : ACME,11000,30.0,1
>> PARTIAL MATCH : [A*6]
>>
>> My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get a completed match (line 16) but no partial match (which is [A*1] in my notation) starting from it.
>> According to the definition of "AFTER MATCH SKIP TO FIRST B", as "ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
>> However, a new match starts from the next one (line 18, 19) in the above trace.
>> Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at that point is 14.3(=2+4+10+15+25+30/6) which is less than 15 so "ACME,11000,30.0,1" belongs to A, not B as shown in the example.
>>
>> Is it a bug? or did I miss something conceptually?
>>
>> p.s. how do you load rows from a local csv file with rowtime configured? I don't like the way I implemented my custom table source in the attached file which I use for testing.
>>
>> Best,
>> Dongwon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html>
>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations>
Re: Wrong result of MATCH_RECOGNIZE clause
Posted by Dian Fu <di...@gmail.com>.
Hi Dongwon,
I guess you are right and the example is wrong. The new matching sequence should start from line "18".
Regards,
Dian
> 在 2019年9月5日,下午8:28,Dongwon Kim <ea...@gmail.com> 写道:
>
> Oops, I think I explained something wrong in the previous email.
> B means not A.
> Therefore, after the completed match, there must be no new partial match starting from there.
> There's nothing wrong with the implementation, but the example in [2] is wrong.
>
> Am I right?
>
> Best,
> Dongwon
>
> On Thu, Sep 5, 2019 at 9:15 PM Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
> While testing the query in [2] on myself, I've got the different result from [2]
> The query result from [2] is as follows:
> symbol start_tstamp end_tstamp avgPrice
> ========= ================== ================== ============
> ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5
> ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
> The other query result from the attached maven project (which only contains a sample program that executes the query in [2]) is as follows:
> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
> There's just one entry, not two.
> (As you might notice, the time of the first record in the attached maven project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)
>
> I dug into the internal implementation of CepOperator and got the followings:
> INPUT : ACME,1000,12.0,1
> PARTIAL MATCH : [A*1]
>
> INPUT : ACME,2000,17.0,2
> PARTIAL MATCH : [A*2]
>
> INPUT : ACME,3000,13.0,1
> PARTIAL MATCH : [A*3]
> PARTIAL MATCH : [A*1]
>
> INPUT : ACME,4000,16.0,3
> PARTIAL MATCH : [A*4]
> PARTIAL MATCH : [A*2]
>
> INPUT : ACME,5000,25.0,2
> COMPLETED MATCH : [A*4, B*1]
>
> INPUT : ACME,6000,2.0,1
> PARTIAL MATCH : [A*1]
>
> INPUT : ACME,7000,4.0,1
> PARTIAL MATCH : [A*2]
> PARTIAL MATCH : [A*1]
>
> INPUT : ACME,8000,10.0,2
> PARTIAL MATCH : [A*3]
> PARTIAL MATCH : [A*2]
> PARTIAL MATCH : [A*1]
>
> INPUT : ACME,9000,15.0,2
> PARTIAL MATCH : [A*4]
> PARTIAL MATCH : [A*3]
> PARTIAL MATCH : [A*2]
>
> INPUT : ACME,10000,25.0,2
> PARTIAL MATCH : [A*5]
> PARTIAL MATCH : [A*4]
>
> INPUT : ACME,11000,30.0,1
> PARTIAL MATCH : [A*6]
>
> My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get a completed match (line 16) but no partial match (which is [A*1] in my notation) starting from it.
> According to the definition of "AFTER MATCH SKIP TO FIRST B", as "ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
> However, a new match starts from the next one (line 18, 19) in the above trace.
> Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at that point is 14.3(=2+4+10+15+25+30/6) which is less than 15 so "ACME,11000,30.0,1" belongs to A, not B as shown in the example.
>
> Is it a bug? or did I miss something conceptually?
>
> p.s. how do you load rows from a local csv file with rowtime configured? I don't like the way I implemented my custom table source in the attached file which I use for testing.
>
> Best,
> Dongwon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html>
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations>
Re: Wrong result of MATCH_RECOGNIZE clause
Posted by Dongwon Kim <ea...@gmail.com>.
Oops, I think I explained something wrong in the previous email.
B means not A.
Therefore, after the completed match, there must be no new partial match
starting from there.
There's nothing wrong with the implementation, but the example in [2] is
wrong.
Am I right?
Best,
Dongwon
On Thu, Sep 5, 2019 at 9:15 PM Dongwon Kim <ea...@gmail.com> wrote:
> Hi,
> I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
> While testing the query in [2] on myself, I've got the different result
> from [2]
> The query result from [2] is as follows:
>
> symbol start_tstamp end_tstamp avgPrice
> ========= ================== ================== ============
> ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5
> ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
>
> The other query result from the attached maven project (which only
> contains a sample program that executes the query in [2]) is as follows:
>
>> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
>>
> There's just one entry, not two.
> (As you might notice, the time of the first record in the attached maven
> project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)
>
> I dug into the internal implementation of CepOperator and got the
> followings:
>
> 1. INPUT : ACME,1000,12.0,1
> 2. PARTIAL MATCH : [A*1]
> 3.
> 4. INPUT : ACME,2000,17.0,2
> 5. PARTIAL MATCH : [A*2]
> 6.
> 7. INPUT : ACME,3000,13.0,1
> 8. PARTIAL MATCH : [A*3]
> 9. PARTIAL MATCH : [A*1]
> 10.
> 11. INPUT : ACME,4000,16.0,3
> 12. PARTIAL MATCH : [A*4]
> 13. PARTIAL MATCH : [A*2]
> 14.
> 15. *INPUT : ACME,5000,25.0,2*
> 16. *COMPLETED MATCH : [A*4, B*1]*
> 17.
> 18. INPUT : ACME,6000,2.0,1
> 19. PARTIAL MATCH : [A*1]
> 20.
> 21. INPUT : ACME,7000,4.0,1
> 22. PARTIAL MATCH : [A*2]
> 23. PARTIAL MATCH : [A*1]
> 24.
> 25. INPUT : ACME,8000,10.0,2
> 26. PARTIAL MATCH : [A*3]
> 27. PARTIAL MATCH : [A*2]
> 28. PARTIAL MATCH : [A*1]
> 29.
> 30. INPUT : ACME,9000,15.0,2
> 31. PARTIAL MATCH : [A*4]
> 32. PARTIAL MATCH : [A*3]
> 33. PARTIAL MATCH : [A*2]
> 34.
> 35. INPUT : ACME,10000,25.0,2
> 36. PARTIAL MATCH : [A*5]
> 37. PARTIAL MATCH : [A*4]
> 38.
> 39. INPUT : ACME,11000,30.0,1
> 40. PARTIAL MATCH : [A*6]
>
>
> My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get
> a completed match (line 16) but no partial match (which is [A*1] in my
> notation) starting from it.
> According to the definition of "AFTER MATCH SKIP TO FIRST B", as
> "ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
> However, a new match starts from the next one (line 18, 19) in the above
> trace.
> Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at
> that point is 14.3(=2+4+10+15+25+30/6) which is less than 15
> so "ACME,11000,30.0,1" belongs to A, not B as shown in the example.
>
> Is it a bug? or did I miss something conceptually?
>
> p.s. how do you load rows from a local csv file with rowtime configured? I
> don't like the way I implemented my custom table source in the attached
> file which I use for testing.
>
> Best,
> Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations
>