You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juergen Donnerstag <ju...@gmail.com> on 2020/02/10 17:35:06 UTC

FlinkCEP questions - architecture

Hi,

we're in very early stages evaluating options. I'm not a Flink expert, but
did read some of the docs and watched videos. Could you please help me
understand if and how certain of our reqs are covered by Flink (CEP). Is
this mailing list the right channel for such questions?

1) We receive files every day, which are exports from some database tables,
containing ONLY changes from the day. Most tables have modify-cols. Even
though they are files but because they contain changes only, I belief the
file records shall be considered events in Flink terminology. Is that
assumption correct?

2) The records within the DB export files are NOT in chronologically, and
we can not change the export. Our use case is a "complex event processing"
case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C
within 30 days, then do something". Does that work with FlinkCEP despite
the events/records are not in chrono order within the file? The files are
100MB to 20GB in size. Do I need to sort the files first before CEP
processing?

3) Occassionally some crazy people manually "correct" DB records within the
database and manually trigger a re-export of ALL of the changes for that
respective day (e.g. last weeks Tuesday). Consequently we receive a
correction file. Same filename but "_1" appended. All filenames include the
date (of the original export). What are the options to handle that case
(besides telling the DB admins not to, which we did already). Regular
checkpoints and re-process all files since then?  What happens to the CEP
state? Will it be checkpointed as well?

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

5) We also have CEP rules that must fire if after a start sequence matched,
the remaining sequence did NOT within a configured window. E.g. If A, then
B, but C did not occur within 30 days since A. Is that supported by
FlinkCEP? I couldn't find a working example.

6) We expect 30-40 CEP rules. How can we estimate the required storage size
for the temporary CEP state? Is there some sort of formular considering
number of rules, number of records per file or day, record size, window,
number of records matched per sequence, number of keyBy grouping keys, ...

7) I can imagine that for debugging reasons it'd be good if we were able to
query the temporary CEP state. What is the (CEP) schema used to persist the
CEP state and how can we query it? And does such query work on the whole
cluster or only per node (e.g. because of shuffle and nodes responsible
only for a portion of the events).

8) I understand state is stored per node. What happens if I want to add or
remove a nodes. Will the state still be found, despite it being stored in
another node? I read that I need to be equally careful when changing rules?
Or is that a different issue?

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging it
at the earliest possible time is not always the best option. May be after
30 days later or so.

10) Are there strategies to minimize temp CEP state? In SQL queries you
 filter first on the "smallest" attributes. CEP rules form a sequence.
Hence that approach will not work. Is that an issue at all? What are
practical limits on the CEP temp state storage engine?

11) Occassionally we need to process about 200 files at once. Can I speed
things up by processing all files in parallel on multiple nodes, despite
their sequence (CEP use case)? This would only work if FlinkCEP in step 1
simply filters on all relevant events of a sequence, updates state, and in
a step 2 - after the files are processed - evaluates the updated state if
that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source system
schema is changed, and not always in a backwards compatible way (insert new
fields in the middle), and also the export will have the field in the
middle. This means that starting from a specific (file) date, I need to
consider a different schema. This must also be handled when re-running
files for the last month, because of corrections provided. And if the file
format has changed someone in the middle ...

thanks a lot for your time and your help
Juergen

Re: FlinkCEP questions - architecture

Posted by Arvid Heise <ar...@ververica.com>.
Hi Juergen,

1) yes, you are using a changelog of events. If you need more information,
you could search for change data capture architecture.

For alle CEP question, I'm pulling in Kostas.

12) It depends in which format the data is exported. If you use a format
with schema evolution (e.g. Avro), then schema changes will be handled
gracefully.

Best,

Arvid

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag <
juergen.donnerstag@gmail.com> wrote:

> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but
> did read some of the docs and watched videos. Could you please help me
> understand if and how certain of our reqs are covered by Flink (CEP). Is
> this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have modify-cols.
> Even though they are files but because they contain changes only, I belief
> the file records shall be considered events in Flink terminology. Is that
> assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and
> we can not change the export. Our use case is a "complex event processing"
> case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C
> within 30 days, then do something". Does that work with FlinkCEP despite
> the events/records are not in chrono order within the file? The files are
> 100MB to 20GB in size. Do I need to sort the files first before CEP
> processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within
> the database and manually trigger a re-export of ALL of the changes for
> that respective day (e.g. last weeks Tuesday). Consequently we receive a
> correction file. Same filename but "_1" appended. All filenames include the
> date (of the original export). What are the options to handle that case
> (besides telling the DB admins not to, which we did already). Regular
> checkpoints and re-process all files since then?  What happens to the CEP
> state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window. E.g. If
> A, then B, but C did not occur within 30 days since A. Is that supported by
> FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able
> to query the temporary CEP state. What is the (CEP) schema used to persist
> the CEP state and how can we query it? And does such query work on the
> whole cluster or only per node (e.g. because of shuffle and nodes
> responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or
> remove a nodes. Will the state still be found, despite it being stored in
> another node? I read that I need to be equally careful when changing rules?
> Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging it
> at the earliest possible time is not always the best option. May be after
> 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you
>  filter first on the "smallest" attributes. CEP rules form a sequence.
> Hence that approach will not work. Is that an issue at all? What are
> practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed
> things up by processing all files in parallel on multiple nodes, despite
> their sequence (CEP use case)? This would only work if FlinkCEP in step 1
> simply filters on all relevant events of a sequence, updates state, and in
> a step 2 - after the files are processed - evaluates the updated state if
> that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system
> schema is changed, and not always in a backwards compatible way (insert new
> fields in the middle), and also the export will have the field in the
> middle. This means that starting from a specific (file) date, I need to
> consider a different schema. This must also be handled when re-running
> files for the last month, because of corrections provided. And if the file
> format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
>

Re: FlinkCEP questions - architecture

Posted by Oytun Tez <oy...@motaword.com>.
Amazing content, thanks for asking and answering.

On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag <
juergen.donnerstag@gmail.com> wrote:

> thanks a lot
> Juergen
>
> On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kk...@gmail.com>
> wrote:
>
>> Hi Juergen,
>>
>> I will reply to your questions inline. As a general comment I would
>> suggest to also have a look at [3] so that you have an idea of some of
>> the alternatives.
>> With that said, here come the answers :)
>>
>> 1) We receive files every day, which are exports from some database
>> tables, containing ONLY changes from the day. Most tables have
>> modify-cols. Even though they are files but because they contain
>> changes only, I belief the file records shall be considered events in
>> Flink terminology. Is that assumption correct?
>>
>> -> Yes. I think your assumption is correct.
>>
>> 2) The records within the DB export files are NOT in chronologically,
>> and we can not change the export. Our use case is a "complex event
>> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
>> A, then B, then C within 30 days, then do something". Does that work
>> with FlinkCEP despite the events/records are not in chrono order
>> within the file? The files are 100MB to 20GB in size. Do I need to
>> sort the files first before CEP processing?
>>
>> -> Flink CEP also works in event time and the re-ordering can be done by
>> Flink
>>
>> 3) Occassionally some crazy people manually "correct" DB records
>> within the database and manually trigger a re-export of ALL of the
>> changes for that respective day (e.g. last weeks Tuesday).
>> Consequently we receive a correction file. Same filename but "_1"
>> appended. All filenames include the date (of the original export).
>> What are the options to handle that case (besides telling the DB
>> admins not to, which we did already). Regular checkpoints and
>> re-process all files since then?  What happens to the CEP state? Will
>> it be checkpointed as well?
>>
>> -> If you require re-processing, then I would say that your best
>> option is what you described. The other option would be to keep
>> everything in Flink state until you are sure that no more corrections
>> will come. In this case, you have to somehow issue the "correction" in
>> a way that the downstream system can understand what to correct and
>> how. Keep in mind that this may be an expensive operation because
>> everything has to be kept in state for longer.
>>
>> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>>
>> -> The only thing to consider is the size of your state. Time is not
>> necessarily an issue. If your state for these 180 days is a couple of
>> MBs, then you have no problem. If it increases fast, then you have to
>> provision your cluster accordingly.
>>
>> 5) We also have CEP rules that must fire if after a start sequence
>> matched, the remaining sequence did NOT within a configured window.
>> E.g. If A, then B, but C did not occur within 30 days since A. Is that
>> supported by FlinkCEP? I couldn't find a working example.
>>
>> -> You can have a look at [1] for the supported pattern combinations
>> and you can also look at [2] for some tests of different pattern
>> combinations.
>>
>> 6) We expect 30-40 CEP rules. How can we estimate the required storage
>> size for the temporary CEP state? Is there some sort of formular
>> considering number of rules, number of records per file or day, record
>> size, window, number of records matched per sequence, number of keyBy
>> grouping keys, ...
>>
>> -> In FlinkCEP, each pattern becomes a single operator. This means
>> that you will have 30-40 operators in your job graph, each with each
>> own state. This can become heavy but once again it depends on your
>> workload. I cannot give an estimate because in CEP, in order to
>> guarantee correct ordering of events in an unordered stream, the
>> library sometimes has to keep also in state more records than will be
>> presented at the end.
>>
>> Have you considered going with a solution based on processfunction and
>> broadcast state? This will also allow you to have a more dynamic
>> set-up where patterns can be added at runtime and it will allow you to
>> do any optimizations specific to your workload ;) For a discussion on
>> this, check [3]. In addition, it will allow you to "multiplex" many
>> patterns into a single operator thus potentially minimizing the amount
>> of copies of the state you keep.
>>
>> 7) I can imagine that for debugging reasons it'd be good if we were
>> able to query the temporary CEP state. What is the (CEP) schema used
>> to persist the CEP state and how can we query it? And does such query
>> work on the whole cluster or only per node (e.g. because of shuffle
>> and nodes responsible only for a portion of the events).
>>
>> -> Unfortunatelly the state in CEP is not queryable, thus I am not
>> sure if you can inspect it at runtime.
>>
>> 8) I understand state is stored per node. What happens if I want to
>> add or remove a nodes. Will the state still be found, despite it being
>> stored in another node? I read that I need to be equally careful when
>> changing rules? Or is that a different issue?
>>
>> -> Rescaling a Flink job is not done automatically. You need to take a
>> savepoint and then relaunch your job with a different parallelism.
>> Updating a rule is not supported in CEP, as changing a rule would
>> imply that (potentially) the state should change. But what you could
>> do is take a savepoint, remove the old pattern and add a new one (the
>> updated one) and tell Flink to ignore the state of the previous
>> operator (as said earlier each CEP pattern is translated to an
>> operator).
>>
>> 9) How does garbage collection of temp CEP state work, or will it stay
>> forever?  For tracing/investigation reasons I can imagine that purging
>> it at the earliest possible time is not always the best option. May be
>> after 30 days later or so.
>>
>> -> CEP clean state after the time horizon (specified with the
>> .within() clause) expires.
>>
>> 10) Are there strategies to minimize temp CEP state? In SQL queries
>> you  filter first on the "smallest" attributes. CEP rules form a
>> sequence. Hence that approach will not work. Is that an issue at all?
>> What are practical limits on the CEP temp state storage engine?
>>
>> -> Such optimizations are not supported out of the box. I would
>> recommend to go with the Broadcast state approach in [3].
>>
>> 11) Occassionally we need to process about 200 files at once. Can I
>> speed things up by processing all files in parallel on multiple nodes,
>> despite their sequence (CEP use case)? This would only work if
>> FlinkCEP in step 1 simply filters on all relevant events of a
>> sequence, updates state, and in a step 2 - after the files are
>> processed - evaluates the updated state if that meets the sequences.
>>
>> 12) Schema changes in the input files: Occassionly the DB source
>> system schema is changed, and not always in a backwards compatible way
>> (insert new fields in the middle), and also the export will have the
>> field in the middle. This means that starting from a specific (file)
>> date, I need to consider a different schema. This must also be handled
>> when re-running files for the last month, because of corrections
>> provided. And if the file format has changed someone in the middle ...
>>
>> -> This seems to be relevant for the "data cleaning" phase, before you
>> send your data to CEP. In this case, if the schema changes, then I
>> assume that you need to update your initial parsing logic, which means
>> taking a savepoint and redeploying the updated jobGraph with the new
>> input parsing logic (if I understand correctly).
>>
>> thanks a lot for your time and your help
>>
>> I hope that above helps!
>>
>> Cheers,
>> Kostas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
>> [2]
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
>> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>>
>> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
>> <ju...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > we're in very early stages evaluating options. I'm not a Flink expert,
>> but did read some of the docs and watched videos. Could you please help me
>> understand if and how certain of our reqs are covered by Flink (CEP). Is
>> this mailing list the right channel for such questions?
>> >
>> > 1) We receive files every day, which are exports from some database
>> tables, containing ONLY changes from the day. Most tables have modify-cols.
>> Even though they are files but because they contain changes only, I belief
>> the file records shall be considered events in Flink terminology. Is that
>> assumption correct?
>> >
>> > 2) The records within the DB export files are NOT in chronologically,
>> and we can not change the export. Our use case is a "complex event
>> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A,
>> then B, then C within 30 days, then do something". Does that work with
>> FlinkCEP despite the events/records are not in chrono order within the
>> file? The files are 100MB to 20GB in size. Do I need to sort the files
>> first before CEP processing?
>> >
>> > 3) Occassionally some crazy people manually "correct" DB records within
>> the database and manually trigger a re-export of ALL of the changes for
>> that respective day (e.g. last weeks Tuesday). Consequently we receive a
>> correction file. Same filename but "_1" appended. All filenames include the
>> date (of the original export). What are the options to handle that case
>> (besides telling the DB admins not to, which we did already). Regular
>> checkpoints and re-process all files since then?  What happens to the CEP
>> state? Will it be checkpointed as well?
>> >
>> > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>> >
>> > 5) We also have CEP rules that must fire if after a start sequence
>> matched, the remaining sequence did NOT within a configured window. E.g. If
>> A, then B, but C did not occur within 30 days since A. Is that supported by
>> FlinkCEP? I couldn't find a working example.
>> >
>> > 6) We expect 30-40 CEP rules. How can we estimate the required storage
>> size for the temporary CEP state? Is there some sort of formular
>> considering number of rules, number of records per file or day, record
>> size, window, number of records matched per sequence, number of keyBy
>> grouping keys, ...
>> >
>> > 7) I can imagine that for debugging reasons it'd be good if we were
>> able to query the temporary CEP state. What is the (CEP) schema used to
>> persist the CEP state and how can we query it? And does such query work on
>> the whole cluster or only per node (e.g. because of shuffle and nodes
>> responsible only for a portion of the events).
>> >
>> > 8) I understand state is stored per node. What happens if I want to add
>> or remove a nodes. Will the state still be found, despite it being stored
>> in another node? I read that I need to be equally careful when changing
>> rules? Or is that a different issue?
>> >
>> > 9) How does garbage collection of temp CEP state work, or will it stay
>> forever?  For tracing/investigation reasons I can imagine that purging it
>> at the earliest possible time is not always the best option. May be after
>> 30 days later or so.
>> >
>> > 10) Are there strategies to minimize temp CEP state? In SQL queries
>> you  filter first on the "smallest" attributes. CEP rules form a sequence.
>> Hence that approach will not work. Is that an issue at all? What are
>> practical limits on the CEP temp state storage engine?
>> >
>> > 11) Occassionally we need to process about 200 files at once. Can I
>> speed things up by processing all files in parallel on multiple nodes,
>> despite their sequence (CEP use case)? This would only work if FlinkCEP in
>> step 1 simply filters on all relevant events of a sequence, updates state,
>> and in a step 2 - after the files are processed - evaluates the updated
>> state if that meets the sequences.
>> >
>> > 12) Schema changes in the input files: Occassionly the DB source system
>> schema is changed, and not always in a backwards compatible way (insert new
>> fields in the middle), and also the export will have the field in the
>> middle. This means that starting from a specific (file) date, I need to
>> consider a different schema. This must also be handled when re-running
>> files for the last month, because of corrections provided. And if the file
>> format has changed someone in the middle ...
>> >
>> > thanks a lot for your time and your help
>> > Juergen
>>
> --
 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oytun@motaword.com

      <https://www.motaword.com/blog>

Re: FlinkCEP questions - architecture

Posted by Juergen Donnerstag <ju...@gmail.com>.
thanks a lot
Juergen

On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Juergen,
>
> I will reply to your questions inline. As a general comment I would
> suggest to also have a look at [3] so that you have an idea of some of
> the alternatives.
> With that said, here come the answers :)
>
> 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have
> modify-cols. Even though they are files but because they contain
> changes only, I belief the file records shall be considered events in
> Flink terminology. Is that assumption correct?
>
> -> Yes. I think your assumption is correct.
>
> 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
> A, then B, then C within 30 days, then do something". Does that work
> with FlinkCEP despite the events/records are not in chrono order
> within the file? The files are 100MB to 20GB in size. Do I need to
> sort the files first before CEP processing?
>
> -> Flink CEP also works in event time and the re-ordering can be done by
> Flink
>
> 3) Occassionally some crazy people manually "correct" DB records
> within the database and manually trigger a re-export of ALL of the
> changes for that respective day (e.g. last weeks Tuesday).
> Consequently we receive a correction file. Same filename but "_1"
> appended. All filenames include the date (of the original export).
> What are the options to handle that case (besides telling the DB
> admins not to, which we did already). Regular checkpoints and
> re-process all files since then?  What happens to the CEP state? Will
> it be checkpointed as well?
>
> -> If you require re-processing, then I would say that your best
> option is what you described. The other option would be to keep
> everything in Flink state until you are sure that no more corrections
> will come. In this case, you have to somehow issue the "correction" in
> a way that the downstream system can understand what to correct and
> how. Keep in mind that this may be an expensive operation because
> everything has to be kept in state for longer.
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> -> The only thing to consider is the size of your state. Time is not
> necessarily an issue. If your state for these 180 days is a couple of
> MBs, then you have no problem. If it increases fast, then you have to
> provision your cluster accordingly.
>
> 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window.
> E.g. If A, then B, but C did not occur within 30 days since A. Is that
> supported by FlinkCEP? I couldn't find a working example.
>
> -> You can have a look at [1] for the supported pattern combinations
> and you can also look at [2] for some tests of different pattern
> combinations.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
>
> -> In FlinkCEP, each pattern becomes a single operator. This means
> that you will have 30-40 operators in your job graph, each with each
> own state. This can become heavy but once again it depends on your
> workload. I cannot give an estimate because in CEP, in order to
> guarantee correct ordering of events in an unordered stream, the
> library sometimes has to keep also in state more records than will be
> presented at the end.
>
> Have you considered going with a solution based on processfunction and
> broadcast state? This will also allow you to have a more dynamic
> set-up where patterns can be added at runtime and it will allow you to
> do any optimizations specific to your workload ;) For a discussion on
> this, check [3]. In addition, it will allow you to "multiplex" many
> patterns into a single operator thus potentially minimizing the amount
> of copies of the state you keep.
>
> 7) I can imagine that for debugging reasons it'd be good if we were
> able to query the temporary CEP state. What is the (CEP) schema used
> to persist the CEP state and how can we query it? And does such query
> work on the whole cluster or only per node (e.g. because of shuffle
> and nodes responsible only for a portion of the events).
>
> -> Unfortunatelly the state in CEP is not queryable, thus I am not
> sure if you can inspect it at runtime.
>
> 8) I understand state is stored per node. What happens if I want to
> add or remove a nodes. Will the state still be found, despite it being
> stored in another node? I read that I need to be equally careful when
> changing rules? Or is that a different issue?
>
> -> Rescaling a Flink job is not done automatically. You need to take a
> savepoint and then relaunch your job with a different parallelism.
> Updating a rule is not supported in CEP, as changing a rule would
> imply that (potentially) the state should change. But what you could
> do is take a savepoint, remove the old pattern and add a new one (the
> updated one) and tell Flink to ignore the state of the previous
> operator (as said earlier each CEP pattern is translated to an
> operator).
>
> 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging
> it at the earliest possible time is not always the best option. May be
> after 30 days later or so.
>
> -> CEP clean state after the time horizon (specified with the
> .within() clause) expires.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries
> you  filter first on the "smallest" attributes. CEP rules form a
> sequence. Hence that approach will not work. Is that an issue at all?
> What are practical limits on the CEP temp state storage engine?
>
> -> Such optimizations are not supported out of the box. I would
> recommend to go with the Broadcast state approach in [3].
>
> 11) Occassionally we need to process about 200 files at once. Can I
> speed things up by processing all files in parallel on multiple nodes,
> despite their sequence (CEP use case)? This would only work if
> FlinkCEP in step 1 simply filters on all relevant events of a
> sequence, updates state, and in a step 2 - after the files are
> processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source
> system schema is changed, and not always in a backwards compatible way
> (insert new fields in the middle), and also the export will have the
> field in the middle. This means that starting from a specific (file)
> date, I need to consider a different schema. This must also be handled
> when re-running files for the last month, because of corrections
> provided. And if the file format has changed someone in the middle ...
>
> -> This seems to be relevant for the "data cleaning" phase, before you
> send your data to CEP. In this case, if the schema changes, then I
> assume that you need to update your initial parsing logic, which means
> taking a savepoint and redeploying the updated jobGraph with the new
> input parsing logic (if I understand correctly).
>
> thanks a lot for your time and your help
>
> I hope that above helps!
>
> Cheers,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
> [2]
> https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
> [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>
> On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
> <ju...@gmail.com> wrote:
> >
> > Hi,
> >
> > we're in very early stages evaluating options. I'm not a Flink expert,
> but did read some of the docs and watched videos. Could you please help me
> understand if and how certain of our reqs are covered by Flink (CEP). Is
> this mailing list the right channel for such questions?
> >
> > 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have modify-cols.
> Even though they are files but because they contain changes only, I belief
> the file records shall be considered events in Flink terminology. Is that
> assumption correct?
> >
> > 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A,
> then B, then C within 30 days, then do something". Does that work with
> FlinkCEP despite the events/records are not in chrono order within the
> file? The files are 100MB to 20GB in size. Do I need to sort the files
> first before CEP processing?
> >
> > 3) Occassionally some crazy people manually "correct" DB records within
> the database and manually trigger a re-export of ALL of the changes for
> that respective day (e.g. last weeks Tuesday). Consequently we receive a
> correction file. Same filename but "_1" appended. All filenames include the
> date (of the original export). What are the options to handle that case
> (besides telling the DB admins not to, which we did already). Regular
> checkpoints and re-process all files since then?  What happens to the CEP
> state? Will it be checkpointed as well?
> >
> > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
> >
> > 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window. E.g. If
> A, then B, but C did not occur within 30 days since A. Is that supported by
> FlinkCEP? I couldn't find a working example.
> >
> > 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
> >
> > 7) I can imagine that for debugging reasons it'd be good if we were able
> to query the temporary CEP state. What is the (CEP) schema used to persist
> the CEP state and how can we query it? And does such query work on the
> whole cluster or only per node (e.g. because of shuffle and nodes
> responsible only for a portion of the events).
> >
> > 8) I understand state is stored per node. What happens if I want to add
> or remove a nodes. Will the state still be found, despite it being stored
> in another node? I read that I need to be equally careful when changing
> rules? Or is that a different issue?
> >
> > 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging it
> at the earliest possible time is not always the best option. May be after
> 30 days later or so.
> >
> > 10) Are there strategies to minimize temp CEP state? In SQL queries you
> filter first on the "smallest" attributes. CEP rules form a sequence. Hence
> that approach will not work. Is that an issue at all? What are practical
> limits on the CEP temp state storage engine?
> >
> > 11) Occassionally we need to process about 200 files at once. Can I
> speed things up by processing all files in parallel on multiple nodes,
> despite their sequence (CEP use case)? This would only work if FlinkCEP in
> step 1 simply filters on all relevant events of a sequence, updates state,
> and in a step 2 - after the files are processed - evaluates the updated
> state if that meets the sequences.
> >
> > 12) Schema changes in the input files: Occassionly the DB source system
> schema is changed, and not always in a backwards compatible way (insert new
> fields in the middle), and also the export will have the field in the
> middle. This means that starting from a specific (file) date, I need to
> consider a different schema. This must also be handled when re-running
> files for the last month, because of corrections provided. And if the file
> format has changed someone in the middle ...
> >
> > thanks a lot for your time and your help
> > Juergen
>

Re: FlinkCEP questions - architecture

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepoint, remove the old pattern and add a new one (the
updated one) and tell Flink to ignore the state of the previous
operator (as said earlier each CEP pattern is translated to an
operator).

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging
it at the earliest possible time is not always the best option. May be
after 30 days later or so.

-> CEP clean state after the time horizon (specified with the
.within() clause) expires.

10) Are there strategies to minimize temp CEP state? In SQL queries
you  filter first on the "smallest" attributes. CEP rules form a
sequence. Hence that approach will not work. Is that an issue at all?
What are practical limits on the CEP temp state storage engine?

-> Such optimizations are not supported out of the box. I would
recommend to go with the Broadcast state approach in [3].

11) Occassionally we need to process about 200 files at once. Can I
speed things up by processing all files in parallel on multiple nodes,
despite their sequence (CEP use case)? This would only work if
FlinkCEP in step 1 simply filters on all relevant events of a
sequence, updates state, and in a step 2 - after the files are
processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source
system schema is changed, and not always in a backwards compatible way
(insert new fields in the middle), and also the export will have the
field in the middle. This means that starting from a specific (file)
date, I need to consider a different schema. This must also be handled
when re-running files for the last month, because of corrections
provided. And if the file format has changed someone in the middle ...

-> This seems to be relevant for the "data cleaning" phase, before you
send your data to CEP. In this case, if the schema changes, then I
assume that you need to update your initial parsing logic, which means
taking a savepoint and redeploying the updated jobGraph with the new
input parsing logic (if I understand correctly).

thanks a lot for your time and your help

I hope that above helps!

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
[3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
<ju...@gmail.com> wrote:
>
> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen