You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by cafe <rm...@estudiantes.uci.cu> on 2007/02/17 16:32:01 UTC
I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
I have a listener (in the server side) who receive request for send files
(Streams):
public class RequestUpdateListener implements SessionAwareMessageListener {
private JmsTemplate jmsTemplate;
private String fileUpdateLocation;
public void onMessage(Message request, Session session) throws
JMSException {
ActiveMQConnectionFactory connectionFactory =
((PooledConnectionFactory) jmsTemplate
.getConnectionFactory()).getConnectionFactory();
FileInputStream fis = null;
try {
fis = new FileInputStream(new
File(fileUpdateLocation));
StreamSender ss = new StreamSender();
ss.send(connectionFactory, fis,uuid);
} catch (Exception e) {
e.printStackTrace();
}
}
.............
and a class:
public class StreamSender {
public void send(ConnectionFactory connectionFactory,
FileInputStream fis, String uuid) {
try {
connection = (ActiveMQConnection)
connectionFactory.createConnection();
session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination destination =
session.createQueue("TransferQueue");
int deliveryMode = DeliveryMode.NON_PERSISTENT;
int priority = 1;
long timeToLive = 0;
outputStreamMQ =
connection.createOutputStream(destination, prop, deliveryMode, priority,
timeToLive);
int total = 0;
int reads;
byte[] array = new byte[8 * 1024];
while ((reads = fis.read(array)) != -1) {
outputStreamMQ.write(array, 0, reads);
}
JmsUtils.commitIfNecessary(session);
......
and again the spring container:
<bean id="updateContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="50" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="requestUpdateQueue" />
<property name="messageListener" ref="requestUpdateListener"
/>
</bean>
and in the client side I have a listener who receive the Stream:
public class NotificationListener implements SessionAwareMessageListener {
public void onMessage(Message response, Session session) throws
JMSException {
String message = "xml descriptor";
jmsTemplate.convertAndSend(requestUpdateQueue, message, new
MessagePostProcessor() {
public Message postProcessMessage(Message message)
throws JMSException {
....
return message;
}
});
StreamReceiver receiver = new StreamReceiver();
receiver.setConnectionFactory(connectionFactory);
receiver.receive();
}
public class StreamReceiver {
public void receive() {
...
try {
connection = (ActiveMQConnection)
connectionFactory.createConnection();
session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination destination =
session.createQueue("TransferQueue");
fisMQ = connection.createInputStream(destination,
selector);
fos = new FileOutputStream("...");
int reads;
int total = 0;
byte[] array = new byte[8 * 1024];
while ((reads = fisMQ.read(array)) != -1) {
fos.write(array, 0, reads);
total += reads;
}
JmsUtils.commitIfNecessary(session);
.........
Now the problem arise when more than 2 client running in different machines
does a request simultaneously, the server (StreamSender) stop the transfer
without any exception
what could be this?
--
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9020869
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by Brian McCallister <br...@apache.org>.
Vote result?
-Brian
On Feb 19, 2007, at 11:26 AM, cafe wrote:
>
> I have replied this in the spring forum too...
>
> Thx Strachan, following your tread I have localized where is the
> problem buy
> not why it happen
>
> The problem is not in the listener of which will be running multiple
> instances for consuming in a concurrent way the incoming message. I
> have
> proved that commenting the code inside the onMessage method and
> putting
> Thread.sleep(60000); with this test the message were consumed
> concurrently.
>
>
> Now the problem is when I attempt to send a Stream to a queue
> inside the
> onMessage method. When more than 2 onMessage methods begin sending
> Streams
> concurrently to the same Queue the transferring stop and stay in
> pause.
>
> What could be this?
>
> Surprisingly when there are 2 Thread sending Streams to the same
> queue all
> work well.
>
> Does ActiveMQ support dispatch Streams concurrently?
> __________________
> regards...
> --
> View this message in context: http://www.nabble.com/I-can%E2%80%99t-
> receive-more-than-2-messages-simultaneously-with-
> DefaultMessageListenerContainer-tf3245123s2354.html#a9048143
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by cafe <rm...@estudiantes.uci.cu>.
Hi James: I’m more close to the problem.
Now I can send multiple streams concurrently but under 2 Mb, when I try to
send streams of size 2 Mb or bigger the transferring is paused without any
complaint.
When i create the output stream the DeliveryMode is not persistent and I
have configured the memory of the broker too to a high value and nothing
...
What could be this?
--
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9076585
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by cafe <rm...@estudiantes.uci.cu>.
James.Strachan wrote:
>
> Are you using separate sessions & producers for each thread?
>
> James
> -------
> http://radio.weblogs.com/0112098/
>
>
yes.
This is the code inside the listener of which there are many instance
running concurrently
connection = (ActiveMQConnection) connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
and this is the line where I obtain the OutputStream if serve of any help...
outputStreamMQ = connection.createOutputStream(destination, prop,
deliveryMode, priority,timeToLive);
I wonder why it work when there are 2 thread writing Stream concurrently and
why it not work when there are more than 2 thread?
could be this a bug?
--
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9069874
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by cafe <rm...@estudiantes.uci.cu>.
James, I found the solution to my problem here
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
sorry for the lost time here and thanks...
--
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9088189
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by James Strachan <ja...@gmail.com>.
On 2/19/07, cafe <rm...@estudiantes.uci.cu> wrote:
>
> I have replied this in the spring forum too...
>
> Thx Strachan, following your tread I have localized where is the problem buy
> not why it happen
>
> The problem is not in the listener of which will be running multiple
> instances for consuming in a concurrent way the incoming message. I have
> proved that commenting the code inside the onMessage method and putting
> Thread.sleep(60000); with this test the message were consumed concurrently.
>
>
> Now the problem is when I attempt to send a Stream to a queue inside the
> onMessage method. When more than 2 onMessage methods begin sending Streams
> concurrently to the same Queue the transferring stop and stay in pause.
>
> What could be this?
>
> Surprisingly when there are 2 Thread sending Streams to the same queue all
> work well.
>
> Does ActiveMQ support dispatch Streams concurrently?
Yes.
Are you using separate sessions & producers for each thread?
--
James
-------
http://radio.weblogs.com/0112098/
Re: [VOTE RESULT] I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by cafe <rm...@estudiantes.uci.cu>.
I have replied this in the spring forum too...
Thx Strachan, following your tread I have localized where is the problem buy
not why it happen
The problem is not in the listener of which will be running multiple
instances for consuming in a concurrent way the incoming message. I have
proved that commenting the code inside the onMessage method and putting
Thread.sleep(60000); with this test the message were consumed concurrently.
Now the problem is when I attempt to send a Stream to a queue inside the
onMessage method. When more than 2 onMessage methods begin sending Streams
concurrently to the same Queue the transferring stop and stay in pause.
What could be this?
Surprisingly when there are 2 Thread sending Streams to the same queue all
work well.
Does ActiveMQ support dispatch Streams concurrently?
__________________
regards...
--
View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9048143
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
Re: I can’t receive more than 2 messages simultaneously with DefaultMessageListenerContainer
Posted by James Strachan <ja...@gmail.com>.
I replied on the other thread you created on the spring forum.
These FAQ entries could be useful...
http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
How many messages are you sending? If its less than 50,000 then I'd
reduce your prefetch value to something small (1 or 2).
On 2/17/07, cafe <rm...@estudiantes.uci.cu> wrote:
>
> I have a listener (in the server side) who receive request for send files
> (Streams):
>
> public class RequestUpdateListener implements SessionAwareMessageListener {
>
> private JmsTemplate jmsTemplate;
>
> private String fileUpdateLocation;
>
> public void onMessage(Message request, Session session) throws
> JMSException {
>
>
> ActiveMQConnectionFactory connectionFactory =
> ((PooledConnectionFactory) jmsTemplate
>
> .getConnectionFactory()).getConnectionFactory();
>
> FileInputStream fis = null;
> try {
> fis = new FileInputStream(new
> File(fileUpdateLocation));
> StreamSender ss = new StreamSender();
> ss.send(connectionFactory, fis,uuid);
>
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
> .............
>
> and a class:
>
> public class StreamSender {
>
> public void send(ConnectionFactory connectionFactory,
> FileInputStream fis, String uuid) {
>
> try {
>
> connection = (ActiveMQConnection)
> connectionFactory.createConnection();
> session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TransferQueue");
>
> int deliveryMode = DeliveryMode.NON_PERSISTENT;
> int priority = 1;
> long timeToLive = 0;
>
> outputStreamMQ =
> connection.createOutputStream(destination, prop, deliveryMode, priority,
> timeToLive);
>
>
> int total = 0;
> int reads;
> byte[] array = new byte[8 * 1024];
>
> while ((reads = fis.read(array)) != -1) {
> outputStreamMQ.write(array, 0, reads);
> }
>
> JmsUtils.commitIfNecessary(session);
> ......
>
> and again the spring container:
>
> <bean id="updateContainer"
>
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
> <property name="concurrentConsumers" value="50" />
> <property name="connectionFactory" ref="jmsFactory" />
> <property name="destination" ref="requestUpdateQueue" />
> <property name="messageListener" ref="requestUpdateListener"
> />
> </bean>
>
>
>
> and in the client side I have a listener who receive the Stream:
>
> public class NotificationListener implements SessionAwareMessageListener {
>
>
> public void onMessage(Message response, Session session) throws
> JMSException {
>
> String message = "xml descriptor";
> jmsTemplate.convertAndSend(requestUpdateQueue, message, new
> MessagePostProcessor() {
> public Message postProcessMessage(Message message)
> throws JMSException {
> ....
> return message;
> }
> });
>
> StreamReceiver receiver = new StreamReceiver();
> receiver.setConnectionFactory(connectionFactory);
> receiver.receive();
>
> }
>
> public class StreamReceiver {
>
>
> public void receive() {
> ...
> try {
>
> connection = (ActiveMQConnection)
> connectionFactory.createConnection();
> session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TransferQueue");
> fisMQ = connection.createInputStream(destination,
> selector);
> fos = new FileOutputStream("...");
>
> int reads;
> int total = 0;
> byte[] array = new byte[8 * 1024];
>
> while ((reads = fisMQ.read(array)) != -1) {
> fos.write(array, 0, reads);
> total += reads;
> }
> JmsUtils.commitIfNecessary(session);
> .........
>
> Now the problem arise when more than 2 client running in different machines
> does a request simultaneously, the server (StreamSender) stop the transfer
> without any exception
>
> what could be this?
> --
> View this message in context: http://www.nabble.com/I-can%E2%80%99t-receive-more-than-2-messages-simultaneously-with-DefaultMessageListenerContainer-tf3245123s2354.html#a9020869
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>
>
--
James
-------
http://radio.weblogs.com/0112098/