You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/02/07 17:46:38 UTC

svn commit: r1565718 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/AmqpMessage.java main/java/org/apache/qpid/jms/impl/MessageIdHelper.java test/java/org/apache/qpid/jms/MessageIntegrationTest.java

Author: robbie
Date: Fri Feb  7 16:46:38 2014
New Revision: 1565718

URL: http://svn.apache.org/r1565718
Log:
QPIDJMS-9: consolidate the correlationId and messageId setters/getters, tweak id conversion for ulong

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Fri Feb  7 16:46:38 2014
@@ -45,6 +45,11 @@ import org.apache.qpid.proton.message.Me
 public abstract class AmqpMessage
 {
     static final short DEFAULT_PRIORITY = 4;
+    private static enum IdType
+    {
+        MESSAGE_ID,
+        CORRELATION_ID
+    }
 
     private final Delivery _delivery;
     private final Message _message;
@@ -441,69 +446,18 @@ public abstract class AmqpMessage
      */
     public Object getMessageId()
     {
-        Object underlyingMessageId = _message.getMessageId();
-
-        if(underlyingMessageId instanceof Binary)
-        {
-            return ((Binary) underlyingMessageId).asByteBuffer();
-        }
-        else if(underlyingMessageId instanceof UnsignedLong)
-        {
-            return ((UnsignedLong) underlyingMessageId).bigIntegerValue();
-        }
-        else
-        {
-            return underlyingMessageId;
-        }
+        return getUnderlyingId(IdType.MESSAGE_ID);
     }
 
     /**
-     * Set a string message-id value on the message.
-     */
-    public void setMessageId(String messageId)
-    {
-        setUnderlyingMessageId(messageId);
-    }
-
-    /**
-     * Set a uuid message-id value on the message.
-     */
-    public void setMessageId(UUID messageId)
-    {
-        setUnderlyingMessageId(messageId);
-    }
-
-    /**
-     * Set an ulong (represented here as a BigInteger) message-id value on the message.
-     *
-     * @param messageId the value to set
-     * @throws IllegalArgumentException if the value is not within the ulong range of [0 - 2^64)
-     */
-    public void setMessageId(BigInteger messageId) throws IllegalArgumentException
-    {
-        if(messageId.signum() == -1 || messageId.bitLength() > 64)
-        {
-            throw new IllegalArgumentException("Value \""+messageId+"\" lies outside the range [0 - 2^64).");
-        }
-
-        setUnderlyingMessageId(UnsignedLong.valueOf(messageId));
-    }
-
-    /**
-     * Set a Binary (represented here as a ByteBuffer) message-id value on the message.
+     * Set a message-id value on the message.
      *
-     * @param messageId the value to set
+     * The supplied value s permitted to be null, String, UUID,
+     * Long (representing ulong), or ByteBuffer (representing binary)
      */
-    public void setMessageId(ByteBuffer messageId)
-    {
-        Binary bin = Binary.create(messageId);
-
-        setUnderlyingMessageId(bin);
-    }
-
-    private void setUnderlyingMessageId(Object messageId)
+    public void setMessageId(final Object messageId)
     {
-        _message.setMessageId(messageId);
+        setUnderlyingId(messageId, IdType.MESSAGE_ID);
     }
 
     /**
@@ -516,98 +470,89 @@ public abstract class AmqpMessage
      */
     public Object getCorrelationId()
     {
-        Object underlyingCorrelationId = _message.getCorrelationId();
-
-        if(underlyingCorrelationId instanceof Binary)
-        {
-            return ((Binary) underlyingCorrelationId).asByteBuffer();
-        }
-        else if(underlyingCorrelationId instanceof UnsignedLong)
-        {
-            return ((UnsignedLong) underlyingCorrelationId).bigIntegerValue();
-        }
-        else
-        {
-            return underlyingCorrelationId;
-        }
+        return getUnderlyingId(IdType.CORRELATION_ID);
     }
 
-    //TODO: temp hack, consolidate with the other methods.
-    public void setCorrelationId(Object correlationId)
+    private Object getUnderlyingId(final IdType idType)
     {
-        if(correlationId == null)
-        {
-            setUnderlyingCorrelationId(correlationId);
-        }
-        else if(correlationId instanceof UUID)
-        {
-            setCorrelationId((UUID)correlationId);
-        }
-        else if(correlationId instanceof BigInteger)
+        Object underlyingId = null;
+        switch(idType)
         {
-            setCorrelationId((BigInteger)correlationId);
+            case MESSAGE_ID:
+                underlyingId = _message.getMessageId();
+                break;
+            case CORRELATION_ID:
+                underlyingId = _message.getCorrelationId();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported id type: " + idType);
         }
-        else if(correlationId instanceof String)
+
+        if(underlyingId instanceof Binary)
         {
-            setCorrelationId((String)correlationId);
+            return ((Binary) underlyingId).asByteBuffer();
         }
-        else if(correlationId instanceof ByteBuffer)
+        else if(underlyingId instanceof UnsignedLong)
         {
-            setCorrelationId((ByteBuffer)correlationId);
+            return ((UnsignedLong) underlyingId).bigIntegerValue();
         }
         else
         {
-            throw new IllegalArgumentException("Provided value is not a legal type");
+            return underlyingId;
         }
     }
 
     /**
-     * Set a string correlation-id value on the message.
-     */
-    public void setCorrelationId(String correlationId)
-    {
-        setUnderlyingCorrelationId(correlationId);
-    }
-
-    /**
-     * Set a uuid correlation-id value on the message.
+     * Set a correlation-id value on the message.
+     *
+     * The supplied value s permitted to be null, String, UUID,
+     * Long (representing ulong), or ByteBuffer (representing binary)
      */
-    public void setCorrelationId(UUID correlationId)
+    public void setCorrelationId(final Object correlationId)
     {
-        setUnderlyingCorrelationId(correlationId);
+        setUnderlyingId(correlationId, IdType.CORRELATION_ID);
     }
 
-    /**
-     * Set an ulong (represented here as a BigInteger) correlation-id value on the message.
-     *
-     * @param correlationId the value to set
-     * @throws IllegalArgumentException if the value is not within the ulong range of [0 - 2^64)
-     */
-    public void setCorrelationId(BigInteger correlationId) throws IllegalArgumentException
+    private void setUnderlyingId(final Object id, final IdType idType)
     {
-        if(correlationId.signum() == -1 || correlationId.bitLength() > 64)
+        Object underlyingId = null;
+        if(id instanceof String || id instanceof UUID || id == null )
         {
-            throw new IllegalArgumentException("Value \""+correlationId+"\" lies outside the range [0 - 2^64).");
+            underlyingId = id;
         }
+        else if(id instanceof BigInteger)
+        {
+            BigInteger bigIntCorrelationId = (BigInteger) id;
+            if(bigIntCorrelationId.signum() == -1 || bigIntCorrelationId.bitLength() > 64)
+            {
+                throw new IllegalArgumentException("Value \""+bigIntCorrelationId+"\" lies outside the range [0 - 2^64).");
+            }
 
-        setUnderlyingCorrelationId(UnsignedLong.valueOf(correlationId));
-    }
-
-    /**
-     * Set a Binary (represented here as a ByteBuffer) correlation-id value on the message.
-     *
-     * @param correlationId the value to set
-     */
-    public void setCorrelationId(ByteBuffer correlationId)
-    {
-        Binary bin = Binary.create(correlationId);
+            underlyingId = UnsignedLong.valueOf(bigIntCorrelationId);
+        }
+        else if(id instanceof ByteBuffer)
+        {
+            Binary bin = Binary.create((ByteBuffer) id);
 
-        setUnderlyingCorrelationId(bin);
-    }
+            underlyingId = bin;
+        }
+        else
+        {
+            throw new IllegalArgumentException("Provided value is not of an allowed type:"
+                                                        + id.getClass().getName());
+        }
 
-    private void setUnderlyingCorrelationId(Object correlationId)
-    {
-        _message.setCorrelationId(correlationId);
+        switch(idType)
+        {
+            case MESSAGE_ID:
+                _message.setMessageId(underlyingId);
+                break;
+            case CORRELATION_ID:
+                _message.setCorrelationId(underlyingId);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported id type: " + idType);
+        }
     }
 
     //===== Application Properties ======

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageIdHelper.java Fri Feb  7 16:46:38 2014
@@ -138,9 +138,8 @@ public class MessageIdHelper
         {
             return AMQP_UUID_PREFIX + messageId.toString();
         }
-        else if(messageId instanceof Number)
+        else if(messageId instanceof BigInteger || messageId instanceof Long)
         {
-            //TODO: use Byte/Short/Integer/Long/BigInteger check instead?
             return AMQP_LONG_PREFIX + messageId.toString();
         }
         else if(messageId instanceof ByteBuffer)

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1565718&r1=1565717&r2=1565718&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Fri Feb  7 16:46:38 2014
@@ -480,7 +480,7 @@ public class MessageIntegrationTest exte
     }
 
     /**
-     * Tests that receiving a message with a UUID typed message-id results in returning the
+     * Tests that receiving a message with a ulong typed message-id results in returning the
      * expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UUID.tostring()
      */
     @Test
@@ -489,7 +489,7 @@ public class MessageIntegrationTest exte
         receivedMessageWithMessageIdTestImpl(BigInteger.valueOf(123456789L));
     }
 
-    private void receivedMessageWithMessageIdTestImpl(Object messageId) throws Exception
+    private void receivedMessageWithMessageIdTestImpl(Object messageIdForAmqpMessageClass) throws Exception
     {
         try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
         {
@@ -501,7 +501,7 @@ public class MessageIntegrationTest exte
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue("myQueue");
 
-            Object underlyingAmqpMessageId = messageId;
+            Object underlyingAmqpMessageId = messageIdForAmqpMessageClass;
             if(underlyingAmqpMessageId instanceof BigInteger)
             {
                 //Proton uses UnsignedLong
@@ -521,7 +521,7 @@ public class MessageIntegrationTest exte
             testPeer.waitForAllHandlersToComplete(3000);
 
             assertNotNull(receivedMessage);
-            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId);
+            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(messageIdForAmqpMessageClass);
 
             assertEquals("ID:" + expectedBaseIdString, receivedMessage.getJMSMessageID());
         }
@@ -568,7 +568,7 @@ public class MessageIntegrationTest exte
         receivedMessageWithCorrelationIdTestImpl(BigInteger.valueOf(123456789L), false);
     }
 
-    private void receivedMessageWithCorrelationIdTestImpl(Object correlationId, boolean appSpecific) throws Exception
+    private void receivedMessageWithCorrelationIdTestImpl(Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception
     {
         try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
         {
@@ -580,7 +580,7 @@ public class MessageIntegrationTest exte
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue("myQueue");
 
-            Object underlyingAmqpCorrelationId = correlationId;
+            Object underlyingAmqpCorrelationId = correlationIdForAmqpMessageClass;
             if(underlyingAmqpCorrelationId instanceof BigInteger)
             {
                 //Proton uses UnsignedLong
@@ -607,7 +607,7 @@ public class MessageIntegrationTest exte
             testPeer.waitForAllHandlersToComplete(3000);
 
             assertNotNull(receivedMessage);
-            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpCorrelationId);
+            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(correlationIdForAmqpMessageClass);
             String expected = expectedBaseIdString;
             if(!appSpecific)
             {
@@ -703,7 +703,18 @@ public class MessageIntegrationTest exte
         recieveMessageIdSendCorrelationIdTestImpl(UUID.randomUUID());
     }
 
-    public void recieveMessageIdSendCorrelationIdTestImpl(Object underlyingAmqpId) throws Exception
+    /**
+     * Tests that receiving a message with a ulong typed message-id, and then sending a message which
+     * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
+     * transmission of the expected AMQP message content.
+     */
+    @Test
+    public void testReceivedMessageWithUlongMessageIdAndSendValueAsCorrelationId() throws Exception
+    {
+        recieveMessageIdSendCorrelationIdTestImpl(BigInteger.valueOf(123456789L));
+    }
+
+    private void recieveMessageIdSendCorrelationIdTestImpl(Object idForAmqpMessageClass) throws Exception
     {
         try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
         {
@@ -715,7 +726,7 @@ public class MessageIntegrationTest exte
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue("myQueue");
 
-            Object underlyingAmqpMessageId = underlyingAmqpId;
+            Object underlyingAmqpMessageId = idForAmqpMessageClass;
             if(underlyingAmqpMessageId instanceof BigInteger)
             {
                 //Proton uses UnsignedLong
@@ -735,7 +746,7 @@ public class MessageIntegrationTest exte
             testPeer.waitForAllHandlersToComplete(3000);
 
             assertNotNull(receivedMessage);
-            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId);
+            String expectedBaseIdString = new MessageIdHelper().toBaseMessageIdString(idForAmqpMessageClass);
 
             String jmsMessageID = receivedMessage.getJMSMessageID();
             assertEquals("ID:" + expectedBaseIdString, jmsMessageID);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org