You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Marcel Hillmann (JIRA)" <ji...@apache.org> on 2017/12/04 11:02:00 UTC

[jira] [Commented] (OPENWIRE-16) ClassCastException: ConnectionAdvisoryTopic

    [ https://issues.apache.org/jira/browse/OPENWIRE-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276621#comment-16276621 ] 

Marcel Hillmann commented on OPENWIRE-16:
-----------------------------------------

import java.util.ArrayList;
import java.util.List;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.junit.Test;

public class QueueObserverTest implements ExceptionListener, MessageListener {


	@Test
	public void register() throws Exception {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://localhost:61616");
		ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection("admin",
				"admin");
		connection.setExceptionListener(this);
		connection.start();
//		connection.setClientID(UUID.randomUUID().toString());
		ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		session.setMessageListener(this);
		
		final Queue queue = new ActiveMQQueue("loggingQueue");
		final Topic topic = new ActiveMQTopic("loggingQueue");
		
		
		final List<Topic> topics = new ArrayList<>();
		topics.add(new ActiveMQTopic("ActiveMQ.Advisory.Queue"));
		topics.add(new ActiveMQTopic("ActiveMQ.Advisory.Topic"));
		topics.add(AdvisorySupport.getConnectionAdvisoryTopic());
//
		topics.add(AdvisorySupport.getProducerAdvisoryTopic(queue));
		topics.add(AdvisorySupport.getConsumerAdvisoryTopic(queue));
		topics.add(AdvisorySupport.getNoConsumersAdvisoryTopic(queue));
		topics.add(AdvisorySupport.getMessageDeliveredAdvisoryTopic(queue));
//		
		topics.add(AdvisorySupport.getProducerAdvisoryTopic(topic));
		topics.add(AdvisorySupport.getConsumerAdvisoryTopic(topic));
		topics.add(AdvisorySupport.getNoConsumersAdvisoryTopic(topic));
		topics.add(AdvisorySupport.getMessageDeliveredAdvisoryTopic(topic));
		
		topics.forEach((_topic) -> {
			try {
				consume(session, _topic);
			}
			catch (JMSException e) {
				e.printStackTrace();
			}
		});
		connection.start();
		while (connection.isStarted() && !connection.isClosed()) {
			Thread.sleep(1000);
		}

	}

	
	private final void consume(Session session, Topic topic) throws JMSException{
		System.out.println(topic);
		MessageConsumer consumer = session.createConsumer(topic);
		consumer.setMessageListener(this);
	}
	@Override
	public void onException(JMSException exception) {
		exception.printStackTrace();
	}

	@Override
	public void onMessage(Message message) {
		if (message instanceof ActiveMQMessage) {
			ActiveMQMessage aMsg = (ActiveMQMessage) message;
			DataStructure dataStructure = aMsg.getDataStructure();
			if(dataStructure instanceof ConnectionInfo){
				ConnectionInfo connectionInfo = (ConnectionInfo) dataStructure;
				System.out.println("Connect: "+connectionInfo.toString());
			}else if(dataStructure instanceof RemoveInfo){
				RemoveInfo removeInfo = (RemoveInfo) dataStructure;
				System.out.println("Remove : "+removeInfo.toString());
			}else if(dataStructure instanceof ProducerInfo){
				ProducerInfo producerInfo = (ProducerInfo) dataStructure;
				System.out.println("Produce: "+producerInfo.toString());
			}else if( message instanceof org.apache.activemq.command.Message){
				org.apache.activemq.command.Message aMessage  = (org.apache.activemq.command.Message) dataStructure;
				System.out.println("Message: "+aMessage.toString());
			}
		}
		else {
			System.out.println("QueueObserverTest.onMessage(): "+message.getClass().getSimpleName());
		}

	}
}

> ClassCastException: ConnectionAdvisoryTopic
> -------------------------------------------
>
>                 Key: OPENWIRE-16
>                 URL: https://issues.apache.org/jira/browse/OPENWIRE-16
>             Project: ActiveMQ OpenWire
>          Issue Type: Bug
>         Environment: Windows 7
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
>            Reporter: Marcel Hillmann
>
> Hi,
> If I'm try to consume the connection advisory topic it raise an ClassCastException.
> Is this an known Isssue?
> How can I fixit?
> {code:java}
> Caused by: java.io.IOException: org.apache.activemq.command.BrokerId cannot be cast to org.apache.activemq.command.ConsumerId
> 	at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)
> 	... 7 more
> Caused by: java.lang.ClassCastException: org.apache.activemq.command.BrokerId cannot be cast to org.apache.activemq.command.ConsumerId
> 	at org.apache.activemq.openwire.v12.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75)
> 	at org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66)
> 	at org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:456)
> 	at org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:125)
> 	at org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71)
> 	at org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:365)
> 	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:278)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)