You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/07 13:36:38 UTC

svn commit: r592729 - in /incubator/qpid/branches/M2.1/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/message/ client/src/test/java/org/apache/qpid...

Author: ritchiem
Date: Wed Nov  7 04:36:36 2007
New Revision: 592729

URL: http://svn.apache.org/viewvc?rev=592729&view=rev
Log:
QPID-160 Addition of JMSXUserID to all messages through the java broker.
As this will cause the headers to be re-encoded it can be disabled in the config.xml. Default is enabled as the sample config.xml should have all features enabled so that testing can observe the interactions.


Modified:
    incubator/qpid/branches/M2.1/java/broker/etc/config.xml
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java

Modified: incubator/qpid/branches/M2.1/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/etc/config.xml?rev=592729&r1=592728&r2=592729&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1/java/broker/etc/config.xml Wed Nov  7 04:36:36 2007
@@ -8,9 +8,9 @@
  - to you under the Apache License, Version 2.0 (the
  - "License"); you may not use this file except in compliance
  - with the License.  You may obtain a copy of the License at
- - 
+ -
  -   http://www.apache.org/licenses/LICENSE-2.0
- - 
+ -
  - Unless required by applicable law or agreed to in writing,
  - software distributed under the License is distributed on an
  - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,6 +50,7 @@
         <enableDirectBuffers>false</enableDirectBuffers>
         <framesize>65535</framesize>
         <compressBufferOnQueue>false</compressBufferOnQueue>
+        <enableJMSXUserID>true</enableJMSXUserID>
     </advanced>
 
     <security>
@@ -84,7 +85,7 @@
         <jmx>
             <access>${conf}/jmxremote.access</access>
             <principal-database>passwordfile</principal-database>
-        </jmx>        
+        </jmx>
     </security>
 
     <virtualhosts>

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=592729&r1=592728&r2=592729&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Nov  7 04:36:36 2007
@@ -23,7 +23,9 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
@@ -43,6 +45,7 @@
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.configuration.Configurator;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -118,9 +121,17 @@
     private final AMQProtocolSession _session;
     private boolean _closing;
 
+    @Configured(path = "advanced.enableJMSXUserID",
+                defaultValue = "true")
+    public boolean ENABLE_JMSXUserID;
+
+
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
-        throws AMQException
+            throws AMQException
     {
+        //Set values from configuration
+        Configurator.configure(this);
+
         _session = session;
         _channelId = channelId;
         _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -199,7 +210,7 @@
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
-        throws AMQException
+            throws AMQException
     {
         if (_currentMessage == null)
         {
@@ -212,6 +223,16 @@
                 _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
             }
 
+            if (ENABLE_JMSXUserID)
+            {
+                //Set JMSXUserID
+                BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
+                //fixme: fudge for QPID-677
+                properties.getHeaders().keySet();
+                
+                properties.setUserId(protocolSession.getAuthorizedID().getName());
+            }
+
             _currentMessage.setContentHeaderBody(contentHeaderBody);
             _currentMessage.setExpiration();
 
@@ -245,8 +266,8 @@
             // returns true iff the message was delivered (i.e. if all data was
             // received
             if (_currentMessage.addContentBodyFrame(_storeContext,
-                        protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
-                            contentBody)))
+                                                    protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+                                                            contentBody)))
             {
                 // callback to allow the context to do any post message processing
                 // primary use is to allow message return processing in the non-tx case
@@ -303,7 +324,7 @@
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
-        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -327,18 +348,19 @@
         {
             _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
             _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-                {
+            {
 
-                    public boolean callback(UnacknowledgedMessage message) throws AMQException
-                    {
-                        _log.debug(message);
+                public boolean callback(UnacknowledgedMessage message) throws AMQException
+                {
+                    _log.debug(message);
 
-                        return true;
-                    }
+                    return true;
+                }
 
-                    public void visitComplete()
-                    { }
-                });
+                public void visitComplete()
+                {
+                }
+            });
         }
 
         AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
@@ -418,7 +440,7 @@
                 if (_log.isDebugEnabled())
                 {
                     _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
-                        + ") with a queue(" + queue + ") for " + consumerTag);
+                               + ") with a queue(" + queue + ") for " + consumerTag);
                 }
             }
         }
@@ -464,7 +486,7 @@
                 // if (_nonTransactedContext == null)
                 {
                     _nonTransactedContext =
-                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
@@ -527,7 +549,7 @@
                 // if (_nonTransactedContext == null)
                 {
                     _nonTransactedContext =
-                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
@@ -547,7 +569,7 @@
             else
             {
                 _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
-                    + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+                          + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
                 // _log.error("Requested requeue of message:" + deliveryTag +
                 // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
                 //
@@ -558,25 +580,26 @@
         else
         {
             _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
-                + _unacknowledgedMessageMap.size());
+                      + _unacknowledgedMessageMap.size());
 
             if (_log.isDebugEnabled())
             {
                 _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-                    {
-                        int count = 0;
+                {
+                    int count = 0;
 
-                        public boolean callback(UnacknowledgedMessage message) throws AMQException
-                        {
-                            _log.debug(
+                    public boolean callback(UnacknowledgedMessage message) throws AMQException
+                    {
+                        _log.debug(
                                 (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
 
-                            return false; // Continue
-                        }
+                        return false; // Continue
+                    }
 
-                        public void visitComplete()
-                        { }
-                    });
+                    public void visitComplete()
+                    {
+                    }
+                });
             }
         }
 
@@ -603,53 +626,54 @@
         // Marking messages who still have a consumer for to be resent
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        {
+            public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
-                public boolean callback(UnacknowledgedMessage message) throws AMQException
+                AMQShortString consumerTag = message.consumerTag;
+                AMQMessage msg = message.message;
+                msg.setRedelivered(true);
+                if (consumerTag != null)
                 {
-                    AMQShortString consumerTag = message.consumerTag;
-                    AMQMessage msg = message.message;
-                    msg.setRedelivered(true);
-                    if (consumerTag != null)
+                    // Consumer exists
+                    if (_consumerTag2QueueMap.containsKey(consumerTag))
                     {
-                        // Consumer exists
-                        if (_consumerTag2QueueMap.containsKey(consumerTag))
-                        {
-                            msgToResend.add(message);
-                        }
-                        else // consumer has gone
-                        {
-                            msgToRequeue.add(message);
-                        }
+                        msgToResend.add(message);
                     }
-                    else
+                    else // consumer has gone
+                    {
+                        msgToRequeue.add(message);
+                    }
+                }
+                else
+                {
+                    // Message has no consumer tag, so was "delivered" to a GET
+                    // or consumer no longer registered
+                    // cannot resend, so re-queue.
+                    if (message.queue != null)
                     {
-                        // Message has no consumer tag, so was "delivered" to a GET
-                        // or consumer no longer registered
-                        // cannot resend, so re-queue.
-                        if (message.queue != null)
+                        if (requeue)
                         {
-                            if (requeue)
-                            {
-                                msgToRequeue.add(message);
-                            }
-                            else
-                            {
-                                _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-                            }
+                            msgToRequeue.add(message);
                         }
                         else
                         {
-                            _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+                            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
                         }
                     }
-
-                    // false means continue processing
-                    return false;
+                    else
+                    {
+                        _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+                    }
                 }
 
-                public void visitComplete()
-                { }
-            });
+                // false means continue processing
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });
 
         // Process Messages to Resend
         if (_log.isDebugEnabled())
@@ -704,7 +728,7 @@
                         if (_log.isDebugEnabled())
                         {
                             _log.debug("Subscription(" + System.identityHashCode(sub)
-                                + ") closed during resend so requeuing message");
+                                       + ") closed during resend so requeuing message");
                         }
                         // move this message to requeue
                         msgToRequeue.add(message);
@@ -714,7 +738,7 @@
                         if (_log.isDebugEnabled())
                         {
                             _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
-                                + System.identityHashCode(sub));
+                                       + System.identityHashCode(sub));
                         }
 
                         sub.addToResendQueue(msg);
@@ -728,7 +752,7 @@
                 if (_log.isInfoEnabled())
                 {
                     _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
-                        + ")to prevent loss");
+                              + ")to prevent loss");
                 }
                 // move this message to requeue
                 msgToRequeue.add(message);
@@ -752,7 +776,7 @@
             if (_nonTransactedContext == null)
             {
                 _nonTransactedContext =
-                    new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
             }
 
             deliveryContext = _nonTransactedContext;
@@ -786,29 +810,30 @@
     public void queueDeleted(final AMQQueue queue) throws AMQException
     {
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        {
+            public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
-                public boolean callback(UnacknowledgedMessage message) throws AMQException
+                if (message.queue == queue)
                 {
-                    if (message.queue == queue)
+                    try
                     {
-                        try
-                        {
-                            message.discard(_storeContext);
-                            message.queue = null;
-                        }
-                        catch (AMQException e)
-                        {
-                            _log.error(
+                        message.discard(_storeContext);
+                        message.queue = null;
+                    }
+                    catch (AMQException e)
+                    {
+                        _log.error(
                                 "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
-                        }
                     }
-
-                    return false;
                 }
 
-                public void visitComplete()
-                { }
-            });
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });
     }
 
     /**
@@ -856,8 +881,8 @@
         boolean suspend;
 
         suspend =
-            ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
-            || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
+                ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+                || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
 
         setSuspended(suspend);
     }
@@ -942,7 +967,7 @@
         {
             AMQMessage message = bouncedMessage.getAMQMessage();
             session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
-                new AMQShortString(bouncedMessage.getMessage()));
+                                                             new AMQShortString(bouncedMessage.getMessage()));
 
             message.decrementReference(_storeContext);
         }
@@ -959,7 +984,7 @@
         else
         {
             boolean willSuspend =
-                ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
+                    ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
             if (!willSuspend)
             {
                 final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=592729&r1=592728&r2=592729&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Wed Nov  7 04:36:36 2007
@@ -29,9 +29,10 @@
 public enum CustomJMSXProperty
 {
     JMS_AMQP_NULL,
-    JMS_QPID_DESTTYPE,    
+    JMS_QPID_DESTTYPE,
     JMSXGroupID,
-    JMSXGroupSeq;
+    JMSXGroupSeq,
+    JMSXUserID;
 
 
     private final AMQShortString _nameAsShortString;
@@ -47,7 +48,7 @@
     }
 
     private static Enumeration _names;
-    
+
     public static synchronized Enumeration asEnumeration()
     {
         if(_names == null)
@@ -60,6 +61,6 @@
             }
             _names = Collections.enumeration(nameList);
         }
-        return _names;    
+        return _names;
     }
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=592729&r1=592728&r2=592729&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Nov  7 04:36:36 2007
@@ -73,11 +73,11 @@
         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
 
         _strictAMQP =
-            Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+                Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-        AMQShortString routingKey, ByteBuffer data) throws AMQException
+                                 AMQShortString routingKey, ByteBuffer data) throws AMQException
     {
         this(contentHeader, deliveryTag);
 
@@ -201,7 +201,7 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new IllegalArgumentException(
-                "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+                    "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
         }
 
         final AMQDestination amqd = (AMQDestination) destination;
@@ -391,12 +391,26 @@
 
     public String getStringProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+
+        if (propertyName.startsWith("JMSX"))
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+            if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+            {
+                return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+            }
+
+            return null;
         }
+        else
+        {
+            if (_strictAMQP)
+            {
+                throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            }
 
-        return getJmsHeaders().getString(propertyName);
+            return getJmsHeaders().getString(propertyName);
+        }
     }
 
     public Object getObjectProperty(String propertyName) throws JMSException

Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=592729&r1=592728&r2=592729&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Wed Nov  7 04:36:36 2007
@@ -59,6 +59,7 @@
     private final List<String> messages = new ArrayList<String>();
     private int _count = 1;
     public String _connectionString = "vm://:1";
+    private static final String USERNAME = "guest";
 
     protected void setUp() throws Exception
     {
@@ -109,7 +110,7 @@
                 _logger.error("Run Number:" + run++);
                 try
                 {
-                    init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+                    init(new AMQConnection(_connectionString, USERNAME, "guest", randomize("Client"), "test"));
                 }
                 catch (Exception e)
                 {
@@ -175,7 +176,7 @@
             _logger.trace("Message:" + m);
 
             Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
-                m.getStringProperty("TempQueue"));
+                                m.getStringProperty("TempQueue"));
 
             m.setJMSType("Test");
             m.setLongProperty("UnsignedInt", (long) 4294967295L);
@@ -210,7 +211,7 @@
             try
             {
                 ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal-bad-scale"),
-                    bd.setScale(Byte.MAX_VALUE + 1));
+                                                    bd.setScale(Byte.MAX_VALUE + 1));
                 fail("UnsupportedOperationException should be thrown as scale can't be correctly transmitted");
             }
             catch (UnsupportedOperationException uoe)
@@ -248,47 +249,51 @@
 
             Assert.assertEquals("Check Boolean properties are correctly transported", true, m.getBooleanProperty("Bool"));
             Assert.assertEquals("Check Byte properties are correctly transported", (byte) Byte.MAX_VALUE,
-                m.getByteProperty("Byte"));
+                                m.getByteProperty("Byte"));
             Assert.assertEquals("Check Double properties are correctly transported", (double) Double.MAX_VALUE,
-                m.getDoubleProperty("Double"));
+                                m.getDoubleProperty("Double"));
             Assert.assertEquals("Check Float properties are correctly transported", (float) Float.MAX_VALUE,
-                m.getFloatProperty("Float"));
+                                m.getFloatProperty("Float"));
             Assert.assertEquals("Check Int properties are correctly transported", (int) Integer.MAX_VALUE,
-                m.getIntProperty("Int"));
+                                m.getIntProperty("Int"));
             Assert.assertEquals("Check CorrelationID properties are correctly transported", "Correlation",
-                m.getJMSCorrelationID());
+                                m.getJMSCorrelationID());
             Assert.assertEquals("Check Priority properties are correctly transported", 8, m.getJMSPriority());
 
             // Queue
             Assert.assertEquals("Check ReplyTo properties are correctly transported", m.getStringProperty("TempQueue"),
-                m.getJMSReplyTo().toString());
+                                m.getJMSReplyTo().toString());
 
             Assert.assertEquals("Check Type properties are correctly transported", "Test", m.getJMSType());
 
             Assert.assertEquals("Check Short properties are correctly transported", (short) Short.MAX_VALUE,
-                m.getShortProperty("Short"));
+                                m.getShortProperty("Short"));
             Assert.assertEquals("Check UnsignedInt properties are correctly transported", (long) 4294967295L,
-                m.getLongProperty("UnsignedInt"));
+                                m.getLongProperty("UnsignedInt"));
             Assert.assertEquals("Check Long properties are correctly transported", (long) Long.MAX_VALUE,
-                m.getLongProperty("Long"));
+                                m.getLongProperty("Long"));
             Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String"));
 
             // AMQP Tests Specific values
 
             Assert.assertEquals("Check Timestamp properties are correctly transported", m.getStringProperty("time-str"),
-                ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
+                                ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
 
             // Decimal
             BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
 
             Assert.assertEquals("Check decimal properties are correctly transported", bd.setScale(Byte.MAX_VALUE),
-                ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
+                                ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
 
             // Void
             ((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
 
             Assert.assertTrue("Check void properties are correctly transported",
-                ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+                              ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
+
+            //JMSXUserID
+            Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+                                m.getStringProperty("JMSXUserID"));
         }
 
         received.clear();