You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/09 18:20:01 UTC

svn commit: r1549620 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Mon Dec  9 17:20:00 2013
New Revision: 1549620

URL: http://svn.apache.org/r1549620
Log:
QPIDJMS-9: add support for sending/receiving messages with expiry, via the absolute-expiry-time field of the properties section and the ttl field of the header section

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Mon Dec  9 17:20:00 2013
@@ -21,12 +21,14 @@
 package org.apache.qpid.jms.engine;
 
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -128,6 +130,51 @@ public abstract class AmqpMessage
         return _message.isDurable();
     }
 
+    /**
+     * @return the ttl in milliseconds, or null if none exists
+     */
+    public Long getTtl()
+    {
+        if(_message.getHeader() == null)
+        {
+            return null;
+        }
+        else
+        {
+            UnsignedInteger ttl = _message.getHeader().getTtl();
+            if(ttl == null)
+            {
+                return null;
+            }
+            else
+            {
+                return ttl.longValue();
+            }
+        }
+    }
+
+    /**
+     * @param timeInMillis the ttl time in milliseconds, or null to clear the field
+     */
+    public void setTtl(Long timeInMillis)
+    {
+        if(timeInMillis == null)
+        {
+            if(_message.getHeader() == null)
+            {
+                return;
+            }
+            else
+            {
+                _message.getHeader().setTtl(null);
+            }
+        }
+        else
+        {
+            _message.setTtl(timeInMillis);
+        }
+    }
+
     //===== MessageAnnotations ======
 
     /**
@@ -237,14 +284,43 @@ public abstract class AmqpMessage
         _message.setAddress(to);
     }
 
-    public long getCreationTime()
+    public Long getCreationTime()
     {
-        return _message.getCreationTime();
+        if(_message.getProperties() == null)
+        {
+            return null;
+        }
+        else
+        {
+            Date date = _message.getProperties().getCreationTime();
+            if(date == null)
+            {
+                return null;
+            }
+            else
+            {
+                return date.getTime();
+            }
+        }
     }
 
-    public void setCreationTime(long timeInMillis)
+    public void setCreationTime(Long timeInMillis)
     {
-        _message.setCreationTime(timeInMillis);
+        if(timeInMillis == null)
+        {
+            if(_message.getProperties() == null)
+            {
+                return;
+            }
+            else
+            {
+                _message.getProperties().setCreationTime(null);
+            }
+        }
+        else
+        {
+            _message.setCreationTime(timeInMillis);
+        }
     }
 
     public String getReplyTo()
@@ -257,6 +333,51 @@ public abstract class AmqpMessage
         _message.setReplyTo(replyTo);
     }
 
+    /**
+     * @return the expiration time in milliseconds since the Unix Epoch, or null if none exists
+     */
+    public Long getAbsoluteExpiryTime()
+    {
+        if(_message.getProperties() == null)
+        {
+            return null;
+        }
+        else
+        {
+            Date date = _message.getProperties().getAbsoluteExpiryTime();
+            if(date == null)
+            {
+                return null;
+            }
+            else
+            {
+                return date.getTime();
+            }
+        }
+    }
+
+    /**
+     * @param timeInMillis the expiration time in milliseconds since the Unix Epoch, or null to clear the field
+     */
+    public void setAbsoluteExpiryTime(Long timeInMillis)
+    {
+        if(timeInMillis == null)
+        {
+            if(_message.getProperties() == null)
+            {
+                return;
+            }
+            else
+            {
+                _message.getProperties().setAbsoluteExpiryTime(null);
+            }
+        }
+        else
+        {
+            _message.setExpiryTime(timeInMillis);
+        }
+    }
+
     //===== Application Properties ======
 
     private void createApplicationProperties()

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon Dec  9 17:20:00 2013
@@ -137,13 +137,28 @@ public abstract class MessageImpl<T exte
     @Override
     public long getJMSTimestamp() throws JMSException
     {
-        return _amqpMessage.getCreationTime();
+        Long timestamp = _amqpMessage.getCreationTime();
+        if(timestamp == null)
+        {
+            return 0;
+        }
+        else
+        {
+            return timestamp.longValue();
+        }
     }
 
     @Override
     public void setJMSTimestamp(long timestamp) throws JMSException
     {
-        _amqpMessage.setCreationTime(timestamp);
+        if(timestamp != 0)
+        {
+            _amqpMessage.setCreationTime(timestamp);
+        }
+        else
+        {
+            _amqpMessage.setCreationTime(null);
+        }
     }
 
     @Override
@@ -283,15 +298,52 @@ public abstract class MessageImpl<T exte
     @Override
     public long getJMSExpiration() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        //Use absolute-expiry if present
+        Long absoluteExpiry = _amqpMessage.getAbsoluteExpiryTime();
+        if(absoluteExpiry != null)
+        {
+            return absoluteExpiry;
+        }
+
+        //derive from creation time and ttl field is present
+        Long creationTime = _amqpMessage.getCreationTime();
+        Long ttl = _amqpMessage.getTtl();
+
+        if(ttl != null)
+        {
+            if(creationTime != null)
+            {
+                return creationTime + ttl;
+            }
+            else
+            {
+                //TODO: this will give a different value each time. Use RcvTime equivalent?
+                return System.currentTimeMillis() + ttl;
+            }
+        }
+
+        //failing the above we must say there is no expiration
+        return 0;
     }
 
     @Override
     public void setJMSExpiration(long expiration) throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        if(expiration != 0)
+        {
+            _amqpMessage.setAbsoluteExpiryTime(expiration);
+        }
+        else
+        {
+            _amqpMessage.setAbsoluteExpiryTime(null);
+
+            //As we are clearing JMSExpiration we must also clear the TTL field if it is
+            //set, or else it will lead to getJMSExpiration continuing to return a value
+            if(_amqpMessage.getTtl() != null)
+            {
+                _amqpMessage.setTtl(null);
+            }
+        }
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Mon Dec  9 17:20:00 2013
@@ -43,6 +43,12 @@ public class SenderImpl extends LinkImpl
 
     private void sendMessage(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
+        //TODO
+        if(priority != Message.DEFAULT_PRIORITY)
+        {
+            throw new IllegalArgumentException("Only default priority is currently supported");
+        }
+
         getConnectionImpl().lock();
         try
         {
@@ -60,8 +66,28 @@ public class SenderImpl extends LinkImpl
                 message.setJMSDeliveryMode(deliveryMode);
             }
 
+            //set the JMSExpiration if necessary
+            if(timeToLive != Message.DEFAULT_TIME_TO_LIVE)
+            {
+                message.setJMSExpiration(timestamp + timeToLive);
+            }
+            else if(message.getJMSExpiration() != 0)
+            {
+                message.setJMSExpiration(0);
+            }
+
             AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
 
+            //set the AMQP header TTL if necessary, otehrwise ensure it is clear
+            if(timeToLive != Message.DEFAULT_TIME_TO_LIVE)
+            {
+                amqpMessage.setTtl(timeToLive);
+            }
+            else if(amqpMessage.getTtl() != null)
+            {
+                amqpMessage.setTtl(null);
+            }
+
             AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
 
             getConnectionImpl().stateChanged();
@@ -186,8 +212,7 @@ public class SenderImpl extends LinkImpl
     @Override
     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        sendMessage(message, deliveryMode, priority, timeToLive);
     }
 
     @Override

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Mon Dec  9 17:20:00 2013
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Date;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -266,7 +268,6 @@ public class MessageIntegrationTest exte
         }
     }
 
-
     /**
      * Tests that the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
      * indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
@@ -380,4 +381,72 @@ public class MessageIntegrationTest exte
             assertNull(receivedMessage.getJMSReplyTo());
         }
     }
+
+    /**
+     * Tests that lack of the absolute-expiry-time and ttl fields on a message results
+     * in it returning 0 for for JMSExpiration
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithNoAbsoluteExpiryOrTtlReturnsJMSExpirationZero() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertEquals(0L, receivedMessage.getJMSExpiration());
+        }
+    }
+
+    /**
+     * Tests that setting a non-zero value in the absolute-expiry-time field on a
+     * message results in it returning this value for JMSExpiration
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithAbsoluteExpiryReturnsJMSExpirationNonZero() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            long timestamp = System.currentTimeMillis();
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(timestamp));
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertEquals(timestamp, receivedMessage.getJMSExpiration());
+        }
+    }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java Mon Dec  9 17:20:00 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.jms.test.testpeer
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
 
 public class SenderIntegrationTest extends QpidJmsTestCase
@@ -170,4 +171,41 @@ public class SenderIntegrationTest exten
             producer.send(message);
         }
     }
+
+    @Test
+    public void testSendingMessageSetsJMSExpirationRelatedAbsoluteExpiryAndTtlFields() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+            MessageProducer producer = session.createProducer(queue);
+
+            long currentTime = System.currentTimeMillis();
+            long ttl = 100_000;
+            Date expiration = new Date(currentTime + ttl);
+
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            headersMatcher.withDurable(equalTo(true));
+            headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(ttl)));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withAbsoluteExpiryTime(greaterThanOrEqualTo(expiration));
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage(text);
+
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+        }
+    }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java Mon Dec  9 17:20:00 2013
@@ -29,7 +29,9 @@ import java.util.Set;
 import org.apache.qpid.jms.QpidJmsTestCase;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.engine.Delivery;
@@ -165,6 +167,67 @@ public class AmqpMessageTest extends Qpi
         }
     }
 
+    // ====== Header =======
+
+    @Test
+    public void testNewMessageHasNoUnderlyingHeaderSection()
+    {
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        Message underlying = testAmqpMessage.getMessage();
+        assertNull(underlying.getHeader());
+    }
+
+    @Test
+    public void testGetTtlIsNullForNewMessage()
+    {
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        assertNull(testAmqpMessage.getTtl());
+    }
+
+    @Test
+    public void testGetTtlOnRecievedMessageWithTtl()
+    {
+        Long ttl = 123L;
+
+        Message message = Proton.message();
+        Header header = new Header();
+        header.setTtl(UnsignedInteger.valueOf(ttl));
+        message.setHeader(header);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+        assertEquals(ttl, testAmqpMessage.getTtl());
+    }
+
+    @Test
+    public void testSetGetTtlOnNewMessage()
+    {
+        Long ttl = 123L;
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        testAmqpMessage.setTtl(ttl);
+
+        assertEquals(ttl.longValue(), testAmqpMessage.getMessage().getHeader().getTtl().longValue());
+        assertEquals(ttl, testAmqpMessage.getTtl());
+    }
+
+    @Test
+    public void testSetTtlNullOnMessageWithExistingTtl()
+    {
+        Long ttl = 123L;
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        testAmqpMessage.setTtl(ttl);
+
+        testAmqpMessage.setTtl(null);
+
+        assertNull(testAmqpMessage.getMessage().getHeader().getTtl());
+        assertNull(testAmqpMessage.getTtl());
+    }
+
     // ====== Properties =======
 
     @Test
@@ -315,6 +378,85 @@ public class AmqpMessageTest extends Qpi
         assertEquals(testReplyToAddress, testAmqpMessage.getReplyTo());
     }
 
+    @Test
+    public void testNewMessageHasNoUnderlyingPropertiesSection()
+    {
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        Message underlying = testAmqpMessage.getMessage();
+        assertNull(underlying.getProperties());
+    }
+
+    @Test
+    public void testGetCreationTimeIsNullForNewMessage()
+    {
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        assertNull(testAmqpMessage.getCreationTime());
+    }
+
+    @Test
+    public void testSetCreationTimeOnNewMessage()
+    {
+        Long timestamp = System.currentTimeMillis();
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        testAmqpMessage.setCreationTime(timestamp);
+
+        assertEquals(timestamp.longValue(), testAmqpMessage.getMessage().getProperties().getCreationTime().getTime());
+        assertEquals(timestamp, testAmqpMessage.getCreationTime());
+    }
+
+    @Test
+    public void testSetCreationTimeNullOnMessageWithExistingCreationTime()
+    {
+        Long timestamp = System.currentTimeMillis();
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        testAmqpMessage.setCreationTime(timestamp);
+
+        testAmqpMessage.setCreationTime(null);
+
+        assertNull(testAmqpMessage.getMessage().getProperties().getCreationTime());
+        assertNull(testAmqpMessage.getCreationTime());
+    }
+
+    @Test
+    public void testGetAbsoluteExpiryTimeIsNullForNewMessage()
+    {
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        assertNull(testAmqpMessage.getAbsoluteExpiryTime());
+    }
+
+    @Test
+    public void testSetAbsoluteExpiryTimeOnNewMessage()
+    {
+        Long timestamp = System.currentTimeMillis();
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+        assertEquals(timestamp.longValue(), testAmqpMessage.getMessage().getProperties().getAbsoluteExpiryTime().getTime());
+        assertEquals(timestamp, testAmqpMessage.getAbsoluteExpiryTime());
+    }
+
+    @Test
+    public void testSetAbsoluteExpiryTimeNullOnMessageWithExistingExpiryTime()
+    {
+        Long timestamp = System.currentTimeMillis();
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+        testAmqpMessage.setAbsoluteExpiryTime(null);
+
+        assertNull(testAmqpMessage.getMessage().getProperties().getAbsoluteExpiryTime());
+        assertNull(testAmqpMessage.getAbsoluteExpiryTime());
+    }
+
     // ====== Message Annotations =======
 
     @Test

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Mon Dec  9 17:20:00 2013
@@ -681,12 +681,13 @@ public class MessageImplTest extends Qpi
     @Test
     public void testSetJMSTimestampOnNewMessage() throws Exception
     {
-        assertEquals(0, _testAmqpMessage.getCreationTime());
+        assertNull(_testAmqpMessage.getCreationTime());
 
         long timestamp = System.currentTimeMillis();
         _testMessage.setJMSTimestamp(timestamp);
 
-        assertEquals(timestamp, _testAmqpMessage.getCreationTime());
+        assertNotNull(_testAmqpMessage.getCreationTime());
+        assertEquals(timestamp, _testAmqpMessage.getCreationTime().longValue());
     }
 
     @Test
@@ -699,6 +700,143 @@ public class MessageImplTest extends Qpi
         assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
     }
 
+    @Test
+    public void testSetJMSTimestampToZeroOnRecievedMessageWithCreationTimeSetsUnderlyingCreationTimeNull() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+        _testAmqpMessage.setCreationTime(timestamp);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        _testMessage.setJMSTimestamp(0);
+
+        assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+        assertNull(_testAmqpMessage.getCreationTime());
+    }
+    // ====== JMSExpiration =======
+
+    @Test
+    public void testGetJMSExpirationOnNewMessage() throws Exception
+    {
+        assertEquals("expected JMSExpiration value not present", 0, _testMessage.getJMSExpiration());
+    }
+
+    @Test
+    public void testSetGetJMSExpirationOnNewMessage() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+
+        _testMessage.setJMSExpiration(timestamp);
+        assertEquals("expected JMSExpiration value not present", timestamp, _testMessage.getJMSExpiration());
+    }
+
+    @Test
+    public void testSetJMSExpirationOnNewMessage() throws Exception
+    {
+        assertNull(_testAmqpMessage.getAbsoluteExpiryTime());
+
+        Long timestamp = System.currentTimeMillis();
+        _testMessage.setJMSExpiration(timestamp);
+
+        assertEquals(timestamp, _testAmqpMessage.getAbsoluteExpiryTime());
+    }
+
+    @Test
+    public void testGetJMSExpirationOnRecievedMessageWithAbsoluteExpiryTimeAndTtlAndCreationTime() throws Exception
+    {
+        long creationTime = System.currentTimeMillis();
+        long ttl = 789L;
+        long expiration = creationTime + ttl;
+        _testAmqpMessage.setCreationTime(creationTime);
+        _testAmqpMessage.setTtl(ttl);
+        _testAmqpMessage.setAbsoluteExpiryTime(expiration);
+
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertEquals("expected JMSExpiration value not present", expiration, _testMessage.getJMSExpiration());
+    }
+
+    @Test
+    public void testGetJMSExpirationOnRecievedMessageWithAbsoluteExpiryTimeButNoTtlOrCreationTime() throws Exception
+    {
+        long expiration = System.currentTimeMillis();
+        _testAmqpMessage.setAbsoluteExpiryTime(expiration);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertEquals("expected JMSExpiration value not present", expiration, _testMessage.getJMSExpiration());
+    }
+
+    @Test
+    public void testGetJMSExpirationOnRecievedMessageWithCreationTimeAndTtlButNoAbsoluteExpiry() throws Exception
+    {
+        long creationTime = System.currentTimeMillis();
+        long ttl = 789L;
+        _testAmqpMessage.setCreationTime(creationTime);
+        _testAmqpMessage.setTtl(ttl);
+
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertEquals("expected JMSExpiration value not present", creationTime + ttl, _testMessage.getJMSExpiration());
+    }
+
+    @Test
+    public void testGetJMSExpirationOnRecievedMessageWithTtlButNoCreationTimeOrAbsoluteExpiry() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+        long ttl = 789L;
+        _testAmqpMessage.setTtl(ttl);
+
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertEquals("expected JMSExpiration value not present", timestamp + ttl, _testMessage.getJMSExpiration(), 3000);
+        //TODO: check calling twice gives the same value
+    }
+
+    @Test
+    public void testSetJMSExpirationToZeroOnRecievedMessageWithAbsoluteExpiryTimeSetsUnderlyingExpiryNull() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+        _testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        _testMessage.setJMSExpiration(0);
+
+        assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+        assertNull(_testAmqpMessage.getAbsoluteExpiryTime());
+    }
+
+    @Test
+    public void testSetJMSExpirationToZeroOnRecievedMessageWithTtlSetsUnderlyingTtlNull() throws Exception
+    {
+        long ttl = 789L;
+        _testAmqpMessage.setTtl(ttl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        _testMessage.setJMSExpiration(0);
+
+        assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+        assertNull(_testAmqpMessage.getTtl());
+    }
+
+    /**
+     * As we are basing getJMSExpiration either on the absolute-expiry-time or calculating it based on the ttl field,
+     * ensure that setting JMSExpiration to 0 results in getJMSExpiration returning 0 if an incoming message had both
+     * absolute-expiry-time and ttl fields set.
+     */
+    @Test
+    public void testSetJMSExpirationToZeroOnRecievedMessageWithAbsoluteExpiryAndTtlFieldsRresultsInGetJMSExpirationAsZero() throws Exception
+    {
+        long ttl = 789L;
+        long timestamp = System.currentTimeMillis();
+        _testAmqpMessage.setTtl(ttl);
+        _testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        _testMessage.setJMSExpiration(0);
+
+        assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+    }
+
     // ====== utility methods =======
 
     private void assertGetPropertyThrowsMessageFormatException(TestMessageImpl testMessage,

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Mon Dec  9 17:20:00 2013
@@ -21,10 +21,12 @@
 package org.apache.qpid.jms.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.Queue;
 
 import org.apache.qpid.jms.QpidJmsTestCase;
@@ -125,4 +127,97 @@ public class SenderImplTest extends Qpid
         //verify the timestamp was set, allowing for a 3second delta
         assertEquals(timestamp, testMessage.getJMSTimestamp(), 3000);
     }
+
+    @Test
+    public void testSenderSetsAbsoluteExpiryAndTtlFieldsOnUnderlyingMessage() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, null);
+
+        assertEquals(0, testMessage.getJMSTimestamp());
+        long timestamp = System.currentTimeMillis();
+        long ttl = 100_000;
+
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+
+        //verify the JMSExpiration is now set, allowing for a 3second delta
+        assertEquals(timestamp + ttl, testMessage.getJMSExpiration(), 3000);
+
+        //more specifically, check the creation-time, ttl, and absolute-expiry fields on the message are set
+        Long creationTime = testMessage.getUnderlyingAmqpMessage(false).getCreationTime();
+        assertNotNull(creationTime);
+        assertEquals(timestamp, creationTime, 3000);
+
+        Long underlyingTtl = testMessage.getUnderlyingAmqpMessage(false).getTtl();
+        assertNotNull(underlyingTtl);
+        assertEquals(ttl, underlyingTtl.longValue());
+
+        Long absoluteExpiryTime = testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime();
+        assertNotNull(absoluteExpiryTime);
+        assertEquals(ttl + creationTime, absoluteExpiryTime.longValue());
+    }
+
+    @Test
+    public void testSenderSetsTtlOnUnderlyingAmqpMessage() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, null);
+
+        assertEquals(0, testMessage.getJMSTimestamp());
+        long timestamp = System.currentTimeMillis();
+        long ttl = 100_000;
+
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+
+        //verify the expiration was set, allowing for a 3second delta
+        assertEquals(timestamp + ttl, testMessage.getJMSExpiration(), 3000);
+    }
+
+    @Test
+    public void testSenderClearsExistingJMSExpirationAndTtlFieldOnUnderlyingAmqpMessageWhenNotUsingTtl() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        Long oldTtl = 456L;
+        testAmqpMessage.setTtl(oldTtl);
+
+        long expiration = System.currentTimeMillis();
+        testAmqpMessage.setAbsoluteExpiryTime(expiration);
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, _mockConnection, null);
+
+        //verify JMSExpiration is non-zero
+        assertEquals(expiration, testMessage.getJMSExpiration());
+
+        //send the message without any TTL
+        senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+        //verify the expiration was cleared, along with the underlying amqp message fields
+        assertEquals(0, testMessage.getJMSExpiration());
+        assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
+        assertNull(testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org