You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "jai.mathaiyan" <ja...@gmail.com> on 2011/05/09 19:59:40 UTC

Subscriber throws errors and dies when using multiple openwire JMS client

Hi, 

 I have been playing around with activemq for some time. Now I am facing a
strange issue. I am using 5.3.1 version. 
I have a broker running and a producer within the same JVM. If I have one
JMS client (openwire) , everything works fine. 

Whenever I launch multiple instances of JMS client (as separate application
from eclipse) subscribing to the same topic, I see the following exceptions
on all the subscribers at various instances.

I see these errors in the client's onException  method. The broker and
producer continue to run normally. There is no errors seen on the broker
side. 
Pls let me know if anyone has faced a similar problem or how to go about
debugging it ? 

The only customizations on the broker is adding Authorization Plugin. I
researched a bit on the error and the following post that says that the
problem is fixed in 5.1 version.
https://issues.apache.org/jira/browse/AMQ-1169?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel#issue-tabs

javax.jms.JMSException: Unexpected error occured 
        at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) 
        at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1803) 
        at
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1820) 
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126) 
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99) 
        at
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160) 
        at
org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:254) 
        at
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:97) 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:195) 
        at java.lang.Thread.run(Thread.java:736) 
Caused by: java.io.IOException: Unexpected error occured 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:193) 
        ... 1 more 
Caused by: java.lang.ClassCastException:
org.apache.activemq.command.BrokerId in 
compatible with org.apache.activemq.command.ConsumerId 
        at
org.apache.activemq.openwire.v5.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75) 
        at
org.apache.activemq.openwire.v5.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66) 
        at
org.apache.activemq.openwire.v5.ActiveMQTextMessageMarshaller.tightUnmarshal(ActiveMQTextMessageMarshaller.java:66) 
        at
org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:453) 
        at
org.apache.activemq.openwire.v5.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:126) 
        at
org.apache.activemq.openwire.v5.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71) 
        at
org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:362) 
        at
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:276) 
        at
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211) 
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) 
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186) 


JMS Client code--

package com.cisco.psbu.vs.ism.test.client.events;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

public class JMSClient implements ExceptionListener{

	private final String DEFAULT_USER = "jai";
	private final String DEFAULT_PASSWORD = "jai";
	private final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
	private ActiveMQConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private Destination destination;
	private boolean transacted = false;
	private boolean isQueue = false;
	private String destinationName;
	private MessageConsumer consumer ;
	private MessageProducer producer;
	private String hostname = "localhost";
	private static int count = 0;
	
	public JMSClient(boolean isQ,String destination){
		this(isQ, destination, "localhost");
	}
	
	public JMSClient(boolean isQ,String destination, String hostname){
		try{
			this.isQueue = isQ;
			this.destinationName = destination;
			this.hostname = hostname;
			print("begin");
			setUp();
			print("setup complete");

			//print("message Size in bytes: " + getMessageSize());
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void setUp() throws JMSException, InterruptedException {
		connectionFactory = new ActiveMQConnectionFactory(
				DEFAULT_USER,
				DEFAULT_PASSWORD,
				"tcp://"+ hostname + ":61616");
		connection =
connectionFactory.createConnection("abc","a5405e08-701f-4631-9b69-2476cc49a87b");
		connection.setExceptionListener(this);
		connection.start();
		
		session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
		//destination = session.createTopic("c.c.p.v.ism.>");
		if(isQueue){
			destination = session.createQueue(destinationName);
		}else{
			destination = session.createTopic(destinationName);
			createConsumerAndReceiveAMessage();
			print("create topic complete. Waiting for messages...");
		}
		createProducer();
	}

	private void createConsumerAndReceiveAMessage() throws JMSException,
InterruptedException {		
		consumer = session.createConsumer(destination);
		MyConsumer myConsumer = new MyConsumer();
		connection.setExceptionListener(myConsumer);
		consumer.setMessageListener(myConsumer);
	}

	private void createProducer() throws JMSException{
		producer = session.createProducer(destination);
 	}

	public void sendMessage(TextMessage message, int messageCount, long
sleepTime){
		try{
			sendLoop(message, messageCount, sleepTime);
		}catch (Exception e) {
			e.printStackTrace();
		}
	}


	protected void sendLoop(TextMessage message, int messageCount, long
sleepTime) 
	throws Exception {

		for (int i = 0; i < messageCount || messageCount == 0; i++) {

			String msg = message.getText();
			if (msg.length() > 50) {
				msg = msg.substring(0, 50) + "...";
			}
			System.out.println("Sending message: " + msg);
			producer.send(message);
			Thread.sleep(sleepTime);
		}

	}

	public void tearDown(){
		try {
				connection.close();
		} catch (Throwable ignore) {
			ignore.printStackTrace();
		}
	}

	private class MyConsumer implements MessageListener, ExceptionListener {

		synchronized public void onException(JMSException ex) {
			print("JMS Exception occured.  Shutting down client.");
			ex.printStackTrace();
			System.exit(1);
			
		}

		public void onMessage(Message message) {
			if (message instanceof TextMessage) {
				count ++;
				TextMessage textMessage = (TextMessage) message;

				System.out.println(" total message size in bytes :" + totalSize);
				try{
					print("Received message: " + textMessage);
				}catch(Exception ex){
					print("Received message: " + textMessage);	
				}
				//textMessage.getText();
				//textMessage.getStringProperty("msgOpCode");

			} else  {
				print("Received: " + message);
			}
		}
	}

	public TextMessage getTextMessage(String payload) throws JMSException{
		return session.createTextMessage(payload);

	}
	
	
	public void print(Object text){
		System.out.println(text.toString());
	}


	public static void main(String[] args) throws Exception {
		JMSClient client = new JMSClient(false,"c.c.p.v.ism.device");
		//TextMessage message = client.getTextMessage("This is a test");
		//client.sendMessage(message,1,0);
		//Thread.sleep(100000);
		//client.tearDown();
	}

	@Override
	public void onException(JMSException exception) {
		System.out.println("Exception detected..");
		exception.printStackTrace();
		
	}


}
 

--
View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3509914.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Subscriber throws errors and dies when using multiple openwire JMS client

Posted by Gary Tully <ga...@gmail.com>.
not sure, but 5.4.2 is closer to 5.3.1 w.r.t dependencies so maybe try that.

On 12 May 2011 21:11, jai.mathaiyan <ja...@gmail.com> wrote:
> Thanks Gary for the response.
>
> I managed to test it with 5.5.0 and the problem did not seem to happen.
> Earlier I used to get the exception on my second listener instance almost
> immediately (after about 20 messages). Now I tested with upto 5 listeners
> and 100 messages. All listeners were healthy and received all messages.
>
> What could have been the problem ?
> We are already deep into the development of the project (which uses spring
> and hibernate) and would like to avoid upgrading to 5.5.0 at this stage.
> 5.5.0 requires updates to slf4j, logback and bunch of other dependent jar
> files. If this is a simple fix, can it be available on the 5.3.1 branch ?
>
> Thanks
> Jai
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3518560.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://blog.garytully.com
http://fusesource.com

Re: Subscriber throws errors and dies when using multiple openwire JMS client

Posted by "jai.mathaiyan" <ja...@gmail.com>.
Thanks Gary for the response.

I managed to test it with 5.5.0 and the problem did not seem to happen.
Earlier I used to get the exception on my second listener instance almost
immediately (after about 20 messages). Now I tested with upto 5 listeners
and 100 messages. All listeners were healthy and received all messages.

What could have been the problem ? 
We are already deep into the development of the project (which uses spring
and hibernate) and would like to avoid upgrading to 5.5.0 at this stage.
5.5.0 requires updates to slf4j, logback and bunch of other dependent jar
files. If this is a simple fix, can it be available on the 5.3.1 branch ?

Thanks
Jai


--
View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3518560.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Subscriber throws errors and dies when using multiple openwire JMS client

Posted by Gary Tully <ga...@gmail.com>.
Can you see if you can recreate with the current 5.5 version?

On 9 May 2011 18:59, jai.mathaiyan <ja...@gmail.com> wrote:
> Hi,
>
>  I have been playing around with activemq for some time. Now I am facing a
> strange issue. I am using 5.3.1 version.
> I have a broker running and a producer within the same JVM. If I have one
> JMS client (openwire) , everything works fine.
>
> Whenever I launch multiple instances of JMS client (as separate application
> from eclipse) subscribing to the same topic, I see the following exceptions
> on all the subscribers at various instances.
>
> I see these errors in the client's onException  method. The broker and
> producer continue to run normally. There is no errors seen on the broker
> side.
> Pls let me know if anyone has faced a similar problem or how to go about
> debugging it ?
>
> The only customizations on the broker is adding Authorization Plugin. I
> researched a bit on the error and the following post that says that the
> problem is fixed in 5.1 version.
> https://issues.apache.org/jira/browse/AMQ-1169?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel#issue-tabs
>
> javax.jms.JMSException: Unexpected error occured
>         at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>         at
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1803)
>         at
> org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1820)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:99)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160)
>         at
> org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:254)
>         at
> org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:97)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:195)
>         at java.lang.Thread.run(Thread.java:736)
> Caused by: java.io.IOException: Unexpected error occured
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:193)
>         ... 1 more
> Caused by: java.lang.ClassCastException:
> org.apache.activemq.command.BrokerId in
> compatible with org.apache.activemq.command.ConsumerId
>         at
> org.apache.activemq.openwire.v5.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75)
>         at
> org.apache.activemq.openwire.v5.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66)
>         at
> org.apache.activemq.openwire.v5.ActiveMQTextMessageMarshaller.tightUnmarshal(ActiveMQTextMessageMarshaller.java:66)
>         at
> org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:453)
>         at
> org.apache.activemq.openwire.v5.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:126)
>         at
> org.apache.activemq.openwire.v5.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71)
>         at
> org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:362)
>         at
> org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:276)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186)
>
>
> JMS Client code--
>
> package com.cisco.psbu.vs.ism.test.client.events;
>
> import java.io.ByteArrayOutputStream;
> import java.io.ObjectOutputStream;
>
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.ExceptionListener;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.command.ActiveMQTextMessage;
>
> public class JMSClient implements ExceptionListener{
>
>        private final String DEFAULT_USER = "jai";
>        private final String DEFAULT_PASSWORD = "jai";
>        private final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
>        private ActiveMQConnectionFactory connectionFactory;
>        private Connection connection;
>        private Session session;
>        private Destination destination;
>        private boolean transacted = false;
>        private boolean isQueue = false;
>        private String destinationName;
>        private MessageConsumer consumer ;
>        private MessageProducer producer;
>        private String hostname = "localhost";
>        private static int count = 0;
>
>        public JMSClient(boolean isQ,String destination){
>                this(isQ, destination, "localhost");
>        }
>
>        public JMSClient(boolean isQ,String destination, String hostname){
>                try{
>                        this.isQueue = isQ;
>                        this.destinationName = destination;
>                        this.hostname = hostname;
>                        print("begin");
>                        setUp();
>                        print("setup complete");
>
>                        //print("message Size in bytes: " + getMessageSize());
>                }catch (Exception e) {
>                        e.printStackTrace();
>                }
>        }
>
>        private void setUp() throws JMSException, InterruptedException {
>                connectionFactory = new ActiveMQConnectionFactory(
>                                DEFAULT_USER,
>                                DEFAULT_PASSWORD,
>                                "tcp://"+ hostname + ":61616");
>                connection =
> connectionFactory.createConnection("abc","a5405e08-701f-4631-9b69-2476cc49a87b");
>                connection.setExceptionListener(this);
>                connection.start();
>
>                session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
>                //destination = session.createTopic("c.c.p.v.ism.>");
>                if(isQueue){
>                        destination = session.createQueue(destinationName);
>                }else{
>                        destination = session.createTopic(destinationName);
>                        createConsumerAndReceiveAMessage();
>                        print("create topic complete. Waiting for messages...");
>                }
>                createProducer();
>        }
>
>        private void createConsumerAndReceiveAMessage() throws JMSException,
> InterruptedException {
>                consumer = session.createConsumer(destination);
>                MyConsumer myConsumer = new MyConsumer();
>                connection.setExceptionListener(myConsumer);
>                consumer.setMessageListener(myConsumer);
>        }
>
>        private void createProducer() throws JMSException{
>                producer = session.createProducer(destination);
>        }
>
>        public void sendMessage(TextMessage message, int messageCount, long
> sleepTime){
>                try{
>                        sendLoop(message, messageCount, sleepTime);
>                }catch (Exception e) {
>                        e.printStackTrace();
>                }
>        }
>
>
>        protected void sendLoop(TextMessage message, int messageCount, long
> sleepTime)
>        throws Exception {
>
>                for (int i = 0; i < messageCount || messageCount == 0; i++) {
>
>                        String msg = message.getText();
>                        if (msg.length() > 50) {
>                                msg = msg.substring(0, 50) + "...";
>                        }
>                        System.out.println("Sending message: " + msg);
>                        producer.send(message);
>                        Thread.sleep(sleepTime);
>                }
>
>        }
>
>        public void tearDown(){
>                try {
>                                connection.close();
>                } catch (Throwable ignore) {
>                        ignore.printStackTrace();
>                }
>        }
>
>        private class MyConsumer implements MessageListener, ExceptionListener {
>
>                synchronized public void onException(JMSException ex) {
>                        print("JMS Exception occured.  Shutting down client.");
>                        ex.printStackTrace();
>                        System.exit(1);
>
>                }
>
>                public void onMessage(Message message) {
>                        if (message instanceof TextMessage) {
>                                count ++;
>                                TextMessage textMessage = (TextMessage) message;
>
>                                System.out.println(" total message size in bytes :" + totalSize);
>                                try{
>                                        print("Received message: " + textMessage);
>                                }catch(Exception ex){
>                                        print("Received message: " + textMessage);
>                                }
>                                //textMessage.getText();
>                                //textMessage.getStringProperty("msgOpCode");
>
>                        } else  {
>                                print("Received: " + message);
>                        }
>                }
>        }
>
>        public TextMessage getTextMessage(String payload) throws JMSException{
>                return session.createTextMessage(payload);
>
>        }
>
>
>        public void print(Object text){
>                System.out.println(text.toString());
>        }
>
>
>        public static void main(String[] args) throws Exception {
>                JMSClient client = new JMSClient(false,"c.c.p.v.ism.device");
>                //TextMessage message = client.getTextMessage("This is a test");
>                //client.sendMessage(message,1,0);
>                //Thread.sleep(100000);
>                //client.tearDown();
>        }
>
>        @Override
>        public void onException(JMSException exception) {
>                System.out.println("Exception detected..");
>                exception.printStackTrace();
>
>        }
>
>
> }
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Subscriber-throws-errors-and-dies-when-using-multiple-openwire-JMS-client-tp3509914p3509914.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://blog.garytully.com
http://fusesource.com