You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2010/01/26 10:02:28 UTC

svn commit: r903130 - in /servicemix/components/bindings/servicemix-jms/trunk/src: main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Author: gertv
Date: Tue Jan 26 09:02:28 2010
New Revision: 903130

URL: http://svn.apache.org/viewvc?rev=903130&view=rev
Log:
SMXCOMP-702: Allow per-message specification of JMS priority and time-to-live

Modified:
    servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=903130&r1=903129&r2=903130&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Jan 26 09:02:28 2010
@@ -106,6 +106,8 @@
     private Store store;
 
     private AbstractMessageListenerContainer listenerContainer;
+    
+    private boolean preserveMessageQos;
 
     /**
      * @return the destination
@@ -470,6 +472,24 @@
     public void setReplyDestinationName(String replyDestinationName) {
         this.replyDestinationName = replyDestinationName;
     }
+    
+    /**
+     * @return the preserveMessageQos
+     */
+    public boolean isPreserveMessageQos() {
+    	return preserveMessageQos;
+    }
+    
+    /**
+     * Specifies whether we want to send message using the QoS settings
+     * specified on the message instead in order to preserve message QoS. 
+     * The default is <code>false</code>.
+     *
+     * @param preserveMessageQos should support per message QoS?
+     */
+    public void setPreserveMessageQos(boolean preserveMessageQos) {
+    	this.preserveMessageQos = preserveMessageQos;
+    }
 
     /**
      * Process the incoming JBI exchange
@@ -976,7 +996,8 @@
         return cont;
     }
 
-    public static class JmsTemplateUtil extends JmsTemplate {
+    public class JmsTemplateUtil extends JmsTemplate {
+
         public void send(Session session, Destination destination, MessageCreator messageCreator) throws JmsException {
             try {
                 doSend(session, destination, messageCreator);
@@ -991,9 +1012,38 @@
                 throw convertJmsAccessException(ex);
             }
         }
+        
+        /**
+         * Override so we can support preserving the Qos settings that have
+         * been set on the message.
+         */
+        protected void doSend(MessageProducer producer, Message message) throws JMSException {
+            if (isPreserveMessageQos()) {
+                producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), getTimeToLive(message));
+            } else {
+                super.doSend(producer, message);
+            }
+        }
+
+        /**
+         * Determine the remaining time-to-live for a JMS Message
+         * @return 0 if no ttl has been set, the remaining ttl otherwise
+         */
+        protected long getTimeToLive(Message message) throws JMSException {
+            long ttl = message.getJMSExpiration();
+            if (ttl != 0) {
+                ttl = ttl - System.currentTimeMillis();
+                // Message had expired.. so set the ttl as small as possible
+                if (ttl <= 0) {
+                    ttl = 1;
+                }
+            }
+            return ttl;
+        }
     }
 
-    public static class JmsTemplate102Util extends JmsTemplateUtil {
+    public class JmsTemplate102Util extends JmsTemplateUtil {
+
         protected void initDefaultStrategies() {
             setMessageConverter(new SimpleMessageConverter102());
         }
@@ -1064,17 +1114,26 @@
         }
 
         protected void doSend(MessageProducer producer, Message message) throws JMSException {
-            if (isPubSubDomain()) {
-                if (isExplicitQosEnabled()) {
-                    ((TopicPublisher) producer).publish(message, getDeliveryMode(), getPriority(), getTimeToLive());
+        	if (isPreserveMessageQos()) {
+                long ttl = getTimeToLive(message);
+                if (isPubSubDomain()) {
+                	((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl); 
                 } else {
-                    ((TopicPublisher) producer).publish(message);
+                    ((QueueSender) producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
                 }
             } else {
-                if (isExplicitQosEnabled()) {
-                    ((QueueSender) producer).send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+                if (isPubSubDomain()) {
+                    if (isExplicitQosEnabled()) {
+                        ((TopicPublisher) producer).publish(message, getDeliveryMode(), getPriority(), getTimeToLive());
+                    } else {
+                        ((TopicPublisher) producer).publish(message);
+                    }
                 } else {
-                    ((QueueSender) producer).send(message);
+                    if (isExplicitQosEnabled()) {
+                        ((QueueSender) producer).send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+                    } else {
+                        ((QueueSender) producer).send(message);
+                    }
                 }
             }
         }

Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=903130&r1=903129&r2=903130&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Jan 26 09:02:28 2010
@@ -48,11 +48,14 @@
      */
     private static final String MSG_PROPERTY = "PropertyTest";
     private static final String MSG_PROPERTY_BLACKLISTED = "BadPropertyTest";
+    
+    private static final String MSG_PRIORITY_PROPERTY = "JMSPriority";
+    private static final String MSG_EXPIRATION_PROPERTY = "JMSExpiration";
 
     protected List<String> blackList;
     
     public void testSendWithoutProperties() throws Exception {
-        container.activateComponent(createEndpoint(false), "servicemix-jms");
+        container.activateComponent(createEndpoint(false, false), "servicemix-jms");
         
         InOnly me = client.createInOnlyExchange();
         NormalizedMessage inMessage = me.getInMessage();
@@ -94,6 +97,26 @@
         assertNotNull(msg);
     }
     
+    public void testProviderInOnlyWithMessageQoS() throws Exception {
+        container.activateComponent(createEndpoint(true, true), "servicemix-jms");
+        
+        InOnly me = client.createInOnlyExchange();
+        NormalizedMessage inMessage = me.getInMessage();
+        inMessage.setProperty(MSG_PRIORITY_PROPERTY, 2);
+        long ttl = System.currentTimeMillis() + 5000;
+        inMessage.setProperty(MSG_EXPIRATION_PROPERTY, ttl);
+        inMessage.setContent(new StringSource("<hello>world</hello>"));
+        me.setService(new QName("jms"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.DONE, me.getStatus());
+        
+        Message msg = jmsTemplate.receive("destination");
+        assertNotNull(msg);
+        assertEquals(2, msg.getJMSPriority());
+        assertTrue("Found none-zero value for JMSExpiration", msg
+        		.getLongProperty(MSG_EXPIRATION_PROPERTY) != 0);
+    }
+    
     public void testProviderInOnlyWithoutReplyDest() throws Exception {
         JmsComponent component = new JmsComponent();
 
@@ -290,10 +313,10 @@
 
     // Helper methods
     private JmsComponent createEndpoint() {
-        return createEndpoint(true);
+        return createEndpoint(true, false);
     }
 
-    private JmsComponent createEndpoint(boolean copyProperties) {
+    private JmsComponent createEndpoint(boolean copyProperties, boolean preserveMsgQoS) {
         // initialize the black list
         blackList = new LinkedList<String>();
         blackList.add(MSG_PROPERTY_BLACKLISTED);
@@ -308,6 +331,7 @@
         endpoint.setEndpoint("endpoint");
         endpoint.setConnectionFactory(connectionFactory);
         endpoint.setDestinationName("destination");
+        endpoint.setPreserveMessageQos(preserveMsgQoS);
         component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
         return component;
     }