You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mikael Gordani <mi...@gmail.com> on 2020/03/12 11:53:15 UTC

Communication between two queries

Hello everyone!

So essentially, I've two identical queries (q1 and q2) running in parallel
(Streams).
I'm trying to activate the ingestion of data to q2 based on what is
processed in q1.
E.g say that we want to start ingesting data to q2 when a tuple with
timestamp > 5000 appears in q1.

The queries are constructed in this way. (they share the same source)
q1: Source -> Filter -> Aggregate -> Filter -> Sink
            |
           V
q2:      Filter -> Filter -> Aggregate -> Filter -> Sink

The initial idea was to have a global variable which is shared between the
two queries. When this tuple appears in q1, it will set the variable to *true
*in the first Filter operator*.* While in q2, the first Filter-operator
returns tuples depending on the value of the global variable.
When the variable = true, it will let data pass, when set to false, no data
is allowed to be ingested.

This works fine when you have all the tasks on the same machine, but of
course, it becomes troublesome in distributed deployments (tasks in
different nodes and such).

My second approach was to create some sort of "loop" in the query. So let's
say that we have the processing logic placed in the last *Filter* operator
in q1, and when this "special" tuple appears, it can communicate with the
first *Filter *operator in q2, in order to allow data to be ingested.
I've tried playing around with *IterativeStreams* but I don't really get it
to work, and I feel like it's the wrong approach..

How can I achieve this sort of functionality?
I'm looking a bit on the BroadcastState part of the DataStream API, but I
feel confused on how to use it. Is it possible to broadcast from a
*downstream* to an *upstream?*
Suggestions would be much appreciated!

Best Regards,
Mikael Gordani

Re: Communication between two queries

Posted by Mikael Gordani <mi...@gmail.com>.
No worries and great idea!
I will play around with it and see what I manage to do.
Cheers!

Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski <pi...@ververica.com>:

> Ops, sorry there was a misleading typo/auto correction in my previous
> e-mail. Second sentence should have been:
>
> > First of all you would have to use event time semantic for consistent
> results
>
> Piotrek
>
> On 17 Mar 2020, at 14:43, Piotr Nowojski <pi...@ververica.com> wrote:
>
> Hi,
>
> Yes, you are looking in the right directions with the watermarks.
>
> First of all you would have to use event time semantic for constant
> results. With processing time everything would be simpler, but it would be
> more difficult to reason about the results (your choice). Secondly, you
> would have to hook up the logic of enabling query1/query2 to the event
> time/watermarks. Thirdly, you need to somehow to sync the input switching
> with the windows boundaries. On top of that, watermarks express lower bound
> of even time that you can expect. However, in order to guarantee
> consistency of the windows, you would like to control the upper bound. For
> example:
>
> 1. If you want to enable Query2, you would need to check what’s the
> largest/latest event time that was processed by the input splitter, lets
> say that’s TS1
> 2. That means, records with event time < TS1 have already been processed
> by Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time
> window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to
> Query1, but already should redirect records with event time >= TS2 to
> Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it
> can finally stop sending records to Query1 and query1 logic could be
> considered “completed”.
>
> So Query1 would be responsible for all of the data before TS2, and Query2
> after TS2.
>
> Alternatively, your input splitter could also buffer some records, so that
> you could enable Query2 faster, by re-sending the buffered records. But in
> that case, both Query1 and Query2 would be responsible for some portion of
> the data.
>
> Piotrek
>
> On 17 Mar 2020, at 10:35, Mikael Gordani <mi...@gmail.com> wrote:
>
> Hi Piotr!
>
> Continuing with my scenario, since both of the queries will share the same
> sink, I've realized that some issues will appear when I switch queries.
> Especially with regards to stateful operators, e.g aggregation.
>
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it
> will perform the average of these integers over some time.
> E.g say that *query1* ingest the sequence *1,2,3,4.... *
> The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.
>
> If I'm later on "activating" *query2*, I need to have both of the queries
> allowing tuples for a while, in order to allow the aggregation to finish in
> *query1* before denying it input.
> But, there is a possibility that *query2* might receive the tuples *3,4*,
> which will result in the window: *[3][3,4][3,4]*
> Later on, the output of the respective queries will be:
> Query 1: 3, *4.5*, 3.5
> Query2 : 3, *3.5*, 3.5
>
> As one can see, the second output will be different.
> I'm thinking of using watermarks somehow to make sure that both queries
> has processed the same amount of data before writing to the sink, but I'm a
> bit unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
>
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com>:
>
>> Hi,
>>
>> Let us know if something doesn’t work :)
>>
>> Piotrek
>>
>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi...@gmail.com> wrote:
>>
>> Hi,
>> I'll try it out =)
>>
>> Cheers!
>>
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com>:
>>
>>> Hi,
>>>
>>> In that case you could try to implement your `FilterFunction` as two
>>> input operator, with broadcast control input, that would be setting the
>>> `global_var`. Broadcast control input can be originating from some source,
>>> or from some operator.
>>>
>>> Piotrek
>>>
>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi...@gmail.com> wrote:
>>>
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve
>>> in more detail:
>>>
>>> Essentially, If I've two queries, in which has the same operators and
>>> runs in the same task, I would want to figure out some way of controlling
>>> the ingestion from *a source* to the respective queries in such a way
>>> that only one of the queries receive data, based on a condition.
>>> For more context, the second query (query2), is equipped with
>>> instrumented operators, which are standard operators extended with some
>>> extra functionality, in my case, they enrich the tuples with meta-data.
>>>
>>> Source --> *Filter1* ---> rest of query1
>>>    |
>>>    v
>>>    *Filter2* ---> rest of query2
>>>
>>> By using *filters* prior to the queries, they allow records to pass
>>> depending on a condition*, *let's say a global boolean variable (which
>>> is initially set to false).
>>> If it's set to *true, Filter1 will accept every record and Filter2 will
>>> disregard every record.*
>>> If it's set to
>>> *false, Filter2 will accept every record and Filter1 will disregard
>>> every record.*
>>>
>>> *So the filter operators looks something like this: *
>>>
>>> boolean global_var = false;
>>>
>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return !global_var;
>>>     }
>>> }
>>>
>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return global_var;
>>>     }
>>> }
>>>
>>>
>>> Then later on, in the respective queries, there are some processing
>>> logic in which changes the value of the global variable, thus enabling and
>>> disabling the flow of data from the source to the respective queries.
>>> The problem lies in this global variable being problematic in
>>> distributed deployments, in which I'm having a hard time figuring out how
>>> to solve.
>>> Is it a bit more clear? =)
>>>
>>>
>>>
>>
>> --
>> Med Vänliga Hälsningar,
>> Mikael Gordani
>>
>>
>>
>
> --
> Med Vänliga Hälsningar,
> Mikael Gordani
>
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani

Re: Communication between two queries

Posted by Piotr Nowojski <pi...@ververica.com>.
Ops, sorry there was a misleading typo/auto correction in my previous e-mail. Second sentence should have been:

> First of all you would have to use event time semantic for consistent results

Piotrek

> On 17 Mar 2020, at 14:43, Piotr Nowojski <pi...@ververica.com> wrote:
> 
> Hi,
> 
> Yes, you are looking in the right directions with the watermarks. 
> 
> First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the windows boundaries. On top of that, watermarks express lower bound of even time that you can expect. However, in order to guarantee consistency of the windows, you would like to control the upper bound. For example:
> 
> 1. If you want to enable Query2, you would need to check what’s the largest/latest event time that was processed by the input splitter, lets say that’s TS1 
> 2. That means, records with event time < TS1 have already been processed by Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to Query1, but already should redirect records with event time >= TS2 to Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop sending records to Query1 and query1 logic could be considered “completed”.  
> 
> So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.
> 
> Alternatively, your input splitter could also buffer some records, so that you could enable Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2 would be responsible for some portion of the data.
> 
> Piotrek
> 
>> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Piotr!
>> 
>> Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.
>> 
>> Let me provide an example:
>> So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
>> E.g say that query1 ingest the sequence 1,2,3,4.... 
>> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
>> 
>> If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
>> But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
>> Later on, the output of the respective queries will be:
>> Query 1: 3, 4.5, 3.5
>> Query2 : 3, 3.5, 3.5
>> 
>> As one can see, the second output will be different. 
>> I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
>> Do you have any suggestions or thoughts?
>> Cheers,
>> 
>> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>:
>> Hi,
>> 
>> Let us know if something doesn’t work :)
>> 
>> Piotrek
>> 
>>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> I'll try it out =) 
>>> 
>>> Cheers!
>>> 
>>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>:
>>> Hi,
>>> 
>>> In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.
>>> 
>>> Piotrek
>>> 
>>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Piotr!
>>>> Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:
>>>> 
>>>> Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition. 
>>>> For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.
>>>> 
>>>> Source --> Filter1 ---> rest of query1
>>>>    |
>>>>    v
>>>>    Filter2 ---> rest of query2
>>>> 
>>>> By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
>>>> If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
>>>> If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
>>>> So the filter operators looks something like this: 
>>>> boolean global_var = false;
>>>> 
>>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>>     @Override
>>>>     public boolean filter(Tuple t) throws Exception {
>>>>         return !global_var;
>>>>     }
>>>> }
>>>> 
>>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>>     @Override
>>>>     public boolean filter(Tuple t) throws Exception {
>>>>         return global_var;
>>>>     }
>>>> }
>>>> 
>>>> Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
>>>> The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
>>>> Is it a bit more clear? =)
>>> 
>>> 
>>> 
>>> -- 
>>> Med Vänliga Hälsningar,
>>> Mikael Gordani
>> 
>> 
>> 
>> -- 
>> Med Vänliga Hälsningar,
>> Mikael Gordani
> 


Re: Communication between two queries

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the windows boundaries. On top of that, watermarks express lower bound of even time that you can expect. However, in order to guarantee consistency of the windows, you would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the largest/latest event time that was processed by the input splitter, lets say that’s TS1 
2. That means, records with event time < TS1 have already been processed by Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop sending records to Query1 and query1 logic could be considered “completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.

Alternatively, your input splitter could also buffer some records, so that you could enable Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

> On 17 Mar 2020, at 10:35, Mikael Gordani <mi...@gmail.com> wrote:
> 
> Hi Piotr!
> 
> Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.
> 
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
> E.g say that query1 ingest the sequence 1,2,3,4.... 
> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
> 
> If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
> But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
> Later on, the output of the respective queries will be:
> Query 1: 3, 4.5, 3.5
> Query2 : 3, 3.5, 3.5
> 
> As one can see, the second output will be different. 
> I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
> 
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>:
> Hi,
> 
> Let us know if something doesn’t work :)
> 
> Piotrek
> 
>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> I'll try it out =) 
>> 
>> Cheers!
>> 
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>:
>> Hi,
>> 
>> In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.
>> 
>> Piotrek
>> 
>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:
>>> 
>>> Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition. 
>>> For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.
>>> 
>>> Source --> Filter1 ---> rest of query1
>>>    |
>>>    v
>>>    Filter2 ---> rest of query2
>>> 
>>> By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
>>> If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
>>> If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
>>> So the filter operators looks something like this: 
>>> boolean global_var = false;
>>> 
>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return !global_var;
>>>     }
>>> }
>>> 
>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return global_var;
>>>     }
>>> }
>>> 
>>> Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
>>> The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
>>> Is it a bit more clear? =)
>> 
>> 
>> 
>> -- 
>> Med Vänliga Hälsningar,
>> Mikael Gordani
> 
> 
> 
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani


Re: Communication between two queries

Posted by Mikael Gordani <mi...@gmail.com>.
Hi Piotr!

Continuing with my scenario, since both of the queries will share the same
sink, I've realized that some issues will appear when I switch queries.
Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it
will perform the average of these integers over some time.
E.g say that *query1* ingest the sequence *1,2,3,4.... *
The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.

If I'm later on "activating" *query2*, I need to have both of the queries
allowing tuples for a while, in order to allow the aggregation to finish in
*query1* before denying it input.
But, there is a possibility that *query2* might receive the tuples *3,4*,
which will result in the window: *[3][3,4][3,4]*
Later on, the output of the respective queries will be:
Query 1: 3, *4.5*, 3.5
Query2 : 3, *3.5*, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has
processed the same amount of data before writing to the sink, but I'm a bit
unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com>:

> Hi,
>
> Let us know if something doesn’t work :)
>
> Piotrek
>
> On 16 Mar 2020, at 08:42, Mikael Gordani <mi...@gmail.com> wrote:
>
> Hi,
> I'll try it out =)
>
> Cheers!
>
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com>:
>
>> Hi,
>>
>> In that case you could try to implement your `FilterFunction` as two
>> input operator, with broadcast control input, that would be setting the
>> `global_var`. Broadcast control input can be originating from some source,
>> or from some operator.
>>
>> Piotrek
>>
>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi...@gmail.com> wrote:
>>
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve
>> in more detail:
>>
>> Essentially, If I've two queries, in which has the same operators and
>> runs in the same task, I would want to figure out some way of controlling
>> the ingestion from *a source* to the respective queries in such a way
>> that only one of the queries receive data, based on a condition.
>> For more context, the second query (query2), is equipped with
>> instrumented operators, which are standard operators extended with some
>> extra functionality, in my case, they enrich the tuples with meta-data.
>>
>> Source --> *Filter1* ---> rest of query1
>>    |
>>    v
>>    *Filter2* ---> rest of query2
>>
>> By using *filters* prior to the queries, they allow records to pass
>> depending on a condition*, *let's say a global boolean variable (which
>> is initially set to false).
>> If it's set to *true, Filter1 will accept every record and Filter2 will
>> disregard every record.*
>> If it's set to
>> *false, Filter2 will accept every record and Filter1 will disregard every
>> record.*
>>
>> *So the filter operators looks something like this: *
>>
>> boolean global_var = false;
>>
>> private static class filter1 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return !global_var;
>>     }
>> }
>>
>> private static class filter2 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return global_var;
>>     }
>> }
>>
>>
>> Then later on, in the respective queries, there are some processing logic
>> in which changes the value of the global variable, thus enabling and
>> disabling the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed
>> deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
>>
>>
>>
>
> --
> Med Vänliga Hälsningar,
> Mikael Gordani
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani

Re: Communication between two queries

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Let us know if something doesn’t work :)

Piotrek

> On 16 Mar 2020, at 08:42, Mikael Gordani <mi...@gmail.com> wrote:
> 
> Hi,
> I'll try it out =) 
> 
> Cheers!
> 
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>>:
> Hi,
> 
> In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.
> 
> Piotrek
> 
>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gordani@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:
>> 
>> Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition. 
>> For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.
>> 
>> Source --> Filter1 ---> rest of query1
>>    |
>>    v
>>    Filter2 ---> rest of query2
>> 
>> By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
>> If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
>> If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
>> So the filter operators looks something like this: 
>> boolean global_var = false;
>> 
>> private static class filter1 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return !global_var;
>>     }
>> }
>> 
>> private static class filter2 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return global_var;
>>     }
>> }
>> 
>> Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
> 
> 
> 
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani


Re: Communication between two queries

Posted by Mikael Gordani <mi...@gmail.com>.
Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com>:

> Hi,
>
> In that case you could try to implement your `FilterFunction` as two input
> operator, with broadcast control input, that would be setting the
> `global_var`. Broadcast control input can be originating from some source,
> or from some operator.
>
> Piotrek
>
> On 13 Mar 2020, at 15:47, Mikael Gordani <mi...@gmail.com> wrote:
>
> Hi Piotr!
> Thanks for your response, I'll try to explain what I'm trying to achieve
> in more detail:
>
> Essentially, If I've two queries, in which has the same operators and runs
> in the same task, I would want to figure out some way of controlling the
> ingestion from *a source* to the respective queries in such a way that
> only one of the queries receive data, based on a condition.
> For more context, the second query (query2), is equipped with instrumented
> operators, which are standard operators extended with some extra
> functionality, in my case, they enrich the tuples with meta-data.
>
> Source --> *Filter1* ---> rest of query1
>    |
>    v
>    *Filter2* ---> rest of query2
>
> By using *filters* prior to the queries, they allow records to pass
> depending on a condition*, *let's say a global boolean variable (which is
> initially set to false).
> If it's set to *true, Filter1 will accept every record and Filter2 will
> disregard every record.*
> If it's set to
> *false, Filter2 will accept every record and Filter1 will disregard every
> record.*
>
> *So the filter operators looks something like this: *
>
> boolean global_var = false;
>
> private static class filter1 implements FilterFunction<Tuple t> {
>     @Override
>     public boolean filter(Tuple t) throws Exception {
>         return !global_var;
>     }
> }
>
> private static class filter2 implements FilterFunction<Tuple t> {
>     @Override
>     public boolean filter(Tuple t) throws Exception {
>         return global_var;
>     }
> }
>
>
> Then later on, in the respective queries, there are some processing logic
> in which changes the value of the global variable, thus enabling and
> disabling the flow of data from the source to the respective queries.
> The problem lies in this global variable being problematic in distributed
> deployments, in which I'm having a hard time figuring out how to solve.
> Is it a bit more clear? =)
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani

Re: Communication between two queries

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

> On 13 Mar 2020, at 15:47, Mikael Gordani <mi...@gmail.com> wrote:
> 
> Hi Piotr!
> Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:
> 
> Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition. 
> For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.
> 
> Source --> Filter1 ---> rest of query1
>    |
>    v
>    Filter2 ---> rest of query2
> 
> By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
> If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
> If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
> So the filter operators looks something like this: 
> boolean global_var = false;
> 
> private static class filter1 implements FilterFunction<Tuple t> {
>     @Override
>     public boolean filter(Tuple t) throws Exception {
>         return !global_var;
>     }
> }
> 
> private static class filter2 implements FilterFunction<Tuple t> {
>     @Override
>     public boolean filter(Tuple t) throws Exception {
>         return global_var;
>     }
> }
> 
> Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
> The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
> Is it a bit more clear? =)


Re: Communication between two queries

Posted by Mikael Gordani <mi...@gmail.com>.
Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in
more detail:

Essentially, If I've two queries, in which has the same operators and runs
in the same task, I would want to figure out some way of controlling the
ingestion from *a source* to the respective queries in such a way that only
one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented
operators, which are standard operators extended with some extra
functionality, in my case, they enrich the tuples with meta-data.

Source --> *Filter1* ---> rest of query1
   |
   v
   *Filter2* ---> rest of query2

By using *filters* prior to the queries, they allow records to pass
depending on a condition*, *let's say a global boolean variable (which is
initially set to false).
If it's set to *true, Filter1 will accept every record and Filter2 will
disregard every record.*
If it's set to
*false, Filter2 will accept every record and Filter1 will disregard every
record.*

*So the filter operators looks something like this: *

boolean global_var = false;

private static class filter1 implements FilterFunction<Tuple t> {
    @Override
    public boolean filter(Tuple t) throws Exception {
        return !global_var;
    }
}

private static class filter2 implements FilterFunction<Tuple t> {
    @Override
    public boolean filter(Tuple t) throws Exception {
        return global_var;
    }
}


Then later on, in the respective queries, there are some processing logic
in which changes the value of the global variable, thus enabling and
disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed
deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)

Re: Communication between two queries

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Could you explain a bit more what are you trying to achieve? 

One problem that pops into my head is that currently in Flink Streaming (it is possible for processing bounded data), there is no way to “not ingest” the data reliably in general case, as this might deadlock the upstream operator once the output buffers will fill out. However instead, you can for example filter out/ignore records until some condition is met.

BroadcastState works for one single operator (and it’s parallel instances) - it doesn’t automatically communicate with any upstream/downstream operators - you have to wire/connect your operators and distribute the information as you want to. For examples how does it work you can take a look at this ITCase for example [1].

What you could do, is create following job topology using side outputs [2]:

Src1 -> OP1 -> broadcast_side_output  
             | 
            V
            Sink1

And use BroadcastProcessFunction to read Src1 and  broadcast_side_output.

Src1 +  broadcast_side_output -> OP2 -> Sink2

But as I wrote before, you have to be careful in OP2. If both OP1 and OP2 are reading from the same data stream Src1, if you stop reading records from Src1 in OP2, you eventually deadlock Src1 itself. Solution for that, would be to create second instance of Src1 operator, that would read records from the external system second time:

Src1" +  broadcast_side_output -> OP2 -> Sink2   

Piotrek

[1] https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java>
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
 

> On 12 Mar 2020, at 12:53, Mikael Gordani <mi...@gmail.com> wrote:
> 
> Hello everyone!
> 
> So essentially, I've two identical queries (q1 and q2) running in parallel (Streams).
> I'm trying to activate the ingestion of data to q2 based on what is processed in q1. 
> E.g say that we want to start ingesting data to q2 when a tuple with timestamp > 5000 appears in q1.
> 
> The queries are constructed in this way. (they share the same source)
> q1: Source -> Filter -> Aggregate -> Filter -> Sink
>             | 
>            V
> q2:      Filter -> Filter -> Aggregate -> Filter -> Sink
> 
> The initial idea was to have a global variable which is shared between the two queries. When this tuple appears in q1, it will set the variable to true in the first Filter operator. While in q2, the first Filter-operator returns tuples depending on the value of the global variable. 
> When the variable = true, it will let data pass, when set to false, no data is allowed to be ingested.
> 
> This works fine when you have all the tasks on the same machine, but of course, it becomes troublesome in distributed deployments (tasks in different nodes and such).
> 
> My second approach was to create some sort of "loop" in the query. So let's say that we have the processing logic placed in the last Filter operator in q1, and when this "special" tuple appears, it can communicate with the first Filter operator in q2, in order to allow data to be ingested.
> I've tried playing around with IterativeStreams but I don't really get it to work, and I feel like it's the wrong approach..
> 
> How can I achieve this sort of functionality? 
> I'm looking a bit on the BroadcastState part of the DataStream API, but I feel confused on how to use it. Is it possible to broadcast from a downstream to an upstream?
> Suggestions would be much appreciated!
> 
> Best Regards,
> Mikael Gordani