You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Jim Lloyd <jl...@silvertailsystems.com> on 2008/12/19 00:11:31 UTC

Race condition with transport listener and failover transports??

We're building a client library to install into our customer's applications
for publishing messages to our applications via ActiveMQ. We need to ensure
that our client library never blocks any of the customer's application's
threads. We also want to ensure that our client library is resilient to
problems connecting to a broker.

Our strategy is to use a failover transport. It looks like we can use the
default failover transport options as documented on the
failover-transport-reference<http://activemq.apache.org/failover-transport-reference.html>page.
We're registering a TransportListener so that we can avoid blocking as
noted at the bottom of the failover transport reference page.

I have noticed a possible race condition and I wonder if there is something
I am overlooking.

Our class that manages our connection looks something like this:

class TopicPublisherClient implements TransportListener {
    private boolean mIsConnected = false;
    private TopicConnection mConnection;
    private TopicSession mSession;
    private TopicPublisher mPublisher;

    public void transportInterupted() { mIsConnected = false; }

    public void transportResumed() { mIsConnected = true; }

    private void initialize(String brokerURL) throws JMSException {
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerURL);
        mConnection = factory.createTopicConnection();
        ((ActiveMQConnection) mConnection).addTransportListener(this);
        mSession = mConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
        createTopics(); // creates several Topic instances, details not
important here
        mPublisher = mSession.createPublisher(null);
    }
}

With the code as above, the transportResumed() callback is called before
initialize() exits. However the callback is called asynchronously, so as far
as I can tell we can't rely on when the callback will be called.

In fact, the race condition is even more problematic. If I insert a
Thread.sleep(1000) between the call to createTopicConnection() and the call
to addTransportListener(), then the callback is never called. If a sleep
causes this to happen, then even without the sleep it may happen. Is there
any way for us to guard against this? It seems to me that
createTopicConnection should have a variant that takes the
TransportListener. That variant should be able to be free of any race
conditions. Or am I missing some other solution?

Thanks,
Jim Lloyd

Re: Race condition with transport listener and failover transports??

Posted by Jim Lloyd <jl...@silvertailsystems.com>.
Thanks Gary. That's works for me. :)

On Mon, Dec 22, 2008 at 4:07 AM, Gary Tully <ga...@gmail.com> wrote:

> appended a little comment to the doc to use
> ActiveMQConnectionFactory.setTransportListener().
> Thanks :-)
>
> 2008/12/21 Jim Lloyd <jl...@silvertailsystems.com>:
> > To answer my own question, it turns out that ActiveMQConnectionFactory
> has a
> > setTransportListener method that can be called before
> createTopicConnection
> > is called.
> >
> > I also deduced that the race condition is unlikely in my previous example
> > because the failover transport uses the initialReconnectDelay parameter
> to
> > impose a sleep before starting the connection. I wanted to verify this so
> I
> > did these tests:
> >
> >   1. Using essentially the same code I posted previously, I changed the
> >   initialReconnectDelay to 1ms instead of the default 10ms. I inserted a
> sleep
> >   of 5ms between the call to createTopicConnection and
> addTransportListener.
> >   That was enough to trigger race condition problem.
> >   2. I then added a call to setTransportListener on the factory before
> >   creating the connection leaving the other code as is. That seemed to be
> >   enough to fix the race condition.
> >
> > I haven't yet traced through the source to verify that using
> > ActiveMQConnectionFactory.setTransportListener is enough to eliminate any
> > chance of a race condition, but I expect it is. About the only thing I
> think
> > would be useful at this point is if there was a note about this in the
> > documentation, perhaps near the warning at the bottom of the
> > failover-transport-reference<
> http://activemq.apache.org/failover-transport-reference.html>page.
> >
> > Jim Lloyd
> >
> >
> > On Thu, Dec 18, 2008 at 3:11 PM, Jim Lloyd <jlloyd@silvertailsystems.com
> >wrote:
> >
> >> We're building a client library to install into our customer's
> applications
> >> for publishing messages to our applications via ActiveMQ. We need to
> ensure
> >> that our client library never blocks any of the customer's application's
> >> threads. We also want to ensure that our client library is resilient to
> >> problems connecting to a broker.
> >>
> >> Our strategy is to use a failover transport. It looks like we can use
> the
> >> default failover transport options as documented on the
> >> failover-transport-reference<
> http://activemq.apache.org/failover-transport-reference.html>page. We're
> registering a TransportListener so that we can avoid blocking as
> >> noted at the bottom of the failover transport reference page.
> >>
> >> I have noticed a possible race condition and I wonder if there is
> something
> >> I am overlooking.
> >>
> >> Our class that manages our connection looks something like this:
> >>
> >> class TopicPublisherClient implements TransportListener {
> >>     private boolean mIsConnected = false;
> >>     private TopicConnection mConnection;
> >>     private TopicSession mSession;
> >>     private TopicPublisher mPublisher;
> >>
> >>     public void transportInterupted() { mIsConnected = false; }
> >>
> >>     public void transportResumed() { mIsConnected = true; }
> >>
> >>     private void initialize(String brokerURL) throws JMSException {
> >>         ActiveMQConnectionFactory factory = new
> >> ActiveMQConnectionFactory(brokerURL);
> >>         mConnection = factory.createTopicConnection();
> >>         ((ActiveMQConnection) mConnection).addTransportListener(this);
> >>         mSession = mConnection.createTopicSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>         createTopics(); // creates several Topic instances, details not
> >> important here
> >>         mPublisher = mSession.createPublisher(null);
> >>     }
> >> }
> >>
> >> With the code as above, the transportResumed() callback is called before
> >> initialize() exits. However the callback is called asynchronously, so as
> far
> >> as I can tell we can't rely on when the callback will be called.
> >>
> >> In fact, the race condition is even more problematic. If I insert a
> >> Thread.sleep(1000) between the call to createTopicConnection() and the
> call
> >> to addTransportListener(), then the callback is never called. If a sleep
> >> causes this to happen, then even without the sleep it may happen. Is
> there
> >> any way for us to guard against this? It seems to me that
> >> createTopicConnection should have a variant that takes the
> >> TransportListener. That variant should be able to be free of any race
> >> conditions. Or am I missing some other solution?
> >>
> >> Thanks,
> >> Jim Lloyd
> >>
> >>
> >
>
>
>
> --
> http://blog.garytully.com
>
> Open Source SOA
> http://FUSESource.com
>

Re: Race condition with transport listener and failover transports??

Posted by Gary Tully <ga...@gmail.com>.
appended a little comment to the doc to use
ActiveMQConnectionFactory.setTransportListener().
Thanks :-)

2008/12/21 Jim Lloyd <jl...@silvertailsystems.com>:
> To answer my own question, it turns out that ActiveMQConnectionFactory has a
> setTransportListener method that can be called before createTopicConnection
> is called.
>
> I also deduced that the race condition is unlikely in my previous example
> because the failover transport uses the initialReconnectDelay parameter to
> impose a sleep before starting the connection. I wanted to verify this so I
> did these tests:
>
>   1. Using essentially the same code I posted previously, I changed the
>   initialReconnectDelay to 1ms instead of the default 10ms. I inserted a sleep
>   of 5ms between the call to createTopicConnection and addTransportListener.
>   That was enough to trigger race condition problem.
>   2. I then added a call to setTransportListener on the factory before
>   creating the connection leaving the other code as is. That seemed to be
>   enough to fix the race condition.
>
> I haven't yet traced through the source to verify that using
> ActiveMQConnectionFactory.setTransportListener is enough to eliminate any
> chance of a race condition, but I expect it is. About the only thing I think
> would be useful at this point is if there was a note about this in the
> documentation, perhaps near the warning at the bottom of the
> failover-transport-reference<http://activemq.apache.org/failover-transport-reference.html>page.
>
> Jim Lloyd
>
>
> On Thu, Dec 18, 2008 at 3:11 PM, Jim Lloyd <jl...@silvertailsystems.com>wrote:
>
>> We're building a client library to install into our customer's applications
>> for publishing messages to our applications via ActiveMQ. We need to ensure
>> that our client library never blocks any of the customer's application's
>> threads. We also want to ensure that our client library is resilient to
>> problems connecting to a broker.
>>
>> Our strategy is to use a failover transport. It looks like we can use the
>> default failover transport options as documented on the
>> failover-transport-reference<http://activemq.apache.org/failover-transport-reference.html>page. We're registering a TransportListener so that we can avoid blocking as
>> noted at the bottom of the failover transport reference page.
>>
>> I have noticed a possible race condition and I wonder if there is something
>> I am overlooking.
>>
>> Our class that manages our connection looks something like this:
>>
>> class TopicPublisherClient implements TransportListener {
>>     private boolean mIsConnected = false;
>>     private TopicConnection mConnection;
>>     private TopicSession mSession;
>>     private TopicPublisher mPublisher;
>>
>>     public void transportInterupted() { mIsConnected = false; }
>>
>>     public void transportResumed() { mIsConnected = true; }
>>
>>     private void initialize(String brokerURL) throws JMSException {
>>         ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory(brokerURL);
>>         mConnection = factory.createTopicConnection();
>>         ((ActiveMQConnection) mConnection).addTransportListener(this);
>>         mSession = mConnection.createTopicSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>         createTopics(); // creates several Topic instances, details not
>> important here
>>         mPublisher = mSession.createPublisher(null);
>>     }
>> }
>>
>> With the code as above, the transportResumed() callback is called before
>> initialize() exits. However the callback is called asynchronously, so as far
>> as I can tell we can't rely on when the callback will be called.
>>
>> In fact, the race condition is even more problematic. If I insert a
>> Thread.sleep(1000) between the call to createTopicConnection() and the call
>> to addTransportListener(), then the callback is never called. If a sleep
>> causes this to happen, then even without the sleep it may happen. Is there
>> any way for us to guard against this? It seems to me that
>> createTopicConnection should have a variant that takes the
>> TransportListener. That variant should be able to be free of any race
>> conditions. Or am I missing some other solution?
>>
>> Thanks,
>> Jim Lloyd
>>
>>
>



-- 
http://blog.garytully.com

Open Source SOA
http://FUSESource.com

Re: Race condition with transport listener and failover transports??

Posted by Jim Lloyd <jl...@silvertailsystems.com>.
To answer my own question, it turns out that ActiveMQConnectionFactory has a
setTransportListener method that can be called before createTopicConnection
is called.

I also deduced that the race condition is unlikely in my previous example
because the failover transport uses the initialReconnectDelay parameter to
impose a sleep before starting the connection. I wanted to verify this so I
did these tests:

   1. Using essentially the same code I posted previously, I changed the
   initialReconnectDelay to 1ms instead of the default 10ms. I inserted a sleep
   of 5ms between the call to createTopicConnection and addTransportListener.
   That was enough to trigger race condition problem.
   2. I then added a call to setTransportListener on the factory before
   creating the connection leaving the other code as is. That seemed to be
   enough to fix the race condition.

I haven't yet traced through the source to verify that using
ActiveMQConnectionFactory.setTransportListener is enough to eliminate any
chance of a race condition, but I expect it is. About the only thing I think
would be useful at this point is if there was a note about this in the
documentation, perhaps near the warning at the bottom of the
failover-transport-reference<http://activemq.apache.org/failover-transport-reference.html>page.

Jim Lloyd


On Thu, Dec 18, 2008 at 3:11 PM, Jim Lloyd <jl...@silvertailsystems.com>wrote:

> We're building a client library to install into our customer's applications
> for publishing messages to our applications via ActiveMQ. We need to ensure
> that our client library never blocks any of the customer's application's
> threads. We also want to ensure that our client library is resilient to
> problems connecting to a broker.
>
> Our strategy is to use a failover transport. It looks like we can use the
> default failover transport options as documented on the
> failover-transport-reference<http://activemq.apache.org/failover-transport-reference.html>page. We're registering a TransportListener so that we can avoid blocking as
> noted at the bottom of the failover transport reference page.
>
> I have noticed a possible race condition and I wonder if there is something
> I am overlooking.
>
> Our class that manages our connection looks something like this:
>
> class TopicPublisherClient implements TransportListener {
>     private boolean mIsConnected = false;
>     private TopicConnection mConnection;
>     private TopicSession mSession;
>     private TopicPublisher mPublisher;
>
>     public void transportInterupted() { mIsConnected = false; }
>
>     public void transportResumed() { mIsConnected = true; }
>
>     private void initialize(String brokerURL) throws JMSException {
>         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerURL);
>         mConnection = factory.createTopicConnection();
>         ((ActiveMQConnection) mConnection).addTransportListener(this);
>         mSession = mConnection.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         createTopics(); // creates several Topic instances, details not
> important here
>         mPublisher = mSession.createPublisher(null);
>     }
> }
>
> With the code as above, the transportResumed() callback is called before
> initialize() exits. However the callback is called asynchronously, so as far
> as I can tell we can't rely on when the callback will be called.
>
> In fact, the race condition is even more problematic. If I insert a
> Thread.sleep(1000) between the call to createTopicConnection() and the call
> to addTransportListener(), then the callback is never called. If a sleep
> causes this to happen, then even without the sleep it may happen. Is there
> any way for us to guard against this? It seems to me that
> createTopicConnection should have a variant that takes the
> TransportListener. That variant should be able to be free of any race
> conditions. Or am I missing some other solution?
>
> Thanks,
> Jim Lloyd
>
>