You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Krzysztof Zarzycki <k....@gmail.com> on 2020/03/23 18:01:53 UTC

Dynamic Flink SQL

Dear Flink community!

In our company we have implemented a system that realize the dynamic
business rules pattern. We spoke about it during Flink Forward 2019
https://www.youtube.com/watch?v=CyrQ5B0exqU.
The system is a great success and we would like to improve it. Let me
shortly mention what the system does:
* We have a Flink job with the engine that applies business rules on
multiple data streams. These rules find patterns in data, produce complex
events on these patterns.
* The engine is built on top of CoProcessFunction, the rules are
preimplemented using state and timers.
* The engine accepts control messages, that deliver configuration of the
rules, and start the instances of the rules. There might be many rule
instances with different configurations running in parallel.
* Data streams are routed to those rules, to all instances.

The *advantages* of this design are:
  * *The performance is superb. *The key to it is that we read data from
the Kafka topic once, deserialize once, shuffle it once (thankfully we have
one partitioning key) and then apply over 100 rule instances needing the
same data.
* We are able to deploy multiple rule instances dynamically without
starting/stopping the job.

Especially the performance is crucial, we have up to 500K events/s
processed by 100 of rules on less than 100 of cores. I can't imagine having
100 of Flink SQL queries each consuming these streams from Kafka on such a
cluster.

The main *painpoints *of the design is:
* to deploy new business rule kind, we need to predevelop the rule template
with use of our SDK. *We can't use* *great Flink CEP*, *Flink SQL
libraries.* Which are getting stronger every day. Flink SQL with
MATCH_RECOGNIZE would fit perfectly for our cases.
* The isolation of the rules is weak. There are many rules running per job.
One fails, the whole job fails.
* There is one set of Kafka offsets, one watermark, one checkpoint for all
the rules.
* We have one just distribution key. Although that can be overcome.

I would like to focus on solving the *first point*. We can live with the
rest.

*Question to the community*: Do you have ideas how to make it possible to
develop with use of Flink SQL with MATCH_RECOGNIZE?

My current ideas are:
1. *A possibility to dynamically modify the job topology. *
Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
sources.

2. *A possibility to save data streams internally to Flink, predistributed*.
Then Flink SQL queries should be able to read these streams.

The ideal imaginary solution would look that simple in use:
CREATE TABLE my_stream(...) with (<kafka properties>,
cached = 'true')
PARTITIONED BY my_partition_key

(the cached table can also be a result of CREATE TABLE and INSERT INTO
my_stream_cached SELECT ... FROM my_stream).

then I can run multiple parallel Flink SQL queries reading from that cached
table in Flink.
These

Technical implementation: Ideally, I imagine saving events in Flink state
before they are consumed. Then implement a Flink source, that can read the
Flink state of the state-filling job. It's a different job, I know! Of
course it needs to run on the same Flink cluster.
A lot of options are possible: building on top of Flink, modifying Flink
(even keeping own fork for the time being), using an external component.

In my opinion the key to the maximized performance are:
* avoid pulling data through network from Kafka
* avoid deserialization of messages for each of queries/ processors.

Comments, ideas - Any feedback is welcome!
Thank you!
Krzysztof

P.S.   I'm writing to both dev and users groups because I suspect I would
need to modify Flink to achieve what I wrote above.

Re: Dynamic Flink SQL

Posted by Krzysztof Zarzycki <k....@gmail.com>.
Hi Maciej, thanks for joining. I answer your comments below.

>
> the idea is quite interesting - although maintaining some coordination to
> be able to handle checkpoints would probably pretty tricky. Did you figure
> out how to handle proper distribution of tasks between TMs? As far as I
> understand you have to guarantee that all sources reading from cache are on
> the same TM as sinks writing data from Kafka? Or you think about some
> distributed caches?
>
No, we haven't yet figured that out. Yes, I've heard that it is indeed a
problem to force Flink to distribute the tasks as one wants it. I only
hoped that we will be able to solve it when we get there :-)
One of the ideas was to actually use in-memory grid co-located with Flink
(like based on Apache Ignite), but then the problem of network shuffle just
moved from Kafka to that grid. Which might be smaller problem, but still.

> As for your original question - we are also looking for solutions/ideas
> for this problem in Nussknacker. We have similar problem, however we had
> different constraints (on premise, not have to care too much about
> bandwidth) and we went with "one job per scenario". It works ok, but the
> biggest problem for me is that it does not scale with the number of jobs:
> Flink job is quite heavy entity - all the threads, classloaders etc. Having
> more than a few dozens of jobs is also not so easy to handle on JobManager
> part - especially when it's restarted etc. I guess your idea would also
> suffer from this problem?
>
Unfortunately yes, good point. Maybe it can be mitigated if I had the jobs
distributed among several Flink clusters. Solution globally heavier, but
lighter on each cluster. Then the data must go to the distributed in-memory
grid with hopefully local reads only.

I see a lot of difficulties in the discussed approach. But my willingness
to use SQL/Table API and CEP is so strong, I want to do the PoC regardless.
I hope we will be able to provide benchmarks which prove that the
performance of such approach is significantly better justifying the work by
us, maybe also the community, on overcoming these difficulties.

>
> thanks,
>
> maciek
>
>
>
> On 27/03/2020 10:18, Krzysztof Zarzycki wrote:
>
> I want to do a bit different hacky PoC:
> * I will write a sink, that caches the results in "JVM global" memory.
> Then I will write a source, that reads this cache.
> * I will launch one job, that reads from Kafka source, shuffles the data
> to the desired partitioning and then sinks to that cache.
> * Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
> that uses the source from cache to read the data out and then reinterprets
> it as keyed stream [1].
> * Using JVM global memory is necessary, because AFAIK the jobs use
> different classloaders. The class of cached object also needs to be
> available in the parent classloader i.e. in the cluster's classpath.
> This is just to prove the idea, the performance and usefulness of it. All
> the problems of checkpointing this data I will leave for later.
>
> I'm very very interested in your, community, comments about this idea and
> later productization of it.
> Thanks!
>
> Answering your comments:
>
>> Unless you need reprocessing for newly added rules, I'd probably just
>> cancel with savepoint and restart the application with the new rules. Of
>> course, it depends on the rules themselves and how much state they require
>> if a restart is viable. That's up to a POC.
>>
> No, I don't need reprocessing (yet). The rule starts processing the data
> from the moment it is defined.
> The cancellation with savepoint was considered, but because the number of
> new rules defined/changed daily might be large enough, that will generate
> too much of downtime. There is a lot of state kept in those rules making
> the restart heavy. What's worse, that would be cross-tenant downtime,
> unless the job was somehow per team/tenant. Therefore we reject this option.
> BTW, the current design of our system is similar to the one from the blog
> series by Alexander Fedulov about dynamic rules pattern [2] he's just
> publishing.
>
>
>> They will consume the same high intensive source(s) therefore I want to
>>> optimize for that by consuming the messages in Flink only once.
>>>
>> That's why I proposed to run one big query instead of 500 small ones.
>> Have a POC where you add two of your rules manually to a Table and see how
>> the optimized logical plan looks like. I'd bet that the source is only
>> tapped once.
>>
>
> I can do that PoC, no problem. But AFAIK it will only work with the
> "restart with savepoint" pattern discussed above.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
>
>
>
>> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki <k....@gmail.com>
>> wrote:
>>
>>> Hello Arvid,
>>> Thanks for joining to the thread!
>>> First, did you take into consideration that I would like to dynamically
>>> add queries on the same source? That means first define one query, later
>>> the day add another one , then another one, and so on. A Week later kill
>>> one of those, start yet another one, etc... There will be hundreds of these
>>> queries running at once, but the set of queries change several times a day.
>>> They will consume the same high intensive source(s) therefore I want to
>>> optimize for that by consuming the messages in Flink only once.
>>>
>>> Regarding the temporary tables AFAIK they are only the metadata (let's
>>> say Kafka topic detals) and store it in the scope of a SQL session.
>>> Therefore multiple queries against that temp table will behave the same way
>>> as querying normal table, that is will read the datasource multiple times.
>>>
>>> It looks like the feature I want or could use is defined by the way of
>>> FLIP-36 about Interactive Programming, more precisely caching the stream
>>> table [1].
>>> While I wouldn't like to limit the discussion to that non-existing yet
>>> feature. Maybe there are other ways of achieving this danymic querying
>>> capability.
>>>
>>> Kind Regards,
>>> Krzysztof
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>>>
>>>
>>>
>>> * You want to use primary Table API as that allows you to
>>>> programmatically introduce structural variance (changing rules).
>>>>
>>> * You start by registering the source as temporary table.
>>>>
>>> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>>>> * Lastly you unionAll the results.
>>>>
>>>> Then I'd perform some experiment if indeed the optimizer figured out
>>>> that it needs to only read the source once. The resulting code would be
>>>> minimal and easy to maintain. If the performance is not satisfying, you can
>>>> always make it more complicated.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>>
>>>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <
>>>> k.zarzycki@gmail.com> wrote:
>>>>
>>>>> Dear Flink community!
>>>>>
>>>>> In our company we have implemented a system that realize the dynamic
>>>>> business rules pattern. We spoke about it during Flink Forward 2019
>>>>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>>>>> The system is a great success and we would like to improve it. Let me
>>>>> shortly mention what the system does:
>>>>> * We have a Flink job with the engine that applies business rules on
>>>>> multiple data streams. These rules find patterns in data, produce complex
>>>>> events on these patterns.
>>>>> * The engine is built on top of CoProcessFunction, the rules are
>>>>> preimplemented using state and timers.
>>>>> * The engine accepts control messages, that deliver configuration of
>>>>> the rules, and start the instances of the rules. There might be many rule
>>>>> instances with different configurations running in parallel.
>>>>> * Data streams are routed to those rules, to all instances.
>>>>>
>>>>> The *advantages* of this design are:
>>>>>   * *The performance is superb. *The key to it is that we read data
>>>>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we
>>>>> have one partitioning key) and then apply over 100 rule instances needing
>>>>> the same data.
>>>>> * We are able to deploy multiple rule instances dynamically without
>>>>> starting/stopping the job.
>>>>>
>>>>> Especially the performance is crucial, we have up to 500K events/s
>>>>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>>>>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>>>>> cluster.
>>>>>
>>>>> The main *painpoints *of the design is:
>>>>> * to deploy new business rule kind, we need to predevelop the rule
>>>>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>>>>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>>>>> MATCH_RECOGNIZE would fit perfectly for our cases.
>>>>> * The isolation of the rules is weak. There are many rules running per
>>>>> job. One fails, the whole job fails.
>>>>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>>>>> all the rules.
>>>>> * We have one just distribution key. Although that can be overcome.
>>>>>
>>>>> I would like to focus on solving the *first point*. We can live with
>>>>> the rest.
>>>>>
>>>>> *Question to the community*: Do you have ideas how to make it
>>>>> possible to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>>>>
>>>>> My current ideas are:
>>>>> 1. *A possibility to dynamically modify the job topology. *
>>>>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>>>>> sources.
>>>>>
>>>>> 2. *A possibility to save data streams internally to Flink,
>>>>> predistributed*. Then Flink SQL queries should be able to read these
>>>>> streams.
>>>>>
>>>>> The ideal imaginary solution would look that simple in use:
>>>>> CREATE TABLE my_stream(...) with (<kafka properties>,
>>>>> cached = 'true')
>>>>> PARTITIONED BY my_partition_key
>>>>>
>>>>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>>>>> my_stream_cached SELECT ... FROM my_stream).
>>>>>
>>>>> then I can run multiple parallel Flink SQL queries reading from that
>>>>> cached table in Flink.
>>>>> These
>>>>>
>>>>> Technical implementation: Ideally, I imagine saving events in Flink
>>>>> state before they are consumed. Then implement a Flink source, that can
>>>>> read the Flink state of the state-filling job. It's a different job, I
>>>>> know! Of course it needs to run on the same Flink cluster.
>>>>> A lot of options are possible: building on top of Flink, modifying
>>>>> Flink (even keeping own fork for the time being), using an external
>>>>> component.
>>>>>
>>>>> In my opinion the key to the maximized performance are:
>>>>> * avoid pulling data through network from Kafka
>>>>> * avoid deserialization of messages for each of queries/ processors.
>>>>>
>>>>> Comments, ideas - Any feedback is welcome!
>>>>> Thank you!
>>>>> Krzysztof
>>>>>
>>>>> P.S.   I'm writing to both dev and users groups because I suspect I
>>>>> would need to modify Flink to achieve what I wrote above.
>>>>>
>>>>

Re: Dynamic Flink SQL

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi Krzysiek,

the idea is quite interesting - although maintaining some coordination 
to be able to handle checkpoints would probably pretty tricky. Did you 
figure out how to handle proper distribution of tasks between TMs? As 
far as I understand you have to guarantee that all sources reading from 
cache are on the same TM as sinks writing data from Kafka? Or you think 
about some distributed caches?

As for your original question - we are also looking for solutions/ideas 
for this problem in Nussknacker. We have similar problem, however we had 
different constraints (on premise, not have to care too much about 
bandwidth) and we went with "one job per scenario". It works ok, but the 
biggest problem for me is that it does not scale with the number of 
jobs:  Flink job is quite heavy entity - all the threads, classloaders 
etc. Having more than a few dozens of jobs is also not so easy to handle 
on JobManager part - especially when it's restarted etc. I guess your 
idea would also suffer from this problem?


thanks,

maciek



On 27/03/2020 10:18, Krzysztof Zarzycki wrote:
> I want to do a bit different hacky PoC:
> * I will write a sink, that caches the results in "JVM global" memory. 
> Then I will write a source, that reads this cache.
> * I will launch one job, that reads from Kafka source, shuffles the 
> data to the desired partitioning and then sinks to that cache.
> * Then I will lunch multiple jobs (Datastream based or Flink SQL 
> based) , that uses the source from cache to read the data out and then 
> reinterprets it as keyed stream [1].
> * Using JVM global memory is necessary, because AFAIK the jobs use 
> different classloaders. The class of cached object also needs to be 
> available in the parent classloader i.e. in the cluster's classpath.
> This is just to prove the idea, the performance and usefulness of it. 
> All the problems of checkpointing this data I will leave for later.
>
> I'm very very interested in your, community, comments about this idea 
> and later productization of it.
> Thanks!
>
> Answering your comments:
>
>     Unless you need reprocessing for newly added rules, I'd probably
>     just cancel with savepoint and restart the application with the
>     new rules. Of course, it depends on the rules themselves and how
>     much state they require if a restart is viable. That's up to a POC.
>
> No, I don't need reprocessing (yet). The rule starts processing the 
> data from the moment it is defined.
> The cancellation with savepoint was considered, but because the number 
> of new rules defined/changed daily might be large enough, that will 
> generate too much of downtime. There is a lot of state kept in those 
> rules making the restart heavy. What's worse, that would be 
> cross-tenant downtime, unless the job was somehow per team/tenant. 
> Therefore we reject this option.
> BTW, the current design of our system is similar to the one from the 
> blog series by Alexander Fedulov about dynamic rules pattern [2] he's 
> just publishing.
>
>
>         They will consume the same high intensive source(s) therefore
>         I want to optimize for that by consuming the messages in Flink
>         only once.
>
>     That's why I proposed to run one big query instead of 500 small
>     ones. Have a POC where you add two of your rules manually to a
>     Table and see how the optimized logical plan looks like. I'd bet
>     that the source is only tapped once.
>
>
> I can do that PoC, no problem. But AFAIK it will only work with the 
> "restart with savepoint" pattern discussed above.
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
>
>
>
>     On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki
>     <k.zarzycki@gmail.com <ma...@gmail.com>> wrote:
>
>         Hello Arvid,
>         Thanks for joining to the thread!
>         First, did you take into consideration that I would like to
>         dynamically add queries on the same source? That means first
>         define one query, later the day add another one , then another
>         one, and so on. A Week later kill one of those, start yet
>         another one, etc... There will be hundreds of these queries
>         running at once, but the set of queries change several times a
>         day.
>         They will consume the same high intensive source(s) therefore
>         I want to optimize for that by consuming the messages in Flink
>         only once.
>
>         Regarding the temporary tables AFAIK they are only the
>         metadata (let's say Kafka topic detals) and store it in the
>         scope of a SQL session. Therefore multiple queries against
>         that temp table will behave the same way as querying normal
>         table, that is will read the datasource multiple times.
>
>         It looks like the feature I want or could use is defined by
>         the way of FLIP-36 about Interactive Programming, more
>         precisely caching the stream table [1].
>         While I wouldn't like to limit the discussion to that
>         non-existing yet feature. Maybe there are other ways of
>         achieving this danymic querying capability.
>
>         Kind Regards,
>         Krzysztof
>
>         [1]
>         https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>
>
>
>             * You want to use primary Table API as that allows you to
>             programmatically introduce structural variance (changing
>             rules).
>
>             * You start by registering the source as temporary table.
>
>             * Then you add your rules as SQL through
>             `TableEnvironment#sqlQuery`.
>             * Lastly you unionAll the results.
>
>             Then I'd perform some experiment if indeed the optimizer
>             figured out that it needs to only read the source once.
>             The resulting code would be minimal and easy to maintain.
>             If the performance is not satisfying, you can always make
>             it more complicated.
>
>             Best,
>
>             Arvid
>
>
>             On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki
>             <k.zarzycki@gmail.com <ma...@gmail.com>> wrote:
>
>                 Dear Flink community!
>
>                 In our company we have implemented a system that
>                 realize the dynamic business rules pattern. We spoke
>                 about it during Flink Forward 2019
>                 https://www.youtube.com/watch?v=CyrQ5B0exqU.
>                 The system is a great success and we would like to
>                 improve it. Let me shortly mention what the system does:
>                 * We have a Flink job with the engine that applies
>                 business rules on multiple data streams. These rules
>                 find patterns in data, produce complex events on these
>                 patterns.
>                 * The engine is built on top of CoProcessFunction, the
>                 rules are preimplemented using state and timers.
>                 * The engine accepts control messages, that deliver
>                 configuration of the rules, and start the instances of
>                 the rules. There might be many rule instances with
>                 different configurations running in parallel.
>                 * Data streams are routed to those rules, to all
>                 instances.
>
>                 The *advantages* of this design are:
>                   * *The performance is superb. *The key to it is
>                 that we read data from the Kafka topic once,
>                 deserialize once, shuffle it once (thankfully we have
>                 one partitioning key) and then apply over 100 rule
>                 instances needing the same data.
>                 * We are able to deploy multiple rule instances
>                 dynamically without starting/stopping the job.
>
>                 Especially the performance is crucial, we have up to
>                 500K events/s processed by 100 of rules on less than
>                 100 of cores. I can't imagine having 100 of Flink SQL
>                 queries each consuming these streams from Kafka on
>                 such a cluster.
>
>                 The main *painpoints *of the design is:
>                 * to deploy new business rule kind, we need to
>                 predevelop the rule template with use of our SDK. *We
>                 can't use* *great Flink CEP*, *Flink SQL
>                 libraries.* Which are getting stronger every day.
>                 Flink SQL with MATCH_RECOGNIZE would fit perfectly for
>                 our cases.
>                 * The isolation of the rules is weak. There are many
>                 rules running per job. One fails, the whole job fails.
>                 * There is one set of Kafka offsets, one watermark,
>                 one checkpoint for all the rules.
>                 * We have one just distribution key. Although that can
>                 be overcome.
>
>                 I would like to focus on solving the *first point*. We
>                 can live with the rest.
>
>                 *Question to the community*: Do you have ideas how to
>                 make it possible to develop with use of Flink SQL with
>                 MATCH_RECOGNIZE?
>
>                 My current ideas are:
>                 1. *A possibility to dynamically modify the job
>                 topology. *
>                 Then I imagine dynamically attaching Flink SQL jobs to
>                 the same Kafka sources.
>
>                 2. *A possibility to save data streams internally to
>                 Flink, predistributed*. Then Flink SQL queries should
>                 be able to read these streams.
>
>                 The ideal imaginary solution would look that simple in
>                 use:
>                 CREATE TABLE my_stream(...) with (<kafka properties>,
>                 cached = 'true')
>                 PARTITIONED BY my_partition_key
>
>                 (the cached table can also be a result of CREATE TABLE
>                 and INSERT INTO my_stream_cached SELECT ... FROM
>                 my_stream).
>
>                 then I can run multiple parallel Flink SQL queries
>                 reading from that cached table in Flink.
>                 These
>
>                 Technical implementation: Ideally, I imagine saving
>                 events in Flink state before they are consumed. Then
>                 implement a Flink source, that can read the Flink
>                 state of the state-filling job. It's a different job,
>                 I know! Of course it needs to run on the same Flink
>                 cluster.
>                 A lot of options are possible: building on top of
>                 Flink, modifying Flink (even keeping own fork for the
>                 time being), using an external component.
>
>                 In my opinion the key to the maximized performance are:
>                 * avoid pulling data through network from Kafka
>                 * avoid deserialization of messages for each of
>                 queries/ processors.
>
>                 Comments, ideas - Any feedback is welcome!
>                 Thank you!
>                 Krzysztof
>
>                 P.S.   I'm writing to both dev and users groups
>                 because I suspect I would need to modify Flink to
>                 achieve what I wrote above.
>

Re: Dynamic Flink SQL

Posted by Krzysztof Zarzycki <k....@gmail.com>.
I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory. Then
I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the data to
the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
that uses the source from cache to read the data out and then reinterprets
it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use
different classloaders. The class of cached object also needs to be
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it. All
the problems of checkpointing this data I will leave for later.

I'm very very interested in your, community, comments about this idea and
later productization of it.
Thanks!

Answering your comments:

> Unless you need reprocessing for newly added rules, I'd probably just
> cancel with savepoint and restart the application with the new rules. Of
> course, it depends on the rules themselves and how much state they require
> if a restart is viable. That's up to a POC.
>
No, I don't need reprocessing (yet). The rule starts processing the data
from the moment it is defined.
The cancellation with savepoint was considered, but because the number of
new rules defined/changed daily might be large enough, that will generate
too much of downtime. There is a lot of state kept in those rules making
the restart heavy. What's worse, that would be cross-tenant downtime,
unless the job was somehow per team/tenant. Therefore we reject this option.
BTW, the current design of our system is similar to the one from the blog
series by Alexander Fedulov about dynamic rules pattern [2] he's just
publishing.


> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
> That's why I proposed to run one big query instead of 500 small ones. Have
> a POC where you add two of your rules manually to a Table and see how the
> optimized logical plan looks like. I'd bet that the source is only tapped
> once.
>

I can do that PoC, no problem. But AFAIK it will only work with the
"restart with savepoint" pattern discussed above.


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html



> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki <k....@gmail.com>
> wrote:
>
>> Hello Arvid,
>> Thanks for joining to the thread!
>> First, did you take into consideration that I would like to dynamically
>> add queries on the same source? That means first define one query, later
>> the day add another one , then another one, and so on. A Week later kill
>> one of those, start yet another one, etc... There will be hundreds of these
>> queries running at once, but the set of queries change several times a day.
>> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
>> Regarding the temporary tables AFAIK they are only the metadata (let's
>> say Kafka topic detals) and store it in the scope of a SQL session.
>> Therefore multiple queries against that temp table will behave the same way
>> as querying normal table, that is will read the datasource multiple times.
>>
>> It looks like the feature I want or could use is defined by the way of
>> FLIP-36 about Interactive Programming, more precisely caching the stream
>> table [1].
>> While I wouldn't like to limit the discussion to that non-existing yet
>> feature. Maybe there are other ways of achieving this danymic querying
>> capability.
>>
>> Kind Regards,
>> Krzysztof
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>>
>>
>>
>> * You want to use primary Table API as that allows you to
>>> programmatically introduce structural variance (changing rules).
>>>
>> * You start by registering the source as temporary table.
>>>
>> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>>> * Lastly you unionAll the results.
>>>
>>> Then I'd perform some experiment if indeed the optimizer figured out
>>> that it needs to only read the source once. The resulting code would be
>>> minimal and easy to maintain. If the performance is not satisfying, you can
>>> always make it more complicated.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
>>> wrote:
>>>
>>>> Dear Flink community!
>>>>
>>>> In our company we have implemented a system that realize the dynamic
>>>> business rules pattern. We spoke about it during Flink Forward 2019
>>>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>>>> The system is a great success and we would like to improve it. Let me
>>>> shortly mention what the system does:
>>>> * We have a Flink job with the engine that applies business rules on
>>>> multiple data streams. These rules find patterns in data, produce complex
>>>> events on these patterns.
>>>> * The engine is built on top of CoProcessFunction, the rules are
>>>> preimplemented using state and timers.
>>>> * The engine accepts control messages, that deliver configuration of
>>>> the rules, and start the instances of the rules. There might be many rule
>>>> instances with different configurations running in parallel.
>>>> * Data streams are routed to those rules, to all instances.
>>>>
>>>> The *advantages* of this design are:
>>>>   * *The performance is superb. *The key to it is that we read data
>>>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we
>>>> have one partitioning key) and then apply over 100 rule instances needing
>>>> the same data.
>>>> * We are able to deploy multiple rule instances dynamically without
>>>> starting/stopping the job.
>>>>
>>>> Especially the performance is crucial, we have up to 500K events/s
>>>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>>>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>>>> cluster.
>>>>
>>>> The main *painpoints *of the design is:
>>>> * to deploy new business rule kind, we need to predevelop the rule
>>>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>>>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>>>> MATCH_RECOGNIZE would fit perfectly for our cases.
>>>> * The isolation of the rules is weak. There are many rules running per
>>>> job. One fails, the whole job fails.
>>>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>>>> all the rules.
>>>> * We have one just distribution key. Although that can be overcome.
>>>>
>>>> I would like to focus on solving the *first point*. We can live with
>>>> the rest.
>>>>
>>>> *Question to the community*: Do you have ideas how to make it possible
>>>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>>>
>>>> My current ideas are:
>>>> 1. *A possibility to dynamically modify the job topology. *
>>>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>>>> sources.
>>>>
>>>> 2. *A possibility to save data streams internally to Flink,
>>>> predistributed*. Then Flink SQL queries should be able to read these
>>>> streams.
>>>>
>>>> The ideal imaginary solution would look that simple in use:
>>>> CREATE TABLE my_stream(...) with (<kafka properties>,
>>>> cached = 'true')
>>>> PARTITIONED BY my_partition_key
>>>>
>>>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>>>> my_stream_cached SELECT ... FROM my_stream).
>>>>
>>>> then I can run multiple parallel Flink SQL queries reading from that
>>>> cached table in Flink.
>>>> These
>>>>
>>>> Technical implementation: Ideally, I imagine saving events in Flink
>>>> state before they are consumed. Then implement a Flink source, that can
>>>> read the Flink state of the state-filling job. It's a different job, I
>>>> know! Of course it needs to run on the same Flink cluster.
>>>> A lot of options are possible: building on top of Flink, modifying
>>>> Flink (even keeping own fork for the time being), using an external
>>>> component.
>>>>
>>>> In my opinion the key to the maximized performance are:
>>>> * avoid pulling data through network from Kafka
>>>> * avoid deserialization of messages for each of queries/ processors.
>>>>
>>>> Comments, ideas - Any feedback is welcome!
>>>> Thank you!
>>>> Krzysztof
>>>>
>>>> P.S.   I'm writing to both dev and users groups because I suspect I
>>>> would need to modify Flink to achieve what I wrote above.
>>>>
>>>

Re: Dynamic Flink SQL

Posted by Krzysztof Zarzycki <k....@gmail.com>.
I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory. Then
I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the data to
the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
that uses the source from cache to read the data out and then reinterprets
it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use
different classloaders. The class of cached object also needs to be
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it. All
the problems of checkpointing this data I will leave for later.

I'm very very interested in your, community, comments about this idea and
later productization of it.
Thanks!

Answering your comments:

> Unless you need reprocessing for newly added rules, I'd probably just
> cancel with savepoint and restart the application with the new rules. Of
> course, it depends on the rules themselves and how much state they require
> if a restart is viable. That's up to a POC.
>
No, I don't need reprocessing (yet). The rule starts processing the data
from the moment it is defined.
The cancellation with savepoint was considered, but because the number of
new rules defined/changed daily might be large enough, that will generate
too much of downtime. There is a lot of state kept in those rules making
the restart heavy. What's worse, that would be cross-tenant downtime,
unless the job was somehow per team/tenant. Therefore we reject this option.
BTW, the current design of our system is similar to the one from the blog
series by Alexander Fedulov about dynamic rules pattern [2] he's just
publishing.


> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
> That's why I proposed to run one big query instead of 500 small ones. Have
> a POC where you add two of your rules manually to a Table and see how the
> optimized logical plan looks like. I'd bet that the source is only tapped
> once.
>

I can do that PoC, no problem. But AFAIK it will only work with the
"restart with savepoint" pattern discussed above.


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html



> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki <k....@gmail.com>
> wrote:
>
>> Hello Arvid,
>> Thanks for joining to the thread!
>> First, did you take into consideration that I would like to dynamically
>> add queries on the same source? That means first define one query, later
>> the day add another one , then another one, and so on. A Week later kill
>> one of those, start yet another one, etc... There will be hundreds of these
>> queries running at once, but the set of queries change several times a day.
>> They will consume the same high intensive source(s) therefore I want to
>> optimize for that by consuming the messages in Flink only once.
>>
>> Regarding the temporary tables AFAIK they are only the metadata (let's
>> say Kafka topic detals) and store it in the scope of a SQL session.
>> Therefore multiple queries against that temp table will behave the same way
>> as querying normal table, that is will read the datasource multiple times.
>>
>> It looks like the feature I want or could use is defined by the way of
>> FLIP-36 about Interactive Programming, more precisely caching the stream
>> table [1].
>> While I wouldn't like to limit the discussion to that non-existing yet
>> feature. Maybe there are other ways of achieving this danymic querying
>> capability.
>>
>> Kind Regards,
>> Krzysztof
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>>
>>
>>
>> * You want to use primary Table API as that allows you to
>>> programmatically introduce structural variance (changing rules).
>>>
>> * You start by registering the source as temporary table.
>>>
>> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>>> * Lastly you unionAll the results.
>>>
>>> Then I'd perform some experiment if indeed the optimizer figured out
>>> that it needs to only read the source once. The resulting code would be
>>> minimal and easy to maintain. If the performance is not satisfying, you can
>>> always make it more complicated.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
>>> wrote:
>>>
>>>> Dear Flink community!
>>>>
>>>> In our company we have implemented a system that realize the dynamic
>>>> business rules pattern. We spoke about it during Flink Forward 2019
>>>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>>>> The system is a great success and we would like to improve it. Let me
>>>> shortly mention what the system does:
>>>> * We have a Flink job with the engine that applies business rules on
>>>> multiple data streams. These rules find patterns in data, produce complex
>>>> events on these patterns.
>>>> * The engine is built on top of CoProcessFunction, the rules are
>>>> preimplemented using state and timers.
>>>> * The engine accepts control messages, that deliver configuration of
>>>> the rules, and start the instances of the rules. There might be many rule
>>>> instances with different configurations running in parallel.
>>>> * Data streams are routed to those rules, to all instances.
>>>>
>>>> The *advantages* of this design are:
>>>>   * *The performance is superb. *The key to it is that we read data
>>>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we
>>>> have one partitioning key) and then apply over 100 rule instances needing
>>>> the same data.
>>>> * We are able to deploy multiple rule instances dynamically without
>>>> starting/stopping the job.
>>>>
>>>> Especially the performance is crucial, we have up to 500K events/s
>>>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>>>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>>>> cluster.
>>>>
>>>> The main *painpoints *of the design is:
>>>> * to deploy new business rule kind, we need to predevelop the rule
>>>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>>>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>>>> MATCH_RECOGNIZE would fit perfectly for our cases.
>>>> * The isolation of the rules is weak. There are many rules running per
>>>> job. One fails, the whole job fails.
>>>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>>>> all the rules.
>>>> * We have one just distribution key. Although that can be overcome.
>>>>
>>>> I would like to focus on solving the *first point*. We can live with
>>>> the rest.
>>>>
>>>> *Question to the community*: Do you have ideas how to make it possible
>>>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>>>
>>>> My current ideas are:
>>>> 1. *A possibility to dynamically modify the job topology. *
>>>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>>>> sources.
>>>>
>>>> 2. *A possibility to save data streams internally to Flink,
>>>> predistributed*. Then Flink SQL queries should be able to read these
>>>> streams.
>>>>
>>>> The ideal imaginary solution would look that simple in use:
>>>> CREATE TABLE my_stream(...) with (<kafka properties>,
>>>> cached = 'true')
>>>> PARTITIONED BY my_partition_key
>>>>
>>>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>>>> my_stream_cached SELECT ... FROM my_stream).
>>>>
>>>> then I can run multiple parallel Flink SQL queries reading from that
>>>> cached table in Flink.
>>>> These
>>>>
>>>> Technical implementation: Ideally, I imagine saving events in Flink
>>>> state before they are consumed. Then implement a Flink source, that can
>>>> read the Flink state of the state-filling job. It's a different job, I
>>>> know! Of course it needs to run on the same Flink cluster.
>>>> A lot of options are possible: building on top of Flink, modifying
>>>> Flink (even keeping own fork for the time being), using an external
>>>> component.
>>>>
>>>> In my opinion the key to the maximized performance are:
>>>> * avoid pulling data through network from Kafka
>>>> * avoid deserialization of messages for each of queries/ processors.
>>>>
>>>> Comments, ideas - Any feedback is welcome!
>>>> Thank you!
>>>> Krzysztof
>>>>
>>>> P.S.   I'm writing to both dev and users groups because I suspect I
>>>> would need to modify Flink to achieve what I wrote above.
>>>>
>>>

Re: Dynamic Flink SQL

Posted by Arvid Heise <ar...@ververica.com>.
I saw that requirement but I'm not sure if you really need to modify the
query at runtime.

Unless you need reprocessing for newly added rules, I'd probably just
cancel with savepoint and restart the application with the new rules. Of
course, it depends on the rules themselves and how much state they require
if a restart is viable. That's up to a POC.

They will consume the same high intensive source(s) therefore I want to
> optimize for that by consuming the messages in Flink only once.
>
That's why I proposed to run one big query instead of 500 small ones. Have
a POC where you add two of your rules manually to a Table and see how the
optimized logical plan looks like. I'd bet that the source is only tapped
once.

On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Hello Arvid,
> Thanks for joining to the thread!
> First, did you take into consideration that I would like to dynamically
> add queries on the same source? That means first define one query, later
> the day add another one , then another one, and so on. A Week later kill
> one of those, start yet another one, etc... There will be hundreds of these
> queries running at once, but the set of queries change several times a day.
> They will consume the same high intensive source(s) therefore I want to
> optimize for that by consuming the messages in Flink only once.
>
> Regarding the temporary tables AFAIK they are only the metadata (let's say
> Kafka topic detals) and store it in the scope of a SQL session. Therefore
> multiple queries against that temp table will behave the same way as
> querying normal table, that is will read the datasource multiple times.
>
> It looks like the feature I want or could use is defined by the way of
> FLIP-36 about Interactive Programming, more precisely caching the stream
> table [1].
> While I wouldn't like to limit the discussion to that non-existing yet
> feature. Maybe there are other ways of achieving this danymic querying
> capability.
>
> Kind Regards,
> Krzysztof
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>
>
>
> * You want to use primary Table API as that allows you to programmatically
>> introduce structural variance (changing rules).
>>
> * You start by registering the source as temporary table.
>>
> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>> * Lastly you unionAll the results.
>>
>> Then I'd perform some experiment if indeed the optimizer figured out that
>> it needs to only read the source once. The resulting code would be minimal
>> and easy to maintain. If the performance is not satisfying, you can always
>> make it more complicated.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
>> wrote:
>>
>>> Dear Flink community!
>>>
>>> In our company we have implemented a system that realize the dynamic
>>> business rules pattern. We spoke about it during Flink Forward 2019
>>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>>> The system is a great success and we would like to improve it. Let me
>>> shortly mention what the system does:
>>> * We have a Flink job with the engine that applies business rules on
>>> multiple data streams. These rules find patterns in data, produce complex
>>> events on these patterns.
>>> * The engine is built on top of CoProcessFunction, the rules are
>>> preimplemented using state and timers.
>>> * The engine accepts control messages, that deliver configuration of the
>>> rules, and start the instances of the rules. There might be many rule
>>> instances with different configurations running in parallel.
>>> * Data streams are routed to those rules, to all instances.
>>>
>>> The *advantages* of this design are:
>>>   * *The performance is superb. *The key to it is that we read data
>>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we
>>> have one partitioning key) and then apply over 100 rule instances needing
>>> the same data.
>>> * We are able to deploy multiple rule instances dynamically without
>>> starting/stopping the job.
>>>
>>> Especially the performance is crucial, we have up to 500K events/s
>>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>>> cluster.
>>>
>>> The main *painpoints *of the design is:
>>> * to deploy new business rule kind, we need to predevelop the rule
>>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>>> MATCH_RECOGNIZE would fit perfectly for our cases.
>>> * The isolation of the rules is weak. There are many rules running per
>>> job. One fails, the whole job fails.
>>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>>> all the rules.
>>> * We have one just distribution key. Although that can be overcome.
>>>
>>> I would like to focus on solving the *first point*. We can live with
>>> the rest.
>>>
>>> *Question to the community*: Do you have ideas how to make it possible
>>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>>
>>> My current ideas are:
>>> 1. *A possibility to dynamically modify the job topology. *
>>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>>> sources.
>>>
>>> 2. *A possibility to save data streams internally to Flink,
>>> predistributed*. Then Flink SQL queries should be able to read these
>>> streams.
>>>
>>> The ideal imaginary solution would look that simple in use:
>>> CREATE TABLE my_stream(...) with (<kafka properties>,
>>> cached = 'true')
>>> PARTITIONED BY my_partition_key
>>>
>>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>>> my_stream_cached SELECT ... FROM my_stream).
>>>
>>> then I can run multiple parallel Flink SQL queries reading from that
>>> cached table in Flink.
>>> These
>>>
>>> Technical implementation: Ideally, I imagine saving events in Flink
>>> state before they are consumed. Then implement a Flink source, that can
>>> read the Flink state of the state-filling job. It's a different job, I
>>> know! Of course it needs to run on the same Flink cluster.
>>> A lot of options are possible: building on top of Flink, modifying Flink
>>> (even keeping own fork for the time being), using an external component.
>>>
>>> In my opinion the key to the maximized performance are:
>>> * avoid pulling data through network from Kafka
>>> * avoid deserialization of messages for each of queries/ processors.
>>>
>>> Comments, ideas - Any feedback is welcome!
>>> Thank you!
>>> Krzysztof
>>>
>>> P.S.   I'm writing to both dev and users groups because I suspect I
>>> would need to modify Flink to achieve what I wrote above.
>>>
>>

Re: Dynamic Flink SQL

Posted by Arvid Heise <ar...@ververica.com>.
I saw that requirement but I'm not sure if you really need to modify the
query at runtime.

Unless you need reprocessing for newly added rules, I'd probably just
cancel with savepoint and restart the application with the new rules. Of
course, it depends on the rules themselves and how much state they require
if a restart is viable. That's up to a POC.

They will consume the same high intensive source(s) therefore I want to
> optimize for that by consuming the messages in Flink only once.
>
That's why I proposed to run one big query instead of 500 small ones. Have
a POC where you add two of your rules manually to a Table and see how the
optimized logical plan looks like. I'd bet that the source is only tapped
once.

On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Hello Arvid,
> Thanks for joining to the thread!
> First, did you take into consideration that I would like to dynamically
> add queries on the same source? That means first define one query, later
> the day add another one , then another one, and so on. A Week later kill
> one of those, start yet another one, etc... There will be hundreds of these
> queries running at once, but the set of queries change several times a day.
> They will consume the same high intensive source(s) therefore I want to
> optimize for that by consuming the messages in Flink only once.
>
> Regarding the temporary tables AFAIK they are only the metadata (let's say
> Kafka topic detals) and store it in the scope of a SQL session. Therefore
> multiple queries against that temp table will behave the same way as
> querying normal table, that is will read the datasource multiple times.
>
> It looks like the feature I want or could use is defined by the way of
> FLIP-36 about Interactive Programming, more precisely caching the stream
> table [1].
> While I wouldn't like to limit the discussion to that non-existing yet
> feature. Maybe there are other ways of achieving this danymic querying
> capability.
>
> Kind Regards,
> Krzysztof
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
>
>
>
> * You want to use primary Table API as that allows you to programmatically
>> introduce structural variance (changing rules).
>>
> * You start by registering the source as temporary table.
>>
> * Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
>> * Lastly you unionAll the results.
>>
>> Then I'd perform some experiment if indeed the optimizer figured out that
>> it needs to only read the source once. The resulting code would be minimal
>> and easy to maintain. If the performance is not satisfying, you can always
>> make it more complicated.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
>> wrote:
>>
>>> Dear Flink community!
>>>
>>> In our company we have implemented a system that realize the dynamic
>>> business rules pattern. We spoke about it during Flink Forward 2019
>>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>>> The system is a great success and we would like to improve it. Let me
>>> shortly mention what the system does:
>>> * We have a Flink job with the engine that applies business rules on
>>> multiple data streams. These rules find patterns in data, produce complex
>>> events on these patterns.
>>> * The engine is built on top of CoProcessFunction, the rules are
>>> preimplemented using state and timers.
>>> * The engine accepts control messages, that deliver configuration of the
>>> rules, and start the instances of the rules. There might be many rule
>>> instances with different configurations running in parallel.
>>> * Data streams are routed to those rules, to all instances.
>>>
>>> The *advantages* of this design are:
>>>   * *The performance is superb. *The key to it is that we read data
>>> from the Kafka topic once, deserialize once, shuffle it once (thankfully we
>>> have one partitioning key) and then apply over 100 rule instances needing
>>> the same data.
>>> * We are able to deploy multiple rule instances dynamically without
>>> starting/stopping the job.
>>>
>>> Especially the performance is crucial, we have up to 500K events/s
>>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>>> cluster.
>>>
>>> The main *painpoints *of the design is:
>>> * to deploy new business rule kind, we need to predevelop the rule
>>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>>> MATCH_RECOGNIZE would fit perfectly for our cases.
>>> * The isolation of the rules is weak. There are many rules running per
>>> job. One fails, the whole job fails.
>>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>>> all the rules.
>>> * We have one just distribution key. Although that can be overcome.
>>>
>>> I would like to focus on solving the *first point*. We can live with
>>> the rest.
>>>
>>> *Question to the community*: Do you have ideas how to make it possible
>>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>>
>>> My current ideas are:
>>> 1. *A possibility to dynamically modify the job topology. *
>>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>>> sources.
>>>
>>> 2. *A possibility to save data streams internally to Flink,
>>> predistributed*. Then Flink SQL queries should be able to read these
>>> streams.
>>>
>>> The ideal imaginary solution would look that simple in use:
>>> CREATE TABLE my_stream(...) with (<kafka properties>,
>>> cached = 'true')
>>> PARTITIONED BY my_partition_key
>>>
>>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>>> my_stream_cached SELECT ... FROM my_stream).
>>>
>>> then I can run multiple parallel Flink SQL queries reading from that
>>> cached table in Flink.
>>> These
>>>
>>> Technical implementation: Ideally, I imagine saving events in Flink
>>> state before they are consumed. Then implement a Flink source, that can
>>> read the Flink state of the state-filling job. It's a different job, I
>>> know! Of course it needs to run on the same Flink cluster.
>>> A lot of options are possible: building on top of Flink, modifying Flink
>>> (even keeping own fork for the time being), using an external component.
>>>
>>> In my opinion the key to the maximized performance are:
>>> * avoid pulling data through network from Kafka
>>> * avoid deserialization of messages for each of queries/ processors.
>>>
>>> Comments, ideas - Any feedback is welcome!
>>> Thank you!
>>> Krzysztof
>>>
>>> P.S.   I'm writing to both dev and users groups because I suspect I
>>> would need to modify Flink to achieve what I wrote above.
>>>
>>

Re: Dynamic Flink SQL

Posted by Krzysztof Zarzycki <k....@gmail.com>.
Hello Arvid,
Thanks for joining to the thread!
First, did you take into consideration that I would like to dynamically add
queries on the same source? That means first define one query, later the
day add another one , then another one, and so on. A Week later kill one of
those, start yet another one, etc... There will be hundreds of these
queries running at once, but the set of queries change several times a day.
They will consume the same high intensive source(s) therefore I want to
optimize for that by consuming the messages in Flink only once.

Regarding the temporary tables AFAIK they are only the metadata (let's say
Kafka topic detals) and store it in the scope of a SQL session. Therefore
multiple queries against that temp table will behave the same way as
querying normal table, that is will read the datasource multiple times.

It looks like the feature I want or could use is defined by the way of
FLIP-36 about Interactive Programming, more precisely caching the stream
table [1].
While I wouldn't like to limit the discussion to that non-existing yet
feature. Maybe there are other ways of achieving this danymic querying
capability.

Kind Regards,
Krzysztof

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable



* You want to use primary Table API as that allows you to programmatically
> introduce structural variance (changing rules).
>
* You start by registering the source as temporary table.
>
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
> * Lastly you unionAll the results.
>
> Then I'd perform some experiment if indeed the optimizer figured out that
> it needs to only read the source once. The resulting code would be minimal
> and easy to maintain. If the performance is not satisfying, you can always
> make it more complicated.
>
> Best,
>
> Arvid
>
>
> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
> wrote:
>
>> Dear Flink community!
>>
>> In our company we have implemented a system that realize the dynamic
>> business rules pattern. We spoke about it during Flink Forward 2019
>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>> The system is a great success and we would like to improve it. Let me
>> shortly mention what the system does:
>> * We have a Flink job with the engine that applies business rules on
>> multiple data streams. These rules find patterns in data, produce complex
>> events on these patterns.
>> * The engine is built on top of CoProcessFunction, the rules are
>> preimplemented using state and timers.
>> * The engine accepts control messages, that deliver configuration of the
>> rules, and start the instances of the rules. There might be many rule
>> instances with different configurations running in parallel.
>> * Data streams are routed to those rules, to all instances.
>>
>> The *advantages* of this design are:
>>   * *The performance is superb. *The key to it is that we read data from
>> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
>> one partitioning key) and then apply over 100 rule instances needing the
>> same data.
>> * We are able to deploy multiple rule instances dynamically without
>> starting/stopping the job.
>>
>> Especially the performance is crucial, we have up to 500K events/s
>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>> cluster.
>>
>> The main *painpoints *of the design is:
>> * to deploy new business rule kind, we need to predevelop the rule
>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>> MATCH_RECOGNIZE would fit perfectly for our cases.
>> * The isolation of the rules is weak. There are many rules running per
>> job. One fails, the whole job fails.
>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>> all the rules.
>> * We have one just distribution key. Although that can be overcome.
>>
>> I would like to focus on solving the *first point*. We can live with the
>> rest.
>>
>> *Question to the community*: Do you have ideas how to make it possible
>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>
>> My current ideas are:
>> 1. *A possibility to dynamically modify the job topology. *
>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>> sources.
>>
>> 2. *A possibility to save data streams internally to Flink,
>> predistributed*. Then Flink SQL queries should be able to read these
>> streams.
>>
>> The ideal imaginary solution would look that simple in use:
>> CREATE TABLE my_stream(...) with (<kafka properties>,
>> cached = 'true')
>> PARTITIONED BY my_partition_key
>>
>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>> my_stream_cached SELECT ... FROM my_stream).
>>
>> then I can run multiple parallel Flink SQL queries reading from that
>> cached table in Flink.
>> These
>>
>> Technical implementation: Ideally, I imagine saving events in Flink state
>> before they are consumed. Then implement a Flink source, that can read the
>> Flink state of the state-filling job. It's a different job, I know! Of
>> course it needs to run on the same Flink cluster.
>> A lot of options are possible: building on top of Flink, modifying Flink
>> (even keeping own fork for the time being), using an external component.
>>
>> In my opinion the key to the maximized performance are:
>> * avoid pulling data through network from Kafka
>> * avoid deserialization of messages for each of queries/ processors.
>>
>> Comments, ideas - Any feedback is welcome!
>> Thank you!
>> Krzysztof
>>
>> P.S.   I'm writing to both dev and users groups because I suspect I would
>> need to modify Flink to achieve what I wrote above.
>>
>

Re: Dynamic Flink SQL

Posted by Krzysztof Zarzycki <k....@gmail.com>.
Hello Arvid,
Thanks for joining to the thread!
First, did you take into consideration that I would like to dynamically add
queries on the same source? That means first define one query, later the
day add another one , then another one, and so on. A Week later kill one of
those, start yet another one, etc... There will be hundreds of these
queries running at once, but the set of queries change several times a day.
They will consume the same high intensive source(s) therefore I want to
optimize for that by consuming the messages in Flink only once.

Regarding the temporary tables AFAIK they are only the metadata (let's say
Kafka topic detals) and store it in the scope of a SQL session. Therefore
multiple queries against that temp table will behave the same way as
querying normal table, that is will read the datasource multiple times.

It looks like the feature I want or could use is defined by the way of
FLIP-36 about Interactive Programming, more precisely caching the stream
table [1].
While I wouldn't like to limit the discussion to that non-existing yet
feature. Maybe there are other ways of achieving this danymic querying
capability.

Kind Regards,
Krzysztof

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable



* You want to use primary Table API as that allows you to programmatically
> introduce structural variance (changing rules).
>
* You start by registering the source as temporary table.
>
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
> * Lastly you unionAll the results.
>
> Then I'd perform some experiment if indeed the optimizer figured out that
> it needs to only read the source once. The resulting code would be minimal
> and easy to maintain. If the performance is not satisfying, you can always
> make it more complicated.
>
> Best,
>
> Arvid
>
>
> On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
> wrote:
>
>> Dear Flink community!
>>
>> In our company we have implemented a system that realize the dynamic
>> business rules pattern. We spoke about it during Flink Forward 2019
>> https://www.youtube.com/watch?v=CyrQ5B0exqU.
>> The system is a great success and we would like to improve it. Let me
>> shortly mention what the system does:
>> * We have a Flink job with the engine that applies business rules on
>> multiple data streams. These rules find patterns in data, produce complex
>> events on these patterns.
>> * The engine is built on top of CoProcessFunction, the rules are
>> preimplemented using state and timers.
>> * The engine accepts control messages, that deliver configuration of the
>> rules, and start the instances of the rules. There might be many rule
>> instances with different configurations running in parallel.
>> * Data streams are routed to those rules, to all instances.
>>
>> The *advantages* of this design are:
>>   * *The performance is superb. *The key to it is that we read data from
>> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
>> one partitioning key) and then apply over 100 rule instances needing the
>> same data.
>> * We are able to deploy multiple rule instances dynamically without
>> starting/stopping the job.
>>
>> Especially the performance is crucial, we have up to 500K events/s
>> processed by 100 of rules on less than 100 of cores. I can't imagine having
>> 100 of Flink SQL queries each consuming these streams from Kafka on such a
>> cluster.
>>
>> The main *painpoints *of the design is:
>> * to deploy new business rule kind, we need to predevelop the rule
>> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
>> SQL libraries.* Which are getting stronger every day. Flink SQL with
>> MATCH_RECOGNIZE would fit perfectly for our cases.
>> * The isolation of the rules is weak. There are many rules running per
>> job. One fails, the whole job fails.
>> * There is one set of Kafka offsets, one watermark, one checkpoint for
>> all the rules.
>> * We have one just distribution key. Although that can be overcome.
>>
>> I would like to focus on solving the *first point*. We can live with the
>> rest.
>>
>> *Question to the community*: Do you have ideas how to make it possible
>> to develop with use of Flink SQL with MATCH_RECOGNIZE?
>>
>> My current ideas are:
>> 1. *A possibility to dynamically modify the job topology. *
>> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
>> sources.
>>
>> 2. *A possibility to save data streams internally to Flink,
>> predistributed*. Then Flink SQL queries should be able to read these
>> streams.
>>
>> The ideal imaginary solution would look that simple in use:
>> CREATE TABLE my_stream(...) with (<kafka properties>,
>> cached = 'true')
>> PARTITIONED BY my_partition_key
>>
>> (the cached table can also be a result of CREATE TABLE and INSERT INTO
>> my_stream_cached SELECT ... FROM my_stream).
>>
>> then I can run multiple parallel Flink SQL queries reading from that
>> cached table in Flink.
>> These
>>
>> Technical implementation: Ideally, I imagine saving events in Flink state
>> before they are consumed. Then implement a Flink source, that can read the
>> Flink state of the state-filling job. It's a different job, I know! Of
>> course it needs to run on the same Flink cluster.
>> A lot of options are possible: building on top of Flink, modifying Flink
>> (even keeping own fork for the time being), using an external component.
>>
>> In my opinion the key to the maximized performance are:
>> * avoid pulling data through network from Kafka
>> * avoid deserialization of messages for each of queries/ processors.
>>
>> Comments, ideas - Any feedback is welcome!
>> Thank you!
>> Krzysztof
>>
>> P.S.   I'm writing to both dev and users groups because I suspect I would
>> need to modify Flink to achieve what I wrote above.
>>
>

Re: Dynamic Flink SQL

Posted by Arvid Heise <ar...@ververica.com>.
Hi Krzysztof,

from my past experience as data engineer, I can safely say that users often
underestimate the optimization potential and techniques of the used
systems. I implemented a similar thing in the past, where I parsed up to
500 rules reading from up to 10 data sources.
The basic idea was to simple generated one big SQL query and let the SQL
optimizer figure out what to do. And as you would have hoped, the optimizer
ultimately figured that it only needs to read each of the 10 sources once
and apply 50 aggregations on average on each of the datasets.

With that said, I'd start simple first:
* You want to use primary Table API as that allows you to programmatically
introduce structural variance (changing rules).
* You start by registering the source as temporary table.
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
* Lastly you unionAll the results.

Then I'd perform some experiment if indeed the optimizer figured out that
it needs to only read the source once. The resulting code would be minimal
and easy to maintain. If the performance is not satisfying, you can always
make it more complicated.

Best,

Arvid


On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Dear Flink community!
>
> In our company we have implemented a system that realize the dynamic
> business rules pattern. We spoke about it during Flink Forward 2019
> https://www.youtube.com/watch?v=CyrQ5B0exqU.
> The system is a great success and we would like to improve it. Let me
> shortly mention what the system does:
> * We have a Flink job with the engine that applies business rules on
> multiple data streams. These rules find patterns in data, produce complex
> events on these patterns.
> * The engine is built on top of CoProcessFunction, the rules are
> preimplemented using state and timers.
> * The engine accepts control messages, that deliver configuration of the
> rules, and start the instances of the rules. There might be many rule
> instances with different configurations running in parallel.
> * Data streams are routed to those rules, to all instances.
>
> The *advantages* of this design are:
>   * *The performance is superb. *The key to it is that we read data from
> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
> one partitioning key) and then apply over 100 rule instances needing the
> same data.
> * We are able to deploy multiple rule instances dynamically without
> starting/stopping the job.
>
> Especially the performance is crucial, we have up to 500K events/s
> processed by 100 of rules on less than 100 of cores. I can't imagine having
> 100 of Flink SQL queries each consuming these streams from Kafka on such a
> cluster.
>
> The main *painpoints *of the design is:
> * to deploy new business rule kind, we need to predevelop the rule
> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
> SQL libraries.* Which are getting stronger every day. Flink SQL with
> MATCH_RECOGNIZE would fit perfectly for our cases.
> * The isolation of the rules is weak. There are many rules running per
> job. One fails, the whole job fails.
> * There is one set of Kafka offsets, one watermark, one checkpoint for all
> the rules.
> * We have one just distribution key. Although that can be overcome.
>
> I would like to focus on solving the *first point*. We can live with the
> rest.
>
> *Question to the community*: Do you have ideas how to make it possible to
> develop with use of Flink SQL with MATCH_RECOGNIZE?
>
> My current ideas are:
> 1. *A possibility to dynamically modify the job topology. *
> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
> sources.
>
> 2. *A possibility to save data streams internally to Flink,
> predistributed*. Then Flink SQL queries should be able to read these
> streams.
>
> The ideal imaginary solution would look that simple in use:
> CREATE TABLE my_stream(...) with (<kafka properties>,
> cached = 'true')
> PARTITIONED BY my_partition_key
>
> (the cached table can also be a result of CREATE TABLE and INSERT INTO
> my_stream_cached SELECT ... FROM my_stream).
>
> then I can run multiple parallel Flink SQL queries reading from that
> cached table in Flink.
> These
>
> Technical implementation: Ideally, I imagine saving events in Flink state
> before they are consumed. Then implement a Flink source, that can read the
> Flink state of the state-filling job. It's a different job, I know! Of
> course it needs to run on the same Flink cluster.
> A lot of options are possible: building on top of Flink, modifying Flink
> (even keeping own fork for the time being), using an external component.
>
> In my opinion the key to the maximized performance are:
> * avoid pulling data through network from Kafka
> * avoid deserialization of messages for each of queries/ processors.
>
> Comments, ideas - Any feedback is welcome!
> Thank you!
> Krzysztof
>
> P.S.   I'm writing to both dev and users groups because I suspect I would
> need to modify Flink to achieve what I wrote above.
>

Re: Dynamic Flink SQL

Posted by Arvid Heise <ar...@ververica.com>.
Hi Krzysztof,

from my past experience as data engineer, I can safely say that users often
underestimate the optimization potential and techniques of the used
systems. I implemented a similar thing in the past, where I parsed up to
500 rules reading from up to 10 data sources.
The basic idea was to simple generated one big SQL query and let the SQL
optimizer figure out what to do. And as you would have hoped, the optimizer
ultimately figured that it only needs to read each of the 10 sources once
and apply 50 aggregations on average on each of the datasets.

With that said, I'd start simple first:
* You want to use primary Table API as that allows you to programmatically
introduce structural variance (changing rules).
* You start by registering the source as temporary table.
* Then you add your rules as SQL through `TableEnvironment#sqlQuery`.
* Lastly you unionAll the results.

Then I'd perform some experiment if indeed the optimizer figured out that
it needs to only read the source once. The resulting code would be minimal
and easy to maintain. If the performance is not satisfying, you can always
make it more complicated.

Best,

Arvid


On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki <k....@gmail.com>
wrote:

> Dear Flink community!
>
> In our company we have implemented a system that realize the dynamic
> business rules pattern. We spoke about it during Flink Forward 2019
> https://www.youtube.com/watch?v=CyrQ5B0exqU.
> The system is a great success and we would like to improve it. Let me
> shortly mention what the system does:
> * We have a Flink job with the engine that applies business rules on
> multiple data streams. These rules find patterns in data, produce complex
> events on these patterns.
> * The engine is built on top of CoProcessFunction, the rules are
> preimplemented using state and timers.
> * The engine accepts control messages, that deliver configuration of the
> rules, and start the instances of the rules. There might be many rule
> instances with different configurations running in parallel.
> * Data streams are routed to those rules, to all instances.
>
> The *advantages* of this design are:
>   * *The performance is superb. *The key to it is that we read data from
> the Kafka topic once, deserialize once, shuffle it once (thankfully we have
> one partitioning key) and then apply over 100 rule instances needing the
> same data.
> * We are able to deploy multiple rule instances dynamically without
> starting/stopping the job.
>
> Especially the performance is crucial, we have up to 500K events/s
> processed by 100 of rules on less than 100 of cores. I can't imagine having
> 100 of Flink SQL queries each consuming these streams from Kafka on such a
> cluster.
>
> The main *painpoints *of the design is:
> * to deploy new business rule kind, we need to predevelop the rule
> template with use of our SDK. *We can't use* *great Flink CEP*, *Flink
> SQL libraries.* Which are getting stronger every day. Flink SQL with
> MATCH_RECOGNIZE would fit perfectly for our cases.
> * The isolation of the rules is weak. There are many rules running per
> job. One fails, the whole job fails.
> * There is one set of Kafka offsets, one watermark, one checkpoint for all
> the rules.
> * We have one just distribution key. Although that can be overcome.
>
> I would like to focus on solving the *first point*. We can live with the
> rest.
>
> *Question to the community*: Do you have ideas how to make it possible to
> develop with use of Flink SQL with MATCH_RECOGNIZE?
>
> My current ideas are:
> 1. *A possibility to dynamically modify the job topology. *
> Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
> sources.
>
> 2. *A possibility to save data streams internally to Flink,
> predistributed*. Then Flink SQL queries should be able to read these
> streams.
>
> The ideal imaginary solution would look that simple in use:
> CREATE TABLE my_stream(...) with (<kafka properties>,
> cached = 'true')
> PARTITIONED BY my_partition_key
>
> (the cached table can also be a result of CREATE TABLE and INSERT INTO
> my_stream_cached SELECT ... FROM my_stream).
>
> then I can run multiple parallel Flink SQL queries reading from that
> cached table in Flink.
> These
>
> Technical implementation: Ideally, I imagine saving events in Flink state
> before they are consumed. Then implement a Flink source, that can read the
> Flink state of the state-filling job. It's a different job, I know! Of
> course it needs to run on the same Flink cluster.
> A lot of options are possible: building on top of Flink, modifying Flink
> (even keeping own fork for the time being), using an external component.
>
> In my opinion the key to the maximized performance are:
> * avoid pulling data through network from Kafka
> * avoid deserialization of messages for each of queries/ processors.
>
> Comments, ideas - Any feedback is welcome!
> Thank you!
> Krzysztof
>
> P.S.   I'm writing to both dev and users groups because I suspect I would
> need to modify Flink to achieve what I wrote above.
>