You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by frbo <fr...@imc.nl> on 2010/04/09 15:30:35 UTC

onMessage() called by multiple threads - synchronization needed?

I'm sorry if this question has been answered before, but it is not clear to
me yet what the safe way is to update class fields from the 'onMessage()'
method of an asynchronous consumer.

I am using ActiveMQ 5.3.0 with the default settings (Java 1.6). The
following example code shows that 'onMessage()' is called from more than one
thread, but not at the same time. The code seems to work ok without any
additional synchronization: the 'receivedMessages' list contains all the
messages I'm sending to the queue. But maybe I'm just being lucky...

If I understand the Java memory model correctly, then you must use
synchronization or 'volatile' variables to make sure that changes made to a
field by one thread are always visible by the other threads. 
So I wonder: should I make use a 'synchronized' version for the collections
that are accessed in the 'onMessage' threads? 

Thanks!

-- Frank 

--------

import org.apache.activemq.*;

import javax.jms.*;
import javax.jms.Message;
import javax.jms.Queue;
import java.util.*;
import java.util.concurrent.locks.*;

public class ThreadSafeConsumer
{
    private static final String theURL =
"vm:(broker:(tcp://localhost:61616)?persistent=true)";

    // QUESTION: should I use a 'synchronized' version of these
collections???
    private final Set<Integer> theThreads = new HashSet<Integer>();
    private final List<String> theReceivedMessages = new
ArrayList<String>();

    private final Lock theLock = new ReentrantLock();

    public static void main(String[] anArgs) throws JMSException
    {
        new ThreadSafeConsumer().run();
    }

    public void run() throws JMSException
    {
        ConnectionFactory myFactory = new ActiveMQConnectionFactory(theURL);
        Connection myConnection = myFactory.createConnection();

        Session mySession = myConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

        Queue myQueue = mySession.createQueue("MyTestInput");
        MessageConsumer myConsumer = mySession.createConsumer(myQueue);

        myConsumer.setMessageListener(new MessageListener()
        {
            @Override
            public void onMessage(Message message)
            {
                try
                {
                    handleIncomingMessage(message);
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });

        myConnection.start();
    }

    private void handleIncomingMessage(Message aMessage) throws JMSException
    {
        if (theThreads.add(System.identityHashCode(Thread.currentThread()))
&& theThreads.size() > 1)
        {
            System.out.println("More than one thread is calling onMessage: "
+ theThreads);
        }

        if (!theLock.tryLock())
        {
            System.err.println("This would be really bad, but fortunately it
doesn't happen");
        }
        try
        {
            theReceivedMessages.add(((TextMessage)aMessage).getText());
        }
        finally
        {
            theLock.unlock();
        }
    }
}

-- 
View this message in context: http://old.nabble.com/onMessage%28%29-called-by-multiple-threads---synchronization-needed--tp28190843p28190843.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: onMessage() called by multiple threads - synchronization needed?

Posted by Denis Bazhenov <ba...@farpost.com>.
MessageListener interface strictly define behavior of listeners in multithreaded environment (http://java.sun.com/j2ee/1.4/docs/api/javax/jms/MessageListener.html). Session should serialize dispatching of messages to listener, so you don't need any additional synchronization primitives over MessageListener. But you should not register one MessageListener in several Sessions at the same time.

Denis Bazhenov



On Apr 10, 2010, at 12:30 AM, frbo wrote:

> I wonder: should I make use a 'synchronized' version for the collections
> that are accessed in the 'onMessage' t