You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by ddawg <di...@gmail.com> on 2012/07/12 17:05:51 UTC

Move message and change headers at the same time

Hi, I'm using activeMQ 5.5.1 and I would like to move a specific message from
one queue to another and at the same time I want to change some headers
(i.e. removing exception headers);

Since with JMX I can only move the message and cannot change the headers,
this is not an option for me.

Thus, my approach is to consume the specific message by setting the
JMSMessageID in the messageSelector and consuming it. Then I change the
headerproperties to my needs. Afterwards, I create a producer and send the
message to the new queue.

I do all of this in a transacted session where I create the consumer and the
producer and commit the session at the end.

Is this approach save or are there any concerns, like that I could lose the
message?

Is the JMSMessageID unique?

Is this the proper way to do this?

Please see my code below.

Thanks Dieter



				String newQueueName = selectedQueueNameForMoving;
				
				try {	
					String oldQueueName =
((ActiveMQQueue)message.getJMSDestination()).getQueueName();
					String oldMessageId = message.getJMSMessageID();
					
	                // Create a Connection
					Connection connection = connectionFactory.createConnection();
	                connection.start();

	                // Create a Session
	                Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
	                


	                //Retrieve (and at the same time remove) message from
source queue
	                Destination oldDestination =
session.createQueue(oldQueueName);               
	                MessageConsumer consumer =
session.createConsumer(oldDestination, "JMSMessageID = '" + oldMessageId +
"'");
	                Message oldMessage = consumer.receive();
	                
	                if(oldMessage == null)
	                	throw new Exception("Retrieving message failed");   
	                
	                //Modify Headers
	                //We have to extract the header properties first, because
in message they are just readable and cannot be modified.
	                ActiveMQMessage m = (ActiveMQMessage)oldMessage;
	                
	                Map<String, Object> props =  new HashMap<String,
Object>(m.getProperties());
	                
	                props.remove(ErpelJMSConstants.EXCEPTION_STACKTRACE);
	                props.remove(ErpelJMSConstants.EXCEPTION_MESSAGE);
	                
	                m.clearProperties();
	                m.setProperties(props);
                
	                
	                //Add modified message to new queue
	                Destination destination =
session.createQueue(newQueueName);
	                MessageProducer producer =
session.createProducer(destination);
	                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	                producer.send(m);       

	                session.commit();

	                // Clean up
	                producer.close();
	                consumer.close();
	                session.close();
	                connection.close();

					setResponsePage(QueuesPage.class);
				} catch (Exception e) {
					try {
						LOG.error("Could not move message " + message.getJMSMessageID() + " to
Queue "
								+ newQueueName, e);
						feedbackPanel.error("Could not move message " +
message.getJMSMessageID() + " to Queue "
								+ newQueueName + ". Error: " + e);
						
						feedbackPanel.add(new AttributeModifier("class",
								"alert alert-error"));
					} catch (JMSException e1) {
						e1.printStackTrace();
					}
				}

--
View this message in context: http://activemq.2283324.n4.nabble.com/Move-message-and-change-headers-at-the-same-time-tp4653938.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Move message and change headers at the same time

Posted by Gary Tully <ga...@gmail.com>.
that will work and is safe.
The only caveat is the scope of a selector in activemq. It works on
messages in memory only, so if there are lots of messages you need to
ensure that the destination policyEntry.maxPageSize>
expected-queue-depth for your destinations. It defaults to 200.
Otherwise you will need to wait till that message is paged in before
you can consume it. That will happen when messages ahead of it are
consumed.

some improvements to the code:
do the cleanup/close in a finally so that resources are released, use
an anonymous producer (session.createProducer(null)) and
producer.send(destination, message, ....) to save on producer
creation.
be sure to use a pooledConnectionFactory


On 12 July 2012 16:05, ddawg <di...@gmail.com> wrote:
> Hi, I'm using activeMQ 5.5.1 and I would like to move a specific message from
> one queue to another and at the same time I want to change some headers
> (i.e. removing exception headers);
>
> Since with JMX I can only move the message and cannot change the headers,
> this is not an option for me.
>
> Thus, my approach is to consume the specific message by setting the
> JMSMessageID in the messageSelector and consuming it. Then I change the
> headerproperties to my needs. Afterwards, I create a producer and send the
> message to the new queue.
>
> I do all of this in a transacted session where I create the consumer and the
> producer and commit the session at the end.
>
> Is this approach save or are there any concerns, like that I could lose the
> message?
>
> Is the JMSMessageID unique?
>
> Is this the proper way to do this?
>
> Please see my code below.
>
> Thanks Dieter
>
>
>
>                                 String newQueueName = selectedQueueNameForMoving;
>
>                                 try {
>                                         String oldQueueName =
> ((ActiveMQQueue)message.getJMSDestination()).getQueueName();
>                                         String oldMessageId = message.getJMSMessageID();
>
>                         // Create a Connection
>                                         Connection connection = connectionFactory.createConnection();
>                         connection.start();
>
>                         // Create a Session
>                         Session session = connection.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>                         //Retrieve (and at the same time remove) message from
> source queue
>                         Destination oldDestination =
> session.createQueue(oldQueueName);
>                         MessageConsumer consumer =
> session.createConsumer(oldDestination, "JMSMessageID = '" + oldMessageId +
> "'");
>                         Message oldMessage = consumer.receive();
>
>                         if(oldMessage == null)
>                                 throw new Exception("Retrieving message failed");
>
>                         //Modify Headers
>                         //We have to extract the header properties first, because
> in message they are just readable and cannot be modified.
>                         ActiveMQMessage m = (ActiveMQMessage)oldMessage;
>
>                         Map<String, Object> props =  new HashMap<String,
> Object>(m.getProperties());
>
>                         props.remove(ErpelJMSConstants.EXCEPTION_STACKTRACE);
>                         props.remove(ErpelJMSConstants.EXCEPTION_MESSAGE);
>
>                         m.clearProperties();
>                         m.setProperties(props);
>
>
>                         //Add modified message to new queue
>                         Destination destination =
> session.createQueue(newQueueName);
>                         MessageProducer producer =
> session.createProducer(destination);
>                         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                         producer.send(m);
>
>                         session.commit();
>
>                         // Clean up
>                         producer.close();
>                         consumer.close();
>                         session.close();
>                         connection.close();
>
>                                         setResponsePage(QueuesPage.class);
>                                 } catch (Exception e) {
>                                         try {
>                                                 LOG.error("Could not move message " + message.getJMSMessageID() + " to
> Queue "
>                                                                 + newQueueName, e);
>                                                 feedbackPanel.error("Could not move message " +
> message.getJMSMessageID() + " to Queue "
>                                                                 + newQueueName + ". Error: " + e);
>
>                                                 feedbackPanel.add(new AttributeModifier("class",
>                                                                 "alert alert-error"));
>                                         } catch (JMSException e1) {
>                                                 e1.printStackTrace();
>                                         }
>                                 }
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Move-message-and-change-headers-at-the-same-time-tp4653938.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.



-- 
http://fusesource.com
http://blog.garytully.com