You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/11 19:24:35 UTC

svn commit: r1550215 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/impl/MessageImpl.java main/java/org/apache/qpid/jms/impl/SenderImpl.java test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Author: robbie
Date: Wed Dec 11 18:24:34 2013
New Revision: 1550215

URL: http://svn.apache.org/r1550215
Log:
QPIDJMS-9: add ability to set the AMQP ttl field independently of the producer-applied Time To Live value which populates JMSExpiration

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1550215&r1=1550214&r2=1550215&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Wed Dec 11 18:24:34 2013
@@ -111,6 +111,8 @@ public abstract class MessageImpl<T exte
 
     private void setApplicationProperty(String name, Object value) throws MessageFormatException
     {
+        //TODO: special case JMS_AMQP_TTL so that it gets set
+        //in the TTL field and not application-properties
         checkPropertyNameIsValid(name);
         checkObjectPropertyValueIsValid(value);
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1550215&r1=1550214&r2=1550215&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Wed Dec 11 18:24:34 2013
@@ -31,6 +31,8 @@ import org.apache.qpid.jms.engine.AmqpSe
 
 public class SenderImpl extends LinkImpl implements MessageProducer
 {
+    private static final long UINT_MAX = 0xFFFFFFFFL - 1;
+    private static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
     private AmqpSender _amqpSender;
     private Destination _destination;
 
@@ -78,14 +80,23 @@ public class SenderImpl extends LinkImpl
 
             AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
 
-            //set the AMQP header TTL if necessary, otherwise ensure it is clear
-            if(timeToLive != Message.DEFAULT_TIME_TO_LIVE)
+            if(message.propertyExists(JMS_AMQP_TTL))
             {
-                amqpMessage.setTtl(timeToLive);
+                //use the requested value from the property
+                long propTtl = message.getLongProperty(JMS_AMQP_TTL);
+                amqpMessage.setTtl(propTtl);
             }
-            else if(amqpMessage.getTtl() != null)
+            else
             {
-                amqpMessage.setTtl(null);
+                //set the AMQP header TTL if necessary and possible, otherwise ensure it is clear
+                if(timeToLive != Message.DEFAULT_TIME_TO_LIVE && timeToLive < UINT_MAX)
+                {
+                    amqpMessage.setTtl(timeToLive);
+                }
+                else if(amqpMessage.getTtl() != null)
+                {
+                    amqpMessage.setTtl(null);
+                }
             }
 
             AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1550215&r1=1550214&r2=1550215&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Wed Dec 11 18:24:34 2013
@@ -41,6 +41,7 @@ import org.mockito.Mockito;
 
 public class SenderImplTest extends QpidJmsTestCase
 {
+    private static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
     private ConnectionImpl _mockConnection;
     private AmqpSender _mockAmqpSender;
     private SessionImpl _mockSession;
@@ -220,4 +221,63 @@ public class SenderImplTest extends Qpid
         assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
         assertNull(testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
     }
+
+    @Test
+    public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenNoProducerTTLInEffect() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        Long ttlPropValue = 789L;
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, _mockConnection);
+        testMessage.setLongProperty(JMS_AMQP_TTL, ttlPropValue);
+
+        //send the message without any TTL
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+        //verify the expiration was clear
+        assertEquals(0, testMessage.getJMSExpiration());
+        assertNull(testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
+
+        //Verify that the underlying amqp message ttl field was set to the value from the property
+        assertEquals(ttlPropValue, testMessage.getUnderlyingAmqpMessage(false).getTtl());
+    }
+
+    @Test
+    public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenProducerTTLInEffect() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        long timestamp = System.currentTimeMillis();
+        Long ttlPropValue = 789L;
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, _mockConnection);
+        testMessage.setLongProperty(JMS_AMQP_TTL, ttlPropValue);
+
+        //send the message with a different TTL
+        long timeToLive = 999999L;
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, timeToLive);
+
+        //verify the expiration was set, allowing a delta
+        Long jmsExpiration = testMessage.getJMSExpiration();
+        assertEquals(timestamp + timeToLive, jmsExpiration, 3000);
+        assertEquals(jmsExpiration, testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
+
+        //Verify that the underlying amqp message ttl field was set to the value from the property, not the producer
+        assertEquals(ttlPropValue, testMessage.getUnderlyingAmqpMessage(false).getTtl());
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org