You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by cafe <rm...@estudiantes.uci.cu> on 2007/02/17 16:32:01 UTC

I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

I have a listener (in the server side) who receive request for send files
(Streams):

public class RequestUpdateListener implements SessionAwareMessageListener {

        private JmsTemplate jmsTemplate;

        private String fileUpdateLocation;
       
        public void onMessage(Message request, Session session) throws
JMSException {


                ActiveMQConnectionFactory connectionFactory =
((PooledConnectionFactory) jmsTemplate
                               
.getConnectionFactory()).getConnectionFactory();

                FileInputStream fis = null;
                try {
                        fis = new FileInputStream(new
File(fileUpdateLocation));
                        StreamSender ss = new StreamSender();
                        ss.send(connectionFactory, fis,uuid);

                } catch (Exception e) {
                        e.printStackTrace();
                }
       
        }
.............

and a class:

public class StreamSender {

        public void send(ConnectionFactory connectionFactory,
FileInputStream fis, String uuid) {
               
                try {

                        connection = (ActiveMQConnection)
connectionFactory.createConnection();
                        session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
                        Destination destination =
session.createQueue("TransferQueue");

                        int deliveryMode = DeliveryMode.NON_PERSISTENT;
                        int priority = 1;
                        long timeToLive = 0;

                        outputStreamMQ =
connection.createOutputStream(destination, prop, deliveryMode, priority,
                                        timeToLive);

               
                        int total = 0;
                        int reads;
                        byte[] array = new byte[8 * 1024];

                        while ((reads = fis.read(array)) != -1) {
                                outputStreamMQ.write(array, 0, reads);
                        }

                        JmsUtils.commitIfNecessary(session);
......

and again the spring container:

        <bean id="updateContainer"
               
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="concurrentConsumers" value="50" />
                <property name="connectionFactory" ref="jmsFactory" />
                <property name="destination" ref="requestUpdateQueue" />
                <property name="messageListener" ref="requestUpdateListener"
/>
        </bean>


 
and in the client side I have a listener who receive the Stream:

public class NotificationListener implements SessionAwareMessageListener {


        public void onMessage(Message response, Session session) throws
JMSException {

                String message = "xml descriptor";
                jmsTemplate.convertAndSend(requestUpdateQueue, message, new
MessagePostProcessor() {
                        public Message postProcessMessage(Message message)
throws JMSException {
                                ....
                                return message;
                        }
                });
               
                StreamReceiver receiver = new StreamReceiver();
                receiver.setConnectionFactory(connectionFactory);
                receiver.receive();

        }

public class StreamReceiver {


        public void receive() {
                ...
                try {

                        connection = (ActiveMQConnection)
connectionFactory.createConnection();
                        session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
                        Destination destination =
session.createQueue("TransferQueue");
                        fisMQ = connection.createInputStream(destination,
selector);
                        fos = new FileOutputStream("...");

                        int reads;
                        int total = 0;
                        byte[] array = new byte[8 * 1024];

                        while ((reads = fisMQ.read(array)) != -1) {
                                fos.write(array, 0, reads);
                                total += reads;
                                        }
                        JmsUtils.commitIfNecessary(session);
.........

Now the problem arise when more than 2 client running in different machines
does a request simultaneously, the server (StreamSender) stop the transfer
without any exception 

what could be this? 
-- 
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9020869
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by Brian McCallister <br...@apache.org>.
Vote result?

-Brian

On Feb 19, 2007, at 11:26 AM, cafe wrote:

>
> I have replied this in the spring forum too...
>
> Thx Strachan, following your tread I have localized where is the  
> problem buy
> not why it happen
>
> The problem is not in the listener of which will be running multiple
> instances for consuming in a concurrent way the incoming message. I  
> have
> proved that commenting the code inside the onMessage method and  
> putting
> Thread.sleep(60000); with this test the message were consumed  
> concurrently.
>
>
> Now the problem is when I attempt to send a Stream to a queue  
> inside the
> onMessage method. When more than 2 onMessage methods begin sending  
> Streams
> concurrently to the same Queue the transferring stop and stay in  
> pause.
>
> What could be this?
>
> Surprisingly when there are 2 Thread sending Streams to the same  
> queue all
> work well.
>
> Does ActiveMQ support dispatch Streams concurrently?
> __________________
> regards...
> -- 
> View this message in context: http://www.nabble.com/I-can%E2%80%99t- 
> receive-more-than-2-messages-simultaneously-with- 
> DefaultMessageListenerContainer-tf3245123s2354.html#a9048143
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by cafe <rm...@estudiantes.uci.cu>.
Hi James: I’m more close to the problem.

Now I can send multiple streams concurrently but under 2 Mb, when I try to
send streams of size 2 Mb or bigger the transferring is paused without any
complaint. 

When i create the output stream the DeliveryMode is not persistent and I
have configured the memory of the broker too to a high value and nothing 
...

What could be this?
-- 
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9076585
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by cafe <rm...@estudiantes.uci.cu>.

James.Strachan wrote:
> 
> Are you using separate sessions & producers for each thread?
> 
> James
> -------
> http://radio.weblogs.com/0112098/
> 
> 

yes. 

This is the code inside the listener of which there are many instance
running concurrently  

connection = (ActiveMQConnection) connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

and this is the line where I obtain the OutputStream if serve of any help... 

outputStreamMQ = connection.createOutputStream(destination, prop,
deliveryMode, priority,timeToLive);


I wonder why it work when there are 2 thread writing Stream concurrently and
why it not work when there are more than 2 thread?

could be this a bug?

-- 
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9069874
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by cafe <rm...@estudiantes.uci.cu>.
James, I found the solution to my problem here
http://activemq.apache.org/what-is-the-prefetch-limit-for.html 

sorry for the lost time here and thanks...
-- 
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9088189
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by James Strachan <ja...@gmail.com>.
On 2/19/07, cafe <rm...@estudiantes.uci.cu> wrote:
>
> I have replied this in the spring forum too...
>
> Thx Strachan, following your tread I have localized where is the problem buy
> not why it happen
>
> The problem is not in the listener of which will be running multiple
> instances for consuming in a concurrent way the incoming message. I have
> proved that commenting the code inside the onMessage method and putting
> Thread.sleep(60000); with this test the message were consumed concurrently.
>
>
> Now the problem is when I attempt to send a Stream to a queue inside the
> onMessage method. When more than 2 onMessage methods begin sending Streams
> concurrently to the same Queue the transferring stop and stay in pause.
>
> What could be this?
>
> Surprisingly when there are 2 Thread sending Streams to the same queue all
> work well.
>
> Does ActiveMQ support dispatch Streams concurrently?

Yes.

Are you using separate sessions & producers for each thread?

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by cafe <rm...@estudiantes.uci.cu>.
I have replied this in the spring forum too...

Thx Strachan, following your tread I have localized where is the problem buy
not why it happen

The problem is not in the listener of which will be running multiple
instances for consuming in a concurrent way the incoming message. I have
proved that commenting the code inside the onMessage method and putting
Thread.sleep(60000); with this test the message were consumed concurrently.


Now the problem is when I attempt to send a Stream to a queue inside the
onMessage method. When more than 2 onMessage methods begin sending Streams
concurrently to the same Queue the transferring stop and stay in pause.

What could be this?

Surprisingly when there are 2 Thread sending Streams to the same queue all
work well.

Does ActiveMQ support dispatch Streams concurrently?
__________________
regards... 
-- 
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9048143
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Re: I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer

Posted by James Strachan <ja...@gmail.com>.
I replied on the other thread you created on the spring forum.

These FAQ entries could be useful...
http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
http://activemq.apache.org/what-is-the-prefetch-limit-for.html

How many messages are you sending? If its less than 50,000 then I'd
reduce your prefetch value to something small (1 or 2).


On 2/17/07, cafe <rm...@estudiantes.uci.cu> wrote:
>
> I have a listener (in the server side) who receive request for send files
> (Streams):
>
> public class RequestUpdateListener implements SessionAwareMessageListener {
>
>         private JmsTemplate jmsTemplate;
>
>         private String fileUpdateLocation;
>
>         public void onMessage(Message request, Session session) throws
> JMSException {
>
>
>                 ActiveMQConnectionFactory connectionFactory =
> ((PooledConnectionFactory) jmsTemplate
>
> .getConnectionFactory()).getConnectionFactory();
>
>                 FileInputStream fis = null;
>                 try {
>                         fis = new FileInputStream(new
> File(fileUpdateLocation));
>                         StreamSender ss = new StreamSender();
>                         ss.send(connectionFactory, fis,uuid);
>
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                 }
>
>         }
> .............
>
> and a class:
>
> public class StreamSender {
>
>         public void send(ConnectionFactory connectionFactory,
> FileInputStream fis, String uuid) {
>
>                 try {
>
>                         connection = (ActiveMQConnection)
> connectionFactory.createConnection();
>                         session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
>                         Destination destination =
> session.createQueue("TransferQueue");
>
>                         int deliveryMode = DeliveryMode.NON_PERSISTENT;
>                         int priority = 1;
>                         long timeToLive = 0;
>
>                         outputStreamMQ =
> connection.createOutputStream(destination, prop, deliveryMode, priority,
>                                         timeToLive);
>
>
>                         int total = 0;
>                         int reads;
>                         byte[] array = new byte[8 * 1024];
>
>                         while ((reads = fis.read(array)) != -1) {
>                                 outputStreamMQ.write(array, 0, reads);
>                         }
>
>                         JmsUtils.commitIfNecessary(session);
> ......
>
> and again the spring container:
>
>         <bean id="updateContainer"
>
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
>                 <property name="concurrentConsumers" value="50" />
>                 <property name="connectionFactory" ref="jmsFactory" />
>                 <property name="destination" ref="requestUpdateQueue" />
>                 <property name="messageListener" ref="requestUpdateListener"
> />
>         </bean>
>
>
>
> and in the client side I have a listener who receive the Stream:
>
> public class NotificationListener implements SessionAwareMessageListener {
>
>
>         public void onMessage(Message response, Session session) throws
> JMSException {
>
>                 String message = "xml descriptor";
>                 jmsTemplate.convertAndSend(requestUpdateQueue, message, new
> MessagePostProcessor() {
>                         public Message postProcessMessage(Message message)
> throws JMSException {
>                                 ....
>                                 return message;
>                         }
>                 });
>
>                 StreamReceiver receiver = new StreamReceiver();
>                 receiver.setConnectionFactory(connectionFactory);
>                 receiver.receive();
>
>         }
>
> public class StreamReceiver {
>
>
>         public void receive() {
>                 ...
>                 try {
>
>                         connection = (ActiveMQConnection)
> connectionFactory.createConnection();
>                         session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
>                         Destination destination =
> session.createQueue("TransferQueue");
>                         fisMQ = connection.createInputStream(destination,
> selector);
>                         fos = new FileOutputStream("...");
>
>                         int reads;
>                         int total = 0;
>                         byte[] array = new byte[8 * 1024];
>
>                         while ((reads = fisMQ.read(array)) != -1) {
>                                 fos.write(array, 0, reads);
>                                 total += reads;
>                                         }
>                         JmsUtils.commitIfNecessary(session);
> .........
>
> Now the problem arise when more than 2 client running in different machines
> does a request simultaneously, the server (StreamSender) stop the transfer
> without any exception
>
> what could be this?
> --
> View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9020869
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/