You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Ryan Stewart <rd...@gmail.com> on 2008/05/24 00:32:03 UTC

VM transport not using pass-by-reference

I'm trying to use an embedded ActiveMQ instance for some lightweight message
handling. According to the 
http://activemq.apache.org/vm-transport-reference.html VM Transport
Reference  and 
http://activemq.apache.org/how-should-i-use-the-vm-transport.html this FAQ ,
the VM transport should pass messages by reference, giving some nice, lean
performance.

I'm using version 5.1.0 final. All in one VM, I create one producer and then
x durable subscribers to a topic. They all use vm://default for the
connection URI, which, according to the docs, starts a single embedded
broker.

Using a profiler, I see that for every message to the topic, 2x message
objects are created (ActiveMQTextMessage in this case). i.e. If I have 10
subscribers and send 100 messages, I end up with 2001 messages in memory if
none are consumed. That's 1 message cached in the producer for sending and
2000 others that result from the sends. I've set copyMessageOnSend to false
in both the producer and consumers, but 1) looking at the source of
ActiveMQConnection, this setting seems to not be used, and 2) if it were
used, I suspect it should only need to be set on producers.

In addition to the number of messages being created, they are going through
some sort of marshaling/unmarshaling process, which causes their related
ActiveMQTopic object to also be replicated a number of times: once for every
two messages that exist, in fact. I assume something like this is happening:

1) producer puts message in topic
2) topic creates a copy of each message for each consumer and pushes to
consumer via marshaling mechanism
3) consumer unmarshals message resulting in another message instance plus a
topic instance

That would mean that step 2 creates one message per subscriber, and step 3
creates one message and one topic per subscriber, which accounts for the
numbers I'm seeing.

My main question is how to get the number of message objects down. Ideally,
I'd like to see it at 1 since the producer is just sending the same message
over and over right now. Any hints?
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17442075.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Ryan Stewart <rd...@gmail.com>.
Hi Rob. I call connection.setCopyMessageOnSend(false) on the producer and
consumer connection objects, and yes, the producer and consumers all use the
vm transport.

See my second post in this thread for sample code demonstrating the problem.
(Actually, I just noticed that in that code I have "copyOnSend" set to true
for the consumers, but I've run tests with it set to false on consumers,
too.)


rajdavies wrote:
> 
> Hi Ryan,
> 
> how are you setting copyMessageOnSend - and are the consumers using  
> vm://transport too ?
> 
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17527119.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Rob Davies <ra...@gmail.com>.
Hi Ryan,

how are you setting copyMessageOnSend - and are the consumers using  
vm://transport too ?

cheers,

Rob

http://open.iona.com/products/enterprise-activemq
http://rajdavies.blogspot.com/



On 23 May 2008, at 23:32, Ryan Stewart wrote:

>
> I'm trying to use an embedded ActiveMQ instance for some lightweight  
> message
> handling. According to the
> http://activemq.apache.org/vm-transport-reference.html VM Transport
> Reference  and
> http://activemq.apache.org/how-should-i-use-the-vm-transport.html  
> this FAQ ,
> the VM transport should pass messages by reference, giving some  
> nice, lean
> performance.
>
> I'm using version 5.1.0 final. All in one VM, I create one producer  
> and then
> x durable subscribers to a topic. They all use vm://default for the
> connection URI, which, according to the docs, starts a single embedded
> broker.
>
> Using a profiler, I see that for every message to the topic, 2x  
> message
> objects are created (ActiveMQTextMessage in this case). i.e. If I  
> have 10
> subscribers and send 100 messages, I end up with 2001 messages in  
> memory if
> none are consumed. That's 1 message cached in the producer for  
> sending and
> 2000 others that result from the sends. I've set copyMessageOnSend  
> to false
> in both the producer and consumers, but 1) looking at the source of
> ActiveMQConnection, this setting seems to not be used, and 2) if it  
> were
> used, I suspect it should only need to be set on producers.
>
> In addition to the number of messages being created, they are going  
> through
> some sort of marshaling/unmarshaling process, which causes their  
> related
> ActiveMQTopic object to also be replicated a number of times: once  
> for every
> two messages that exist, in fact. I assume something like this is  
> happening:
>
> 1) producer puts message in topic
> 2) topic creates a copy of each message for each consumer and pushes  
> to
> consumer via marshaling mechanism
> 3) consumer unmarshals message resulting in another message instance  
> plus a
> topic instance
>
> That would mean that step 2 creates one message per subscriber, and  
> step 3
> creates one message and one topic per subscriber, which accounts for  
> the
> numbers I'm seeing.
>
> My main question is how to get the number of message objects down.  
> Ideally,
> I'd like to see it at 1 since the producer is just sending the same  
> message
> over and over right now. Any hints?
> -- 
> View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17442075.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



Re: VM transport not using pass-by-reference

Posted by bwtaylor <br...@yahoo.com>.
I traced through some code. It looks like ActiveMQSession.send() does honor
copyMessageOnSend as noted. But this only explains why you see 2001, not
2002 messages. Message consumption on a topic happens in a two step process: 
 1) During dispatch, a MessageDispatch object is enqueued to consumer's
unconsumedMessage MessageDispatchChannel, then 
 2) During receive, the MessageDispatch is dequeued from unconsumedMessage

The message is dispatched via a call to
ActiveMQMessageConsumer.dispatch(MessageDispatch md). If the consumer has a
MessageListener, a copy of the message is sent to it after calls to
createActiveMQMessage(md) and listener.onMessage(message). Then the original
is enqueued to unconsumedMessage. I don't think you actually have such a
listener, so there's a 3rd copy out there that you are missing out on :-)
The copy happens on the first line of createActiveMQMessage:

    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md)
throws JMSException {
        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
        [... stuff omitted]
        return m;
    }

The dispatch call happens after a long sequence of dispatch calls that go
all the way back to the ActiveMQConnection object. It's onCommand() method
uses a visitor pattern and the CommandVisitorAdapter it uses has this
code...

                    public Response processMessageDispatch(MessageDispatch
md) throws Exception {
                        ActiveMQDispatcher dispatcher =
dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            // Copy in case a embedded broker is dispatching
via
                            // vm://
                            // md.getMessage() == null to signal end of
queue
                            // browse.
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                               
msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                md.setMessage(msg);
                            }
                            dispatcher.dispatch(md);
                        }
                        return null;
                    }

According to the comment, this copy is a feature of vm:// transport not a
bug :-)

The other copy is easier to find. When the message is consumed we instead
get a call to ActiveMQMessageConsumer.receive(), which dequeues a
MessageDispatch md from the MessageDispatchChannel unconsumedMessages then
and calls createActiveMQMessage(md), which as we saw above makes a copy.

So 1 copy on dispatch, 1 copy on receive, and a 3rd copy onMessage() if the
consumer has a listener. I have no insight into the motivations here, I just
eyeballed the code and described the relevant parts.

Some good news: message.copy() is a shallow copy: the content (and
everything else) within the message is copied by reference. See
org.apache.activemq.command.Message.copy(Message copy) .
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17662490.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Ryan Stewart <rd...@gmail.com>.

ttmdev wrote:
> 
> FYI - copyMessageOnSend is being used within the ActiveMQSession.send()
> method. 
> 
> ...
> if (connection.isCopyMessageOnSend()) {
>    msg = (ActiveMQMessage)msg.copy();
> }
> ...
> 
> Joe
> 

Cool. Thanks for pointing that out.
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17538260.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by ttmdev <jo...@ttmsolutions.com>.
FYI - copyMessageOnSend is being used within the ActiveMQSession.send()
method. 

...
if (connection.isCopyMessageOnSend()) {
   msg = (ActiveMQMessage)msg.copy();
}
...

Joe


Ryan Stewart wrote:
> 
> I'm trying to use an embedded ActiveMQ instance for some lightweight
> message handling. According to the 
> http://activemq.apache.org/vm-transport-reference.html VM Transport
> Reference  and 
> http://activemq.apache.org/how-should-i-use-the-vm-transport.html this FAQ
> , the VM transport should pass messages by reference, giving some nice,
> lean performance.
> 
> I'm using version 5.1.0 final. All in one VM, I create one producer and
> then x durable subscribers to a topic. They all use vm://default for the
> connection URI, which, according to the docs, starts a single embedded
> broker.
> 
> Using a profiler, I see that for every message to the topic, 2x message
> objects are created (ActiveMQTextMessage in this case). i.e. If I have 10
> subscribers and send 100 messages, I end up with 2001 messages in memory
> if none are consumed. That's 1 message cached in the producer for sending
> and 2000 others that result from the sends. I've set copyMessageOnSend to
> false in both the producer and consumers, but 1) looking at the source of
> ActiveMQConnection, this setting seems to not be used, and 2) if it were
> used, I suspect it should only need to be set on producers.
> 
> In addition to the number of messages being created, they are going
> through some sort of marshaling/unmarshaling process, which causes their
> related ActiveMQTopic object to also be replicated a number of times: once
> for every two messages that exist, in fact. I assume something like this
> is happening:
> 
> 1) producer puts message in topic
> 2) topic creates a copy of each message for each consumer and pushes to
> consumer via marshaling mechanism
> 3) consumer unmarshals message resulting in another message instance plus
> a topic instance
> 
> That would mean that step 2 creates one message per subscriber, and step 3
> creates one message and one topic per subscriber, which accounts for the
> numbers I'm seeing.
> 
> My main question is how to get the number of message objects down.
> Ideally, I'd like to see it at 1 since the producer is just sending the
> same message over and over right now. Any hints?
> 

-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17536264.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Ryan Stewart <rd...@gmail.com>.

Ryan Stewart wrote:
> 
> I'm trying to use an embedded ActiveMQ instance for some lightweight
> message handling...
> 

*bump* again.

Is there no answer to this?
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17648713.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Ryan Stewart <rd...@gmail.com>.

Ryan Stewart wrote:
> 
> I'm trying to use an embedded ActiveMQ instance for some lightweight
> message handling...
> 

*bump*

Anyone?
-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17520875.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: VM transport not using pass-by-reference

Posted by Ryan Stewart <rd...@gmail.com>.

Ryan Stewart wrote:
> 
> I'm trying to use an embedded ActiveMQ instance for some lightweight
> message handling...
> 

Here is a simple test case that exhibits the problem. Unfortunately you'll
need some sort of profiler to see the actual problem. When I run this test,
I get 2001 ActiveMQTextMessages. If I use a normal consumer instead of a
durable subscriber (call session.createConsumer instead of
session.createDurableSubscriber), I only get 1001 messages.

Test code follows:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MessageByReferenceTest {
    
    private ActiveMQConnection producerConnection;
    private List<ActiveMQConnection> consumerConnections = new
ArrayList<ActiveMQConnection>();
    private MessageProducer producer;
    private TextMessage message;

    @Before
    public void setUp() throws JMSException {
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://default");
        
        producerConnection = (ActiveMQConnection)
factory.createConnection();
        producerConnection.setCopyMessageOnSend(false);
        Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        Destination producerDestination =
producerSession.createTopic("test.topic");
        producer = producerSession.createProducer(producerDestination);
        message = producerSession.createTextMessage("some text");
        
        for (int i = 0; i < 10; i++) {
            ActiveMQConnection consumerConnection = (ActiveMQConnection)
factory.createConnection();
            consumerConnections.add(consumerConnection);
            consumerConnection.setCopyMessageOnSend(true);
            consumerConnection.setClientID(Integer.toString(i));
            Session consumerSession =
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic consumerDestination =
consumerSession.createTopic("test.topic");
            MessageConsumer consumer =
consumerSession.createDurableSubscriber(consumerDestination, "foo");
            consumerConnection.start();
        }
    }
    
    @After
    public void tearDown() throws JMSException {
        producerConnection.stop();
        for (Connection connection : consumerConnections) {
            connection.stop();
        }
    }
    
    @Test
    public void testPassingByReference() throws JMSException, IOException {
        for (int i = 0; i < 100; i++) {
            producer.send(message);
        }
        System.out.println("Check the number of ActiveMQTextMessage objects
in memory now; enter to exit");
        System.in.read();
    }
}

-- 
View this message in context: http://www.nabble.com/VM-transport-not-using-pass-by-reference-tp17442075s2354p17444209.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.