You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by miniman <be...@db.com> on 2007/01/16 19:32:08 UTC
Trouble with posting messages to topic
All i have created an MDB that sits inside jboss and listens to the
queue/testQueue when a message gets posted to that queue it then calls
another method which posts the message to a topic. When i do this i get an
error that comes back.
code is
in the onMessage method
onMessage(Message message){
System.out.println("Received: " + message);
PublishToTopic(message);
}
Then PublishToTopic looks like
private void PublishToTopic(Message msg){
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topic/testTopic");
//control = session.createTopic("topictest.control");
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//session.createConsumer(control).setMessageListener(this);
connection.start();
//request shutdown
publisher.send(msg);
connection.stop();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
The error i get is
18:26:50,523 INFO [PrefetchSubscription] Could not correlate acknowledgment
with dispatched message
: MessageAck {commandId = 6, responseRequired = false, ackType = 2,
consumerId = ID:test1-489
6-1168971769034-3:5:-1:2, firstMessageId =
ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
queue://queue/testQueue, transactio
nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}
Am i doing something wrong ?
--
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: Trouble with posting messages to topic
Posted by miniman <be...@db.com>.
Well i can post you my code which is very simple. I have just intergrated
activemq with jboss and then setup an MDB bean which listens to a queue and
then once it gets the msg from the queue it trys to post it to a topic.
code is :
package com.db.abmonitor.mdb;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.logging.Logger;
import javax.ejb.CreateException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.NamingException;
import java.rmi.RemoteException;
/**
* @author mailto:michael.gaffney@panacya.com Michael Gaffney
*/
public class AutobahnMonitorMDB implements MessageDrivenBean,
MessageListener {
// private static final String SENDER_NAME = "java:comp/env/ejb/Sender";
private static Logger logger =
Logger.getLogger(PublishMsgToTopic.class.getName());
private MessageDrivenContext context;
private PublishMsgToTopic processMSG;
private int counter;
private boolean verbose = true;
private boolean transacted = false;
private Connection connection;
private Session session;
private String url = "tcp://localhost:61616";
private Topic topic;
private MessageProducer publisher;
public AutobahnMonitorMDB() {
if (logger.isInfoEnabled()) {
logger.info("Autobahn Monitor MDB.");
}
}
public void onMessage(Message message) {
if (logger.isInfoEnabled()) {
logger.info("Autobahn Monitor.onMessage");
}
//if (message instanceof ObjectMessage) {
//System.out.println("Received: " + message);
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("TOOL.TOPICTEST");
//control = session.createTopic("topictest.control");
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//session.createConsumer(control).setMessageListener(this);
connection.start();
//request shutdown
publisher.send(message);
//connection.stop();
//connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//processMSG = new PublishMsgToTopic(message);
//processMSG.run();
//} else {
// if (verbose) {
// System.out.println("Received: " + message);
// }
//}
/* Used for Hanlde message for Remote / Internal EJB
try {
handleMessage(message);
} catch (JMSException e) {
logger.error(e.toString(), e);
} catch (NamingException e) {
logger.error(e.toString(), e);
} catch (RemoteException e) {
logger.error(e.toString(), e);
} catch (CreateException e) {
logger.error(e.toString(), e);
//} catch (SenderException e) {
// logger.error(e.toString(), e);
}
*/
}
public void ejbRemove() {
if (logger.isInfoEnabled()) {
logger.info("Autobahn Monitor.ejbRemove");
}
}
public void setMessageDrivenContext(MessageDrivenContext
messageDrivenContext) {
if (logger.isInfoEnabled()) {
logger.info("Autobahn Monitor.setMessageDrivenContext");
}
context = messageDrivenContext;
}
public void ejbCreate() {
if (logger.isInfoEnabled()) {
logger.info("Autobahn Monitor.ejbCreate");
}
}
/* Old Routine used to EJB Sender Remote
private void handleMessage(Message message) throws JMSException,
NamingException, RemoteException, CreateException {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
if (logger.isInfoEnabled()) {
logger.info("Message received: " + textMessage.getText());
}
// send(textMessage.getText());
} else {
if (logger.isInfoEnabled()) {
logger.info("Unknown message type received: " +
message.toString());
}
// send("Unknown message type: " + message.toString());
}
}
*/
private void PublishToTopic(Message msg){
//System.out.println("Received: " + msg);
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("TOOL.TOPICTEST");
//control = session.createTopic("topictest.control");
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//session.createConsumer(control).setMessageListener(this);
connection.start();
//request shutdown
publisher.send(msg);
//connection.stop();
//connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
James.Strachan wrote:
>
> On 1/17/07, miniman <be...@db.com> wrote:
>>
>> I am using apache-activemq-4.1.0-incubator
>>
>> Did these get fixed ?
>
> Yes - though I'm not sure why you are getting your error. Any chance
> you could create a test case for us then we can fix it?
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>
>
--
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8410729
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: Trouble with posting messages to topic
Posted by James Strachan <ja...@gmail.com>.
On 1/17/07, miniman <be...@db.com> wrote:
>
> I am using apache-activemq-4.1.0-incubator
>
> Did these get fixed ?
Yes - though I'm not sure why you are getting your error. Any chance
you could create a test case for us then we can fix it?
--
James
-------
http://radio.weblogs.com/0112098/
Re: Trouble with posting messages to topic
Posted by miniman <be...@db.com>.
I am using apache-activemq-4.1.0-incubator
Did these get fixed ?
James.Strachan wrote:
>
> BTW which version of ActiveMQ are you using? We did find some gremlins
> on Acks that were fixed in 4.1
>
> On 1/16/07, miniman <be...@db.com> wrote:
>>
>> All i have created an MDB that sits inside jboss and listens to the
>> queue/testQueue when a message gets posted to that queue it then calls
>> another method which posts the message to a topic. When i do this i get
>> an
>> error that comes back.
>>
>> code is
>>
>> in the onMessage method
>>
>> onMessage(Message message){
>> System.out.println("Received: " + message);
>> PublishToTopic(message);
>> }
>>
>> Then PublishToTopic looks like
>>
>> private void PublishToTopic(Message msg){
>>
>> try {
>> ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory(url);
>> connection = factory.createConnection();
>> session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> topic = session.createTopic("topic/testTopic");
>> //control =
>> session.createTopic("topictest.control");
>>
>> publisher = session.createProducer(topic);
>>
>> publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>>
>>
>> //session.createConsumer(control).setMessageListener(this);
>> connection.start();
>>
>> //request shutdown
>> publisher.send(msg);
>>
>> connection.stop();
>> connection.close();
>> } catch (JMSException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>>
>> The error i get is
>>
>> 18:26:50,523 INFO [PrefetchSubscription] Could not correlate
>> acknowledgment
>> with dispatched message
>> : MessageAck {commandId = 6, responseRequired = false, ackType = 2,
>> consumerId = ID:test1-489
>> 6-1168971769034-3:5:-1:2, firstMessageId =
>> ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
>> Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
>> queue://queue/testQueue, transactio
>> nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}
>>
>> Am i doing something wrong ?
>>
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>
>
--
View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8405701
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: Trouble with posting messages to topic
Posted by James Strachan <ja...@gmail.com>.
BTW which version of ActiveMQ are you using? We did find some gremlins
on Acks that were fixed in 4.1
On 1/16/07, miniman <be...@db.com> wrote:
>
> All i have created an MDB that sits inside jboss and listens to the
> queue/testQueue when a message gets posted to that queue it then calls
> another method which posts the message to a topic. When i do this i get an
> error that comes back.
>
> code is
>
> in the onMessage method
>
> onMessage(Message message){
> System.out.println("Received: " + message);
> PublishToTopic(message);
> }
>
> Then PublishToTopic looks like
>
> private void PublishToTopic(Message msg){
>
> try {
> ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
> connection = factory.createConnection();
> session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> topic = session.createTopic("topic/testTopic");
> //control = session.createTopic("topictest.control");
>
> publisher = session.createProducer(topic);
> publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>
> //session.createConsumer(control).setMessageListener(this);
> connection.start();
>
> //request shutdown
> publisher.send(msg);
>
> connection.stop();
> connection.close();
> } catch (JMSException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
>
> The error i get is
>
> 18:26:50,523 INFO [PrefetchSubscription] Could not correlate acknowledgment
> with dispatched message
> : MessageAck {commandId = 6, responseRequired = false, ackType = 2,
> consumerId = ID:test1-489
> 6-1168971769034-3:5:-1:2, firstMessageId =
> ID:test1-4896-1168971769034-3:6:1:1:1, lastMessage
> Id = ID:test1-4896-1168971769034-3:6:1:1:1, destination =
> queue://queue/testQueue, transactio
> nId = XID:257:64626c6f6e777331393830302f3137:31, messageCount = 1}
>
> Am i doing something wrong ?
>
>
> --
> View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8395964
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
--
James
-------
http://radio.weblogs.com/0112098/