You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@servicemix.apache.org by manish_goyal <ma...@infosys.com> on 2009/01/21 09:10:34 UTC
problem while sending concurrent messages to ActiveMQ Jms Queue
Hi,
I am getting error while sending concurrent messages to ActiveMQ JMS queue.
My service is sending multiple message to ActiveMQ JMS queue
concurrently.When i m sending more then 3 times concurrently then only i am
getting this error. If i send message for only two times or less then, it is
not throwing any error.
I am getting the following error : -
ERROR - TcpTransport - Could not stop service:
tcp://localhost
/127.0.0.1:61616. Reason: java.lang.InterruptedException
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare
dNanos(AbstractQueuedSynchronizer.java:1165)
at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245)
at
org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459)
at
org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63)
at
org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469)
at
org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113)
at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
at
org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87)
at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
at
org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120)
at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
at
org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592)
This error is thrown while closing ActiveMQ connection.
This is how i am send message to Queue:-
public void sendToLogging(LogObject ephService)
{
Connection connection = null;
try {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
subject=EPHConfig.getProperty("LOG_QUEUE_NAME",
EPHConstants.COMMON_PROP_FILE_NAME);
Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
try{
MessageFactory msgfact = MessageFactory.newInstance();
SOAPMessage soapMessage = msgfact.createMessage();
soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
"UTF-8");
SOAPPart soapPart = soapMessage.getSOAPPart();
SOAPEnvelope envelope = soapPart.getEnvelope();
envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/");
SOAPBody soapBody = (SOAPBody)envelope.getBody();
JAXBContext jc = JAXBContext.newInstance(
"com.messages._1_0" );
Marshaller marshaller = jc.createMarshaller();
marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT,
Boolean.TRUE );
marshaller.marshal(ephService,soapBody);
soapMessage.saveChanges();
soapMessage.writeTo(System.out);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
soapMessage.writeTo(baos);
TextMessage txtmsg = session.createTextMessage();
txtmsg.setText(baos.toString());
producer.send(txtmsg);
if (transacted) {
session.commit();
}
}
catch(Exception e){
sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage());
}
// Use the ActiveMQConnection interface to dump the connection
ActiveMQConnection actmqcon =
(ActiveMQConnection)connection;
}
catch (Exception e) {
sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage());
}
finally {
try {
connection.close();
}
catch (Exception exp) {
sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage());
}
}
}
Please give any pointers to solve this problem.
Thanks.
Manish
--
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21578653p21578653.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.
Re: problem while sending concurrent messages to ActiveMQ Jms Queue
Posted by Ashwin Karpe <as...@progress.com>.
Hi Manish,
Can you use a PooledActiveMQConnectionFactory from Jencks and use pooled
connections rather than the ActiveMQConnectionFactory. Trying to use a
single connection object for concurrent message writing to a JMS Queue is
not a good idea.
Cheers,
Ashwin...
manish_goyal wrote:
>
> Hi,
>
> I am getting error while sending concurrent messages to ActiveMQ JMS
> queue. My service is sending multiple message to ActiveMQ JMS queue
> concurrently.When i m sending more then 3 times concurrently then only i
> am getting this error. If i send message for only two times or less then,
> it is not throwing any error.
> I am getting the following error : -
>
> ERROR - TcpTransport - Could not stop service:
> tcp://localhost
> /127.0.0.1:61616. Reason: java.lang.InterruptedException
> java.lang.InterruptedException
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare
> dNanos(AbstractQueuedSynchronizer.java:1165)
> at
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245)
> at
> org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459)
> at
> org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63)
> at
> org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469)
> at
> org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113)
> at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> at
> org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87)
> at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> at
> org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120)
> at
> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
> at
> org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592)
>
> This error is thrown while closing ActiveMQ connection.
>
> This is how i am send message to Queue:-
>
> public void sendToLogging(LogObject ephService)
> {
> Connection connection = null;
> try {
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(user, password, url);
> connection = connectionFactory.createConnection();
> connection.start();
> subject=EPHConfig.getProperty("LOG_QUEUE_NAME",
> EPHConstants.COMMON_PROP_FILE_NAME);
> Session session = connection.createSession(transacted,
> Session.AUTO_ACKNOWLEDGE);
> if (topic) {
> destination = session.createTopic(subject);
> } else {
> destination = session.createQueue(subject);
> }
>
> // Create the producer.
> MessageProducer producer = session.createProducer(destination);
> if (persistent) {
> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> } else {
> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> }
> if (timeToLive != 0) {
> producer.setTimeToLive(timeToLive);
> }
> try{
> MessageFactory msgfact = MessageFactory.newInstance();
> SOAPMessage soapMessage = msgfact.createMessage();
>
> soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
> "UTF-8");
> SOAPPart soapPart = soapMessage.getSOAPPart();
> SOAPEnvelope envelope = soapPart.getEnvelope();
>
> envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/");
> SOAPBody soapBody = (SOAPBody)envelope.getBody();
> JAXBContext jc = JAXBContext.newInstance(
> "com.messages._1_0" );
> Marshaller marshaller = jc.createMarshaller();
> marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT,
> Boolean.TRUE );
> marshaller.marshal(ephService,soapBody);
> soapMessage.saveChanges();
> soapMessage.writeTo(System.out);
> ByteArrayOutputStream baos = new ByteArrayOutputStream();
> soapMessage.writeTo(baos);
> TextMessage txtmsg = session.createTextMessage();
> txtmsg.setText(baos.toString());
> producer.send(txtmsg);
> if (transacted) {
> session.commit();
> }
> }
> catch(Exception e){
> sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage());
> }
> // Use the ActiveMQConnection interface to dump the connection
> ActiveMQConnection actmqcon =
> (ActiveMQConnection)connection;
> }
> catch (Exception e) {
> sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage());
> }
> finally {
> try {
> connection.close();
> }
> catch (Exception exp) {
> sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage());
> }
> }
> }
>
> Please give any pointers to solve this problem.
>
> Thanks.
>
> Manish
>
-----
---
Ashwin Karpe, Principal Consultant, PS - Opensource Center of Competence
Progress Software Corporation
14 Oak Park Drive
Bedford, MA 01730
---
+1-972-304-9084 (Office)
+1-972-971-1700 (Mobile)
----
Blog: http://opensourceknowledge.blogspot.com/
--
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21578653p21615385.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.