You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/09 18:20:01 UTC
svn commit: r1549620 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/
test/java/org/apache/qpid/jms/impl/
Author: robbie
Date: Mon Dec 9 17:20:00 2013
New Revision: 1549620
URL: http://svn.apache.org/r1549620
Log:
QPIDJMS-9: add support for sending/receiving messages with expiry, via the absolute-expiry-time field of the properties section and the ttl field of the header section
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Mon Dec 9 17:20:00 2013
@@ -21,12 +21,14 @@
package org.apache.qpid.jms.engine;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -128,6 +130,51 @@ public abstract class AmqpMessage
return _message.isDurable();
}
+ /**
+ * @return the ttl in milliseconds, or null if none exists
+ */
+ public Long getTtl()
+ {
+ if(_message.getHeader() == null)
+ {
+ return null;
+ }
+ else
+ {
+ UnsignedInteger ttl = _message.getHeader().getTtl();
+ if(ttl == null)
+ {
+ return null;
+ }
+ else
+ {
+ return ttl.longValue();
+ }
+ }
+ }
+
+ /**
+ * @param timeInMillis the ttl time in milliseconds, or null to clear the field
+ */
+ public void setTtl(Long timeInMillis)
+ {
+ if(timeInMillis == null)
+ {
+ if(_message.getHeader() == null)
+ {
+ return;
+ }
+ else
+ {
+ _message.getHeader().setTtl(null);
+ }
+ }
+ else
+ {
+ _message.setTtl(timeInMillis);
+ }
+ }
+
//===== MessageAnnotations ======
/**
@@ -237,14 +284,43 @@ public abstract class AmqpMessage
_message.setAddress(to);
}
- public long getCreationTime()
+ public Long getCreationTime()
{
- return _message.getCreationTime();
+ if(_message.getProperties() == null)
+ {
+ return null;
+ }
+ else
+ {
+ Date date = _message.getProperties().getCreationTime();
+ if(date == null)
+ {
+ return null;
+ }
+ else
+ {
+ return date.getTime();
+ }
+ }
}
- public void setCreationTime(long timeInMillis)
+ public void setCreationTime(Long timeInMillis)
{
- _message.setCreationTime(timeInMillis);
+ if(timeInMillis == null)
+ {
+ if(_message.getProperties() == null)
+ {
+ return;
+ }
+ else
+ {
+ _message.getProperties().setCreationTime(null);
+ }
+ }
+ else
+ {
+ _message.setCreationTime(timeInMillis);
+ }
}
public String getReplyTo()
@@ -257,6 +333,51 @@ public abstract class AmqpMessage
_message.setReplyTo(replyTo);
}
+ /**
+ * @return the expiration time in milliseconds since the Unix Epoch, or null if none exists
+ */
+ public Long getAbsoluteExpiryTime()
+ {
+ if(_message.getProperties() == null)
+ {
+ return null;
+ }
+ else
+ {
+ Date date = _message.getProperties().getAbsoluteExpiryTime();
+ if(date == null)
+ {
+ return null;
+ }
+ else
+ {
+ return date.getTime();
+ }
+ }
+ }
+
+ /**
+ * @param timeInMillis the expiration time in milliseconds since the Unix Epoch, or null to clear the field
+ */
+ public void setAbsoluteExpiryTime(Long timeInMillis)
+ {
+ if(timeInMillis == null)
+ {
+ if(_message.getProperties() == null)
+ {
+ return;
+ }
+ else
+ {
+ _message.getProperties().setAbsoluteExpiryTime(null);
+ }
+ }
+ else
+ {
+ _message.setExpiryTime(timeInMillis);
+ }
+ }
+
//===== Application Properties ======
private void createApplicationProperties()
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon Dec 9 17:20:00 2013
@@ -137,13 +137,28 @@ public abstract class MessageImpl<T exte
@Override
public long getJMSTimestamp() throws JMSException
{
- return _amqpMessage.getCreationTime();
+ Long timestamp = _amqpMessage.getCreationTime();
+ if(timestamp == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return timestamp.longValue();
+ }
}
@Override
public void setJMSTimestamp(long timestamp) throws JMSException
{
- _amqpMessage.setCreationTime(timestamp);
+ if(timestamp != 0)
+ {
+ _amqpMessage.setCreationTime(timestamp);
+ }
+ else
+ {
+ _amqpMessage.setCreationTime(null);
+ }
}
@Override
@@ -283,15 +298,52 @@ public abstract class MessageImpl<T exte
@Override
public long getJMSExpiration() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ //Use absolute-expiry if present
+ Long absoluteExpiry = _amqpMessage.getAbsoluteExpiryTime();
+ if(absoluteExpiry != null)
+ {
+ return absoluteExpiry;
+ }
+
+ //derive from creation time and ttl field is present
+ Long creationTime = _amqpMessage.getCreationTime();
+ Long ttl = _amqpMessage.getTtl();
+
+ if(ttl != null)
+ {
+ if(creationTime != null)
+ {
+ return creationTime + ttl;
+ }
+ else
+ {
+ //TODO: this will give a different value each time. Use RcvTime equivalent?
+ return System.currentTimeMillis() + ttl;
+ }
+ }
+
+ //failing the above we must say there is no expiration
+ return 0;
}
@Override
public void setJMSExpiration(long expiration) throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ if(expiration != 0)
+ {
+ _amqpMessage.setAbsoluteExpiryTime(expiration);
+ }
+ else
+ {
+ _amqpMessage.setAbsoluteExpiryTime(null);
+
+ //As we are clearing JMSExpiration we must also clear the TTL field if it is
+ //set, or else it will lead to getJMSExpiration continuing to return a value
+ if(_amqpMessage.getTtl() != null)
+ {
+ _amqpMessage.setTtl(null);
+ }
+ }
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Mon Dec 9 17:20:00 2013
@@ -43,6 +43,12 @@ public class SenderImpl extends LinkImpl
private void sendMessage(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
+ //TODO
+ if(priority != Message.DEFAULT_PRIORITY)
+ {
+ throw new IllegalArgumentException("Only default priority is currently supported");
+ }
+
getConnectionImpl().lock();
try
{
@@ -60,8 +66,28 @@ public class SenderImpl extends LinkImpl
message.setJMSDeliveryMode(deliveryMode);
}
+ //set the JMSExpiration if necessary
+ if(timeToLive != Message.DEFAULT_TIME_TO_LIVE)
+ {
+ message.setJMSExpiration(timestamp + timeToLive);
+ }
+ else if(message.getJMSExpiration() != 0)
+ {
+ message.setJMSExpiration(0);
+ }
+
AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
+ //set the AMQP header TTL if necessary, otehrwise ensure it is clear
+ if(timeToLive != Message.DEFAULT_TIME_TO_LIVE)
+ {
+ amqpMessage.setTtl(timeToLive);
+ }
+ else if(amqpMessage.getTtl() != null)
+ {
+ amqpMessage.setTtl(null);
+ }
+
AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
getConnectionImpl().stateChanged();
@@ -186,8 +212,7 @@ public class SenderImpl extends LinkImpl
@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ sendMessage(message, deliveryMode, priority, timeToLive);
}
@Override
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Mon Dec 9 17:20:00 2013
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.Date;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
@@ -266,7 +268,6 @@ public class MessageIntegrationTest exte
}
}
-
/**
* Tests that the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
* indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
@@ -380,4 +381,72 @@ public class MessageIntegrationTest exte
assertNull(receivedMessage.getJMSReplyTo());
}
}
+
+ /**
+ * Tests that lack of the absolute-expiry-time and ttl fields on a message results
+ * in it returning 0 for for JMSExpiration
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithNoAbsoluteExpiryOrTtlReturnsJMSExpirationZero() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertEquals(0L, receivedMessage.getJMSExpiration());
+ }
+ }
+
+ /**
+ * Tests that setting a non-zero value in the absolute-expiry-time field on a
+ * message results in it returning this value for JMSExpiration
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithAbsoluteExpiryReturnsJMSExpirationNonZero() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ long timestamp = System.currentTimeMillis();
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(timestamp));
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertEquals(timestamp, receivedMessage.getJMSExpiration());
+ }
+ }
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java Mon Dec 9 17:20:00 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.jms.test.testpeer
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
public class SenderIntegrationTest extends QpidJmsTestCase
@@ -170,4 +171,41 @@ public class SenderIntegrationTest exten
producer.send(message);
}
}
+
+ @Test
+ public void testSendingMessageSetsJMSExpirationRelatedAbsoluteExpiryAndTtlFields() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ long currentTime = System.currentTimeMillis();
+ long ttl = 100_000;
+ Date expiration = new Date(currentTime + ttl);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(ttl)));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withAbsoluteExpiryTime(greaterThanOrEqualTo(expiration));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+ }
+ }
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java Mon Dec 9 17:20:00 2013
@@ -29,7 +29,9 @@ import java.util.Set;
import org.apache.qpid.jms.QpidJmsTestCase;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.engine.Delivery;
@@ -165,6 +167,67 @@ public class AmqpMessageTest extends Qpi
}
}
+ // ====== Header =======
+
+ @Test
+ public void testNewMessageHasNoUnderlyingHeaderSection()
+ {
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ Message underlying = testAmqpMessage.getMessage();
+ assertNull(underlying.getHeader());
+ }
+
+ @Test
+ public void testGetTtlIsNullForNewMessage()
+ {
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ assertNull(testAmqpMessage.getTtl());
+ }
+
+ @Test
+ public void testGetTtlOnRecievedMessageWithTtl()
+ {
+ Long ttl = 123L;
+
+ Message message = Proton.message();
+ Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(ttl));
+ message.setHeader(header);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+ assertEquals(ttl, testAmqpMessage.getTtl());
+ }
+
+ @Test
+ public void testSetGetTtlOnNewMessage()
+ {
+ Long ttl = 123L;
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ testAmqpMessage.setTtl(ttl);
+
+ assertEquals(ttl.longValue(), testAmqpMessage.getMessage().getHeader().getTtl().longValue());
+ assertEquals(ttl, testAmqpMessage.getTtl());
+ }
+
+ @Test
+ public void testSetTtlNullOnMessageWithExistingTtl()
+ {
+ Long ttl = 123L;
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ testAmqpMessage.setTtl(ttl);
+
+ testAmqpMessage.setTtl(null);
+
+ assertNull(testAmqpMessage.getMessage().getHeader().getTtl());
+ assertNull(testAmqpMessage.getTtl());
+ }
+
// ====== Properties =======
@Test
@@ -315,6 +378,85 @@ public class AmqpMessageTest extends Qpi
assertEquals(testReplyToAddress, testAmqpMessage.getReplyTo());
}
+ @Test
+ public void testNewMessageHasNoUnderlyingPropertiesSection()
+ {
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ Message underlying = testAmqpMessage.getMessage();
+ assertNull(underlying.getProperties());
+ }
+
+ @Test
+ public void testGetCreationTimeIsNullForNewMessage()
+ {
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ assertNull(testAmqpMessage.getCreationTime());
+ }
+
+ @Test
+ public void testSetCreationTimeOnNewMessage()
+ {
+ Long timestamp = System.currentTimeMillis();
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ testAmqpMessage.setCreationTime(timestamp);
+
+ assertEquals(timestamp.longValue(), testAmqpMessage.getMessage().getProperties().getCreationTime().getTime());
+ assertEquals(timestamp, testAmqpMessage.getCreationTime());
+ }
+
+ @Test
+ public void testSetCreationTimeNullOnMessageWithExistingCreationTime()
+ {
+ Long timestamp = System.currentTimeMillis();
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ testAmqpMessage.setCreationTime(timestamp);
+
+ testAmqpMessage.setCreationTime(null);
+
+ assertNull(testAmqpMessage.getMessage().getProperties().getCreationTime());
+ assertNull(testAmqpMessage.getCreationTime());
+ }
+
+ @Test
+ public void testGetAbsoluteExpiryTimeIsNullForNewMessage()
+ {
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ assertNull(testAmqpMessage.getAbsoluteExpiryTime());
+ }
+
+ @Test
+ public void testSetAbsoluteExpiryTimeOnNewMessage()
+ {
+ Long timestamp = System.currentTimeMillis();
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+ assertEquals(timestamp.longValue(), testAmqpMessage.getMessage().getProperties().getAbsoluteExpiryTime().getTime());
+ assertEquals(timestamp, testAmqpMessage.getAbsoluteExpiryTime());
+ }
+
+ @Test
+ public void testSetAbsoluteExpiryTimeNullOnMessageWithExistingExpiryTime()
+ {
+ Long timestamp = System.currentTimeMillis();
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+ testAmqpMessage.setAbsoluteExpiryTime(null);
+
+ assertNull(testAmqpMessage.getMessage().getProperties().getAbsoluteExpiryTime());
+ assertNull(testAmqpMessage.getAbsoluteExpiryTime());
+ }
+
// ====== Message Annotations =======
@Test
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Mon Dec 9 17:20:00 2013
@@ -681,12 +681,13 @@ public class MessageImplTest extends Qpi
@Test
public void testSetJMSTimestampOnNewMessage() throws Exception
{
- assertEquals(0, _testAmqpMessage.getCreationTime());
+ assertNull(_testAmqpMessage.getCreationTime());
long timestamp = System.currentTimeMillis();
_testMessage.setJMSTimestamp(timestamp);
- assertEquals(timestamp, _testAmqpMessage.getCreationTime());
+ assertNotNull(_testAmqpMessage.getCreationTime());
+ assertEquals(timestamp, _testAmqpMessage.getCreationTime().longValue());
}
@Test
@@ -699,6 +700,143 @@ public class MessageImplTest extends Qpi
assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
}
+ @Test
+ public void testSetJMSTimestampToZeroOnRecievedMessageWithCreationTimeSetsUnderlyingCreationTimeNull() throws Exception
+ {
+ long timestamp = System.currentTimeMillis();
+ _testAmqpMessage.setCreationTime(timestamp);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ _testMessage.setJMSTimestamp(0);
+
+ assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+ assertNull(_testAmqpMessage.getCreationTime());
+ }
+ // ====== JMSExpiration =======
+
+ @Test
+ public void testGetJMSExpirationOnNewMessage() throws Exception
+ {
+ assertEquals("expected JMSExpiration value not present", 0, _testMessage.getJMSExpiration());
+ }
+
+ @Test
+ public void testSetGetJMSExpirationOnNewMessage() throws Exception
+ {
+ long timestamp = System.currentTimeMillis();
+
+ _testMessage.setJMSExpiration(timestamp);
+ assertEquals("expected JMSExpiration value not present", timestamp, _testMessage.getJMSExpiration());
+ }
+
+ @Test
+ public void testSetJMSExpirationOnNewMessage() throws Exception
+ {
+ assertNull(_testAmqpMessage.getAbsoluteExpiryTime());
+
+ Long timestamp = System.currentTimeMillis();
+ _testMessage.setJMSExpiration(timestamp);
+
+ assertEquals(timestamp, _testAmqpMessage.getAbsoluteExpiryTime());
+ }
+
+ @Test
+ public void testGetJMSExpirationOnRecievedMessageWithAbsoluteExpiryTimeAndTtlAndCreationTime() throws Exception
+ {
+ long creationTime = System.currentTimeMillis();
+ long ttl = 789L;
+ long expiration = creationTime + ttl;
+ _testAmqpMessage.setCreationTime(creationTime);
+ _testAmqpMessage.setTtl(ttl);
+ _testAmqpMessage.setAbsoluteExpiryTime(expiration);
+
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertEquals("expected JMSExpiration value not present", expiration, _testMessage.getJMSExpiration());
+ }
+
+ @Test
+ public void testGetJMSExpirationOnRecievedMessageWithAbsoluteExpiryTimeButNoTtlOrCreationTime() throws Exception
+ {
+ long expiration = System.currentTimeMillis();
+ _testAmqpMessage.setAbsoluteExpiryTime(expiration);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertEquals("expected JMSExpiration value not present", expiration, _testMessage.getJMSExpiration());
+ }
+
+ @Test
+ public void testGetJMSExpirationOnRecievedMessageWithCreationTimeAndTtlButNoAbsoluteExpiry() throws Exception
+ {
+ long creationTime = System.currentTimeMillis();
+ long ttl = 789L;
+ _testAmqpMessage.setCreationTime(creationTime);
+ _testAmqpMessage.setTtl(ttl);
+
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertEquals("expected JMSExpiration value not present", creationTime + ttl, _testMessage.getJMSExpiration());
+ }
+
+ @Test
+ public void testGetJMSExpirationOnRecievedMessageWithTtlButNoCreationTimeOrAbsoluteExpiry() throws Exception
+ {
+ long timestamp = System.currentTimeMillis();
+ long ttl = 789L;
+ _testAmqpMessage.setTtl(ttl);
+
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertEquals("expected JMSExpiration value not present", timestamp + ttl, _testMessage.getJMSExpiration(), 3000);
+ //TODO: check calling twice gives the same value
+ }
+
+ @Test
+ public void testSetJMSExpirationToZeroOnRecievedMessageWithAbsoluteExpiryTimeSetsUnderlyingExpiryNull() throws Exception
+ {
+ long timestamp = System.currentTimeMillis();
+ _testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ _testMessage.setJMSExpiration(0);
+
+ assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+ assertNull(_testAmqpMessage.getAbsoluteExpiryTime());
+ }
+
+ @Test
+ public void testSetJMSExpirationToZeroOnRecievedMessageWithTtlSetsUnderlyingTtlNull() throws Exception
+ {
+ long ttl = 789L;
+ _testAmqpMessage.setTtl(ttl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ _testMessage.setJMSExpiration(0);
+
+ assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+ assertNull(_testAmqpMessage.getTtl());
+ }
+
+ /**
+ * As we are basing getJMSExpiration either on the absolute-expiry-time or calculating it based on the ttl field,
+ * ensure that setting JMSExpiration to 0 results in getJMSExpiration returning 0 if an incoming message had both
+ * absolute-expiry-time and ttl fields set.
+ */
+ @Test
+ public void testSetJMSExpirationToZeroOnRecievedMessageWithAbsoluteExpiryAndTtlFieldsRresultsInGetJMSExpirationAsZero() throws Exception
+ {
+ long ttl = 789L;
+ long timestamp = System.currentTimeMillis();
+ _testAmqpMessage.setTtl(ttl);
+ _testAmqpMessage.setAbsoluteExpiryTime(timestamp);
+
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ _testMessage.setJMSExpiration(0);
+
+ assertEquals("expected JMSExpiration value not present", 0L, _testMessage.getJMSExpiration());
+ }
+
// ====== utility methods =======
private void assertGetPropertyThrowsMessageFormatException(TestMessageImpl testMessage,
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1549620&r1=1549619&r2=1549620&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Mon Dec 9 17:20:00 2013
@@ -21,10 +21,12 @@
package org.apache.qpid.jms.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import javax.jms.DeliveryMode;
+import javax.jms.Message;
import javax.jms.Queue;
import org.apache.qpid.jms.QpidJmsTestCase;
@@ -125,4 +127,97 @@ public class SenderImplTest extends Qpid
//verify the timestamp was set, allowing for a 3second delta
assertEquals(timestamp, testMessage.getJMSTimestamp(), 3000);
}
+
+ @Test
+ public void testSenderSetsAbsoluteExpiryAndTtlFieldsOnUnderlyingMessage() throws Exception
+ {
+ //Create mock sent message token, ensure that it is immediately marked as Accepted
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, null);
+
+ assertEquals(0, testMessage.getJMSTimestamp());
+ long timestamp = System.currentTimeMillis();
+ long ttl = 100_000;
+
+ senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+
+ //verify the JMSExpiration is now set, allowing for a 3second delta
+ assertEquals(timestamp + ttl, testMessage.getJMSExpiration(), 3000);
+
+ //more specifically, check the creation-time, ttl, and absolute-expiry fields on the message are set
+ Long creationTime = testMessage.getUnderlyingAmqpMessage(false).getCreationTime();
+ assertNotNull(creationTime);
+ assertEquals(timestamp, creationTime, 3000);
+
+ Long underlyingTtl = testMessage.getUnderlyingAmqpMessage(false).getTtl();
+ assertNotNull(underlyingTtl);
+ assertEquals(ttl, underlyingTtl.longValue());
+
+ Long absoluteExpiryTime = testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime();
+ assertNotNull(absoluteExpiryTime);
+ assertEquals(ttl + creationTime, absoluteExpiryTime.longValue());
+ }
+
+ @Test
+ public void testSenderSetsTtlOnUnderlyingAmqpMessage() throws Exception
+ {
+ //Create mock sent message token, ensure that it is immediately marked as Accepted
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, null);
+
+ assertEquals(0, testMessage.getJMSTimestamp());
+ long timestamp = System.currentTimeMillis();
+ long ttl = 100_000;
+
+ senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+
+ //verify the expiration was set, allowing for a 3second delta
+ assertEquals(timestamp + ttl, testMessage.getJMSExpiration(), 3000);
+ }
+
+ @Test
+ public void testSenderClearsExistingJMSExpirationAndTtlFieldOnUnderlyingAmqpMessageWhenNotUsingTtl() throws Exception
+ {
+ //Create mock sent message token, ensure that it is immediately marked as Accepted
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ Long oldTtl = 456L;
+ testAmqpMessage.setTtl(oldTtl);
+
+ long expiration = System.currentTimeMillis();
+ testAmqpMessage.setAbsoluteExpiryTime(expiration);
+ TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, _mockSession, _mockConnection, null);
+
+ //verify JMSExpiration is non-zero
+ assertEquals(expiration, testMessage.getJMSExpiration());
+
+ //send the message without any TTL
+ senderImpl.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ //verify the expiration was cleared, along with the underlying amqp message fields
+ assertEquals(0, testMessage.getJMSExpiration());
+ assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
+ assertNull(testMessage.getUnderlyingAmqpMessage(false).getAbsoluteExpiryTime());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org