You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashwin Sinha <as...@go-mmt.com> on 2018/07/06 11:45:51 UTC
Need help with event-time aggregations in Flink-1.5 sql-client queries
Hi,
We are trying to aggregate data using flink-1.5 sql client and facing some
issues with *event-time* based aggregations.
*JSON data in Kafka:*
{"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time":
"1530854716000", "number": 851}
{"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time":
"1530854720000", "number": 853}
{"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time":
"1530854722000", "number": 854}
{"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time":
"1530854724000", "number": 855}
{"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time":
"1530854726000", "number": 856}
{"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time":
"1530854728000", "number": 857}
{"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time":
"1530854730000", "number": 858}
{"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time":
"1530854732000", "number": 859}
{"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time":
"1530854736000", "number": 861}
{"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time":
"1530854742000", "number": 864}
{"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time":
"1530854746000", "number": 866}
{"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time":
"1530854748000", "number": 867}
{"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time":
"1530854750000", "number": 868}
{"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time":
"1530854752000", "number": 869}
{"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time":
"1530854754000", "number": 870}
{"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time":
"1530854758000", "number": 872}
{"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time":
"1530854760000", "number": 873}
{"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time":
"1530854764000", "number": 875}
{"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time":
"1530854770000", "number": 878}
{"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time":
"1530854772000", "number": 879}
{"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time":
"1530854780000", "number": 883}
*SQL client config yml: *https://pastebin.com/NW9zkStk
*SQL Query 1: *select * from TimestampTable
*Result 1: [image: Screen Shot 2018-07-06 at 4.44.17 PM.png]SQL Query
2: *select
HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from
TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE,
INTERVAL '1' MINUTE)
*Result 2: *Shell remains empty even after long time-
[image: Screen Shot 2018-07-06 at 4.44.33 PM.png]
Problem: We are not able to aggregate on basis of event-time, but when we
do it through processing time, we are getting results. Can someone help
here?
Also want to know if there is any plan for sinks, like Kafka, for these
query results?
--
*Ashwin Sinha *| Data Engineer
ashwin.sinha@go-mmt.com <sh...@go-mmt.com> | 9452075361
<https://www.makemytrip.com/> <https://www.goibibo.com/>
<https://www.redbus.in/>
--
::DISCLAIMER::
----------------------------------------------------------------------------------------------------------------------------------------------------
This message is intended only for the use of the addressee and may
contain information that is privileged, confidential and exempt from
disclosure under applicable law. If the reader of this message is not the
intended recipient, or the employee or agent responsible for delivering the
message to the intended recipient, you are hereby notified that any
dissemination, distribution or copying of this communication is strictly
prohibited. If you have received this e-mail in error, please notify us
immediately by return e-mail and delete this e-mail and all attachments
from your system.
Re: Need help with event-time aggregations in Flink-1.5 sql-client queries
Posted by Ashwin Sinha <as...@go-mmt.com>.
Ok, thanks Timo!
On Fri 6 Jul, 2018, 17:34 Timo Walther, <tw...@apache.org> wrote:
> Hi Ashwin,
>
> the Kafka connector does not support emitting watermarks. You have to use
> a different watermark strategy than `from-source`.
>
> Regards,
> Timo
>
>
> Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
>
> Hi,
>
> We are trying to aggregate data using flink-1.5 sql client and facing some
> issues with *event-time* based aggregations.
>
> *JSON data in Kafka:*
> {"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time":
> "1530854716000", "number": 851}
> {"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time":
> "1530854720000", "number": 853}
> {"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time":
> "1530854722000", "number": 854}
> {"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time":
> "1530854724000", "number": 855}
> {"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time":
> "1530854726000", "number": 856}
> {"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time":
> "1530854728000", "number": 857}
> {"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time":
> "1530854730000", "number": 858}
> {"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time":
> "1530854732000", "number": 859}
> {"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time":
> "1530854736000", "number": 861}
> {"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time":
> "1530854742000", "number": 864}
> {"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time":
> "1530854746000", "number": 866}
> {"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time":
> "1530854748000", "number": 867}
> {"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time":
> "1530854750000", "number": 868}
> {"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time":
> "1530854752000", "number": 869}
> {"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time":
> "1530854754000", "number": 870}
> {"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time":
> "1530854758000", "number": 872}
> {"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time":
> "1530854760000", "number": 873}
> {"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time":
> "1530854764000", "number": 875}
> {"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time":
> "1530854770000", "number": 878}
> {"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time":
> "1530854772000", "number": 879}
> {"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time":
> "1530854780000", "number": 883}
>
> *SQL client config yml: *https://pastebin.com/NW9zkStk
>
>
> *SQL Query 1: *select * from TimestampTable
>
>
>
>
> *Result 1: [image: Screen Shot 2018-07-06 at 4.44.17 PM.png] SQL Query
> 2: *select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
> ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from
> TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE,
> INTERVAL '1' MINUTE)
> *Result 2: *Shell remains empty even after long time-
> [image: Screen Shot 2018-07-06 at 4.44.33 PM.png]
>
>
> Problem: We are not able to aggregate on basis of event-time, but when we
> do it through processing time, we are getting results. Can someone help
> here?
>
> Also want to know if there is any plan for sinks, like Kafka, for these
> query results?
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.sinha@go-mmt.com <sh...@go-mmt.com> | 9452075361
> <https://www.makemytrip.com/> <https://www.goibibo.com/>
> <https://www.redbus.in/>
>
>
> ::DISCLAIMER::
>
>
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>
> This message is intended only for the use of the addressee and may contain
> information that is privileged, confidential and exempt from disclosure
> under applicable law. If the reader of this message is not the intended
> recipient, or the employee or agent responsible for delivering the message
> to the intended recipient, you are hereby notified that any dissemination,
> distribution or copying of this communication is strictly prohibited. If
> you have received this e-mail in error, please notify us immediately by
> return e-mail and delete this e-mail and all attachments from your system.
>
>
>
--
::DISCLAIMER::
----------------------------------------------------------------------------------------------------------------------------------------------------
This message is intended only for the use of the addressee and may
contain information that is privileged, confidential and exempt from
disclosure under applicable law. If the reader of this message is not the
intended recipient, or the employee or agent responsible for delivering the
message to the intended recipient, you are hereby notified that any
dissemination, distribution or copying of this communication is strictly
prohibited. If you have received this e-mail in error, please notify us
immediately by return e-mail and delete this e-mail and all attachments
from your system.
Re: Need help with event-time aggregations in Flink-1.5 sql-client
queries
Posted by Timo Walther <tw...@apache.org>.
Hi Ashwin,
the Kafka connector does not support emitting watermarks. You have to
use a different watermark strategy than `from-source`.
Regards,
Timo
Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
> Hi,
>
> We are trying to aggregate data using flink-1.5 sql client and facing
> some issues with *event-time* based aggregations.
>
> *JSON data in Kafka:*
> {"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time":
> "1530854716000", "number": 851}
> {"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time":
> "1530854720000", "number": 853}
> {"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time":
> "1530854722000", "number": 854}
> {"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time":
> "1530854724000", "number": 855}
> {"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time":
> "1530854726000", "number": 856}
> {"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time":
> "1530854728000", "number": 857}
> {"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time":
> "1530854730000", "number": 858}
> {"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time":
> "1530854732000", "number": 859}
> {"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time":
> "1530854736000", "number": 861}
> {"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time":
> "1530854742000", "number": 864}
> {"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time":
> "1530854746000", "number": 866}
> {"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time":
> "1530854748000", "number": 867}
> {"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time":
> "1530854750000", "number": 868}
> {"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time":
> "1530854752000", "number": 869}
> {"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time":
> "1530854754000", "number": 870}
> {"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time":
> "1530854758000", "number": 872}
> {"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time":
> "1530854760000", "number": 873}
> {"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time":
> "1530854764000", "number": 875}
> {"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time":
> "1530854770000", "number": 878}
> {"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time":
> "1530854772000", "number": 879}
> {"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time":
> "1530854780000", "number": 883}
>
> *SQL client config yml: *https://pastebin.com/NW9zkStk
>
>
> *SQL Query 1: *select * from TimestampTable
> *Result 1:
> Screen Shot 2018-07-06 at 4.44.17 PM.png
>
>
> SQL Query 2: *select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL
> '1' MINUTE) ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1'
> MINUTE), count(*) from TimestampTable where number > 10 group by
> HOP(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
> *Result 2: *Shell remains empty even after long time-
> Screen Shot 2018-07-06 at 4.44.33 PM.png
>
>
> Problem: We are not able to aggregate on basis of event-time, but when
> we do it through processing time, we are getting results. Can someone
> help here?
>
> Also want to know if there is any plan for sinks, like Kafka, for
> these query results?
>
> --
> *Ashwin Sinha *| Data Engineer
> ashwin.sinha@go-mmt.com <ma...@go-mmt.com> | 9452075361
> <https://www.makemytrip.com/><https://www.goibibo.com/>
> <https://www.redbus.in/>
>
>
> ::DISCLAIMER::
>
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>
> This message is intended only for the use of the addressee and may
> contain information that is privileged, confidential and exempt from
> disclosure under applicable law. If the reader of this message is not
> the intended recipient, or the employee or agent responsible for
> delivering the message to the intended recipient, you are hereby
> notified that any dissemination, distribution or copying of this
> communication is strictly prohibited. If you have received this e-mail
> in error, please notify us immediately by return e-mail and delete
> this e-mail and all attachments from your system.
>