You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by miniman <be...@db.com> on 2007/01/16 19:32:08 UTC

Trouble with posting messages to topic

All i have created an MDB that sits inside jboss and listens to the
queue/testQueue when a message gets posted to that queue it then calls
another method which posts the message to a topic. When i do this i get an
error that comes back.

code is 

in the onMessage   method

onMessage(Message message){
  System.out.println("Received: " + message);
  PublishToTopic(message);
}

Then PublishToTopic looks like

	private void PublishToTopic(Message msg){
		
		try {
			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
			connection = factory.createConnection();    	
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			topic = session.createTopic("topic/testTopic");
			//control = session.createTopic("topictest.control");
			
			publisher = session.createProducer(topic);
			publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
			//session.createConsumer(control).setMessageListener(this);
			connection.start();

			//request shutdown
			publisher.send(msg);

			connection.stop();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}     
	}


The error i get is

18:26:50,523 INFO  [PrefetchSubscription] Could not correlate acknowledgment
with dispatched message
: MessageAck {commandId = 6, responseRequired = false, ackType = 2,
consumerId = ID:test1-489
6-1168971769034-3:5:-1:2, firstMessageId =
ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
queue://queue/testQueue, transactio
nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}

Am i doing something wrong ?


-- 
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Trouble with posting messages to topic

Posted by miniman <be...@db.com>.
Well i can post you my code which is very simple. I have just intergrated
activemq with jboss and then setup an MDB bean which listens to a queue and
then once it gets the msg from the queue it trys to post it to a topic.



code is :


package com.db.abmonitor.mdb;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.logging.Logger;

import javax.ejb.CreateException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.NamingException;
import java.rmi.RemoteException;

/**
 * @author  mailto:michael.gaffney@panacya.com Michael Gaffney  
 */

public class AutobahnMonitorMDB implements MessageDrivenBean,
MessageListener {

   // private static final String SENDER_NAME = "java:comp/env/ejb/Sender";
	private static Logger logger = 
Logger.getLogger(PublishMsgToTopic.class.getName());
    private MessageDrivenContext context;
    private PublishMsgToTopic processMSG;
    private int counter;
    private boolean verbose = true;
    private boolean transacted = false;
    
    
	private Connection connection;
	private Session session;
	private String url = "tcp://localhost:61616";
	private Topic topic;
	private MessageProducer publisher;

    public AutobahnMonitorMDB() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor MDB.");
        }
    }

    public void onMessage(Message message)  {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.onMessage");
        }
			//if (message instanceof ObjectMessage) {

					//System.out.println("Received: " + message);

        
		try {
			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
			connection = factory.createConnection();    	
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			topic = session.createTopic("TOOL.TOPICTEST");
			//control = session.createTopic("topictest.control");
			
			publisher = session.createProducer(topic);
			publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
			//session.createConsumer(control).setMessageListener(this);
			connection.start();

			//request shutdown
			publisher.send(message);

			//connection.stop();
			//connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}   
        
        
        
				//processMSG = new PublishMsgToTopic(message);
				//processMSG.run();
			//} else {
			//	if (verbose) {
			//		System.out.println("Received: " + message);
			//	}
			//}
		/* Used for Hanlde message for Remote / Internal EJB
        try {
            handleMessage(message);
        } catch (JMSException e) {
            logger.error(e.toString(), e);
        } catch (NamingException e) {
            logger.error(e.toString(), e);
        } catch (RemoteException e) {
            logger.error(e.toString(), e);
        } catch (CreateException e) {
            logger.error(e.toString(), e);
        //} catch (SenderException e) {
        //    logger.error(e.toString(), e);
        }
        */
    }

	
    public void ejbRemove() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.ejbRemove");
        }
    }

    public void setMessageDrivenContext(MessageDrivenContext
messageDrivenContext) {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.setMessageDrivenContext");
        }
        context = messageDrivenContext;
    }

    public void ejbCreate() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.ejbCreate");
        }
    }
    
    /* Old Routine used to EJB Sender Remote
    private void handleMessage(Message message) throws JMSException,
NamingException, RemoteException, CreateException {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            if (logger.isInfoEnabled()) {
                logger.info("Message received: " + textMessage.getText());
            }
           // send(textMessage.getText());
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Unknown message type received: " +
message.toString());
            }
           // send("Unknown message type: " + message.toString());
        }
    }
    */
    
	private void PublishToTopic(Message msg){
		//System.out.println("Received: " + msg);

		try {
			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
			connection = factory.createConnection();    	
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			topic = session.createTopic("TOOL.TOPICTEST");
			//control = session.createTopic("topictest.control");
			
			publisher = session.createProducer(topic);
			publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
			//session.createConsumer(control).setMessageListener(this);
			connection.start();

			//request shutdown
			publisher.send(msg);

			//connection.stop();
			//connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}     
	}
}


James.Strachan wrote:
> 
> On 1/17/07, miniman <be...@db.com> wrote:
>>
>> I am using apache-activemq-4.1.0-incubator
>>
>> Did these get fixed ?
> 
> Yes - though I'm not sure why you are getting your error. Any chance
> you could create a test case for us then we can fix it?
> 
> -- 
> 
> James
> -------
> http://radio.weblogs.com/0112098/
> 
> 

-- 
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8410729
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Trouble with posting messages to topic

Posted by James Strachan <ja...@gmail.com>.
On 1/17/07, miniman <be...@db.com> wrote:
>
> I am using apache-activemq-4.1.0-incubator
>
> Did these get fixed ?

Yes - though I'm not sure why you are getting your error. Any chance
you could create a test case for us then we can fix it?

-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Trouble with posting messages to topic

Posted by miniman <be...@db.com>.
I am using apache-activemq-4.1.0-incubator

Did these get fixed ?



James.Strachan wrote:
> 
> BTW which version of ActiveMQ are you using? We did find some gremlins
> on Acks that were fixed in 4.1
> 
> On 1/16/07, miniman <be...@db.com> wrote:
>>
>> All i have created an MDB that sits inside jboss and listens to the
>> queue/testQueue when a message gets posted to that queue it then calls
>> another method which posts the message to a topic. When i do this i get
>> an
>> error that comes back.
>>
>> code is
>>
>> in the onMessage   method
>>
>> onMessage(Message message){
>>   System.out.println("Received: " + message);
>>   PublishToTopic(message);
>> }
>>
>> Then PublishToTopic looks like
>>
>>         private void PublishToTopic(Message msg){
>>
>>                 try {
>>                         ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory(url);
>>                         connection = factory.createConnection();
>>                         session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>                         topic = session.createTopic("topic/testTopic");
>>                         //control =
>> session.createTopic("topictest.control");
>>
>>                         publisher = session.createProducer(topic);
>>                        
>> publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>>
>>                        
>> //session.createConsumer(control).setMessageListener(this);
>>                         connection.start();
>>
>>                         //request shutdown
>>                         publisher.send(msg);
>>
>>                         connection.stop();
>>                         connection.close();
>>                 } catch (JMSException e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                 }
>>         }
>>
>>
>> The error i get is
>>
>> 18:26:50,523 INFO  [PrefetchSubscription] Could not correlate
>> acknowledgment
>> with dispatched message
>> : MessageAck {commandId = 6, responseRequired = false, ackType = 2,
>> consumerId = ID:test1-489
>> 6-1168971769034-3:5:-1:2, firstMessageId =
>> ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
>> Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
>> queue://queue/testQueue, transactio
>> nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}
>>
>> Am i doing something wrong ?
>>
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> 
> James
> -------
> http://radio.weblogs.com/0112098/
> 
> 

-- 
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8405701
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Trouble with posting messages to topic

Posted by James Strachan <ja...@gmail.com>.
BTW which version of ActiveMQ are you using? We did find some gremlins
on Acks that were fixed in 4.1

On 1/16/07, miniman <be...@db.com> wrote:
>
> All i have created an MDB that sits inside jboss and listens to the
> queue/testQueue when a message gets posted to that queue it then calls
> another method which posts the message to a topic. When i do this i get an
> error that comes back.
>
> code is
>
> in the onMessage   method
>
> onMessage(Message message){
>   System.out.println("Received: " + message);
>   PublishToTopic(message);
> }
>
> Then PublishToTopic looks like
>
>         private void PublishToTopic(Message msg){
>
>                 try {
>                         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
>                         connection = factory.createConnection();
>                         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>                         topic = session.createTopic("topic/testTopic");
>                         //control = session.createTopic("topictest.control");
>
>                         publisher = session.createProducer(topic);
>                         publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>
>                         //session.createConsumer(control).setMessageListener(this);
>                         connection.start();
>
>                         //request shutdown
>                         publisher.send(msg);
>
>                         connection.stop();
>                         connection.close();
>                 } catch (JMSException e) {
>                         // TODO Auto-generated catch block
>                         e.printStackTrace();
>                 }
>         }
>
>
> The error i get is
>
> 18:26:50,523 INFO  [PrefetchSubscription] Could not correlate acknowledgment
> with dispatched message
> : MessageAck {commandId = 6, responseRequired = false, ackType = 2,
> consumerId = ID:test1-489
> 6-1168971769034-3:5:-1:2, firstMessageId =
> ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
> Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
> queue://queue/testQueue, transactio
> nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}
>
> Am i doing something wrong ?
>
>
> --
> View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/