You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ken Barr <Ke...@Solace.com> on 2019/09/19 17:51:35 UTC

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

The Start() seems to be working so focus on advance().  Is there any way to prove if I am blocked in advance() for dataflow runner?  I have been through code and cannot see anything.  But I know that does not mean much.

Ken
On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote: 
> Hi Ken,
> 
> I have seen some deadlock behavior with custom sources earlier 
> (different runner, but that might not be important). Some lessons learned:
> 
>   a) please make sure your advance() or start() methods do not block, 
> that will cause issues and possibly deadlocks you describe
> 
>   b) if you want to limit parallelism, that should be possible in the 
> split() method - you can return collection containing only (this) if 
> there is no more readers
> 
> Hope this helps, please feel free to ask more details if needed.
> 
> Best,
> 
>   Jan
> 
> On 9/19/19 4:47 PM, Ken Barr wrote:
> > I have a custom UnboundedSource IO that reads from a series of messaging queues.   I have implemented this such that the IO takes a list of queues and expands a UnboundedSource/UnboundedReader for each queue.
> >
> > If I use autoscaling with maxNumWorkers <= # number of queues everything works well.   For example if I have 4 queues and run in dataflow; the Dataflow process starts with 1 worker with 4 Readers each consuming from a queue.   As CPU usage  and backlog grow, Dataflow spawns more workers and moves the Readers to the new workers.  As CPU usage and backlog shrinks,  the Readers are moved back to the original worker and unused workers are deleted.  This is exactly what I was hoping for.
> >
> > Problems happen if I set maxNumWorkers greater then number of queues.   As scaleup goes past the number of queues, not only are Readers moved, but for some reason new Readers are created.  This should not be too bad, new Readers would just not receive messages as the original Reader is holding exclusive access to ensure in-order delivery.  The real problem is that the original Readers are holding the queue and their advance() method is not being called.  The new Readers advance() method is being called, but they are not active on the queue, hence the system is now deadlocked.
> >
> > Questions are:
> > Why are new Readers being spawned if maxNumWorkers exceeds original number of Readers?  Is there a way of preventing this as I would like to maintain in-order delivery?
> >
> > Why is the original Readers advance() method no longer being called?  This is causing a deadlock.
> 

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Jan Lukavský <je...@seznam.cz>.
You are not obliged to generate exactly the numSplits in return to 
split(). If you have fewer queues (partitions), you can just return 
fewer number of splits (and therefore make sure that all your splits 
have associated queue). On the other hand, if your source returns 
watermark of BoundedWindow.TIMESTAMP_MAX_VALUE if it has no queue (and 
constantly return false on call to advance()), then this should make no 
difference.

Jan

On 9/25/19 4:44 PM, Ken Barr wrote:
> I already poll a local queue in advance() and respond false if queue empty.
> Is there an example of restricting UnboundedSource splits?
>
> On 2019/09/25 08:13:43, Jan Lukavský <je...@seznam.cz> wrote:
>> I have a feeling there has to be something going on wrong with your
>> source. If I were you, I would probably do the following:
>>
>>    - verify that I never produce an UnboundedSource split with no queue
>> associated (Preconditions.checkState())
>>
>>    - make sure that I *really never* block in call to advance(), that is
>> not even when the queue is actually empty. You can have a look at
>> KafkaUnboundedReader for inspiration, generally, you would probably want
>> to spawn a new thread to feed a local BlockingQueue (or something
>> similar) in case you cannot do purely async IO (i.e. just check if you
>> have any data in network buffer in call to advance() and return false
>> otherwise)
>>
>> Each blocking call in advance() is potential source of distributed
>> deadlocks which might be what you observe.
>>
>> Jan
>>
>> On 9/24/19 4:07 PM, Ken Barr wrote:
>>> I might have to resort to this.  I am reading from queue in a timed manor and properly returning false when read timeout.  I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc.
>>>
>>> On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote:
>>>> You can ssh to the dataflow worker and investigate jstack of the
>>>> harness. The principal source of blocking would be if you wait for any
>>>> data. The logic should be implemented so that if there is no data
>>>> available at the moment then just return false and don't wait for
>>>> anything. Another suggestion would be, focus on how your reader behaves
>>>> when it receives no queue. I think a proper behavior would be to return
>>>> false from each call to advance() and set watermark to
>>>> BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more
>>>> data anymore.
>>>>
>>>> Jan
>>>>

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Ken Barr <Ke...@Solace.com>.
I already poll a local queue in advance() and respond false if queue empty.
Is there an example of restricting UnboundedSource splits?  

On 2019/09/25 08:13:43, Jan Lukavský <je...@seznam.cz> wrote: 
> I have a feeling there has to be something going on wrong with your 
> source. If I were you, I would probably do the following:
> 
>   - verify that I never produce an UnboundedSource split with no queue 
> associated (Preconditions.checkState())
> 
>   - make sure that I *really never* block in call to advance(), that is 
> not even when the queue is actually empty. You can have a look at 
> KafkaUnboundedReader for inspiration, generally, you would probably want 
> to spawn a new thread to feed a local BlockingQueue (or something 
> similar) in case you cannot do purely async IO (i.e. just check if you 
> have any data in network buffer in call to advance() and return false 
> otherwise)
> 
> Each blocking call in advance() is potential source of distributed 
> deadlocks which might be what you observe.
> 
> Jan
> 
> On 9/24/19 4:07 PM, Ken Barr wrote:
> > I might have to resort to this.  I am reading from queue in a timed manor and properly returning false when read timeout.  I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc.
> >
> > On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote:
> >> You can ssh to the dataflow worker and investigate jstack of the
> >> harness. The principal source of blocking would be if you wait for any
> >> data. The logic should be implemented so that if there is no data
> >> available at the moment then just return false and don't wait for
> >> anything. Another suggestion would be, focus on how your reader behaves
> >> when it receives no queue. I think a proper behavior would be to return
> >> false from each call to advance() and set watermark to
> >> BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more
> >> data anymore.
> >>
> >> Jan
> >>
> 

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Jan Lukavský <je...@seznam.cz>.
I have a feeling there has to be something going on wrong with your 
source. If I were you, I would probably do the following:

  - verify that I never produce an UnboundedSource split with no queue 
associated (Preconditions.checkState())

  - make sure that I *really never* block in call to advance(), that is 
not even when the queue is actually empty. You can have a look at 
KafkaUnboundedReader for inspiration, generally, you would probably want 
to spawn a new thread to feed a local BlockingQueue (or something 
similar) in case you cannot do purely async IO (i.e. just check if you 
have any data in network buffer in call to advance() and return false 
otherwise)

Each blocking call in advance() is potential source of distributed 
deadlocks which might be what you observe.

Jan

On 9/24/19 4:07 PM, Ken Barr wrote:
> I might have to resort to this.  I am reading from queue in a timed manor and properly returning false when read timeout.  I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc.
>
> On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote:
>> You can ssh to the dataflow worker and investigate jstack of the
>> harness. The principal source of blocking would be if you wait for any
>> data. The logic should be implemented so that if there is no data
>> available at the moment then just return false and don't wait for
>> anything. Another suggestion would be, focus on how your reader behaves
>> when it receives no queue. I think a proper behavior would be to return
>> false from each call to advance() and set watermark to
>> BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more
>> data anymore.
>>
>> Jan
>>

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Ken Barr <Ke...@Solace.com>.
I might have to resort to this.  I am reading from queue in a timed manor and properly returning false when read timeout.  I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc.

On 2019/09/19 19:45:49, Jan Lukavský <je...@seznam.cz> wrote: 
> You can ssh to the dataflow worker and investigate jstack of the 
> harness. The principal source of blocking would be if you wait for any 
> data. The logic should be implemented so that if there is no data 
> available at the moment then just return false and don't wait for 
> anything. Another suggestion would be, focus on how your reader behaves 
> when it receives no queue. I think a proper behavior would be to return 
> false from each call to advance() and set watermark to 
> BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more 
> data anymore.
> 
> Jan
> 

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Jan Lukavský <je...@seznam.cz>.
You can ssh to the dataflow worker and investigate jstack of the 
harness. The principal source of blocking would be if you wait for any 
data. The logic should be implemented so that if there is no data 
available at the moment then just return false and don't wait for 
anything. Another suggestion would be, focus on how your reader behaves 
when it receives no queue. I think a proper behavior would be to return 
false from each call to advance() and set watermark to 
BoundedWindow.TIMESTAMP_MAX_VALUE to indicate that there will be no more 
data anymore.

Jan

On 9/19/19 7:51 PM, Ken Barr wrote:
> The Start() seems to be working so focus on advance().  Is there any way to prove if I am blocked in advance() for dataflow runner?  I have been through code and cannot see anything.  But I know that does not mean much.
>
> Ken
> On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote:
>> Hi Ken,
>>
>> I have seen some deadlock behavior with custom sources earlier
>> (different runner, but that might not be important). Some lessons learned:
>>
>>    a) please make sure your advance() or start() methods do not block,
>> that will cause issues and possibly deadlocks you describe
>>
>>    b) if you want to limit parallelism, that should be possible in the
>> split() method - you can return collection containing only (this) if
>> there is no more readers
>>
>> Hope this helps, please feel free to ask more details if needed.
>>
>> Best,
>>
>>    Jan
>>
>> On 9/19/19 4:47 PM, Ken Barr wrote:
>>> I have a custom UnboundedSource IO that reads from a series of messaging queues.   I have implemented this such that the IO takes a list of queues and expands a UnboundedSource/UnboundedReader for each queue.
>>>
>>> If I use autoscaling with maxNumWorkers <= # number of queues everything works well.   For example if I have 4 queues and run in dataflow; the Dataflow process starts with 1 worker with 4 Readers each consuming from a queue.   As CPU usage  and backlog grow, Dataflow spawns more workers and moves the Readers to the new workers.  As CPU usage and backlog shrinks,  the Readers are moved back to the original worker and unused workers are deleted.  This is exactly what I was hoping for.
>>>
>>> Problems happen if I set maxNumWorkers greater then number of queues.   As scaleup goes past the number of queues, not only are Readers moved, but for some reason new Readers are created.  This should not be too bad, new Readers would just not receive messages as the original Reader is holding exclusive access to ensure in-order delivery.  The real problem is that the original Readers are holding the queue and their advance() method is not being called.  The new Readers advance() method is being called, but they are not active on the queue, hence the system is now deadlocked.
>>>
>>> Questions are:
>>> Why are new Readers being spawned if maxNumWorkers exceeds original number of Readers?  Is there a way of preventing this as I would like to maintain in-order delivery?
>>>
>>> Why is the original Readers advance() method no longer being called?  This is causing a deadlock.

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

Posted by Ken Barr <Ke...@Solace.com>.
I believe I have proven that the UnboundedReader.advance() methods is never being called in this case.  Each time I enter the advance() method I spawn a thread that loops for up to 60 seconds and throws a runtime exception if it has not been stopped before time expires.

I have proved this works and that I can see the runtime exception.

When I run in the situation where deadlock occurs I never see the runtime exception, ie. The Reader that is holding exclusivity never has it's advance() method called after scaleup.

On 2019/09/19 17:51:35, Ken Barr <Ke...@Solace.com> wrote: 
> The Start() seems to be working so focus on advance().  Is there any way to prove if I am blocked in advance() for dataflow runner?  I have been through code and cannot see anything.  But I know that does not mean much.
> 
> Ken
> On 2019/09/19 14:57:09, Jan Lukavský <je...@seznam.cz> wrote: 
> > Hi Ken,
> > 
> > I have seen some deadlock behavior with custom sources earlier 
> > (different runner, but that might not be important). Some lessons learned:
> > 
> >   a) please make sure your advance() or start() methods do not block, 
> > that will cause issues and possibly deadlocks you describe
> > 
> >   b) if you want to limit parallelism, that should be possible in the 
> > split() method - you can return collection containing only (this) if 
> > there is no more readers
> > 
> > Hope this helps, please feel free to ask more details if needed.
> > 
> > Best,
> > 
> >   Jan
> > 
> > On 9/19/19 4:47 PM, Ken Barr wrote:
> > > I have a custom UnboundedSource IO that reads from a series of messaging queues.   I have implemented this such that the IO takes a list of queues and expands a UnboundedSource/UnboundedReader for each queue.
> > >
> > > If I use autoscaling with maxNumWorkers <= # number of queues everything works well.   For example if I have 4 queues and run in dataflow; the Dataflow process starts with 1 worker with 4 Readers each consuming from a queue.   As CPU usage  and backlog grow, Dataflow spawns more workers and moves the Readers to the new workers.  As CPU usage and backlog shrinks,  the Readers are moved back to the original worker and unused workers are deleted.  This is exactly what I was hoping for.
> > >
> > > Problems happen if I set maxNumWorkers greater then number of queues.   As scaleup goes past the number of queues, not only are Readers moved, but for some reason new Readers are created.  This should not be too bad, new Readers would just not receive messages as the original Reader is holding exclusive access to ensure in-order delivery.  The real problem is that the original Readers are holding the queue and their advance() method is not being called.  The new Readers advance() method is being called, but they are not active on the queue, hence the system is now deadlocked.
> > >
> > > Questions are:
> > > Why are new Readers being spawned if maxNumWorkers exceeds original number of Readers?  Is there a way of preventing this as I would like to maintain in-order delivery?
> > >
> > > Why is the original Readers advance() method no longer being called?  This is causing a deadlock.
> > 
>