You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2019/10/10 17:35:54 UTC

Limited join with stop condition

Hello,

We have a use case and it's not clear how it can be solved/implemented with Beam. I count on community help with this, maybe I miss something that lays on the surface.

Let’s say, there are two different bounded sources and one join transform (say GBK) downstream. This Join transform is like INNER JOIN which joins elements of two collections only if they have common key (though, it could be any other join logic there, doesn’t matter). What matters is that this Join has to return only N records as output and then we have to stop pipeline after they have been processed. It means that, in the best case, we need to read only N records from every source, join them and move downstream and after pipeline should be stopped. In other cases, if some records don’t have common key in other collection, we need to read another bunch of records and see if it would be enough to have N joined records after Join.

Below, here is a simple example of this. Say, every source contains 1M of records but after Join we need to have only 1K of joined records. So, we don’t want to read all two millions from 2 sources in case if we can have an output after reading much less records in the end. So, 1K of joined records is a stop condition. 

1M 
—————
| Source 1 |————
—————              |              ———  
                                |———> | Join  |———> Output 1K and stop
1M                           |              ——— 
—————              |
| Source 2 |————
—————              

So, it looks like I need to have ability to read new portion of data "on demand” or like to have a back pressure mechanizm which signals from downstream to upstream that “please, give me only N elements and then wait until I ask for more”. I’m not sure that Beam supports something like this.

As an idea, I was trying to split initial inputs into fixed Windows with trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches and use another “AfterPane.elementCountAtLeast(N)” after Join which should trigger only once. It doesn’t work and still, it won’t read data “on demand” and stop the whole pipeline, I guess.

Do you think it can be feasible to do in Beam?
Any ideas or advices are very welcomed!



Re: Limited join with stop condition

Posted by Alexey Romanenko <ar...@gmail.com>.
Many thanks for your ideas, everybody, I really appreciate it. I’m going to play with Stateful DoFn and see if it will work for us.

> And I have to ask, though, can you build indices instead of brute force for the join?
Answering your question, Kenn. Yes, potentially, we can build indices for this case and use them for look-ups but it will take time (since initial sources are just files in S3) and initial goal was to have fast and generic solution for different sources. Also, I think we can sacrifice parallelism since the amount of processing data should not be huge and final output is relatively small.

In the same time, this use case and another recent KinesisIO issue brought me to thinking about effective solution for such request. In the end, it could be used, for example, for dynamic Back pressure. Afaik, we don’t have such option in Beam and "Read IOs" usually use eager strategy to read data from source as much as possible. Potentially, it can cause NPE if input buffers are not limited by size, but it still doesn’t take into account the downstream throughput. For instance, in pure Spark streaming jobs Back pressure can be configured but I doubt it will work with SparkRunner in Beam since Beam has own implementation of IO connectors. 
So, I’m wondering about your thoughts if this feature could be useful and if it should be integrated in Beam?

> On 11 Oct 2019, at 06:29, Reza Rokni <re...@google.com> wrote:
> 
> Hi,
> 
> Agreed with the others that this does not sound like a good fit... 
> 
> But to explore ideas... One possible (complicated and error prone) way this could be done, ...
> 
> Beam does not support cycles, but you could use an external unbounded source as a way of sending impulse out and then back into the system to read more data; 
> 
> Assuming you are not using standard Sources IO's and your reading data via a DoFn ( it would not work with the inbuilt Source IO's) :
> Create a streaming pipeline that reads from an unbounded source, this source is just used for signals to read more data.
> You start the initial read by sending a Start event to the unbounded source
> In the pipeline you branch the start event to two DoFns, DoFnReadFromSource1 and DoFnReadFromSource2. These will each read X records, which are then warped in an Event object and sent forward. You will also need to have sequence id's and an EndRead Event object ( in case a source has been exhausted) . 
> You send the events to a Stateful DoFn (in global window) which does the following: 
> If Condition not met, send a Start event message back to the unbounded source ( which will result in more data read ) 
> If Condition is met, send out the joined event and GC data that has been joined. 
> Keep the other elements around for the next time you send a start event into the unbounded source. 
> I am sure there are many corner cases I have not thought of ... ( for example when both sources are exhausted and you dont have a join condition match, what should it do..) . Also this will result in a pipeline that is always up and running. 
> 
> Cheers
> Reza
> 
>                       
> 
> 
> On Fri, 11 Oct 2019 at 11:19, Kenneth Knowles <kenn@apache.org <ma...@apache.org>> wrote:
> Interesting! I agree with Luke that it seems not a great fit for Beam in the most rigorous sense. There are many considerations:
> 
> 1. We assume ParDo has side effects by default. So the model actual *requires* eager evaluation, not lazy, in order to make all the side effects happen. But for your case let us assume somehow we know it is all @Pure.
> 2. Lazy evaluation and parallelism are in opposition. In pure computations like Haskell, literally everything (except monadic sequence) is parallel for free, but the problem is nothing starts until it is needed so parallelism requires forcing computations early.
> 
> On the other hand, we can think about ways forward here. A first step is if the join is a "side lookup join" where we always process all of source 1 but try to process less of source 2. If source 2 is feeding into a map side input then this could be lazy in some way. When an element from source 1 calls the side input lookup it could be a blocking call that triggers reads from source 2 until a match is found. This computation strategy is consistent with the model and will read all of source 1 but only the prefix of source 2 needed to join all of source 1. I think you could implement this pattern with parallelism on both the main input and side input. Then, to read less of source 1 you need feedback from the sink to the source. We have nothing like that... This is all very abstract hypotheticals.
> 
> If we get to practical implementation "today" then every runner pretty much reads all of a bounded source before even starting the next transform, no?. I wonder if it makes sense to convert them to unbounded (which is still allowed to terminate but does not support dynamic splits). Then you just terminate the pipeline when you have enough output. You will read more than you need but maybe that is not so bad, and anyhow hard to avoid. Also a vague idea...
> 
> And I have to ask, though, can you build indices instead of brute force for the join?
> 
> Kenn
> 
> On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> This doesn't seem like a good fit for Apache Beam but have you tried:
> * using a StatefulDoFn that performs all the joining and signals the service powering the sources to stop sending data once your criteria is met (most services powering these sources won't have a way to be controlled this way)?
> * using a StatefulDoFn that performs all the joining and to write out the data to the output directly and then shutdown the pipeline (you can't have any transforms that are after the StatefulDoFn)?
> 
> Both of these ideas remove a lot of the parallelism that Apache Beam provides.
> 
> 
> 
> On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Hello,
> 
> We have a use case and it's not clear how it can be solved/implemented with Beam. I count on community help with this, maybe I miss something that lays on the surface.
> 
> Let’s say, there are two different bounded sources and one join transform (say GBK) downstream. This Join transform is like INNER JOIN which joins elements of two collections only if they have common key (though, it could be any other join logic there, doesn’t matter). What matters is that this Join has to return only N records as output and then we have to stop pipeline after they have been processed. It means that, in the best case, we need to read only N records from every source, join them and move downstream and after pipeline should be stopped. In other cases, if some records don’t have common key in other collection, we need to read another bunch of records and see if it would be enough to have N joined records after Join.
> 
> Below, here is a simple example of this. Say, every source contains 1M of records but after Join we need to have only 1K of joined records. So, we don’t want to read all two millions from 2 sources in case if we can have an output after reading much less records in the end. So, 1K of joined records is a stop condition. 
> 
> 1M 
> —————
> | Source 1 |————
> —————              |              ———  
>                                 |———> | Join  |———> Output 1K and stop
> 1M                           |              ——— 
> —————              |
> | Source 2 |————
> —————              
> 
> So, it looks like I need to have ability to read new portion of data "on demand” or like to have a back pressure mechanizm which signals from downstream to upstream that “please, give me only N elements and then wait until I ask for more”. I’m not sure that Beam supports something like this.
> 
> As an idea, I was trying to split initial inputs into fixed Windows with trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches and use another “AfterPane.elementCountAtLeast(N)” after Join which should trigger only once. It doesn’t work and still, it won’t read data “on demand” and stop the whole pipeline, I guess.
> 
> Do you think it can be feasible to do in Beam?
> Any ideas or advices are very welcomed!
> 
> 
> 
> 
> -- 
> This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person. 
> The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.


Re: Limited join with stop condition

Posted by Reza Rokni <re...@google.com>.
Hi,

Agreed with the others that this does not sound like a good fit...

But to explore ideas... One possible (complicated and error prone) way this
could be done, ...

Beam does not support cycles, but you could use an external unbounded
source as a way of sending impulse out and then back into the system to
read more data;

Assuming you are not using standard Sources IO's and your reading data via
a DoFn ( it would not work with the inbuilt Source IO's) :

   - Create a streaming pipeline that reads from an unbounded source, this
   source is just used for signals to read more data.
   - You start the initial read by sending a Start event to the unbounded
   source
   - In the pipeline you branch the start event to two DoFns,
   DoFnReadFromSource1 and DoFnReadFromSource2. These will each read X
   records, which are then warped in an Event object and sent forward. You
   will also need to have sequence id's and an EndRead Event object ( in case
   a source has been exhausted) .
   - You send the events to a Stateful DoFn (in global window) which does
   the following:
      - If Condition not met, send a Start event message back to the
      unbounded source ( which will result in more data read )
      - If Condition is met, send out the joined event and GC data that has
      been joined.
      - Keep the other elements around for the next time you send a start
      event into the unbounded source.

I am sure there are many corner cases I have not thought of ... ( for
example when both sources are exhausted and you dont have a join condition
match, what should it do..) . Also this will result in a pipeline that is
always up and running.

Cheers
Reza




On Fri, 11 Oct 2019 at 11:19, Kenneth Knowles <ke...@apache.org> wrote:

> Interesting! I agree with Luke that it seems not a great fit for Beam in
> the most rigorous sense. There are many considerations:
>
> 1. We assume ParDo has side effects by default. So the model actual
> *requires* eager evaluation, not lazy, in order to make all the side
> effects happen. But for your case let us assume somehow we know it is
> all @Pure.
> 2. Lazy evaluation and parallelism are in opposition. In pure computations
> like Haskell, literally everything (except monadic sequence) is parallel
> for free, but the problem is nothing starts until it is needed so
> parallelism requires forcing computations early.
>
> On the other hand, we can think about ways forward here. A first step is
> if the join is a "side lookup join" where we always process all of source 1
> but try to process less of source 2. If source 2 is feeding into a map side
> input then this could be lazy in some way. When an element from source 1
> calls the side input lookup it could be a blocking call that triggers reads
> from source 2 until a match is found. This computation strategy is
> consistent with the model and will read all of source 1 but only the prefix
> of source 2 needed to join all of source 1. I think you could implement
> this pattern with parallelism on both the main input and side input. Then,
> to read less of source 1 you need feedback from the sink to the source. We
> have nothing like that... This is all very abstract hypotheticals.
>
> If we get to practical implementation "today" then every runner pretty
> much reads all of a bounded source before even starting the next transform,
> no?. I wonder if it makes sense to convert them to unbounded (which is
> still allowed to terminate but does not support dynamic splits). Then you
> just terminate the pipeline when you have enough output. You will read more
> than you need but maybe that is not so bad, and anyhow hard to avoid. Also
> a vague idea...
>
> And I have to ask, though, can you build indices instead of brute force
> for the join?
>
> Kenn
>
> On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik <lc...@google.com> wrote:
>
>> This doesn't seem like a good fit for Apache Beam but have you tried:
>> * using a StatefulDoFn that performs all the joining and signals the
>> service powering the sources to stop sending data once your criteria is met
>> (most services powering these sources won't have a way to be controlled
>> this way)?
>> * using a StatefulDoFn that performs all the joining and to write out the
>> data to the output directly and then shutdown the pipeline (you can't have
>> any transforms that are after the StatefulDoFn)?
>>
>> Both of these ideas remove a lot of the parallelism that Apache Beam
>> provides.
>>
>>
>>
>> On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> We have a use case and it's not clear how it can be solved/implemented
>>> with Beam. I count on community help with this, maybe I miss something that
>>> lays on the surface.
>>>
>>> Let’s say, there are two different bounded sources and one join
>>> transform (say GBK) downstream. This Join transform is like INNER JOIN
>>> which joins elements of two collections only if they have common key
>>> (though, it could be any other join logic there, doesn’t matter). What
>>> matters is that this Join has to return only N records as output and then
>>> we have to stop pipeline after they have been processed. It means that, in
>>> the best case, we need to read only N records from every source, join them
>>> and move downstream and after pipeline should be stopped. In other cases,
>>> if some records don’t have common key in other collection, we need to read
>>> another bunch of records and see if it would be enough to have N joined
>>> records after Join.
>>>
>>> Below, here is a simple example of this. Say, every source contains 1M
>>> of records but after Join we need to have only 1K of joined records. So, we
>>> don’t want to read all two millions from 2 sources in case if we can have
>>> an output after reading much less records in the end. So, 1K of joined
>>> records is a stop condition.
>>>
>>> 1M
>>> —————
>>> | Source 1 |————
>>> —————              |              ———
>>>                                 |———> | Join  |———> Output 1K and stop
>>> 1M                           |              ———
>>> —————              |
>>> | Source 2 |————
>>> —————
>>>
>>> So, it looks like I need to have ability to read new portion of data "on
>>> demand” or like to have a back pressure mechanizm which signals from
>>> downstream to upstream that “please, give me only N elements and then wait
>>> until I ask for more”. I’m not sure that Beam supports something like this.
>>>
>>> As an idea, I was trying to split initial inputs into fixed Windows with
>>> trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches
>>> and use another “AfterPane.elementCountAtLeast(N)” after Join which should
>>> trigger only once. It doesn’t work and still, it won’t read data “on
>>> demand” and stop the whole pipeline, I guess.
>>>
>>> Do you think it can be feasible to do in Beam?
>>> Any ideas or advices are very welcomed!
>>>
>>>
>>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Limited join with stop condition

Posted by Kenneth Knowles <ke...@apache.org>.
Interesting! I agree with Luke that it seems not a great fit for Beam in
the most rigorous sense. There are many considerations:

1. We assume ParDo has side effects by default. So the model actual
*requires* eager evaluation, not lazy, in order to make all the side
effects happen. But for your case let us assume somehow we know it is
all @Pure.
2. Lazy evaluation and parallelism are in opposition. In pure computations
like Haskell, literally everything (except monadic sequence) is parallel
for free, but the problem is nothing starts until it is needed so
parallelism requires forcing computations early.

On the other hand, we can think about ways forward here. A first step is if
the join is a "side lookup join" where we always process all of source 1
but try to process less of source 2. If source 2 is feeding into a map side
input then this could be lazy in some way. When an element from source 1
calls the side input lookup it could be a blocking call that triggers reads
from source 2 until a match is found. This computation strategy is
consistent with the model and will read all of source 1 but only the prefix
of source 2 needed to join all of source 1. I think you could implement
this pattern with parallelism on both the main input and side input. Then,
to read less of source 1 you need feedback from the sink to the source. We
have nothing like that... This is all very abstract hypotheticals.

If we get to practical implementation "today" then every runner pretty much
reads all of a bounded source before even starting the next transform, no?.
I wonder if it makes sense to convert them to unbounded (which is still
allowed to terminate but does not support dynamic splits). Then you just
terminate the pipeline when you have enough output. You will read more than
you need but maybe that is not so bad, and anyhow hard to avoid. Also a
vague idea...

And I have to ask, though, can you build indices instead of brute force for
the join?

Kenn

On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik <lc...@google.com> wrote:

> This doesn't seem like a good fit for Apache Beam but have you tried:
> * using a StatefulDoFn that performs all the joining and signals the
> service powering the sources to stop sending data once your criteria is met
> (most services powering these sources won't have a way to be controlled
> this way)?
> * using a StatefulDoFn that performs all the joining and to write out the
> data to the output directly and then shutdown the pipeline (you can't have
> any transforms that are after the StatefulDoFn)?
>
> Both of these ideas remove a lot of the parallelism that Apache Beam
> provides.
>
>
>
> On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> Hello,
>>
>> We have a use case and it's not clear how it can be solved/implemented
>> with Beam. I count on community help with this, maybe I miss something that
>> lays on the surface.
>>
>> Let’s say, there are two different bounded sources and one join transform
>> (say GBK) downstream. This Join transform is like INNER JOIN which joins
>> elements of two collections only if they have common key (though, it could
>> be any other join logic there, doesn’t matter). What matters is that this
>> Join has to return only N records as output and then we have to stop
>> pipeline after they have been processed. It means that, in the best case,
>> we need to read only N records from every source, join them and move
>> downstream and after pipeline should be stopped. In other cases, if some
>> records don’t have common key in other collection, we need to read another
>> bunch of records and see if it would be enough to have N joined records
>> after Join.
>>
>> Below, here is a simple example of this. Say, every source contains 1M of
>> records but after Join we need to have only 1K of joined records. So, we
>> don’t want to read all two millions from 2 sources in case if we can have
>> an output after reading much less records in the end. So, 1K of joined
>> records is a stop condition.
>>
>> 1M
>> —————
>> | Source 1 |————
>> —————              |              ———
>>                                 |———> | Join  |———> Output 1K and stop
>> 1M                           |              ———
>> —————              |
>> | Source 2 |————
>> —————
>>
>> So, it looks like I need to have ability to read new portion of data "on
>> demand” or like to have a back pressure mechanizm which signals from
>> downstream to upstream that “please, give me only N elements and then wait
>> until I ask for more”. I’m not sure that Beam supports something like this.
>>
>> As an idea, I was trying to split initial inputs into fixed Windows with
>> trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches
>> and use another “AfterPane.elementCountAtLeast(N)” after Join which should
>> trigger only once. It doesn’t work and still, it won’t read data “on
>> demand” and stop the whole pipeline, I guess.
>>
>> Do you think it can be feasible to do in Beam?
>> Any ideas or advices are very welcomed!
>>
>>
>>

Re: Limited join with stop condition

Posted by Luke Cwik <lc...@google.com>.
This doesn't seem like a good fit for Apache Beam but have you tried:
* using a StatefulDoFn that performs all the joining and signals the
service powering the sources to stop sending data once your criteria is met
(most services powering these sources won't have a way to be controlled
this way)?
* using a StatefulDoFn that performs all the joining and to write out the
data to the output directly and then shutdown the pipeline (you can't have
any transforms that are after the StatefulDoFn)?

Both of these ideas remove a lot of the parallelism that Apache Beam
provides.



On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hello,
>
> We have a use case and it's not clear how it can be solved/implemented
> with Beam. I count on community help with this, maybe I miss something that
> lays on the surface.
>
> Let’s say, there are two different bounded sources and one join transform
> (say GBK) downstream. This Join transform is like INNER JOIN which joins
> elements of two collections only if they have common key (though, it could
> be any other join logic there, doesn’t matter). What matters is that this
> Join has to return only N records as output and then we have to stop
> pipeline after they have been processed. It means that, in the best case,
> we need to read only N records from every source, join them and move
> downstream and after pipeline should be stopped. In other cases, if some
> records don’t have common key in other collection, we need to read another
> bunch of records and see if it would be enough to have N joined records
> after Join.
>
> Below, here is a simple example of this. Say, every source contains 1M of
> records but after Join we need to have only 1K of joined records. So, we
> don’t want to read all two millions from 2 sources in case if we can have
> an output after reading much less records in the end. So, 1K of joined
> records is a stop condition.
>
> 1M
> —————
> | Source 1 |————
> —————              |              ———
>                                 |———> | Join  |———> Output 1K and stop
> 1M                           |              ———
> —————              |
> | Source 2 |————
> —————
>
> So, it looks like I need to have ability to read new portion of data "on
> demand” or like to have a back pressure mechanizm which signals from
> downstream to upstream that “please, give me only N elements and then wait
> until I ask for more”. I’m not sure that Beam supports something like this.
>
> As an idea, I was trying to split initial inputs into fixed Windows with
> trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches
> and use another “AfterPane.elementCountAtLeast(N)” after Join which should
> trigger only once. It doesn’t work and still, it won’t read data “on
> demand” and stop the whole pipeline, I guess.
>
> Do you think it can be feasible to do in Beam?
> Any ideas or advices are very welcomed!
>
>
>