You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/02/03 18:07:38 UTC
svn commit: r1563983 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/
test/java/org/apache/qpid/jms/impl/
Author: robbie
Date: Mon Feb 3 17:07:37 2014
New Revision: 1563983
URL: http://svn.apache.org/r1563983
Log:
QPIDJMS-9: set JMSMessageID when sending. Initial work on correlation-id handling.
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Mon Feb 3 17:07:37 2014
@@ -506,6 +506,81 @@ public abstract class AmqpMessage
_message.setMessageId(messageId);
}
+ /**
+ * Get the correlationId.
+ *
+ * If present, the returned object may be a String, UUID,
+ * ByteBuffer (representing binary), or BigInteger (representing ulong).
+ *
+ * @return the correlationId, or null if there isn't any
+ */
+ public Object getCorrelationId()
+ {
+ Object underlyingCorrelationId = _message.getCorrelationId();
+
+ if(underlyingCorrelationId instanceof Binary)
+ {
+ return ((Binary) underlyingCorrelationId).asByteBuffer();
+ }
+ else if(underlyingCorrelationId instanceof UnsignedLong)
+ {
+ return ((UnsignedLong) underlyingCorrelationId).bigIntegerValue();
+ }
+ else
+ {
+ return underlyingCorrelationId;
+ }
+ }
+
+ /**
+ * Set a string correlation-id value on the message.
+ */
+ public void setCorrelationId(String correlationId)
+ {
+ setUnderlyingCorrelationId(correlationId);
+ }
+
+ /**
+ * Set a uuid correlation-id value on the message.
+ */
+ public void setCorrelationId(UUID correlationId)
+ {
+ setUnderlyingCorrelationId(correlationId);
+ }
+
+ /**
+ * Set an ulong (represented here as a BigInteger) correlation-id value on the message.
+ *
+ * @param correlationId the value to set
+ * @throws IllegalArgumentException if the value is not within the ulong range of [0 - 2^64)
+ */
+ public void setCorrelationId(BigInteger correlationId) throws IllegalArgumentException
+ {
+ if(correlationId.signum() == -1 || correlationId.bitLength() > 64)
+ {
+ throw new IllegalArgumentException("Value \""+correlationId+"\" lies outside the range [0 - 2^64).");
+ }
+
+ setUnderlyingCorrelationId(UnsignedLong.valueOf(correlationId));
+ }
+
+ /**
+ * Set a Binary (represented here as a ByteBuffer) correlation-id value on the message.
+ *
+ * @param correlationId the value to set
+ */
+ public void setCorrelationId(ByteBuffer correlationId)
+ {
+ Binary bin = Binary.create(correlationId);
+
+ setUnderlyingCorrelationId(bin);
+ }
+
+ private void setUnderlyingCorrelationId(Object correlationId)
+ {
+ _message.setCorrelationId(correlationId);
+ }
+
//===== Application Properties ======
private void createApplicationProperties()
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java Mon Feb 3 17:07:37 2014
@@ -24,4 +24,7 @@ public class ClientProperties
{
//Message Property Names
public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
+
+ //Message Annotation Names
+ public static final String X_OPT_APP_CORRELATION_ID = "x-opt-app-correlation-id";
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon Feb 3 17:07:37 2014
@@ -249,15 +249,57 @@ public abstract class MessageImpl<T exte
@Override
public void setJMSCorrelationID(String correlationID) throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ MessageIdHelper messageIdHelper = _sessionImpl.getMessageIdHelper();
+
+ boolean appSpecific = false;
+ if(correlationID != null && !messageIdHelper.hasMessageIdPrefix(correlationID))
+ {
+ appSpecific = true;
+ }
+
+ String stripped = messageIdHelper.stripMessageIdPrefix(correlationID);
+
+ //TODO: convert the string if necessary, i.e. it indicates an encoded AMQP type
+ _amqpMessage.setCorrelationId(stripped);
+
+ if(appSpecific)
+ {
+ _amqpMessage.setMessageAnnotation(ClientProperties.X_OPT_APP_CORRELATION_ID, true);
+ }
+ else
+ {
+ _amqpMessage.clearMessageAnnotation(ClientProperties.X_OPT_APP_CORRELATION_ID);
+ }
}
@Override
public String getJMSCorrelationID() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ MessageIdHelper messageIdHelper = _sessionImpl.getMessageIdHelper();
+
+ String baseIdString = messageIdHelper.toBaseMessageIdString(_amqpMessage.getCorrelationId());
+
+ if(baseIdString == null)
+ {
+ return null;
+ }
+ else
+ {
+ boolean appSpecific = false;
+ if(_amqpMessage.messageAnnotationExists(ClientProperties.X_OPT_APP_CORRELATION_ID))
+ {
+ appSpecific = (Boolean) _amqpMessage.getMessageAnnotation(ClientProperties.X_OPT_APP_CORRELATION_ID);
+ }
+
+ if(appSpecific)
+ {
+ return baseIdString;
+ }
+ else
+ {
+ return JMS_ID_PREFIX + baseIdString;
+ }
+ }
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Mon Feb 3 17:07:37 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.jms.impl;
import static org.apache.qpid.jms.impl.ClientProperties.JMS_AMQP_TTL;
+import java.util.UUID;
+
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -51,6 +53,13 @@ public class SenderImpl extends LinkImpl
{
long timestamp = System.currentTimeMillis();
+ //TODO: Respect the disableJMSMessageId hint
+ //TODO: Bypass adding the ID: prefix for our own messages, since we remove it down the stack.
+ //set the message id
+ UUID uuid = UUID.randomUUID();
+ String messageID = MessageIdHelper.JMS_ID_PREFIX + uuid.toString();
+ message.setJMSMessageID(messageID);
+
//set the priority
message.setJMSPriority(priority);
@@ -78,6 +87,7 @@ public class SenderImpl extends LinkImpl
AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
+ //set the AMQP header ttl field if necessary
if(message.propertyExists(JMS_AMQP_TTL))
{
//Use the requested value from the property. 0 means clear TTL header.
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java Mon Feb 3 17:07:37 2014
@@ -20,7 +20,10 @@ package org.apache.qpid.jms;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import java.util.Calendar;
import java.util.Date;
@@ -33,6 +36,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.qpid.jms.impl.DestinationHelper;
+import org.apache.qpid.jms.impl.MessageIdHelper;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -281,4 +285,52 @@ public class SenderIntegrationTest exten
assertEquals(priority, message.getJMSPriority());
}
}
+
+ /**
+ * Test that upon sending a message, the sender sets the JMSMessageID on the
+ * Message object, and that the expected value is included in the AMQP
+ * message sent by the client
+ */
+ @Test
+ public void testSendingMessageSetsJMSMessageID() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+
+ assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
+
+ producer.send(message);
+
+ String jmsMessageID = message.getJMSMessageID();
+ assertNotNull("JMSMessageID should not yet be set", jmsMessageID);
+
+ //Get the value that was actually transmitted/received, compare to what we have
+ testPeer.waitForAllHandlersToComplete(1000);
+ Object receivedMessageId = propsMatcher.getReceivedMessageId();
+ MessageIdHelper mih = new MessageIdHelper();
+ assertEquals("Unexpected AMQP message-id value", mih.stripMessageIdPrefix(jmsMessageID), receivedMessageId);
+ }
+ }
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java Mon Feb 3 17:07:37 2014
@@ -565,6 +565,176 @@ public class AmqpMessageTest extends Qpi
//TODO: delete this marker comment
@Test
+ public void testGetCorrelationIdIsNullOnNewMessage()
+ {
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+
+ assertNull("Expected correlationId to be null on new message", testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that setting then getting a String as the CorrelationId returns the expected value
+ */
+ @Test
+ public void testSetGetCorrelationIdOnNewMessageWithString()
+ {
+ String testCorrelationId = "myStringCorrelationId";
+
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+ testAmqpMessage.setCorrelationId(testCorrelationId);
+
+ assertEquals("Expected correlationId not returned", testCorrelationId, testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that getting the correlationId when using an underlying received message with a
+ * String correlation id returns the expected value.
+ */
+ @Test
+ public void testGetCorrelationIdOnReceivedMessageWithString()
+ {
+ correlationIdOnReceivedMessageTestImpl("myCorrelationIdString");
+ }
+
+ /**
+ * Test that setting then getting a UUID as the correlationId returns the expected value
+ */
+ @Test
+ public void testSetGetCorrelationIdOnNewMessageWithUUID()
+ {
+ UUID testCorrelationId = UUID.randomUUID();
+
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+ testAmqpMessage.setCorrelationId(testCorrelationId);
+
+ assertEquals("Expected correlationId not returned", testCorrelationId, testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that getting the correlationId when using an underlying received message with a
+ * UUID correlation id returns the expected value.
+ */
+ @Test
+ public void testGetCorrelationIdOnReceivedMessageWithUUID()
+ {
+ correlationIdOnReceivedMessageTestImpl(UUID.randomUUID());
+ }
+
+ /**
+ * Test that setting then getting a ulong correlationId (using BigInteger) returns the expected value
+ */
+ @Test
+ public void testSetGetCorrelationIdOnNewMessageWithULong()
+ {
+ BigInteger testCorrelationId = BigInteger.valueOf(123456789);
+
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+ testAmqpMessage.setCorrelationId(testCorrelationId);
+
+ assertEquals("Expected correlationId not returned", testCorrelationId, testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that getting the correlationId when using an underlying received message with a
+ * ulong correlation id (using BigInteger) returns the expected value.
+ */
+ @Test
+ public void testGetCorrelationIdOnReceivedMessageWithLong()
+ {
+ correlationIdOnReceivedMessageTestImpl(BigInteger.valueOf(123456789L));
+ }
+
+ /**
+ * Test that attempting to set a ulong correlationId (using BigInteger) with a value
+ * outwith the allowed [0 - 2^64) range results in an IAE being thrown
+ */
+ @Test
+ public void testSetCorrelationIdOnNewMessageWithULongOurOfRangeThrowsIAE()
+ {
+ //negative value
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+ try
+ {
+ testAmqpMessage.setCorrelationId(BigInteger.valueOf(-1));
+ fail("expected exception was not thrown");
+ }
+ catch(IllegalArgumentException iae)
+ {
+ //expected
+ }
+
+ //value 1 over max
+ BigInteger aboveLimit = new BigInteger(TWO_TO_64_BYTES);
+ try
+ {
+ testAmqpMessage.setCorrelationId(aboveLimit);
+ fail("expected exception was not thrown");
+ }
+ catch(IllegalArgumentException iae)
+ {
+ //expected
+ }
+
+ //confirm subtracting 1 to get the max value then allows success
+ BigInteger onLimit = aboveLimit.subtract(BigInteger.ONE);
+ testAmqpMessage.setCorrelationId(onLimit);
+ }
+
+ /**
+ * Test that setting then getting binary as the correlationId returns the expected value
+ */
+ @Test
+ public void testSetGetCorrelationIdOnNewMessageWithBinary()
+ {
+ ByteBuffer testCorrelationId = createByteBufferForBinaryId();
+
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
+ testAmqpMessage.setCorrelationId(testCorrelationId);
+
+ assertEquals("Expected correlationId not returned", testCorrelationId, testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that getting the correlationId when using an underlying received message with a
+ * Binary message id returns the expected ByteBuffer value.
+ */
+ @Test
+ public void testGetCorrelationIdOnReceivedMessageWithBinary()
+ {
+ ByteBuffer testCorrelationId = createByteBufferForBinaryId();
+
+ correlationIdOnReceivedMessageTestImpl(testCorrelationId);
+ }
+
+ private void correlationIdOnReceivedMessageTestImpl(Object testCorrelationId)
+ {
+ Object underlyingIdObject = testCorrelationId;
+ if(testCorrelationId instanceof ByteBuffer)
+ {
+ //The proton message uses a Binary wrapper for binary ids, not a ByteBuffer
+ underlyingIdObject = Binary.create((ByteBuffer) testCorrelationId);
+ }
+ else if(testCorrelationId instanceof BigInteger)
+ {
+ //The proton message uses an UnsignedLong wrapper
+ underlyingIdObject = UnsignedLong.valueOf((BigInteger) testCorrelationId);
+ }
+
+ Message message = Proton.message();
+
+ Properties props = new Properties();
+ props.setCorrelationId(underlyingIdObject);
+ message.setProperties(props);
+
+ AmqpMessage testAmqpMessage = TestAmqpMessage.createReceivedMessage(message, _mockDelivery, _mockAmqpConnection);
+
+ assertNotNull("Expected a correlationId on received message", testAmqpMessage.getCorrelationId());
+
+ assertEquals("Incorrect correlationId value received", testCorrelationId, testAmqpMessage.getCorrelationId());
+ }
+
+ //TODO: delete this marker comment
+ @Test
public void testGetMessageIdIsNullOnNewMessage()
{
AmqpMessage testAmqpMessage = TestAmqpMessage.createNewMessage();
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Mon Feb 3 17:07:37 2014
@@ -1025,6 +1025,137 @@ public class MessageImplTest extends Qpi
assertEquals("expected JMSMessageID value not present", expectedJmsMessageId, _testMessage.getJMSMessageID());
}
+ //TODO: remove this marker comment
+ // ====== JMSCorrelationID =======
+
+ @Test
+ public void testGetJMSCorrelationIDIsNullOnNewMessage() throws Exception
+ {
+ assertNull("JMSCorrelationID should be null on new message", _testMessage.getJMSCorrelationID());
+ }
+
+ @Test
+ public void testAppSpecificCorrelationIdAnnotationDoesNotExistOnNewMessage() throws Exception
+ {
+ assertFalse("MessageAnnotation should not exist to indicate app-specific correlation-id, since there is no correlation-id",
+ _testAmqpMessage.messageAnnotationExists(ClientProperties.X_OPT_APP_CORRELATION_ID));
+ }
+
+ /**
+ * Test that {@link MessageImpl#setJMSCorrelationID(String)} accepts null and clears an existing value
+ */
+ @Test
+ public void testSetJMSCorrelationIDAcceptsNullAndClearsPreviousValue() throws Exception
+ {
+ //test setting null on fresh message is accepted
+ _testMessage.setJMSCorrelationID(null);
+ assertNull("JMSCorrelationID should still be null", _testMessage.getJMSCorrelationID());
+
+ //test that setting null clears an existing value
+ _testMessage.setJMSCorrelationID("ID:something");
+ assertNotNull("JMSCorrelationID should not be null anymore", _testMessage.getJMSCorrelationID());
+ assertNotNull("Underlying correlation id should not be null anymore", _testAmqpMessage.getCorrelationId());
+
+ _testMessage.setJMSCorrelationID(null);
+ assertNull("JMSCorrelationID should be null again", _testMessage.getJMSCorrelationID());
+ assertNull("Underlying correlation id should be null again", _testAmqpMessage.getCorrelationId());
+ }
+
+ /**
+ * Test that {@link MessageImpl#setJMSCorrelationID(String)} accepts null and clears an existing app-specific
+ * value, additionally clearing the message annotation indicating the value was app-specific
+ */
+ @Test
+ public void testSetJMSCorrelationIDAcceptsNullAndClearsPreviousAppSpecificValue() throws Exception
+ {
+ //test that setting null clears an existing value
+ _testMessage.setJMSCorrelationID("app-specific");
+ assertNotNull("JMSCorrelationID should not be null anymore", _testMessage.getJMSCorrelationID());
+ assertNotNull("Underlying correlation id should not be null anymore", _testAmqpMessage.getCorrelationId());
+
+ _testMessage.setJMSCorrelationID(null);
+ assertNull("JMSCorrelationID should be null again", _testMessage.getJMSCorrelationID());
+ assertNull("Underlying correlation id should be null again", _testAmqpMessage.getCorrelationId());
+
+ assertFalse("MessageAnnotation should not exist to indicate app-specific correlation-id, since there is no correlation-id",
+ _testAmqpMessage.messageAnnotationExists(ClientProperties.X_OPT_APP_CORRELATION_ID));
+ }
+
+ /**
+ * Test that {@link MessageImpl#setJMSCorrelationID(String)} sets the expected value
+ * on the underlying message, i.e the JMS CorrelationID minus the "ID:" prefix,
+ * and does not set the annotation to indicate the value is application-specific.
+ */
+ @Test
+ public void testSetJMSCorrelationIDSetsUnderlyingMessageWithString() throws Exception
+ {
+ String baseId = "something";
+ String jmsId = "ID:" + baseId;
+
+ _testMessage.setJMSCorrelationID(jmsId);
+
+ assertNotNull("Underlying correlation id should not be null", _testAmqpMessage.getCorrelationId());
+ assertEquals("Underlying correlation id value was not as expected", baseId, _testAmqpMessage.getCorrelationId());
+
+ assertFalse("MessageAnnotation should not exist to indicate app-specific correlation-id",
+ _testAmqpMessage.messageAnnotationExists(ClientProperties.X_OPT_APP_CORRELATION_ID));
+ }
+
+ /**
+ * Test that {@link MessageImpl#setJMSCorrelationID(String)} sets the expected value
+ * on the underlying message when provided an application-specific string, and that
+ * the it also sets the annotation to indicate the value is application-specific.
+ */
+ @Test
+ public void testSetJMSCorrelationIDSetsUnderlyingMessageWithAppSpecificString() throws Exception
+ {
+ String baseId = "app-specific";
+
+ _testMessage.setJMSCorrelationID(baseId);
+
+ assertNotNull("Underlying correlation id should not be null", _testAmqpMessage.getCorrelationId());
+ assertEquals("Underlying correlation id value was not as expected", baseId, _testAmqpMessage.getCorrelationId());
+
+ assertTrue("MessageAnnotation should not exist to indicate app-specific correlation-id",
+ _testAmqpMessage.messageAnnotationExists(ClientProperties.X_OPT_APP_CORRELATION_ID));
+ }
+
+ /**
+ * Test that receiving a message with a string typed correlation-id value results in the
+ * expected JMSCorrelationID value being returned, i.e. the base string plus the JMS "ID:" prefix.
+ */
+ @Test
+ public void testGetJMSCorrelationIDOnReceivedMessageWithString() throws Exception
+ {
+ getJMSCorrelationIDOnReceivedMessageWithStringTestImpl(false);
+ }
+
+ /**
+ * Test that receiving a message with a string typed correlation-id value results in the
+ * expected JMSCorrelationID value being returned, i.e. the base string plus the JMS "ID:" prefix.
+ */
+ @Test
+ public void testGetJMSCorrelationIDOnReceivedMessageWithStringAlreadyContainingPrefix() throws Exception
+ {
+ getJMSCorrelationIDOnReceivedMessageWithStringTestImpl(true);
+ }
+
+ private void getJMSCorrelationIDOnReceivedMessageWithStringTestImpl(boolean prefixAlreadyExists) throws JMSException
+ {
+ String baseId = "something";
+ if(prefixAlreadyExists)
+ {
+ baseId = "ID:" + baseId;
+ }
+
+ String expectedJmsCorrelationId = "ID:" + baseId;
+
+ _testAmqpMessage.setCorrelationId(baseId);
+ _testMessage = TestMessageImpl.createReceivedMessage(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertEquals("expected JMSCorrelationID value not present", expectedJmsCorrelationId, _testMessage.getJMSCorrelationID());
+ }
+
// ====== JMS_AMQP_TTL property =======
@Test
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1563983&r1=1563982&r2=1563983&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Mon Feb 3 17:07:37 2014
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import javax.jms.DeliveryMode;
import javax.jms.Message;
@@ -57,6 +58,7 @@ public class SenderImplTest extends Qpid
_mockAmqpSender = Mockito.mock(AmqpSender.class);
_mockSession = Mockito.mock(SessionImpl.class);
Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
+ Mockito.when(_mockSession.getMessageIdHelper()).thenReturn(new MessageIdHelper());
_mockQueueName = "mockQueueName";
_mockQueue = Mockito.mock(Queue.class);
@@ -128,6 +130,28 @@ public class SenderImplTest extends Qpid
}
@Test
+ public void testSenderSetsJMSMessageIDOnMessage() throws Exception
+ {
+ //Create mock sent message token, ensure that it is immediately marked as Accepted
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+ MessageImpl<?> testMessage = TestMessageImpl.createNewMessage(_mockSession, null);
+
+ assertNull("JMSMessageID should not be set yet", testMessage.getJMSMessageID());
+
+ senderImpl.send(testMessage);
+
+ String msgId = testMessage.getJMSMessageID();
+ assertNotNull("JMSMessageID should be set", msgId);
+ assertTrue("MessageId does not have the expected prefix", msgId.startsWith(MessageIdHelper.JMS_ID_PREFIX));
+ }
+
+ @Test
public void testSenderSetsJMSPriorityOnMessage() throws Exception
{
//Create mock sent message token, ensure that it is immediately marked as Accepted
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org