You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/12 14:28:03 UTC

svn commit: r1550414 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Thu Dec 12 13:28:03 2013
New Revision: 1550414

URL: http://svn.apache.org/r1550414
Log:
QPIDJMS-9: add support for setting the AMQP header ttl field on outgoing messages independently of the value normally applied as result of the JMS producers TTL, using the JMS_AMQP_TTL vendor property

Added:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java?rev=1550414&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java Thu Dec 12 13:28:03 2013
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+public class ClientProperties
+{
+    //Message Property Names
+    public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
+}

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1550414&r1=1550413&r2=1550414&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Thu Dec 12 13:28:03 2013
@@ -18,8 +18,13 @@
  */
 package org.apache.qpid.jms.impl;
 
+import static org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TTL;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.List;
+import java.util.Set;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -31,12 +36,19 @@ import org.apache.qpid.jms.engine.AmqpMe
 
 public abstract class MessageImpl<T extends AmqpMessage> implements Message
 {
+    private static final long MAX_UINT = 0xFFFFFFFFL;
     private final T _amqpMessage;
     private final SessionImpl _sessionImpl;
     private Long _jmsExpirationFromTTL = null;
     private Destination _destination;
     private Destination _replyTo;
 
+    /**
+     * Used to record the value of JMS_AMQP_TTL property
+     * if it is explicitly set by the application
+     */
+    private Long _propJMS_AMQP_TTL = null;
+
     //message to be sent
     public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
     {
@@ -111,9 +123,28 @@ public abstract class MessageImpl<T exte
 
     private void setApplicationProperty(String name, Object value) throws MessageFormatException
     {
-        //TODO: special case JMS_AMQP_TTL so that it gets set
-        //in the TTL field and not application-properties
         checkPropertyNameIsValid(name);
+
+        if(JMS_AMQP_TTL.equals(name))
+        {
+            Long ttl = null;
+            if(value instanceof Long)
+            {
+                ttl = (Long) value;
+            }
+
+            if(ttl != null && ttl >= 0 && ttl <= MAX_UINT)
+            {
+                _propJMS_AMQP_TTL = ttl;
+            }
+            else
+            {
+                throw new MessageFormatException(JMS_AMQP_TTL + " must be a long with value in range 0 to 2^31 - 1");
+            }
+
+            return;
+        }
+
         checkObjectPropertyValueIsValid(value);
 
         _amqpMessage.setApplicationProperty(name, value);
@@ -123,6 +154,11 @@ public abstract class MessageImpl<T exte
     {
         checkPropertyNameIsValid(name);
 
+        if(JMS_AMQP_TTL.equals(name))
+        {
+            return _propJMS_AMQP_TTL;
+        }
+
         //TODO: handle non-JMS types?
         return _amqpMessage.getApplicationProperty(name);
     }
@@ -360,11 +396,18 @@ public abstract class MessageImpl<T exte
     {
         // TODO Auto-generated method stub
         throw new UnsupportedOperationException("Not Implemented");
+
+        //_propJMS_AMQP_TTL = null;
     }
 
     @Override
     public boolean propertyExists(String name) throws JMSException
     {
+        if(JMS_AMQP_TTL.equals(name))
+        {
+            return _propJMS_AMQP_TTL != null;
+        }
+
         return _amqpMessage.applicationPropertyExists(name);
     }
 
@@ -562,7 +605,18 @@ public abstract class MessageImpl<T exte
     @Override
     public Enumeration<?> getPropertyNames() throws JMSException
     {
-        return Collections.enumeration(_amqpMessage.getApplicationPropertyNames());
+        //Get the base names from the underlying AMQP message
+        Set<String> underlyingApplicationPropertyNames = _amqpMessage.getApplicationPropertyNames();
+
+        //Create a new list we can mutate
+        List<String> propNames = new ArrayList<String>(underlyingApplicationPropertyNames);
+
+        if(_propJMS_AMQP_TTL != null)
+        {
+            propNames.add(JMS_AMQP_TTL);
+        }
+
+        return Collections.enumeration(propNames);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1550414&r1=1550413&r2=1550414&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Thu Dec 12 13:28:03 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.jms.impl;
 
+import static org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TTL;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -31,8 +33,7 @@ import org.apache.qpid.jms.engine.AmqpSe
 
 public class SenderImpl extends LinkImpl implements MessageProducer
 {
-    private static final long UINT_MAX = 0xFFFFFFFFL - 1;
-    private static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
+    private static final long UINT_MAX = 0xFFFFFFFFL;
     private AmqpSender _amqpSender;
     private Destination _destination;
 
@@ -82,13 +83,21 @@ public class SenderImpl extends LinkImpl
 
             if(message.propertyExists(JMS_AMQP_TTL))
             {
-                //use the requested value from the property
+                //Use the requested value from the property. 0 means clear TTL header.
                 long propTtl = message.getLongProperty(JMS_AMQP_TTL);
-                amqpMessage.setTtl(propTtl);
+                if(propTtl != 0)
+                {
+                    amqpMessage.setTtl(propTtl);
+                }
+                else if(amqpMessage.getTtl() != null)
+                {
+                    amqpMessage.setTtl(null);
+                }
             }
             else
             {
-                //set the AMQP header TTL if necessary and possible, otherwise ensure it is clear
+                //Set the AMQP header TTL to any non-default value implied by JMS TTL (if possible),
+                //otherwise ensure TTL header is clear.
                 if(timeToLive != Message.DEFAULT_TIME_TO_LIVE && timeToLive < UINT_MAX)
                 {
                     amqpMessage.setTtl(timeToLive);

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1550414&r1=1550413&r2=1550414&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Thu Dec 12 13:28:03 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import static org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TTL;
 import static org.junit.Assert.*;
 
 import java.util.Enumeration;
@@ -555,7 +556,6 @@ public class MessageImplTest extends Qpi
         assertEquals(newDestinationExpected, _testMessage.getJMSDestination());
     }
 
-    //TODO:new
     // ====== JMSReplyTo =======
 
     @Test
@@ -712,6 +712,7 @@ public class MessageImplTest extends Qpi
         assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
         assertNull(_testAmqpMessage.getCreationTime());
     }
+
     // ====== JMSExpiration =======
 
     @Test
@@ -835,6 +836,153 @@ public class MessageImplTest extends Qpi
         assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
     }
 
+    // ====== JMS_AMQP_TTL property =======
+
+    @Test
+    public void testSetJMS_AMQP_TTL_PropertyRejectsNegatives() throws Exception
+    {
+        //check negatives are rejected
+        try
+        {
+            _testMessage.setLongProperty(JMS_AMQP_TTL, -1L);
+            fail("expected exception not thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+
+        //check values over 2^32 - 1 are rejected
+        try
+        {
+            _testMessage.setLongProperty(JMS_AMQP_TTL, 0X100000000L);
+            fail("expected exception not thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetJMS_AMQP_TTL_PropertyRejectsNonLongValue() throws Exception
+    {
+        //check an int is rejected
+        try
+        {
+            _testMessage.setIntProperty(JMS_AMQP_TTL, 1);
+            fail("expected exception not thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testJMS_AMQP_TTL_PropertyExists() throws Exception
+    {
+        assertFalse(_testMessage.propertyExists(JMS_AMQP_TTL));
+        _testMessage.setLongProperty(JMS_AMQP_TTL, 1L);
+        assertTrue(_testMessage.propertyExists(JMS_AMQP_TTL));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testJMS_AMQP_TTL_GetPropertyNamesOnNewMessage() throws Exception
+    {
+        //verify the name doesn't exist originally
+        Enumeration<String> names = (Enumeration<String>) _testMessage.getPropertyNames();
+        boolean containsJMS_AMQP_TTL = false;
+        while(names.hasMoreElements())
+        {
+            String prop = names.nextElement();
+            if(JMS_AMQP_TTL.equals(prop))
+            {
+                containsJMS_AMQP_TTL = true;
+            }
+        }
+
+        assertFalse(containsJMS_AMQP_TTL);
+
+        //set property
+        _testMessage.setLongProperty(JMS_AMQP_TTL, 1L);
+
+        //verify the namenow exists
+        names = (Enumeration<String>) _testMessage.getPropertyNames();
+        while(names.hasMoreElements())
+        {
+            String prop = names.nextElement();
+            if(JMS_AMQP_TTL.equals(prop))
+            {
+                containsJMS_AMQP_TTL = true;
+            }
+        }
+
+        assertTrue(containsJMS_AMQP_TTL);
+    }
+
+    @Test
+    public void testGetJMS_AMQP_TTL_PropertyOnNewMessage() throws Exception
+    {
+        try
+        {
+            _testMessage.getLongProperty(JMS_AMQP_TTL);
+            fail("expected exception not thrown");
+        }
+        catch(NumberFormatException nfe)
+        {
+            //expected, property isn't set, so it returns null, and so
+            //the null->Long conversion process failure applies
+        }
+    }
+
+    @Test
+    public void testJMS_AMQP_TTL_PropertyBehaviourOnRecievedMessageWithUnderlyingTtlFieldSet() throws Exception
+    {
+        _testAmqpMessage.setTtl(5L);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        //check the propertyExists method
+        assertFalse(_testMessage.propertyExists(JMS_AMQP_TTL));
+
+        //verify the name doesn't exist
+        @SuppressWarnings("unchecked")
+        Enumeration<String> names = (Enumeration<String>) _testMessage.getPropertyNames();
+        boolean containsJMS_AMQP_TTL = false;
+        while(names.hasMoreElements())
+        {
+            String prop = names.nextElement();
+            if(JMS_AMQP_TTL.equals(prop))
+            {
+                containsJMS_AMQP_TTL = true;
+            }
+        }
+        assertFalse(containsJMS_AMQP_TTL);
+
+        //verify getting the value returns a NFE
+        try
+        {
+            _testMessage.getLongProperty(JMS_AMQP_TTL);
+            fail("expected exception not thrown");
+        }
+        catch(NumberFormatException nfe)
+        {
+            //expected, property isn't set, so it returns null, and so
+            //the null->Long conversion process failure applies
+        }
+    }
+
+    @Test
+    public void testSetJMS_AMQP_TTL_PropertyDoesntSetEntryInUnderlyingApplicationProperties() throws Exception
+    {
+        assertFalse(_testAmqpMessage.applicationPropertyExists(JMS_AMQP_TTL));
+
+        _testMessage.setLongProperty(JMS_AMQP_TTL, 5L);
+
+        assertFalse(_testAmqpMessage.applicationPropertyExists(JMS_AMQP_TTL));
+    }
+
     // ====== utility methods =======
 
     private void assertGetPropertyThrowsMessageFormatException(TestMessageImpl testMessage,

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1550414&r1=1550413&r2=1550414&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Thu Dec 12 13:28:03 2013
@@ -280,4 +280,35 @@ public class SenderImplTest extends Qpid
         //Verify that the underlying amqp message ttl field was set to the value from the property, not the producer
         assertEquals(ttlPropValue, testMessage.getUnderlyingAmqpMessage(false).getTtl());
     }
+
+    @Test
+    public void testSenderUsesJMS_AMQP_TTLPropertyValueZeroToClearUnderlyingTtlField() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        long timestamp = System.currentTimeMillis();
+        Long ttlPropValue = 0L;
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, _mockConnection);
+        testMessage.setLongProperty(JMS_AMQP_TTL, ttlPropValue);
+
+        //send the message with a different TTL
+        long timeToLive = 999999L;
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, timeToLive);
+
+        //verify the expiration was set, allowing a delta
+        Long jmsExpiration = testMessage.getJMSExpiration();
+        assertEquals(timestamp + timeToLive, jmsExpiration, 3000);
+        assertEquals(jmsExpiration, testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
+
+        //Verify that the underlying amqp message ttl field was NOT set, as requested by the property value being 0
+        assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org