You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@servicemix.apache.org by manish_goyal <ma...@infosys.com> on 2009/01/21 09:10:34 UTC

problem while sending concurrent messages to ActiveMQ Jms Queue

Hi,

I am getting error while sending concurrent messages to ActiveMQ JMS queue.
My service is sending multiple message to ActiveMQ JMS queue
concurrently.When i m sending more then 3 times concurrently then only i am
getting this error. If i send message for only two times or less then, it is
not throwing any error.
 I am getting the following error : -

ERROR - TcpTransport                   - Could not stop service:
tcp://localhost
/127.0.0.1:61616. Reason: java.lang.InterruptedException
java.lang.InterruptedException
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare
dNanos(AbstractQueuedSynchronizer.java:1165)
        at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245)
        at
org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459)
        at
org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63)
        at
org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469)
        at
org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592)

This error is thrown while closing ActiveMQ connection.

This is how i am send message to Queue:-

public void sendToLogging(LogObject ephService)
 {
        Connection connection = null;
         try {
         ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
         connection = connectionFactory.createConnection();
         connection.start();
        subject=EPHConfig.getProperty("LOG_QUEUE_NAME",
EPHConstants.COMMON_PROP_FILE_NAME);
        Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
        if (topic) {
        destination = session.createTopic(subject);
                } else {
        destination = session.createQueue(subject);
                }

        // Create the producer.
        MessageProducer producer = session.createProducer(destination);
        if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                }
                if (timeToLive != 0) {
                producer.setTimeToLive(timeToLive);
                }
  try{
                MessageFactory msgfact = MessageFactory.newInstance();
                SOAPMessage soapMessage = msgfact.createMessage();
       
soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
"UTF-8");
                SOAPPart soapPart = soapMessage.getSOAPPart();
                SOAPEnvelope envelope = soapPart.getEnvelope();
               
envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/");
                SOAPBody soapBody = (SOAPBody)envelope.getBody();
                JAXBContext jc = JAXBContext.newInstance(
"com.messages._1_0" );
          Marshaller marshaller = jc.createMarshaller();
          marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT,
Boolean.TRUE );
          marshaller.marshal(ephService,soapBody);
          soapMessage.saveChanges();
          soapMessage.writeTo(System.out);
          ByteArrayOutputStream baos = new ByteArrayOutputStream();
          soapMessage.writeTo(baos);
          TextMessage txtmsg = session.createTextMessage();
          txtmsg.setText(baos.toString());
                producer.send(txtmsg);
                if (transacted) {
                        session.commit();
                        }
  }
                catch(Exception e){
  sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage());
          }
          // Use the ActiveMQConnection interface to dump the connection
                        ActiveMQConnection actmqcon =
(ActiveMQConnection)connection;
          }
          catch (Exception e) {
          sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage());
          }
          finally {
                        try {
          connection.close();
          }
          catch (Exception exp) {
          sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage());
          }
                  }
 }

Please give any pointers to solve this problem.

Thanks.

Manish
-- 
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21578653p21578653.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.


Re: problem while sending concurrent messages to ActiveMQ Jms Queue

Posted by Ashwin Karpe <as...@progress.com>.
Hi Manish,

Can you use a PooledActiveMQConnectionFactory from Jencks and use pooled
connections rather than the ActiveMQConnectionFactory. Trying to use a
single connection object for concurrent message writing to a JMS Queue is
not a good idea.

Cheers,

Ashwin...



manish_goyal wrote:
> 
> Hi,
> 
> I am getting error while sending concurrent messages to ActiveMQ JMS
> queue. My service is sending multiple message to ActiveMQ JMS queue
> concurrently.When i m sending more then 3 times concurrently then only i
> am getting this error. If i send message for only two times or less then,
> it is not throwing any error.
>  I am getting the following error : -
> 
> ERROR - TcpTransport                   - Could not stop service:
> tcp://localhost
> /127.0.0.1:61616. Reason: java.lang.InterruptedException
> java.lang.InterruptedException
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare
> dNanos(AbstractQueuedSynchronizer.java:1165)
>         at
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459)
>         at
> org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469)
>         at
> org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113)
>         at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87)
>         at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
>         at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
>         at
> org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120)
>         at
> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
>         at
> org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592)
> 
> This error is thrown while closing ActiveMQ connection.
> 
> This is how i am send message to Queue:-
> 
> public void sendToLogging(LogObject ephService)
>  {
>         Connection connection = null;
>          try {
>          ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(user, password, url);
>          connection = connectionFactory.createConnection();
>          connection.start();
>         subject=EPHConfig.getProperty("LOG_QUEUE_NAME",
> EPHConstants.COMMON_PROP_FILE_NAME);
>         Session session = connection.createSession(transacted,
> Session.AUTO_ACKNOWLEDGE);
>         if (topic) {
>         destination = session.createTopic(subject);
>                 } else {
>         destination = session.createQueue(subject);
>                 }
> 
>         // Create the producer.
>         MessageProducer producer = session.createProducer(destination);
>         if (persistent) {
>                 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                 } else {
>                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>                 }
>                 if (timeToLive != 0) {
>                 producer.setTimeToLive(timeToLive);
>                 }
>   try{
>                 MessageFactory msgfact = MessageFactory.newInstance();
>                 SOAPMessage soapMessage = msgfact.createMessage();
>        
> soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
> "UTF-8");
>                 SOAPPart soapPart = soapMessage.getSOAPPart();
>                 SOAPEnvelope envelope = soapPart.getEnvelope();
>                
> envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/");
>                 SOAPBody soapBody = (SOAPBody)envelope.getBody();
>                 JAXBContext jc = JAXBContext.newInstance(
> "com.messages._1_0" );
>           Marshaller marshaller = jc.createMarshaller();
>           marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT,
> Boolean.TRUE );
>           marshaller.marshal(ephService,soapBody);
>           soapMessage.saveChanges();
>           soapMessage.writeTo(System.out);
>           ByteArrayOutputStream baos = new ByteArrayOutputStream();
>           soapMessage.writeTo(baos);
>           TextMessage txtmsg = session.createTextMessage();
>           txtmsg.setText(baos.toString());
>                 producer.send(txtmsg);
>                 if (transacted) {
>                         session.commit();
>                         }
>   }
>                 catch(Exception e){
>   sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage());
>           }
>           // Use the ActiveMQConnection interface to dump the connection
>                         ActiveMQConnection actmqcon =
> (ActiveMQConnection)connection;
>           }
>           catch (Exception e) {
>           sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage());
>           }
>           finally {
>                         try {
>           connection.close();
>           }
>           catch (Exception exp) {
>           sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage());
>           }
>                   }
>  }
> 
> Please give any pointers to solve this problem.
> 
> Thanks.
> 
> Manish
> 


-----
--- 
Ashwin Karpe, Principal Consultant, PS - Opensource Center of Competence 
Progress Software Corporation
14 Oak Park Drive
Bedford, MA 01730
--- 
+1-972-304-9084 (Office) 
+1-972-971-1700 (Mobile) 
---- 
Blog: http://opensourceknowledge.blogspot.com/


-- 
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21578653p21615385.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.