You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/02/07 17:46:38 UTC
svn commit: r1565718 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/AmqpMessage.java
main/java/org/apache/qpid/jms/impl/MessageIdHelper.java
test/java/org/apache/qpid/jms/MessageIntegrationTest.java
Author: robbie
Date: Fri Feb 7 16:46:38 2014
New Revision: 1565718
URL: http://svn.apache.org/r1565718
Log:
QPIDJMS-9: consolidate the correlationId and messageId setters/getters, tweak id conversion for ulong
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Fri Feb 7 16:46:38 2014
@@ -45,6 +45,11 @@ import org.apache.qpid.proton.message.Me
public abstract class AmqpMessage
{
static final short DEFAULT_PRIORITY = 4;
+ private static enum IdType
+ {
+ MESSAGE_ID,
+ CORRELATION_ID
+ }
private final Delivery _delivery;
private final Message _message;
@@ -441,69 +446,18 @@ public abstract class AmqpMessage
*/
public Object getMessageId()
{
- Object underlyingMessageId = _message.getMessageId();
-
- if(underlyingMessageId instanceof Binary)
- {
- return ((Binary) underlyingMessageId).asByteBuffer();
- }
- else if(underlyingMessageId instanceof UnsignedLong)
- {
- return ((UnsignedLong) underlyingMessageId).bigIntegerValue();
- }
- else
- {
- return underlyingMessageId;
- }
+ return getUnderlyingId(IdType.MESSAGE_ID);
}
/**
- * Set a string message-id value on the message.
- */
- public void setMessageId(String messageId)
- {
- setUnderlyingMessageId(messageId);
- }
-
- /**
- * Set a uuid message-id value on the message.
- */
- public void setMessageId(UUID messageId)
- {
- setUnderlyingMessageId(messageId);
- }
-
- /**
- * Set an ulong (represented here as a BigInteger) message-id value on the message.
- *
- * @param messageId the value to set
- * @throws IllegalArgumentException if the value is not within the ulong range of [0 - 2^64)
- */
- public void setMessageId(BigInteger messageId) throws IllegalArgumentException
- {
- if(messageId.signum() == -1 || messageId.bitLength() > 64)
- {
- throw new IllegalArgumentException("Value \""+messageId+"\" lies outside the range [0 - 2^64).");
- }
-
- setUnderlyingMessageId(UnsignedLong.valueOf(messageId));
- }
-
- /**
- * Set a Binary (represented here as a ByteBuffer) message-id value on the message.
+ * Set a message-id value on the message.
*
- * @param messageId the value to set
+ * The supplied value s permitted to be null, String, UUID,
+ * Long (representing ulong), or ByteBuffer (representing binary)
*/
- public void setMessageId(ByteBuffer messageId)
- {
- Binary bin = Binary.create(messageId);
-
- setUnderlyingMessageId(bin);
- }
-
- private void setUnderlyingMessageId(Object messageId)
+ public void setMessageId(final Object messageId)
{
- _message.setMessageId(messageId);
+ setUnderlyingId(messageId, IdType.MESSAGE_ID);
}
/**
@@ -516,98 +470,89 @@ public abstract class AmqpMessage
*/
public Object getCorrelationId()
{
- Object underlyingCorrelationId = _message.getCorrelationId();
-
- if(underlyingCorrelationId instanceof Binary)
- {
- return ((Binary) underlyingCorrelationId).asByteBuffer();
- }
- else if(underlyingCorrelationId instanceof UnsignedLong)
- {
- return ((UnsignedLong) underlyingCorrelationId).bigIntegerValue();
- }
- else
- {
- return underlyingCorrelationId;
- }
+ return getUnderlyingId(IdType.CORRELATION_ID);
}
- //TODO: temp hack, consolidate with the other methods.
- public void setCorrelationId(Object correlationId)
+ private Object getUnderlyingId(final IdType idType)
{
- if(correlationId == null)
- {
- setUnderlyingCorrelationId(correlationId);
- }
- else if(correlationId instanceof UUID)
- {
- setCorrelationId((UUID)correlationId);
- }
- else if(correlationId instanceof BigInteger)
+ Object underlyingId = null;
+ switch(idType)
{
- setCorrelationId((BigInteger)correlationId);
+ case MESSAGE_ID:
+ underlyingId = _message.getMessageId();
+ break;
+ case CORRELATION_ID:
+ underlyingId = _message.getCorrelationId();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported id type: " + idType);
}
- else if(correlationId instanceof String)
+
+ if(underlyingId instanceof Binary)
{
- setCorrelationId((String)correlationId);
+ return ((Binary) underlyingId).asByteBuffer();
}
- else if(correlationId instanceof ByteBuffer)
+ else if(underlyingId instanceof UnsignedLong)
{
- setCorrelationId((ByteBuffer)correlationId);
+ return ((UnsignedLong) underlyingId).bigIntegerValue();
}
else
{
- throw new IllegalArgumentException("Provided value is not a legal type");
+ return underlyingId;
}
}
/**
- * Set a string correlation-id value on the message.
- */
- public void setCorrelationId(String correlationId)
- {
- setUnderlyingCorrelationId(correlationId);
- }
-
- /**
- * Set a uuid correlation-id value on the message.
+ * Set a correlation-id value on the message.
+ *
+ * The supplied value s permitted to be null, String, UUID,
+ * Long (representing ulong), or ByteBuffer (representing binary)
*/
- public void setCorrelationId(UUID correlationId)
+ public void setCorrelationId(final Object correlationId)
{
- setUnderlyingCorrelationId(correlationId);
+ setUnderlyingId(correlationId, IdType.CORRELATION_ID);
}
- /**
- * Set an ulong (represented here as a BigInteger) correlation-id value on the message.
- *
- * @param correlationId the value to set
- * @throws IllegalArgumentException if the value is not within the ulong range of [0 - 2^64)
- */
- public void setCorrelationId(BigInteger correlationId) throws IllegalArgumentException
+ private void setUnderlyingId(final Object id, final IdType idType)
{
- if(correlationId.signum() == -1 || correlationId.bitLength() > 64)
+ Object underlyingId = null;
+ if(id instanceof String || id instanceof UUID || id == null )
{
- throw new IllegalArgumentException("Value \""+correlationId+"\" lies outside the range [0 - 2^64).");
+ underlyingId = id;
}
+ else if(id instanceof BigInteger)
+ {
+ BigInteger bigIntCorrelationId = (BigInteger) id;
+ if(bigIntCorrelationId.signum() == -1 || bigIntCorrelationId.bitLength() > 64)
+ {
+ throw new IllegalArgumentException("Value \""+bigIntCorrelationId+"\" lies outside the range [0 - 2^64).");
+ }
- setUnderlyingCorrelationId(UnsignedLong.valueOf(correlationId));
- }
-
- /**
- * Set a Binary (represented here as a ByteBuffer) correlation-id value on the message.
- *
- * @param correlationId the value to set
- */
- public void setCorrelationId(ByteBuffer correlationId)
- {
- Binary bin = Binary.create(correlationId);
+ underlyingId = UnsignedLong.valueOf(bigIntCorrelationId);
+ }
+ else if(id instanceof ByteBuffer)
+ {
+ Binary bin = Binary.create((ByteBuffer) id);
- setUnderlyingCorrelationId(bin);
- }
+ underlyingId = bin;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Provided value is not of an allowed type:"
+ + id.getClass().getName());
+ }
- private void setUnderlyingCorrelationId(Object correlationId)
- {
- _message.setCorrelationId(correlationId);
+ switch(idType)
+ {
+ case MESSAGE_ID:
+ _message.setMessageId(underlyingId);
+ break;
+ case CORRELATION_ID:
+ _message.setCorrelationId(underlyingId);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported id type: " + idType);
+ }
}
//===== Application Properties ======
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java Fri Feb 7 16:46:38 2014
@@ -138,9 +138,8 @@ public class MessageIdHelper
{
return AMQP_UUID_PREFIX + messageId.toString();
}
- else if(messageId instanceof Number)
+ else if(messageId instanceof BigInteger || messageId instanceof Long)
{
- //TODO: use Byte/Short/Integer/Long/BigInteger check instead?
return AMQP_LONG_PREFIX + messageId.toString();
}
else if(messageId instanceof ByteBuffer)
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Fri Feb 7 16:46:38 2014
@@ -480,7 +480,7 @@ public class MessageIntegrationTest exte
}
/**
- * Tests that receiving a message with a UUID typed message-id results in returning the
+ * Tests that receiving a message with a ulong typed message-id results in returning the
* expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UUID.tostring()
*/
@Test
@@ -489,7 +489,7 @@ public class MessageIntegrationTest exte
receivedMessageWithMessageIdTestImpl(BigInteger.valueOf(123456789L));
}
- private void receivedMessageWithMessageIdTestImpl(Object messageId) throws Exception
+ private void receivedMessageWithMessageIdTestImpl(Object messageIdForAmqpMessageClass) throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
{
@@ -501,7 +501,7 @@ public class MessageIntegrationTest exte
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
- Object underlyingAmqpMessageId = messageId;
+ Object underlyingAmqpMessageId = messageIdForAmqpMessageClass;
if(underlyingAmqpMessageId instanceof BigInteger)
{
//Proton uses UnsignedLong
@@ -521,7 +521,7 @@ public class MessageIntegrationTest exte
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
- String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId);
+ String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(messageIdForAmqpMessageClass);
assertEquals("ID:" + expectedBaseIdString, receivedMessage.getJMSMessageID());
}
@@ -568,7 +568,7 @@ public class MessageIntegrationTest exte
receivedMessageWithCorrelationIdTestImpl(BigInteger.valueOf(123456789L), false);
}
- private void receivedMessageWithCorrelationIdTestImpl(Object correlationId, boolean appSpecific) throws Exception
+ private void receivedMessageWithCorrelationIdTestImpl(Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
{
@@ -580,7 +580,7 @@ public class MessageIntegrationTest exte
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
- Object underlyingAmqpCorrelationId = correlationId;
+ Object underlyingAmqpCorrelationId = correlationIdForAmqpMessageClass;
if(underlyingAmqpCorrelationId instanceof BigInteger)
{
//Proton uses UnsignedLong
@@ -607,7 +607,7 @@ public class MessageIntegrationTest exte
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
- String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpCorrelationId);
+ String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(correlationIdForAmqpMessageClass);
String expected = expectedBaseIdString;
if(!appSpecific)
{
@@ -703,7 +703,18 @@ public class MessageIntegrationTest exte
recieveMessageIdSendCorrelationIdTestImpl(UUID.randomUUID());
}
- public void recieveMessageIdSendCorrelationIdTestImpl(Object underlyingAmqpId) throws Exception
+ /**
+ * Tests that receiving a message with a ulong typed message-id, and then sending a message which
+ * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
+ * transmission of the expected AMQP message content.
+ */
+ @Test
+ public void testReceivedMessageWithUlongMessageIdAndSendValueAsCorrelationId() throws Exception
+ {
+ recieveMessageIdSendCorrelationIdTestImpl(BigInteger.valueOf(123456789L));
+ }
+
+ private void recieveMessageIdSendCorrelationIdTestImpl(Object idForAmqpMessageClass) throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
{
@@ -715,7 +726,7 @@ public class MessageIntegrationTest exte
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
- Object underlyingAmqpMessageId = underlyingAmqpId;
+ Object underlyingAmqpMessageId = idForAmqpMessageClass;
if(underlyingAmqpMessageId instanceof BigInteger)
{
//Proton uses UnsignedLong
@@ -735,7 +746,7 @@ public class MessageIntegrationTest exte
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
- String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId);
+ String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(idForAmqpMessageClass);
String jmsMessageID = receivedMessage.getJMSMessageID();
assertEquals("ID:" + expectedBaseIdString, jmsMessageID);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org