You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashwin Sinha <as...@go-mmt.com> on 2018/07/06 11:45:51 UTC

Need help with event-time aggregations in Flink-1.5 sql-client queries

Hi,

We are trying to aggregate data using flink-1.5 sql client and facing some
issues with *event-time* based aggregations.

*JSON data in Kafka:*
{"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time":
"1530854716000", "number": 851}
{"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time":
"1530854720000", "number": 853}
{"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time":
"1530854722000", "number": 854}
{"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time":
"1530854724000", "number": 855}
{"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time":
"1530854726000", "number": 856}
{"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time":
"1530854728000", "number": 857}
{"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time":
"1530854730000", "number": 858}
{"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time":
"1530854732000", "number": 859}
{"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time":
"1530854736000", "number": 861}
{"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time":
"1530854742000", "number": 864}
{"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time":
"1530854746000", "number": 866}
{"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time":
"1530854748000", "number": 867}
{"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time":
"1530854750000", "number": 868}
{"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time":
"1530854752000", "number": 869}
{"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time":
"1530854754000", "number": 870}
{"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time":
"1530854758000", "number": 872}
{"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time":
"1530854760000", "number": 873}
{"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time":
"1530854764000", "number": 875}
{"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time":
"1530854770000", "number": 878}
{"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time":
"1530854772000", "number": 879}
{"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time":
"1530854780000", "number": 883}

*SQL client config yml: *https://pastebin.com/NW9zkStk


*SQL Query 1: *select * from TimestampTable




*Result 1: [image: Screen Shot 2018-07-06 at 4.44.17 PM.png]SQL Query
2: *select
HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from
TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE,
INTERVAL '1' MINUTE)
*Result 2: *Shell remains empty even after long time-
[image: Screen Shot 2018-07-06 at 4.44.33 PM.png]


Problem: We are not able to aggregate on basis of event-time, but when we
do it through processing time, we are getting results. Can someone help
here?

Also want to know if there is any plan for sinks, like Kafka, for these
query results?

-- 
*Ashwin Sinha *| Data Engineer
ashwin.sinha@go-mmt.com <sh...@go-mmt.com> | 9452075361
<https://www.makemytrip.com/> <https://www.goibibo.com/>
<https://www.redbus.in/>

-- 


::DISCLAIMER::


----------------------------------------------------------------------------------------------------------------------------------------------------





This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.

Re: Need help with event-time aggregations in Flink-1.5 sql-client queries

Posted by Ashwin Sinha <as...@go-mmt.com>.
Ok, thanks Timo!

On Fri 6 Jul, 2018, 17:34 Timo Walther, <tw...@apache.org> wrote:

> Hi Ashwin,
>
> the Kafka connector does not support emitting watermarks. You have to use
> a different watermark strategy than `from-source`.
>
> Regards,
> Timo
>
>
> Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
>
> Hi,
>
> We are trying to aggregate data using flink-1.5 sql client and facing some
> issues with *event-time* based aggregations.
>
> *JSON data in Kafka:*
> {"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time":
> "1530854716000", "number": 851}
> {"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time":
> "1530854720000", "number": 853}
> {"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time":
> "1530854722000", "number": 854}
> {"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time":
> "1530854724000", "number": 855}
> {"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time":
> "1530854726000", "number": 856}
> {"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time":
> "1530854728000", "number": 857}
> {"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time":
> "1530854730000", "number": 858}
> {"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time":
> "1530854732000", "number": 859}
> {"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time":
> "1530854736000", "number": 861}
> {"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time":
> "1530854742000", "number": 864}
> {"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time":
> "1530854746000", "number": 866}
> {"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time":
> "1530854748000", "number": 867}
> {"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time":
> "1530854750000", "number": 868}
> {"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time":
> "1530854752000", "number": 869}
> {"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time":
> "1530854754000", "number": 870}
> {"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time":
> "1530854758000", "number": 872}
> {"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time":
> "1530854760000", "number": 873}
> {"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time":
> "1530854764000", "number": 875}
> {"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time":
> "1530854770000", "number": 878}
> {"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time":
> "1530854772000", "number": 879}
> {"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time":
> "1530854780000", "number": 883}
>
> *SQL client config yml: *https://pastebin.com/NW9zkStk
>
>
> *SQL Query 1: *select * from TimestampTable
>
>
>
>
> *Result 1:  [image: Screen Shot 2018-07-06 at 4.44.17 PM.png] SQL Query
> 2: *select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
> ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from
> TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE,
> INTERVAL '1' MINUTE)
> *Result 2: *Shell remains empty even after long time-
> [image: Screen Shot 2018-07-06 at 4.44.33 PM.png]
>
>
> Problem: We are not able to aggregate on basis of event-time, but when we
> do it through processing time, we are getting results. Can someone help
> here?
>
> Also want to know if there is any plan for sinks, like Kafka, for these
> query results?
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.sinha@go-mmt.com <sh...@go-mmt.com> | 9452075361
> <https://www.makemytrip.com/> <https://www.goibibo.com/>
> <https://www.redbus.in/>
>
>
> ::DISCLAIMER::
>
>
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>
> This message is intended only for the use of the addressee and may contain
> information that is privileged, confidential and exempt from disclosure
> under applicable law. If the reader of this message is not the intended
> recipient, or the employee or agent responsible for delivering the message
> to the intended recipient, you are hereby notified that any dissemination,
> distribution or copying of this communication is strictly prohibited. If
> you have received this e-mail in error, please notify us immediately by
> return e-mail and delete this e-mail and all attachments from your system.
>
>
>

-- 


::DISCLAIMER::


----------------------------------------------------------------------------------------------------------------------------------------------------





This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.

Re: Need help with event-time aggregations in Flink-1.5 sql-client queries

Posted by Timo Walther <tw...@apache.org>.
Hi Ashwin,

the Kafka connector does not support emitting watermarks. You have to 
use a different watermark strategy than `from-source`.

Regards,
Timo


Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
> Hi,
>
> We are trying to aggregate data using flink-1.5 sql client and facing 
> some issues with *event-time* based aggregations.
>
> *JSON data in Kafka:*
> {"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time": 
> "1530854716000", "number": 851}
> {"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time": 
> "1530854720000", "number": 853}
> {"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time": 
> "1530854722000", "number": 854}
> {"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time": 
> "1530854724000", "number": 855}
> {"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time": 
> "1530854726000", "number": 856}
> {"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time": 
> "1530854728000", "number": 857}
> {"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time": 
> "1530854730000", "number": 858}
> {"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time": 
> "1530854732000", "number": 859}
> {"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time": 
> "1530854736000", "number": 861}
> {"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time": 
> "1530854742000", "number": 864}
> {"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time": 
> "1530854746000", "number": 866}
> {"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time": 
> "1530854748000", "number": 867}
> {"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time": 
> "1530854750000", "number": 868}
> {"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time": 
> "1530854752000", "number": 869}
> {"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time": 
> "1530854754000", "number": 870}
> {"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time": 
> "1530854758000", "number": 872}
> {"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time": 
> "1530854760000", "number": 873}
> {"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time": 
> "1530854764000", "number": 875}
> {"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time": 
> "1530854770000", "number": 878}
> {"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time": 
> "1530854772000", "number": 879}
> {"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time": 
> "1530854780000", "number": 883}
>
> *SQL client config yml: *https://pastebin.com/NW9zkStk
>
>
> *SQL Query 1: *select * from TimestampTable
> *Result 1:
> Screen Shot 2018-07-06 at 4.44.17 PM.png
>
>
> SQL Query 2: *select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL 
> '1' MINUTE) ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' 
> MINUTE), count(*) from TimestampTable where number > 10 group by 
> HOP(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
> *Result 2: *Shell remains empty even after long time-
> Screen Shot 2018-07-06 at 4.44.33 PM.png
>
>
> Problem: We are not able to aggregate on basis of event-time, but when 
> we do it through processing time, we are getting results. Can someone 
> help here?
>
> Also want to know if there is any plan for sinks, like Kafka, for 
> these query results?
>
> -- 
> *Ashwin Sinha *| Data Engineer
> ashwin.sinha@go-mmt.com <ma...@go-mmt.com> | 9452075361
> <https://www.makemytrip.com/><https://www.goibibo.com/>
> <https://www.redbus.in/>
>
>
> ::DISCLAIMER::
>
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>
> This message is intended only for the use of the addressee and may 
> contain information that is privileged, confidential and exempt from 
> disclosure under applicable law. If the reader of this message is not 
> the intended recipient, or the employee or agent responsible for 
> delivering the message to the intended recipient, you are hereby 
> notified that any dissemination, distribution or copying of this 
> communication is strictly prohibited. If you have received this e-mail 
> in error, please notify us immediately by return e-mail and delete 
> this e-mail and all attachments from your system.
>