You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "s_penakalapati@yahoo.com" <s_...@yahoo.com> on 2020/09/07 15:25:41 UTC

Flink alert after database lookUp

Hi All,
I am new to Flink, request your help!!!
My scenario : 1> we receive Json messages at a very high frequency like 10,000 messages / second2> we need to raise an Alert for a particular user if there is any breach in threshold value against each attribute in Json.3> These threshold values are part of my database table and can be frequently updated by different user.4> In realtime I would like to know how to get latest data from the database.
I tried using Flink CEP Pattern approach to generate alert. I would like to get some inputs on how I can implement the realtime lookup tables in Flink Java while monitoring alert, any sample code reference.
Also for such scenarios do you recommend to use Flink CEP approach or we need to use Process function approach.

Regards,Sunitha.

Re: Flink alert after database lookUp

Posted by Arvid Heise <ar...@ververica.com>.
Hi Sunitha,

dependency looks good. I'd probably bump the version to 1.1.0 though
(version is off-cycle to Flink as of now to accelerate releases of this
young feature).

Best,

Arvid

On Tue, Sep 15, 2020 at 5:10 PM s_penakalapati@yahoo.com <
s_penakalapati@yahoo.com> wrote:

> Hi Arvid,
>
> Thank you!!!
>
> Will check change data capture approach. Please confirm including
> dependency and adding sourceFunction should help us to achieve CDC.
>
> <dependency>
>   <groupId>com.alibaba.ververica</groupId>
>   <artifactId>flink-connector-postgre-cdc</artifactId>
>   <version>1.0.0</version>
> </dependency
>
>
> Regards,
> Sunitha
>
>
> On Monday, September 14, 2020, 05:10:57 PM GMT+5:30, Arvid Heise <
> arvid@ververica.com> wrote:
>
>
> Hi Sunitha,
>
> to listen to changes in your database a change-data-capture approach
> should be taken [1], which is supported in Flink since 1.11.
>
> Basically, a tool like debezium [2] will monitor the changelog of the
> database and publish the result as a change stream, which can be ingested
> in Flink as another source. You can then use the change stream to build
> dynamic look-up tables in Flink and enrich your data as desired.
>
> Also have a look at this presentation for a better overview [3].
>
> [1]
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc
> [2] https://debezium.io/
> [3] https://noti.st/morsapaes/liQzgs/slides
>
> On Wed, Sep 9, 2020 at 11:13 AM Timo Walther <tw...@apache.org> wrote:
>
> Flink's built-in JDBC connector will read the data only once. JDBC does
> not provide means to continously monitor a database table.
>
> It depends on the size of your database, if you parameter table is small
> it might be sufficient to write a simple Flink connector that
> periodically reads the table and ingests the data to the streaming
> pipeline. For larger database/streaming integrations, it might be worth
> to look into Kafka's Connect API. Also Debezium where you hook into
> database logs for retrieving continous data but this might be overkill
> for your usecase.
>
> The link that I've sent you to for streaming pattern slides should work
> after registration.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:49, s_penakalapati@yahoo.com wrote:
> >
> > Hi Timo,
> >
> > Thank you for the suggestions.
> >
> > I see now both Process function and CEP approach will not fit in. Now if
> > I follow the third approach to stream the values from database() . Is it
> > possible to stream data continuously?
> >
> > If I follow the bellow approach, both I see one time load only not
> > continuously stream
> > Using JDBCInputFormat this will execute the query only once , so it will
> > not be a stream data. when we try to iterate source this may iterate
> > only on the data already fetched
> > Using RichFlatMapFunctions, in open() if I try to connect to DB again
> > this would be one time load. If I connect database in flatmap() then it
> > would lead to multiple hits to database.
> >
> > Request your help on how to continuously stream the data, If possible
> > sample source code for reference to stream database. Please help me
> > badly stuck.
> >
> > In the mail, I see you asked me to register. Are you referring to any
> > training here or any other registration.
> >
> >
> > Regards,
> > Sunitha.
> > On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther
> > <tw...@apache.org> wrote:
> >
> >
> > Hi Sunitha,
> >
> > what you are describing is a typical streaming enrichment. We need to
> > enrich the stream with some data from a database. There are different
> > strategies to handle this:
> >
> > 1) You are querying the database for every record. This is usually not
> > what you want because it would slow down your pipeline due to the
> > communication latenties to your database. It would also cause a lot of
> > pressure to the database in general.
> >
> > 2) You only query database from time to time and store the latest value
> > in a ProcessFunction ValueState or MapState.
> >
> > 3) You stream in the values as well and use connect() [1].
> >
> > In any case, I think CEP might not be useful for this case. If you
> > really want to do option 1, it might make sense to also checkout the SQL
> > API of Flink because it offers different kind of joins with very good
> > abstraction. `Join with a Temporal Table` offers a JDBC connector for
> > lookups in your database.
> >
> > If you like to use DataStream API, I would also recommend the Pattern
> > slides here [3] (unfortunately you have to register first).
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> > [3] https://training.ververica.com/decks/patterns/
> >
> >
> > On 07.09.20 17:25, s_penakalapati@yahoo.com
> > <ma...@yahoo.com> wrote:
> >  > Hi All,
> >  >
> >  > I am new to Flink, request your help!!!
> >  >
> >  > My scenario :
> >  > 1> we receive Json messages at a very high frequency like 10,000
> >  > messages / second
> >  > 2> we need to raise an Alert for a particular user if there is any
> >  > breach in threshold value against each attribute in Json.
> >  > 3> These threshold values are part of my database table and can be
> >  > frequently updated by different user.
> >  > 4> In realtime I would like to know how to get latest data from the
> >  > database.
> >  >
> >  > I tried using Flink CEP Pattern approach to generate alert. I would
> like
> >  > to get some inputs on how I can implement the realtime lookup tables
> in
> >  > Flink Java while monitoring alert, any sample code reference.
> >  >
> >  > Also for such scenarios do you recommend to use Flink CEP approach or
> we
> >  > need to use Process function approach.
> >  >
> >  >
> >  > Regards,
> >  > Sunitha.
> >
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Flink alert after database lookUp

Posted by "s_penakalapati@yahoo.com" <s_...@yahoo.com>.
 Hi Arvid,
Thank you!!!
Will check change data capture approach. Please confirm including dependency and adding sourceFunction should help us to achieve CDC.
<dependency>  <groupId>com.alibaba.ververica</groupId>  <artifactId>flink-connector-postgre-cdc</artifactId>  <version>1.0.0</version></dependency

Regards,Sunitha

    On Monday, September 14, 2020, 05:10:57 PM GMT+5:30, Arvid Heise <ar...@ververica.com> wrote:  
 
 Hi Sunitha,
to listen to changes in your database a change-data-capture approach should be taken [1], which is supported in Flink since 1.11.
Basically, a tool like debezium [2] will monitor the changelog of the database and publish the result as a change stream, which can be ingested in Flink as another source. You can then use the change stream to build dynamic look-up tables in Flink and enrich your data as desired.
Also have a look at this presentation for a better overview [3].

[1] https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc[2] https://debezium.io/[3] https://noti.st/morsapaes/liQzgs/slides
On Wed, Sep 9, 2020 at 11:13 AM Timo Walther <tw...@apache.org> wrote:

Flink's built-in JDBC connector will read the data only once. JDBC does 
not provide means to continously monitor a database table.

It depends on the size of your database, if you parameter table is small 
it might be sufficient to write a simple Flink connector that 
periodically reads the table and ingests the data to the streaming 
pipeline. For larger database/streaming integrations, it might be worth 
to look into Kafka's Connect API. Also Debezium where you hook into 
database logs for retrieving continous data but this might be overkill 
for your usecase.

The link that I've sent you to for streaming pattern slides should work 
after registration.

Regards,
Timo


On 09.09.20 09:49, s_penakalapati@yahoo.com wrote:
> 
> Hi Timo,
> 
> Thank you for the suggestions.
> 
> I see now both Process function and CEP approach will not fit in. Now if 
> I follow the third approach to stream the values from database() . Is it 
> possible to stream data continuously?
> 
> If I follow the bellow approach, both I see one time load only not 
> continuously stream
> Using JDBCInputFormat this will execute the query only once , so it will 
> not be a stream data. when we try to iterate source this may iterate 
> only on the data already fetched
> Using RichFlatMapFunctions, in open() if I try to connect to DB again 
> this would be one time load. If I connect database in flatmap() then it 
> would lead to multiple hits to database.
> 
> Request your help on how to continuously stream the data, If possible 
> sample source code for reference to stream database. Please help me 
> badly stuck.
> 
> In the mail, I see you asked me to register. Are you referring to any 
> training here or any other registration.
> 
> 
> Regards,
> Sunitha.
> On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther 
> <tw...@apache.org> wrote:
> 
> 
> Hi Sunitha,
> 
> what you are describing is a typical streaming enrichment. We need to
> enrich the stream with some data from a database. There are different
> strategies to handle this:
> 
> 1) You are querying the database for every record. This is usually not
> what you want because it would slow down your pipeline due to the
> communication latenties to your database. It would also cause a lot of
> pressure to the database in general.
> 
> 2) You only query database from time to time and store the latest value
> in a ProcessFunction ValueState or MapState.
> 
> 3) You stream in the values as well and use connect() [1].
> 
> In any case, I think CEP might not be useful for this case. If you
> really want to do option 1, it might make sense to also checkout the SQL
> API of Flink because it offers different kind of joins with very good
> abstraction. `Join with a Temporal Table` offers a JDBC connector for
> lookups in your database.
> 
> If you like to use DataStream API, I would also recommend the Pattern
> slides here [3] (unfortunately you have to register first).
> 
> Regards,
> Timo
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3] https://training.ververica.com/decks/patterns/
> 
> 
> On 07.09.20 17:25, s_penakalapati@yahoo.com 
> <ma...@yahoo.com> wrote:
>  > Hi All,
>  >
>  > I am new to Flink, request your help!!!
>  >
>  > My scenario :
>  > 1> we receive Json messages at a very high frequency like 10,000
>  > messages / second
>  > 2> we need to raise an Alert for a particular user if there is any
>  > breach in threshold value against each attribute in Json.
>  > 3> These threshold values are part of my database table and can be
>  > frequently updated by different user.
>  > 4> In realtime I would like to know how to get latest data from the
>  > database.
>  >
>  > I tried using Flink CEP Pattern approach to generate alert. I would like
>  > to get some inputs on how I can implement the realtime lookup tables in
>  > Flink Java while monitoring alert, any sample code reference.
>  >
>  > Also for such scenarios do you recommend to use Flink CEP approach or we
>  > need to use Process function approach.
>  >
>  >
>  > Regards,
>  > Sunitha.
> 




-- 

Arvid Heise | Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng      

Re: Flink alert after database lookUp

Posted by Arvid Heise <ar...@ververica.com>.
Hi Sunitha,

to listen to changes in your database a change-data-capture approach should
be taken [1], which is supported in Flink since 1.11.

Basically, a tool like debezium [2] will monitor the changelog of the
database and publish the result as a change stream, which can be ingested
in Flink as another source. You can then use the change stream to build
dynamic look-up tables in Flink and enrich your data as desired.

Also have a look at this presentation for a better overview [3].

[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc
[2] https://debezium.io/
[3] https://noti.st/morsapaes/liQzgs/slides

On Wed, Sep 9, 2020 at 11:13 AM Timo Walther <tw...@apache.org> wrote:

> Flink's built-in JDBC connector will read the data only once. JDBC does
> not provide means to continously monitor a database table.
>
> It depends on the size of your database, if you parameter table is small
> it might be sufficient to write a simple Flink connector that
> periodically reads the table and ingests the data to the streaming
> pipeline. For larger database/streaming integrations, it might be worth
> to look into Kafka's Connect API. Also Debezium where you hook into
> database logs for retrieving continous data but this might be overkill
> for your usecase.
>
> The link that I've sent you to for streaming pattern slides should work
> after registration.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:49, s_penakalapati@yahoo.com wrote:
> >
> > Hi Timo,
> >
> > Thank you for the suggestions.
> >
> > I see now both Process function and CEP approach will not fit in. Now if
> > I follow the third approach to stream the values from database() . Is it
> > possible to stream data continuously?
> >
> > If I follow the bellow approach, both I see one time load only not
> > continuously stream
> > Using JDBCInputFormat this will execute the query only once , so it will
> > not be a stream data. when we try to iterate source this may iterate
> > only on the data already fetched
> > Using RichFlatMapFunctions, in open() if I try to connect to DB again
> > this would be one time load. If I connect database in flatmap() then it
> > would lead to multiple hits to database.
> >
> > Request your help on how to continuously stream the data, If possible
> > sample source code for reference to stream database. Please help me
> > badly stuck.
> >
> > In the mail, I see you asked me to register. Are you referring to any
> > training here or any other registration.
> >
> >
> > Regards,
> > Sunitha.
> > On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther
> > <tw...@apache.org> wrote:
> >
> >
> > Hi Sunitha,
> >
> > what you are describing is a typical streaming enrichment. We need to
> > enrich the stream with some data from a database. There are different
> > strategies to handle this:
> >
> > 1) You are querying the database for every record. This is usually not
> > what you want because it would slow down your pipeline due to the
> > communication latenties to your database. It would also cause a lot of
> > pressure to the database in general.
> >
> > 2) You only query database from time to time and store the latest value
> > in a ProcessFunction ValueState or MapState.
> >
> > 3) You stream in the values as well and use connect() [1].
> >
> > In any case, I think CEP might not be useful for this case. If you
> > really want to do option 1, it might make sense to also checkout the SQL
> > API of Flink because it offers different kind of joins with very good
> > abstraction. `Join with a Temporal Table` offers a JDBC connector for
> > lookups in your database.
> >
> > If you like to use DataStream API, I would also recommend the Pattern
> > slides here [3] (unfortunately you have to register first).
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> > [3] https://training.ververica.com/decks/patterns/
> >
> >
> > On 07.09.20 17:25, s_penakalapati@yahoo.com
> > <ma...@yahoo.com> wrote:
> >  > Hi All,
> >  >
> >  > I am new to Flink, request your help!!!
> >  >
> >  > My scenario :
> >  > 1> we receive Json messages at a very high frequency like 10,000
> >  > messages / second
> >  > 2> we need to raise an Alert for a particular user if there is any
> >  > breach in threshold value against each attribute in Json.
> >  > 3> These threshold values are part of my database table and can be
> >  > frequently updated by different user.
> >  > 4> In realtime I would like to know how to get latest data from the
> >  > database.
> >  >
> >  > I tried using Flink CEP Pattern approach to generate alert. I would
> like
> >  > to get some inputs on how I can implement the realtime lookup tables
> in
> >  > Flink Java while monitoring alert, any sample code reference.
> >  >
> >  > Also for such scenarios do you recommend to use Flink CEP approach or
> we
> >  > need to use Process function approach.
> >  >
> >  >
> >  > Regards,
> >  > Sunitha.
> >
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Flink alert after database lookUp

Posted by Timo Walther <tw...@apache.org>.
Flink's built-in JDBC connector will read the data only once. JDBC does 
not provide means to continously monitor a database table.

It depends on the size of your database, if you parameter table is small 
it might be sufficient to write a simple Flink connector that 
periodically reads the table and ingests the data to the streaming 
pipeline. For larger database/streaming integrations, it might be worth 
to look into Kafka's Connect API. Also Debezium where you hook into 
database logs for retrieving continous data but this might be overkill 
for your usecase.

The link that I've sent you to for streaming pattern slides should work 
after registration.

Regards,
Timo


On 09.09.20 09:49, s_penakalapati@yahoo.com wrote:
> 
> Hi Timo,
> 
> Thank you for the suggestions.
> 
> I see now both Process function and CEP approach will not fit in. Now if 
> I follow the third approach to stream the values from database() . Is it 
> possible to stream data continuously?
> 
> If I follow the bellow approach, both I see one time load only not 
> continuously stream
> Using JDBCInputFormat this will execute the query only once , so it will 
> not be a stream data. when we try to iterate source this may iterate 
> only on the data already fetched
> Using RichFlatMapFunctions, in open() if I try to connect to DB again 
> this would be one time load. If I connect database in flatmap() then it 
> would lead to multiple hits to database.
> 
> Request your help on how to continuously stream the data, If possible 
> sample source code for reference to stream database. Please help me 
> badly stuck.
> 
> In the mail, I see you asked me to register. Are you referring to any 
> training here or any other registration.
> 
> 
> Regards,
> Sunitha.
> On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther 
> <tw...@apache.org> wrote:
> 
> 
> Hi Sunitha,
> 
> what you are describing is a typical streaming enrichment. We need to
> enrich the stream with some data from a database. There are different
> strategies to handle this:
> 
> 1) You are querying the database for every record. This is usually not
> what you want because it would slow down your pipeline due to the
> communication latenties to your database. It would also cause a lot of
> pressure to the database in general.
> 
> 2) You only query database from time to time and store the latest value
> in a ProcessFunction ValueState or MapState.
> 
> 3) You stream in the values as well and use connect() [1].
> 
> In any case, I think CEP might not be useful for this case. If you
> really want to do option 1, it might make sense to also checkout the SQL
> API of Flink because it offers different kind of joins with very good
> abstraction. `Join with a Temporal Table` offers a JDBC connector for
> lookups in your database.
> 
> If you like to use DataStream API, I would also recommend the Pattern
> slides here [3] (unfortunately you have to register first).
> 
> Regards,
> Timo
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
> [3] https://training.ververica.com/decks/patterns/
> 
> 
> On 07.09.20 17:25, s_penakalapati@yahoo.com 
> <ma...@yahoo.com> wrote:
>  > Hi All,
>  >
>  > I am new to Flink, request your help!!!
>  >
>  > My scenario :
>  > 1> we receive Json messages at a very high frequency like 10,000
>  > messages / second
>  > 2> we need to raise an Alert for a particular user if there is any
>  > breach in threshold value against each attribute in Json.
>  > 3> These threshold values are part of my database table and can be
>  > frequently updated by different user.
>  > 4> In realtime I would like to know how to get latest data from the
>  > database.
>  >
>  > I tried using Flink CEP Pattern approach to generate alert. I would like
>  > to get some inputs on how I can implement the realtime lookup tables in
>  > Flink Java while monitoring alert, any sample code reference.
>  >
>  > Also for such scenarios do you recommend to use Flink CEP approach or we
>  > need to use Process function approach.
>  >
>  >
>  > Regards,
>  > Sunitha.
> 


Re: Flink alert after database lookUp

Posted by "s_penakalapati@yahoo.com" <s_...@yahoo.com>.
 
Hi Timo,
Thank you for the suggestions.
I see now both Process function and CEP approach will not fit in. Now if I follow the third approach to stream the values from database() . Is it possible to stream data continuously?
If I follow the bellow approach, both I see one time load only not continuously streamUsing JDBCInputFormat this will execute the query only once , so it will not be a stream data. when we try to iterate source this may iterate only on the data already fetchedUsing RichFlatMapFunctions, in open() if I try to connect to DB again this would be one time load. If I connect database in flatmap() then it would lead to multiple hits to database.
Request your help on how to continuously stream the data, If possible sample source code for reference to stream database. Please help me badly stuck.
In the mail, I see you asked me to register. Are you referring to any training here or any other registration.

Regards,Sunitha.    On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther <tw...@apache.org> wrote:  
 
 Hi Sunitha,

what you are describing is a typical streaming enrichment. We need to 
enrich the stream with some data from a database. There are different 
strategies to handle this:

1) You are querying the database for every record. This is usually not 
what you want because it would slow down your pipeline due to the 
communication latenties to your database. It would also cause a lot of 
pressure to the database in general.

2) You only query database from time to time and store the latest value 
in a ProcessFunction ValueState or MapState.

3) You stream in the values as well and use connect() [1].

In any case, I think CEP might not be useful for this case. If you 
really want to do option 1, it might make sense to also checkout the SQL 
API of Flink because it offers different kind of joins with very good 
abstraction. `Join with a Temporal Table` offers a JDBC connector for 
lookups in your database.

If you like to use DataStream API, I would also recommend the Pattern 
slides here [3] (unfortunately you have to register first).

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] https://training.ververica.com/decks/patterns/


On 07.09.20 17:25, s_penakalapati@yahoo.com wrote:
> Hi All,
> 
> I am new to Flink, request your help!!!
> 
> My scenario :
> 1> we receive Json messages at a very high frequency like 10,000 
> messages / second
> 2> we need to raise an Alert for a particular user if there is any 
> breach in threshold value against each attribute in Json.
> 3> These threshold values are part of my database table and can be 
> frequently updated by different user.
> 4> In realtime I would like to know how to get latest data from the 
> database.
> 
> I tried using Flink CEP Pattern approach to generate alert. I would like 
> to get some inputs on how I can implement the realtime lookup tables in 
> Flink Java while monitoring alert, any sample code reference.
> 
> Also for such scenarios do you recommend to use Flink CEP approach or we 
> need to use Process function approach.
> 
> 
> Regards,
> Sunitha.

  

Re: Flink alert after database lookUp

Posted by Timo Walther <tw...@apache.org>.
Hi Sunitha,

what you are describing is a typical streaming enrichment. We need to 
enrich the stream with some data from a database. There are different 
strategies to handle this:

1) You are querying the database for every record. This is usually not 
what you want because it would slow down your pipeline due to the 
communication latenties to your database. It would also cause a lot of 
pressure to the database in general.

2) You only query database from time to time and store the latest value 
in a ProcessFunction ValueState or MapState.

3) You stream in the values as well and use connect() [1].

In any case, I think CEP might not be useful for this case. If you 
really want to do option 1, it might make sense to also checkout the SQL 
API of Flink because it offers different kind of joins with very good 
abstraction. `Join with a Temporal Table` offers a JDBC connector for 
lookups in your database.

If you like to use DataStream API, I would also recommend the Pattern 
slides here [3] (unfortunately you have to register first).

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] https://training.ververica.com/decks/patterns/


On 07.09.20 17:25, s_penakalapati@yahoo.com wrote:
> Hi All,
> 
> I am new to Flink, request your help!!!
> 
> My scenario :
> 1> we receive Json messages at a very high frequency like 10,000 
> messages / second
> 2> we need to raise an Alert for a particular user if there is any 
> breach in threshold value against each attribute in Json.
> 3> These threshold values are part of my database table and can be 
> frequently updated by different user.
> 4> In realtime I would like to know how to get latest data from the 
> database.
> 
> I tried using Flink CEP Pattern approach to generate alert. I would like 
> to get some inputs on how I can implement the realtime lookup tables in 
> Flink Java while monitoring alert, any sample code reference.
> 
> Also for such scenarios do you recommend to use Flink CEP approach or we 
> need to use Process function approach.
> 
> 
> Regards,
> Sunitha.