You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by manish_goyal <ma...@infosys.com> on 2009/01/21 07:46:21 UTC

problem while sending concurrent messages to ActiveMQ Jms Queue

Hi,

I am getting error while sending concurrent messages to ActiveMQ JMS queue.
My service is sending multiple message to ActiveMQ JMS queue
concurrently.When i m sending more then 3 times concurrently then only i am
getting this error. If i send message for only two times or less then, it is
not throwing any error.
 I am getting the following error : -

ERROR - TcpTransport                   - Could not stop service:
tcp://localhost
/127.0.0.1:61616. Reason: java.lang.InterruptedException
java.lang.InterruptedException
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare
dNanos(AbstractQueuedSynchronizer.java:1165)
        at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245)
        at
org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459)
        at
org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63)
        at
org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469)
        at
org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at
org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592)

This error is thrown while closing ActiveMQ connection.

This is how i am send message to Queue:-

public void sendToLogging(LogObject ephService)
 {
	Connection connection = null;
	 try {
	 ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
	 connection = connectionFactory.createConnection();
	 connection.start();
	subject=EPHConfig.getProperty("LOG_QUEUE_NAME",
EPHConstants.COMMON_PROP_FILE_NAME);
        Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
	if (topic) {
	destination = session.createTopic(subject);
		} else {
	destination = session.createQueue(subject);
		}

	// Create the producer.
	MessageProducer producer = session.createProducer(destination);
	if (persistent) {
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		} else {
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		}
		if (timeToLive != 0) {
		producer.setTimeToLive(timeToLive);
		}
 		try{
		MessageFactory msgfact = MessageFactory.newInstance();
		SOAPMessage soapMessage = msgfact.createMessage();
	soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING,
"UTF-8");
		SOAPPart soapPart = soapMessage.getSOAPPart();
		SOAPEnvelope envelope = soapPart.getEnvelope();
		envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/");
		SOAPBody soapBody = (SOAPBody)envelope.getBody();
		JAXBContext jc = JAXBContext.newInstance( "com.messages._1_0" );
	 	Marshaller marshaller = jc.createMarshaller();
	 	marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE );
	 	marshaller.marshal(ephService,soapBody);
	 	soapMessage.saveChanges();
	 	soapMessage.writeTo(System.out);
	 	ByteArrayOutputStream baos = new ByteArrayOutputStream();
	 	soapMessage.writeTo(baos);
	 	TextMessage txtmsg = session.createTextMessage();
	 	txtmsg.setText(baos.toString());
		producer.send(txtmsg);
		if (transacted) {
			session.commit();
			}
 		}
		catch(Exception e){
 			sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage());
	 		}
	 		// Use the ActiveMQConnection interface to dump the connection
			ActiveMQConnection actmqcon = (ActiveMQConnection)connection;
	 	}
	 	catch (Exception e) {
	 		sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage());
	 	}
	 	finally {
			try {
	 			connection.close();
	 			}
	 		catch (Exception exp) {
	 			sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage());
	 			}
		 	}
 }

Please give any pointers to solve this problem.

Thanks.

Manish
-- 
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21577948p21577948.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: problem while sending concurrent messages to ActiveMQ Jms Queue

Posted by manish_goyal <ma...@infosys.com>.
Hi,

Can anyone please reply :-):-)

Thanks,
Mansih
-- 
View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21577948p21598333.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.