You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Martin Ritchie <ri...@apache.org> on 2009/09/21 15:32:40 UTC

[QPID-1871] Java Client Dispatcher change proposal.

Hi,

Following on from looking at QPID-1871. I believe that there is quite
a significant change required to ensure that the message order or
rollback is maintained.

I propose that we extract the Dispatcher from AMQSession, which will
simplify our biggest class (3100+ lines!) and show clear
responsibility for incoming message processing. This will simplify
rollback as the Dispatcher thread can be given full responsibility for
clearing up the state that it knows best. Rather than the current
situation where the calling thread does some work on AMQSession whilst
the Dispatcher is running/stopping, then calls the the Dispatcher code
directly clean up the remainder. All this while the Dispatcher may be
processing a message.

Change design posted here:
http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes

Comments on the investigation, implications and design welcome.
I'll capture the details on the wiki so we don't lose track of comments

Martin
-- 
Martin Ritchie

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Martin Ritchie <ri...@apache.org>.
2009/9/21 Rafael Schloming <ra...@redhat.com>:
> Martin Ritchie wrote:
>>
>> Hi Rafi,
>>
>> I saw the syncDispatchQueue method but I don't think waiting for the
>> _queue to clear is not the right solution, IMO, for rollback, even on
>> 0-10. When rollback is called you don't want the dispatcher to process
>> any more messages. Your client may have MessageListeners setup that
>> will take a long time to process, so the Dispatcher should stop
>> processing messages and perform the Rollback.
>>
>> I've attached a new test for RollbackOrderTest that blocks because the
>> syncDispatchQueue waits for the Dispatcher to empty the _queue.
>> However, when called via the MessageListener.onMessage(), you end up
>> blocking the Dispatcher.
>
> That's a good point, however won't this also be an issue in the design
> you're proposing? Regardless of which thread actually does the rollback
> processing, rollback needs to block until that processing is complete, and
> if you're waiting for the "ServiceRequest" to be processed from the thread
> that is supposed to process it then you have essentially the same deadlock
> you've attached below.
>
>> I think extracting the Dispatcher will make it clearer to show that
>> the message processing varies in each protocol and will allow the
>> Session classes to focus on the creation and control of
>> Consumer/Producers. This will allow Dispatchers for each protocol to
>> be cleaner and highlight the protocol differences at failover; A 0-8
>> Dispatcher that simply drops all pending messages compared to the 0-10
>> Dispatcher that attempts to process all the messages it has received.
>
> I don't actually think any of this needs to be (or should be) protocol
> specific. AFAICT there's no relevant difference in the protocol semantics
> here, these issues are common to any protocol that does prefetch. For
> example, the choice of whether to drop or process pending messages in a
> given scenario could be made either way for any protocol version.
>
>> I think it is a significant change but I think it is worth it as it
>> will improve the readability of the Client code as well as improving
>> the testability. Currently AMQSession is not exactly easy to unit test
>> so splitting it in to more discrete components and unit testing them
>> will be beneficial.
>>
>> The change boils down to:
>> - Extract Dispatcher Logic to Dispatcher_0_8 , Dispatcher_0_10
>> - Update AMQSession to use new Dispatcher
>> - Update Dispatcher to be able to stop processing the _queue of
>> messages and perform rollback.
>
> I agree the client is badly in need of some improvements in maintainability
> and readability, however in this particular case I don't think moving the
> rollback processing from one thread to another actually improves the
> situation significantly. Fundamentally the rollback logic needs to be
> performed from at least two different threads, the dispatcher thread and the
> application thread. I suspect in order do this properly we really need to
> stop thinking in terms of code being associated with a given thread, and
> think instead about what locks we have, what data structures those locks
> protect, and which locks need to be held in order to execute a given piece
> of code.
>
> In my experience the number one issue with the client is deadlocks and race
> conditions stemming from the large number of haphazardly defined
> locks/conditions that are littered throughout the client code. I think
> before we can safely and productively move large chunks of code around we
> really need to have some sort plan for documenting and simplifying the
> locking situation. Really we need to be able to articulate exactly what
> locks the client has, what data structure(s) each lock protects, and what
> order should be used to acquire multiple locks when necessary.
>
> --Rafael
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org

I had a brief call with Rob/Rafi to discuss the changes I had proposed
to the client and the concerns. To summarise the concerns, which I
think we all feel, the Java client is quite fragile and the number of
locks means that it is difficult to reason that changes will not
introduce a new race condition or subtly change an existing one.

Rafi's recent work on the Python client leads him to believe that we
can do message delivery in a much simpler way which will allow us to
reduce the number of locks we need in the client.

So with that in mind, spending the effort to refactor the current
client to make it more maintainable may not be justified. As a result
we discussed an alternative approach documented here:
http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes+-+Alternative

This approach will increase code reused between existing protocol
versions. Reducing the instances were a protocol version does a
particular feature in its own way, when there is no need to have a
protocol specific version, will help reduce the complexity of the
code. It should also make it much clearer what is protocol specific so
when we come to add further protocol versions we have a solid
codebase.

If anyone has any comments on what I am proposing to do then we can
discuss it either on the Wiki or here. Either way I shall capture the
comments at the end of the wiki so we do not lose any contributions.

Cheers

Martin
-- 
Martin Ritchie

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Rafael Schloming <ra...@redhat.com>.
Martin Ritchie wrote:
> Hi Rafi,
> 
> I saw the syncDispatchQueue method but I don't think waiting for the
> _queue to clear is not the right solution, IMO, for rollback, even on
> 0-10. When rollback is called you don't want the dispatcher to process
> any more messages. Your client may have MessageListeners setup that
> will take a long time to process, so the Dispatcher should stop
> processing messages and perform the Rollback.
> 
> I've attached a new test for RollbackOrderTest that blocks because the
> syncDispatchQueue waits for the Dispatcher to empty the _queue.
> However, when called via the MessageListener.onMessage(), you end up
> blocking the Dispatcher.

That's a good point, however won't this also be an issue in the design 
you're proposing? Regardless of which thread actually does the rollback 
processing, rollback needs to block until that processing is complete, 
and if you're waiting for the "ServiceRequest" to be processed from the 
thread that is supposed to process it then you have essentially the same 
deadlock you've attached below.

> I think extracting the Dispatcher will make it clearer to show that
> the message processing varies in each protocol and will allow the
> Session classes to focus on the creation and control of
> Consumer/Producers. This will allow Dispatchers for each protocol to
> be cleaner and highlight the protocol differences at failover; A 0-8
> Dispatcher that simply drops all pending messages compared to the 0-10
> Dispatcher that attempts to process all the messages it has received.

I don't actually think any of this needs to be (or should be) protocol 
specific. AFAICT there's no relevant difference in the protocol 
semantics here, these issues are common to any protocol that does 
prefetch. For example, the choice of whether to drop or process pending 
messages in a given scenario could be made either way for any protocol 
version.

> I think it is a significant change but I think it is worth it as it
> will improve the readability of the Client code as well as improving
> the testability. Currently AMQSession is not exactly easy to unit test
> so splitting it in to more discrete components and unit testing them
> will be beneficial.
> 
> The change boils down to:
> - Extract Dispatcher Logic to Dispatcher_0_8 , Dispatcher_0_10
> - Update AMQSession to use new Dispatcher
> - Update Dispatcher to be able to stop processing the _queue of
> messages and perform rollback.

I agree the client is badly in need of some improvements in 
maintainability and readability, however in this particular case I don't 
think moving the rollback processing from one thread to another actually 
improves the situation significantly. Fundamentally the rollback logic 
needs to be performed from at least two different threads, the 
dispatcher thread and the application thread. I suspect in order do this 
properly we really need to stop thinking in terms of code being 
associated with a given thread, and think instead about what locks we 
have, what data structures those locks protect, and which locks need to 
be held in order to execute a given piece of code.

In my experience the number one issue with the client is deadlocks and 
race conditions stemming from the large number of haphazardly defined 
locks/conditions that are littered throughout the client code. I think 
before we can safely and productively move large chunks of code around 
we really need to have some sort plan for documenting and simplifying 
the locking situation. Really we need to be able to articulate exactly 
what locks the client has, what data structure(s) each lock protects, 
and what order should be used to acquire multiple locks when necessary.

--Rafael

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Martin Ritchie <ri...@apache.org>.
2009/9/21 Rafael Schloming <ra...@redhat.com>:
> Martin Ritchie wrote:
>>
>> Hi,
>>
>> Following on from looking at QPID-1871. I believe that there is quite
>> a significant change required to ensure that the message order or
>> rollback is maintained.
>>
>> I propose that we extract the Dispatcher from AMQSession, which will
>> simplify our biggest class (3100+ lines!) and show clear
>> responsibility for incoming message processing. This will simplify
>> rollback as the Dispatcher thread can be given full responsibility for
>> clearing up the state that it knows best. Rather than the current
>> situation where the calling thread does some work on AMQSession whilst
>> the Dispatcher is running/stopping, then calls the the Dispatcher code
>> directly clean up the remainder. All this while the Dispatcher may be
>> processing a message.
>>
>> Change design posted here:
>>
>> http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes
>>
>> Comments on the investigation, implications and design welcome.
>> I'll capture the details on the wiki so we don't lose track of comments
>
> Hey Martin,
>
> Sorry I didn't pick up on this earlier. We hit this issue a while back in
> the 0-10 code path. That's why we added RollbackOrderTest, and that's why it
> doesn't fail for 0-10 brokers. You should probably check out
> AMQSession.syncDispatchQueue, this method pretty much solves the problem
> you're describing. It will block until the dispatch queue is empty... or
> more precisely it will block until everything that is currently in the
> dispatch queue has been processed by the dispatcher thread, which if done
> after stopping incoming message flow means it will block until the dispatch
> queue is empty.
>
> This method is used in a few places in the 0-10 codepath where it is
> necessary to clean out the dispatch queue prior to proceeding (e.g. during
> failover), however the key place here is AMQSession_0_10.releaseForRollback.
> If you look at this you'll notice that it is called before the release is
> actually done. If AMQSession_0_8.releaseForRollback were to do the same, or
> preferrably if we were to move the syncDispatchQueue call up to
> AMQSession.java then I suspect this problem would go away without the need
> for a large refactor.
>
> --Rafael

Hi Rafi,

I saw the syncDispatchQueue method but I don't think waiting for the
_queue to clear is not the right solution, IMO, for rollback, even on
0-10. When rollback is called you don't want the dispatcher to process
any more messages. Your client may have MessageListeners setup that
will take a long time to process, so the Dispatcher should stop
processing messages and perform the Rollback.

I've attached a new test for RollbackOrderTest that blocks because the
syncDispatchQueue waits for the Dispatcher to empty the _queue.
However, when called via the MessageListener.onMessage(), you end up
blocking the Dispatcher.

I can understand that you may wish to do the block for 0-10 Failover
as there may still be useful data in the _queue. Also with failover
you are guaranteed that it is not going to be the Dispatcher thread
that is calling syncDispatchQueue.

I think extracting the Dispatcher will make it clearer to show that
the message processing varies in each protocol and will allow the
Session classes to focus on the creation and control of
Consumer/Producers. This will allow Dispatchers for each protocol to
be cleaner and highlight the protocol differences at failover; A 0-8
Dispatcher that simply drops all pending messages compared to the 0-10
Dispatcher that attempts to process all the messages it has received.

I think it is a significant change but I think it is worth it as it
will improve the readability of the Client code as well as improving
the testability. Currently AMQSession is not exactly easy to unit test
so splitting it in to more discrete components and unit testing them
will be beneficial.

The change boils down to:
- Extract Dispatcher Logic to Dispatcher_0_8 , Dispatcher_0_10
- Update AMQSession to use new Dispatcher
- Update Dispatcher to be able to stop processing the _queue of
messages and perform rollback.


Cheers
Martin

-- 
Martin Ritchie

Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Martin Ritchie <ri...@apache.org>.
2009/9/21 Rafael Schloming <ra...@redhat.com>:
> Rafael Schloming wrote:
>>
>> Martin Ritchie wrote:
>>>
>>> Hi,
>>>
>>> Following on from looking at QPID-1871. I believe that there is quite
>>> a significant change required to ensure that the message order or
>>> rollback is maintained.
>>>
>>> I propose that we extract the Dispatcher from AMQSession, which will
>>> simplify our biggest class (3100+ lines!) and show clear
>>> responsibility for incoming message processing. This will simplify
>>> rollback as the Dispatcher thread can be given full responsibility for
>>> clearing up the state that it knows best. Rather than the current
>>> situation where the calling thread does some work on AMQSession whilst
>>> the Dispatcher is running/stopping, then calls the the Dispatcher code
>>> directly clean up the remainder. All this while the Dispatcher may be
>>> processing a message.
>>>
>>> Change design posted here:
>>>
>>> http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes
>>>
>>> Comments on the investigation, implications and design welcome.
>>> I'll capture the details on the wiki so we don't lose track of comments
>>
>> Hey Martin,
>>
>> Sorry I didn't pick up on this earlier. We hit this issue a while back in
>> the 0-10 code path. That's why we added RollbackOrderTest, and that's why it
>> doesn't fail for 0-10 brokers. You should probably check out
>> AMQSession.syncDispatchQueue, this method pretty much solves the problem
>> you're describing. It will block until the dispatch queue is empty... or
>> more precisely it will block until everything that is currently in the
>> dispatch queue has been processed by the dispatcher thread, which if done
>> after stopping incoming message flow means it will block until the dispatch
>> queue is empty.
>>
>> This method is used in a few places in the 0-10 codepath where it is
>> necessary to clean out the dispatch queue prior to proceeding (e.g. during
>> failover), however the key place here is AMQSession_0_10.releaseForRollback.
>> If you look at this you'll notice that it is called before the release is
>> actually done. If AMQSession_0_8.releaseForRollback were to do the same, or
>> preferrably if we were to move the syncDispatchQueue call up to
>> AMQSession.java then I suspect this problem would go away without the need
>> for a large refactor.
>
> The other thing you may want to look at is the Dispatchable interface. This
> is how syncDispatchQueue works, and I believe it is a similar concept to the
> ServiceRequests that you mention.

Similar but the difference is that the ServiceRequest queue is
processed before any content in the main message delivery _queue.

Martin

> --Rafael
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
>



-- 
Martin Ritchie

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Rafael Schloming <ra...@redhat.com>.
Rafael Schloming wrote:
> Martin Ritchie wrote:
>> Hi,
>>
>> Following on from looking at QPID-1871. I believe that there is quite
>> a significant change required to ensure that the message order or
>> rollback is maintained.
>>
>> I propose that we extract the Dispatcher from AMQSession, which will
>> simplify our biggest class (3100+ lines!) and show clear
>> responsibility for incoming message processing. This will simplify
>> rollback as the Dispatcher thread can be given full responsibility for
>> clearing up the state that it knows best. Rather than the current
>> situation where the calling thread does some work on AMQSession whilst
>> the Dispatcher is running/stopping, then calls the the Dispatcher code
>> directly clean up the remainder. All this while the Dispatcher may be
>> processing a message.
>>
>> Change design posted here:
>> http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes 
>>
>>
>> Comments on the investigation, implications and design welcome.
>> I'll capture the details on the wiki so we don't lose track of comments
> 
> Hey Martin,
> 
> Sorry I didn't pick up on this earlier. We hit this issue a while back 
> in the 0-10 code path. That's why we added RollbackOrderTest, and that's 
> why it doesn't fail for 0-10 brokers. You should probably check out 
> AMQSession.syncDispatchQueue, this method pretty much solves the problem 
> you're describing. It will block until the dispatch queue is empty... or 
> more precisely it will block until everything that is currently in the 
> dispatch queue has been processed by the dispatcher thread, which if 
> done after stopping incoming message flow means it will block until the 
> dispatch queue is empty.
> 
> This method is used in a few places in the 0-10 codepath where it is 
> necessary to clean out the dispatch queue prior to proceeding (e.g. 
> during failover), however the key place here is 
> AMQSession_0_10.releaseForRollback. If you look at this you'll notice 
> that it is called before the release is actually done. If 
> AMQSession_0_8.releaseForRollback were to do the same, or preferrably if 
> we were to move the syncDispatchQueue call up to AMQSession.java then I 
> suspect this problem would go away without the need for a large refactor.

The other thing you may want to look at is the Dispatchable interface. 
This is how syncDispatchQueue works, and I believe it is a similar 
concept to the ServiceRequests that you mention.

--Rafael


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: [QPID-1871] Java Client Dispatcher change proposal.

Posted by Rafael Schloming <ra...@redhat.com>.
Martin Ritchie wrote:
> Hi,
> 
> Following on from looking at QPID-1871. I believe that there is quite
> a significant change required to ensure that the message order or
> rollback is maintained.
> 
> I propose that we extract the Dispatcher from AMQSession, which will
> simplify our biggest class (3100+ lines!) and show clear
> responsibility for incoming message processing. This will simplify
> rollback as the Dispatcher thread can be given full responsibility for
> clearing up the state that it knows best. Rather than the current
> situation where the calling thread does some work on AMQSession whilst
> the Dispatcher is running/stopping, then calls the the Dispatcher code
> directly clean up the remainder. All this while the Dispatcher may be
> processing a message.
> 
> Change design posted here:
> http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes
> 
> Comments on the investigation, implications and design welcome.
> I'll capture the details on the wiki so we don't lose track of comments

Hey Martin,

Sorry I didn't pick up on this earlier. We hit this issue a while back 
in the 0-10 code path. That's why we added RollbackOrderTest, and that's 
why it doesn't fail for 0-10 brokers. You should probably check out 
AMQSession.syncDispatchQueue, this method pretty much solves the problem 
you're describing. It will block until the dispatch queue is empty... or 
more precisely it will block until everything that is currently in the 
dispatch queue has been processed by the dispatcher thread, which if 
done after stopping incoming message flow means it will block until the 
dispatch queue is empty.

This method is used in a few places in the 0-10 codepath where it is 
necessary to clean out the dispatch queue prior to proceeding (e.g. 
during failover), however the key place here is 
AMQSession_0_10.releaseForRollback. If you look at this you'll notice 
that it is called before the release is actually done. If 
AMQSession_0_8.releaseForRollback were to do the same, or preferrably if 
we were to move the syncDispatchQueue call up to AMQSession.java then I 
suspect this problem would go away without the need for a large refactor.

--Rafael

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org