You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/04 18:13:00 UTC

svn commit: r535309 - in /incubator/qpid/branches/M2/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/message/ client/src/test/java/org/apache/qpid/test/unit/basic/ client/src/test/java/org/apache/qpid/test...

Author: ritchiem
Date: Fri May  4 09:12:59 2007
New Revision: 535309

URL: http://svn.apache.org/viewvc?view=rev&rev=535309
Log:
Changes to FieldTable along with corresponding PropertyValueTest to limit the Java client to only AMQP 0-8 compliant values.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri May  4 09:12:59 2007
@@ -443,7 +443,7 @@
 
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
         _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
-        _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+        _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
         _transacted = transacted;
@@ -491,7 +491,6 @@
     }
 
 
-
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
@@ -785,7 +784,7 @@
                 amqe = new AMQException("Closing session forcibly", e);
             }
             _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);            
+            closeProducersAndConsumers(amqe);
         }
     }
 
@@ -2021,7 +2020,7 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
-        // If IMMEDIATE_PREFETCH is not set then we need to start fetching          
+        // If IMMEDIATE_PREFETCH is not set then we need to start fetching
         if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java Fri May  4 09:12:59 2007
@@ -24,14 +24,17 @@
 
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import java.math.BigDecimal;
 
 public class AMQMessage
 {
     protected ContentHeaderProperties _contentHeaderProperties;
 
-    /**
-     * If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
-     */
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
     protected AMQSession _session;
 
     protected final long _deliveryTag;
@@ -48,8 +51,9 @@
     }
 
     /**
-     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user
-     * calls acknowledge()
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
      * @param s the AMQ session that delivered this message
      */
     public void setAMQSession(AMQSession s)
@@ -64,6 +68,7 @@
 
     /**
      * Get the AMQ message number assigned to this message
+     *
      * @return the message number
      */
     public long getDeliveryTag()
@@ -71,11 +76,60 @@
         return _deliveryTag;
     }
 
-    /**
-     * Invoked prior to sending the message. Allows the message to be modified if necessary before
-     * sending.
-     */
+    /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
     public void prepareForSending() throws JMSException
     {
+    }
+
+    public FieldTable getPropertyHeaders()
+    {
+        return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders();
+    }
+
+    public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException
+    {
+        getPropertyHeaders().setDecimal(propertyName, bd);
+    }
+
+    public void setIntProperty(AMQShortString propertyName, int i) throws JMSException
+    {
+        getPropertyHeaders().setInteger(propertyName, new Integer(i));
+    }
+
+    public void setLongStringProperty(AMQShortString propertyName, String value)
+    {
+        getPropertyHeaders().setString(propertyName, value);
+    }
+
+    public void setTimestampProperty(AMQShortString propertyName, long value)
+    {
+        getPropertyHeaders().setTimestamp(propertyName, value);
+    }
+
+    public void setVoidProperty(AMQShortString propertyName)
+    {
+        getPropertyHeaders().setVoid(propertyName);
+    }
+
+    //** Getters
+
+    public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException
+    {
+        return getPropertyHeaders().getDecimal(propertyName);
+    }
+
+    public int getIntegerProperty(AMQShortString propertyName) throws JMSException
+    {
+        return getPropertyHeaders().getInteger(propertyName);
+    }
+
+    public String getLongStringProperty(AMQShortString propertyName)
+    {
+        return getPropertyHeaders().getString(propertyName);
+    }
+
+    public Long getTimestampProperty(AMQShortString propertyName)
+    {
+        return getPropertyHeaders().getTimestamp(propertyName);
     }
 }

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Fri May  4 09:12:59 2007
@@ -54,6 +54,7 @@
     private Destination _destination;
     private JMSHeaderAdapter _headerAdapter;
     private BasicMessageConsumer _consumer;
+    private boolean _strictAMQP;
 
     protected AbstractJMSMessage(ByteBuffer data)
     {
@@ -68,6 +69,8 @@
         _readableMessage = (data != null);
         _changedData = (data == null);
         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+
+        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -289,85 +292,116 @@
 
     public boolean propertyExists(AMQShortString propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
-
         return getJmsHeaders().propertyExists(propertyName);
     }
 
     public boolean propertyExists(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
-
         return getJmsHeaders().propertyExists(propertyName);
     }
 
     public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getBoolean(propertyName);
     }
 
     public boolean getBooleanProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getBoolean(propertyName);
     }
 
     public byte getByteProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getByte(propertyName);
     }
 
+    public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+    {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
+        return getJmsHeaders().getBytes(propertyName);
+    }
+
     public short getShortProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getShort(propertyName);
     }
 
     public int getIntProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getInteger(propertyName);
     }
 
     public long getLongProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getLong(propertyName);
     }
 
     public float getFloatProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getFloat(propertyName);
     }
 
     public double getDoubleProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getDouble(propertyName);
     }
 
     public String getStringProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
 
         return getJmsHeaders().getString(propertyName);
     }
 
     public Object getObjectProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
-
         return getJmsHeaders().getObject(propertyName);
     }
 
@@ -378,83 +412,124 @@
 
     public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setBoolean(propertyName, b);
     }
 
     public void setBooleanProperty(String propertyName, boolean b) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setBoolean(propertyName, b);
     }
 
     public void setByteProperty(String propertyName, byte b) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setByte(propertyName, new Byte(b));
     }
 
+    public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
+    {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
+        checkWritableProperties();
+        getJmsHeaders().setBytes(propertyName, bytes);
+    }
+
     public void setShortProperty(String propertyName, short i) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setShort(propertyName, new Short(i));
     }
 
     public void setIntProperty(String propertyName, int i) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
-        getJmsHeaders().setInteger(propertyName, new Integer(i));
+        JMSHeaderAdapter.checkPropertyName(propertyName);
+        super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
     }
 
     public void setLongProperty(String propertyName, long l) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setLong(propertyName, new Long(l));
     }
 
     public void setFloatProperty(String propertyName, float f) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setFloat(propertyName, new Float(f));
     }
 
     public void setDoubleProperty(String propertyName, double v) throws JMSException
     {
+        if (_strictAMQP)
+        {
+            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+        }
+
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setDouble(propertyName, new Double(v));
     }
 
     public void setStringProperty(String propertyName, String value) throws JMSException
     {
         checkWritableProperties();
-        checkPropertyName(propertyName);
-        getJmsHeaders().setString(propertyName, value);
+        JMSHeaderAdapter.checkPropertyName(propertyName);
+        super.setLongStringProperty(new AMQShortString(propertyName), value);
     }
 
     public void setObjectProperty(String propertyName, Object object) throws JMSException
     {
         checkWritableProperties();
-        checkPropertyName(propertyName);
         getJmsHeaders().setObject(propertyName, object);
     }
 
     protected void removeProperty(AMQShortString propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
         getJmsHeaders().remove(propertyName);
     }
 
     protected void removeProperty(String propertyName) throws JMSException
     {
-        checkPropertyName(propertyName);
         getJmsHeaders().remove(propertyName);
     }
 
@@ -544,17 +619,6 @@
         getContentHeaderProperties().setHeaders(messageProperties);
     }
 
-    private void checkPropertyName(CharSequence propertyName)
-    {
-        if (propertyName == null)
-        {
-            throw new IllegalArgumentException("Property name must not be null");
-        }
-        else if (propertyName.length() == 0)
-        {
-            throw new IllegalArgumentException("Property name must not be the empty string");
-        }
-    }
 
     public JMSHeaderAdapter getJmsHeaders()
     {
@@ -625,11 +689,4 @@
         _consumer = basicMessageConsumer;
     }
 
-    public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
-    {
-        checkPropertyName(propertyName);
-
-        return getJmsHeaders().getBytes(propertyName);
-
-    }
 }

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Fri May  4 09:12:59 2007
@@ -48,6 +48,7 @@
 
     public boolean getBoolean(String string) throws JMSException
     {
+        checkPropertyName(string);
         Boolean b = getHeaders().getBoolean(string);
 
         if (b == null)
@@ -76,6 +77,7 @@
 
     public boolean getBoolean(AMQShortString string) throws JMSException
     {
+        checkPropertyName(string);
         Boolean b = getHeaders().getBoolean(string);
 
         if (b == null)
@@ -104,6 +106,7 @@
 
     public char getCharacter(String string) throws JMSException
     {
+        checkPropertyName(string);
         Character c = getHeaders().getCharacter(string);
 
         if (c == null)
@@ -130,6 +133,8 @@
 
     public byte[] getBytes(AMQShortString string) throws JMSException
     {
+        checkPropertyName(string);
+
         byte[] bs = getHeaders().getBytes(string);
 
         if (bs == null)
@@ -144,6 +149,7 @@
 
     public byte getByte(String string) throws JMSException
     {
+        checkPropertyName(string);
         Byte b = getHeaders().getByte(string);
         if (b == null)
         {
@@ -171,6 +177,7 @@
 
     public short getShort(String string) throws JMSException
     {
+        checkPropertyName(string);
         Short s = getHeaders().getShort(string);
 
         if (s == null)
@@ -183,6 +190,7 @@
 
     public int getInteger(String string) throws JMSException
     {
+        checkPropertyName(string);
         Integer i = getHeaders().getInteger(string);
 
         if (i == null)
@@ -195,6 +203,7 @@
 
     public long getLong(String string) throws JMSException
     {
+        checkPropertyName(string);
         Long l = getHeaders().getLong(string);
 
         if (l == null)
@@ -207,6 +216,7 @@
 
     public float getFloat(String string) throws JMSException
     {
+        checkPropertyName(string);
         Float f = getHeaders().getFloat(string);
 
         if (f == null)
@@ -236,6 +246,7 @@
 
     public double getDouble(String string) throws JMSException
     {
+        checkPropertyName(string);
         Double d = getHeaders().getDouble(string);
 
         if (d == null)
@@ -248,6 +259,7 @@
 
     public String getString(String string) throws JMSException
     {
+        checkPropertyName(string);
         String s = getHeaders().getString(string);
 
         if (s == null)
@@ -278,6 +290,7 @@
 
     public Object getObject(String string) throws JMSException
     {
+        checkPropertyName(string);
         return getHeaders().getObject(string);
     }
 
@@ -301,16 +314,19 @@
 
     public Object setBytes(AMQShortString string, byte[] bytes)
     {
+        checkPropertyName(string);
         return getHeaders().setBytes(string, bytes);
     }
 
     public Object setBytes(String string, byte[] bytes)
     {
+        checkPropertyName(string);
         return getHeaders().setBytes(string, bytes);
     }
 
     public Object setBytes(String string, byte[] bytes, int start, int length)
     {
+        checkPropertyName(string);
         return getHeaders().setBytes(string, bytes, start, length);
     }
 
@@ -392,6 +408,7 @@
 
     public boolean itemExists(String string) throws JMSException
     {
+        checkPropertyName(string);
         return getHeaders().containsKey(string);
     }
 
@@ -407,26 +424,31 @@
 
     public boolean propertyExists(AMQShortString propertyName)
     {
+        checkPropertyName(propertyName);
         return getHeaders().propertyExists(propertyName);
     }
 
     public boolean propertyExists(String propertyName)
     {
+        checkPropertyName(propertyName);
         return getHeaders().propertyExists(propertyName);
     }
 
     public Object put(Object key, Object value)
     {
+        checkPropertyName(key.toString());
         return getHeaders().setObject(key.toString(), value);
     }
 
     public Object remove(AMQShortString propertyName)
     {
+        checkPropertyName(propertyName);
         return getHeaders().remove(propertyName);
     }
 
     public Object remove(String propertyName)
     {
+        checkPropertyName(propertyName);
         return getHeaders().remove(propertyName);
     }
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Fri May  4 09:12:59 2007
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.math.BigDecimal;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -41,6 +42,8 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
 
 public class PropertyValueTest extends TestCase implements MessageListener
 {
@@ -53,7 +56,7 @@
     private AMQSession _session;
     private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
     private final List<String> messages = new ArrayList<String>();
-    private int _count = 100;
+    private int _count = 1;
     public String _connectionString = "vm://:1";
 
     protected void setUp() throws Exception
@@ -118,7 +121,7 @@
                 check();
                 _logger.info("Completed without failure");
 
-	        Thread.sleep(10);
+                Thread.sleep(10);
                 _connection.close();
 
                 _logger.error("End Run Number:" + (run - 1));
@@ -180,6 +183,20 @@
             m.setShortProperty("Short", (short) Short.MAX_VALUE);
             m.setStringProperty("String", "Test");
 
+            //AMQP Specific values
+
+            // Timestamp
+            long nano = System.nanoTime();
+            m.setStringProperty("time-str", String.valueOf(nano));
+            ((AMQMessage) m).setTimestampProperty(new AMQShortString("time"), nano);
+
+            //Decimal
+            BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
+            ((AMQMessage) m).setDecimalProperty(new AMQShortString("decimal"), bd.setScale(Byte.MAX_VALUE));
+
+            //Void
+            ((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
+
             _logger.debug("Sending Msg:" + m);
             producer.send(m);
         }
@@ -235,6 +252,25 @@
                                 (long) Long.MAX_VALUE, m.getLongProperty("Long"));
             Assert.assertEquals("Check String properties are correctly transported",
                                 "Test", m.getStringProperty("String"));
+
+            // AMQP Tests Specific values
+           
+            Assert.assertEquals("Check Timestamp properties are correctly transported",
+                                m.getStringProperty("time-str"),
+                                ((AMQMessage) m).getTimestampProperty(new AMQShortString("time")).toString());
+
+            //Decimal
+            BigDecimal bd = new BigDecimal(Integer.MAX_VALUE);
+
+            Assert.assertEquals("Check decimal properties are correctly transported",
+                                bd.setScale(Byte.MAX_VALUE),
+                                ((AMQMessage) m).getDecimalProperty(new AMQShortString("decimal")));
+
+            //Void
+            ((AMQMessage) m).setVoidProperty(new AMQShortString("void"));
+
+            Assert.assertTrue("Check void properties are correctly transported",
+                              ((AMQMessage) m).getPropertyHeaders().containsKey("void"));
         }
         received.clear();
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Fri May  4 09:12:59 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,12 +47,12 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        TransportConnection.createVMBroker(1);
+//        TransportConnection.createVMBroker(1);
     }
 
     protected void tearDown() throws Exception
     {
-        TransportConnection.killVMBroker(1);
+//        TransportConnection.killVMBroker(1);
     }
 
     public void testSimpleConnection()

Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri May  4 09:12:59 2007
@@ -22,10 +22,11 @@
 
 import org.apache.mina.common.ByteBuffer;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
 public enum AMQType
 {
-
-
     //AMQP FieldTable Wire Types
 
     LONG_STRING('S')
@@ -113,55 +114,75 @@
 
         public int getEncodingSize(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            return EncodingUtils.encodedByteLength()+ EncodingUtils.encodedIntegerLength();
         }
 
         public Object toNativeValue(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            if(value instanceof BigDecimal)
+            {
+                return (BigDecimal) value;
+            }
+            else
+            {
+                throw new NumberFormatException("Cannot convert: " + value + "(" +
+                                                value.getClass().getName() + ") to BigDecimal.");
+            }
         }
 
         public void writeValueImpl(Object value, ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            BigDecimal bd = (BigDecimal) value;
+
+            byte places = new Integer(bd.scale()).byteValue();
+
+            int unscaled = bd.intValue();
+
+            EncodingUtils.writeByte(buffer, places);
+
+            EncodingUtils.writeInteger(buffer, unscaled);
         }
 
         public Object readValueFromBuffer(ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            byte places = EncodingUtils.readByte(buffer);
+
+            int unscaled = EncodingUtils.readInteger(buffer);
+
+            BigDecimal bd = new BigDecimal(unscaled);
+            return bd.setScale(places);            
         }
     },
 
     TIMESTAMP('T')
     {
-
         public int getEncodingSize(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            return EncodingUtils.encodedLongLength();
         }
 
-
         public Object toNativeValue(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            if(value instanceof Long)
+            {
+                return (Long) value;
+            }
+            else
+            {
+                throw new NumberFormatException("Cannot convert: " + value + "(" +
+                                                value.getClass().getName() + ") to timestamp.");
+            }
         }
 
         public void writeValueImpl(Object value, ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            EncodingUtils.writeLong(buffer, (Long) value);
         }
 
+
         public Object readValueFromBuffer(ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            return EncodingUtils.readLong(buffer);
         }
     },
 
@@ -173,7 +194,6 @@
             throw new UnsupportedOperationException();
         }
 
-
         public Object toNativeValue(Object value)
         {
             // TODO : fixme
@@ -250,7 +270,7 @@
 
         public void writeValueImpl(Object value, ByteBuffer buffer)
         {
-            EncodingUtils.writeLongstr(buffer, (byte[]) value);            
+            EncodingUtils.writeLongstr(buffer, (byte[]) value);
         }
 
         public Object readValueFromBuffer(ByteBuffer buffer)

Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri May  4 09:12:59 2007
@@ -27,6 +27,7 @@
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.math.BigDecimal;
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
@@ -56,26 +57,28 @@
      *
      * @param buffer the buffer from which to read data. The length byte must be read already
      * @param length the length of the field table. Must be > 0.
+     *
      * @throws AMQFrameDecodingException if there is an error decoding the table
      */
     public FieldTable(ByteBuffer buffer, long length) throws AMQFrameDecodingException
     {
         this();
         _encodedForm = buffer.slice();
-        _encodedForm.limit((int)length);
+        _encodedForm.limit((int) length);
         _encodedSize = length;
-        buffer.skip((int)length);
+        buffer.skip((int) length);
     }
 
 
-
     private AMQTypedValue getProperty(AMQShortString string)
     {
-        synchronized(this)
+        checkPropertyName(string);
+
+        synchronized (this)
         {
-            if(_properties == null)
+            if (_properties == null)
             {
-                if(_encodedForm == null)
+                if (_encodedForm == null)
                 {
                     return null;
                 }
@@ -86,7 +89,7 @@
             }
         }
 
-        if(_properties == null)
+        if (_properties == null)
         {
             return null;
         }
@@ -112,17 +115,18 @@
 
     private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
     {
+        checkPropertyName(key);
         initMapIfNecessary();
-        if(_properties.containsKey(key))
+        if (_properties.containsKey(key))
         {
             _encodedForm = null;
 
-            if(val == null)
+            if (val == null)
             {
                 return removeKey(key);
             }
         }
-        else if(_encodedForm != null && val != null)
+        else if (_encodedForm != null && val != null)
         {
             EncodingUtils.writeShortStringBytes(_encodedForm, key);
             val.writeToBuffer(_encodedForm);
@@ -134,9 +138,8 @@
         }
 
 
-
-        AMQTypedValue oldVal = _properties.put(key,val);
-        if(oldVal != null)
+        AMQTypedValue oldVal = _properties.put(key, val);
+        if (oldVal != null)
         {
             _encodedSize -= oldVal.getEncodingSize();
         }
@@ -151,13 +154,13 @@
 
     private void initMapIfNecessary()
     {
-        synchronized(this)
+        synchronized (this)
         {
-            if(_properties == null)
+            if (_properties == null)
             {
-                if(_encodedForm == null  || _encodedSize == 0)
+                if (_encodedForm == null || _encodedSize == 0)
                 {
-                    _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>();
+                    _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>();
                 }
                 else
                 {
@@ -365,7 +368,7 @@
     public Object getObject(AMQShortString string)
     {
         AMQTypedValue value = getProperty(string);
-        if(value != null)
+        if (value != null)
         {
             return value.getValue();
         }
@@ -376,6 +379,33 @@
 
     }
 
+    public Long getTimestamp(AMQShortString name)
+    {
+        AMQTypedValue value = getProperty(name);
+        if ((value != null) && ((value.getType() == AMQType.TIMESTAMP)))
+        {
+            return (Long) value.getValue();
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public BigDecimal getDecimal(AMQShortString propertyName)
+    {
+        AMQTypedValue value = getProperty(propertyName);
+        if ((value != null) && ((value.getType() == AMQType.DECIMAL)))
+        {
+            return (BigDecimal) value.getValue();
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+
     // ************  Setters
     public Object setBoolean(String string, boolean b)
     {
@@ -384,18 +414,16 @@
 
     public Object setBoolean(AMQShortString string, boolean b)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.BOOLEAN.asTypedValue(b));
     }
 
     public Object setByte(String string, byte b)
     {
         return setByte(new AMQShortString(string), b);
-    }    
+    }
 
     public Object setByte(AMQShortString string, byte b)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.BYTE.asTypedValue(b));
     }
 
@@ -406,7 +434,6 @@
 
     public Object setShort(AMQShortString string, short i)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.SHORT.asTypedValue(i));
     }
 
@@ -418,7 +445,6 @@
 
     public Object setInteger(AMQShortString string, int i)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.INT.asTypedValue(i));
     }
 
@@ -430,11 +456,9 @@
 
     public Object setLong(AMQShortString string, long l)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.LONG.asTypedValue(l));
     }
 
-
     public Object setFloat(String string, float f)
     {
         return setFloat(new AMQShortString(string), f);
@@ -442,7 +466,6 @@
 
     public Object setFloat(AMQShortString string, float v)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.FLOAT.asTypedValue(v));
     }
 
@@ -451,14 +474,11 @@
         return setDouble(new AMQShortString(string), d);
     }
 
-
     public Object setDouble(AMQShortString string, double v)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.DOUBLE.asTypedValue(v));
     }
 
-
     public Object setString(String string, String s)
     {
         return setString(new AMQShortString(string), s);
@@ -466,7 +486,6 @@
 
     public Object setAsciiString(AMQShortString string, String value)
     {
-        checkPropertyName(string);
         if (value == null)
         {
             return setProperty(string, AMQType.VOID.asTypedValue(null));
@@ -479,7 +498,6 @@
 
     public Object setString(AMQShortString string, String value)
     {
-        checkPropertyName(string);
         if (value == null)
         {
             return setProperty(string, AMQType.VOID.asTypedValue(null));
@@ -490,20 +508,16 @@
         }
     }
 
-
     public Object setChar(String string, char c)
     {
         return setChar(new AMQShortString(string), c);
     }
 
-
     public Object setChar(AMQShortString string, char c)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
     }
 
-
     public Object setBytes(String string, byte[] b)
     {
         return setBytes(new AMQShortString(string), b);
@@ -511,20 +525,18 @@
 
     public Object setBytes(AMQShortString string, byte[] bytes)
     {
-        checkPropertyName(string);
         return setProperty(string, AMQType.BINARY.asTypedValue(bytes));
     }
 
     public Object setBytes(String string, byte[] bytes, int start, int length)
     {
-        return setBytes(new AMQShortString(string), bytes,start,length);
+        return setBytes(new AMQShortString(string), bytes, start, length);
     }
 
     public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
     {
-        checkPropertyName(string);
         byte[] newBytes = new byte[length];
-        System.arraycopy(bytes,start,newBytes,0,length);
+        System.arraycopy(bytes, start, newBytes, 0, length);
         return setBytes(string, bytes);
     }
 
@@ -533,6 +545,21 @@
         return setObject(new AMQShortString(string), o);
     }
 
+    public Object setTimestamp(AMQShortString string, long datetime)
+    {
+        return setProperty(string, AMQType.TIMESTAMP.asTypedValue(datetime));
+    }
+
+    public Object setDecimal(AMQShortString string, BigDecimal decimal)
+    {
+        return setProperty(string, AMQType.DECIMAL.asTypedValue(decimal));
+    }
+
+    public Object setVoid(AMQShortString string)
+    {
+        return setProperty(string, AMQType.VOID.asTypedValue(null));
+    }
+
     public Object setObject(AMQShortString string, Object object)
     {
         if (object instanceof Boolean)
@@ -579,7 +606,6 @@
         throw new AMQPInvalidClassException("Only Primatives objects allowed Object is:" + object.getClass());
     }
 
-
     public boolean isNullStringValue(String name)
     {
         AMQTypedValue value = getProperty(new AMQShortString(name));
@@ -603,10 +629,11 @@
         return itemExists(propertyName);
     }
 
-    public boolean itemExists(AMQShortString string)
+    public boolean itemExists(AMQShortString propertyName)
     {
+        checkPropertyName(propertyName);
         initMapIfNecessary();
-        return _properties.containsKey(string);
+        return _properties.containsKey(propertyName);
     }
 
     public boolean itemExists(String string)
@@ -620,15 +647,13 @@
         return _properties.toString();
     }
 
-
-
     private void checkPropertyName(AMQShortString propertyName)
     {
         if (propertyName == null)
         {
             throw new IllegalArgumentException("Property name must not be null");
         }
-        else if (propertyName.length()==0)
+        else if (propertyName.length() == 0)
         {
             throw new IllegalArgumentException("Property name must not be the empty string");
         }
@@ -636,7 +661,6 @@
         checkIdentiferFormat(propertyName);
     }
 
-
     protected static void checkIdentiferFormat(AMQShortString propertyName)
     {
 //        AMQP Spec: 4.2.5.5 Field Tables
@@ -649,7 +673,6 @@
 //             503 (syntax error). Conformance test: amq_wlp_table_01.
 //           * A peer MUST handle duplicate fields by using only the first instance.
 
-
         // AMQP length limit
         if (propertyName.length() > 128)
         {
@@ -666,7 +689,6 @@
         }
     }
 
- 
     // *************************  Byte Buffer Processing
 
     public void writeToBuffer(ByteBuffer buffer)
@@ -707,9 +729,9 @@
     {
 
         int encodedSize = 0;
-        if(_properties != null)
+        if (_properties != null)
         {
-            for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
+            for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet())
             {
                 encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
                 encodedSize++; // the byte for the encoding Type
@@ -732,18 +754,19 @@
     public static interface FieldTableElementProcessor
     {
         public boolean processElement(String propertyName, AMQTypedValue value);
+
         public Object getResult();
     }
 
     public Object processOverElements(FieldTableElementProcessor processor)
     {
         initMapIfNecessary();
-        if(_properties != null)
+        if (_properties != null)
         {
-            for(Map.Entry<AMQShortString,AMQTypedValue> e : _properties.entrySet())
+            for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet())
             {
                 boolean result = processor.processElement(e.getKey().toString(), e.getValue());
-                if(!result)
+                if (!result)
                 {
                     break;
                 }
@@ -764,7 +787,7 @@
 
     public boolean isEmpty()
     {
-        return size() ==0;
+        return size() == 0;
     }
 
     public boolean containsKey(AMQShortString key)
@@ -782,7 +805,7 @@
     {
         initMapIfNecessary();
         Set<String> keys = new LinkedHashSet<String>();
-        for(AMQShortString key : _properties.keySet())
+        for (AMQShortString key : _properties.keySet())
         {
             keys.add(key.toString());
         }
@@ -797,7 +820,6 @@
     }
 
 
-
     public Object put(AMQShortString key, Object value)
     {
         return setObject(key, value);
@@ -824,7 +846,7 @@
         initMapIfNecessary();
         _encodedForm = null;
         AMQTypedValue value = _properties.remove(key);
-        if(value == null)
+        if (value == null)
         {
             return null;
         }
@@ -839,11 +861,10 @@
     }
 
 
-
     public void clear()
     {
         initMapIfNecessary();
-        _encodedForm = null;        
+        _encodedForm = null;
         _properties.clear();
         _encodedSize = 0;
     }
@@ -857,19 +878,19 @@
     private void putDataInBuffer(ByteBuffer buffer)
     {
 
-        if(_encodedForm != null)
+        if (_encodedForm != null)
         {
 
-            if(_encodedForm.position() != 0)
+            if (_encodedForm.position() != 0)
             {
                 _encodedForm.flip();
             }
 //            _encodedForm.limit((int)getEncodedSize());
             buffer.put(_encodedForm);
         }
-        else if(_properties != null)
+        else if (_properties != null)
         {
-            final Iterator<Map.Entry<AMQShortString,AMQTypedValue>> it = _properties.entrySet().iterator();
+            final Iterator<Map.Entry<AMQShortString, AMQTypedValue>> it = _properties.entrySet().iterator();
 
             //If there are values then write out the encoded Size... could check _encodedSize != 0
             // write out the total length, which we have kept up to date as data is added
@@ -877,7 +898,7 @@
 
             while (it.hasNext())
             {
-                final Map.Entry<AMQShortString,AMQTypedValue> me = it.next();
+                final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
                 try
                 {
                     if (_logger.isTraceEnabled())
@@ -889,8 +910,6 @@
                                       " Remaining:" + buffer.remaining());
                     }
 
-
-
                     //Write the actual parameter name
                     EncodingUtils.writeShortStringBytes(buffer, me.getKey());
                     me.getValue().writeToBuffer(buffer);
@@ -917,12 +936,12 @@
     {
 
         final boolean trace = _logger.isTraceEnabled();
-        if(length > 0)
+        if (length > 0)
         {
 
-            final int expectedRemaining = buffer.remaining()-(int)length;
+            final int expectedRemaining = buffer.remaining() - (int) length;
 
-            _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
+            _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
 
             do
             {
@@ -936,11 +955,9 @@
                 }
 
 
-
-                _properties.put(key,value);
+                _properties.put(key, value);
 
 
-            
             }
             while (buffer.remaining() > expectedRemaining);
 
@@ -962,15 +979,15 @@
 
     public boolean equals(Object o)
     {
-        if(o == this)
+        if (o == this)
         {
             return true;
         }
-        if(o == null)
+        if (o == null)
         {
             return false;
         }
-        if(!(o instanceof FieldTable))
+        if (!(o instanceof FieldTable))
         {
             return false;
         }

Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri May  4 09:12:59 2007
@@ -37,6 +37,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
 import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.AMQMessage;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.MessageProducer;
@@ -374,7 +375,7 @@
      * ping producers on the same JVM.
      */
     private static Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
     /** A convenient formatter to use when time stamping output. */
     protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -549,13 +550,13 @@
      * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
      * to be started to bounce the pings back again.
      *
-     * @param  args The command line arguments.
+     * @param args The command line arguments.
      */
     public static void main(String[] args)
     {
         try
         {
-            Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+            Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}));
 
             // Create a ping producer overriding its defaults with all options passed on the command line.
             PingPongProducer pingProducer = new PingPongProducer(options);
@@ -597,7 +598,8 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            { }
+            {
+            }
         }
     }
 
@@ -648,11 +650,11 @@
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
-        boolean durable) throws JMSException, AMQException
+                                       boolean durable) throws JMSException, AMQException
     {
         log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
-            + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
-            + durable + "): called");
+                  + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+                  + durable + "): called");
 
         _pingDestinations = new ArrayList<Destination>();
 
@@ -688,8 +690,8 @@
                 else
                 {
                     destination =
-                        AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
-                            _clientID, (AMQConnection) _connection);
+                            AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+                                                        _clientID, (AMQConnection) _connection);
                     log.debug("Created durable topic " + destination);
                 }
             }
@@ -698,11 +700,11 @@
             {
                 AMQShortString destinationName = new AMQShortString(rootName + id);
                 destination =
-                    new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
-                        _isDurable);
+                        new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+                                     _isDurable);
                 ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
                 ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
-                    ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+                                                          ExchangeDefaults.DIRECT_EXCHANGE_NAME);
 
                 log.debug("Created queue " + destination);
             }
@@ -715,15 +717,15 @@
     /**
      * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
      *
-     * @param  destinations The destinations to listen to.
-     * @param  selector     A selector to filter the messages with.
+     * @param destinations The destinations to listen to.
+     * @param selector     A selector to filter the messages with.
      *
      * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
     {
         log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
-            + ", String selector = " + selector + "): called");
+                  + ", String selector = " + selector + "): called");
 
         log.debug("Creating " + destinations.size() + " reply consumers.");
 
@@ -731,8 +733,8 @@
         {
             // Create a consumer for the destination and set this pinger to listen to its messages.
             _consumer =
-                _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
-                    selector);
+                    _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+                                                    selector);
             _consumer.setMessageListener(this);
 
             log.debug("Set this to listen to replies sent to destination: " + destination);
@@ -740,8 +742,9 @@
     }
 
     /**
-     * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
-     * reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
+     * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+     * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
+     * replies map.
      *
      * @param message The received message.
      */
@@ -830,26 +833,26 @@
     }
 
     /**
-     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
-     * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
-     * correlation id.
-     *
-     * @param  message              The message to send. If this is null, one is generated.
-     * @param  numPings             The number of ping messages to send.
-     * @param  timeout              The timeout in milliseconds.
-     * @param  messageCorrelationId The message correlation id. If this is null, one is generated.
+     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+     * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+     * the correlation id.
+     *
+     * @param message              The message to send. If this is null, one is generated.
+     * @param numPings             The number of ping messages to send.
+     * @param timeout              The timeout in milliseconds.
+     * @param messageCorrelationId The message correlation id. If this is null, one is generated.
      *
-     * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
-     *         all prematurely.
+     * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
+     *         for all prematurely.
      *
      * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
      * @throws InterruptedException When interrupted by a timeout
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
-        throws JMSException, InterruptedException
+            throws JMSException, InterruptedException
     {
         log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
-            + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+                  + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
         // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
         if (messageCorrelationId == null)
@@ -929,16 +932,16 @@
     /**
      * Sends the specified number of ping messages and does not wait for correlating replies.
      *
-     * @param  message              The message to send.
-     * @param  numPings             The number of pings to send.
-     * @param  messageCorrelationId A correlation id to place on all messages sent.
+     * @param message              The message to send.
+     * @param numPings             The number of pings to send.
+     * @param messageCorrelationId A correlation id to place on all messages sent.
      *
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
     public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
     {
         log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
-            + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+                  + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
         if (message == null)
         {
@@ -1040,9 +1043,9 @@
     }
 
     /**
-     * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
-     * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
-     * which will terminate the pinger.
+     * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
+     * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
+     * terminate the pinger.
      */
     public void pingLoop()
     {
@@ -1050,7 +1053,7 @@
         {
             // Generate a sample message and time stamp it.
             Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
-            msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+            setTimestamp(msg);
 
             // Send the message and wait for a reply.
             pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
@@ -1068,7 +1071,8 @@
     }
 
     /**
-     * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
+     * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+     * here.
      *
      * @param messageListener The chained message listener.
      */
@@ -1077,9 +1081,7 @@
         _chainedMessageListener = messageListener;
     }
 
-    /**
-     * Removes any chained message listeners from this pinger.
-     */
+    /** Removes any chained message listeners from this pinger. */
     public void removeChainedMessageListener()
     {
         _chainedMessageListener = null;
@@ -1088,9 +1090,9 @@
     /**
      * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
      *
-     * @param  replyQueue  The reply-to destination for the message.
-     * @param  messageSize The desired size of the message in bytes.
-     * @param  persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+     * @param replyQueue  The reply-to destination for the message.
+     * @param messageSize The desired size of the message in bytes.
+     * @param persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
      *
      * @return A freshly generated test message.
      *
@@ -1101,23 +1103,50 @@
         ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
 
         // Timestamp the message in nanoseconds.
-        msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+        setTimestamp(msg);
 
         return msg;
     }
 
+    protected void setTimestamp(Message msg) throws JMSException
+    {
+        if (((AMQSession) _producerSession).isStrictAMQP())
+        {
+            ((AMQMessage) msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
+        }
+        else
+        {
+            msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+        }
+    }
+
+    protected long getTimestamp(Message msg) throws JMSException
+    {
+
+        if (((AMQSession) _producerSession).isStrictAMQP())
+        {
+            Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
+
+            return value == null ? 0L : value;
+        }
+        else
+        {
+            return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+        }
+    }
+
+
     /**
-     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
-     * flag has been cleared.
+     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
+     * has been cleared.
      */
     public void stop()
     {
         _publish = false;
     }
 
-    /**
-     * Implements a ping loop that repeatedly pings until the publish flag becomes false.
-     */
+    /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
     public void run()
     {
         // Keep running until the publish flag is cleared.
@@ -1128,8 +1157,8 @@
     }
 
     /**
-     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the connection,
-     * this clears the publish flag which in turn will halt the ping loop.
+     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+     * connection, this clears the publish flag which in turn will halt the ping loop.
      *
      * @param e The exception that triggered this callback method.
      */
@@ -1140,20 +1169,20 @@
     }
 
     /**
-     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
-     * the runtime system as a shutdown hook.
+     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+     * with the runtime system as a shutdown hook.
      *
      * @return A shutdown hook for the ping loop.
      */
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
-                {
-                    public void run()
-                    {
-                        stop();
-                    }
-                });
+        {
+            public void run()
+            {
+                stop();
+            }
+        });
     }
 
     /**
@@ -1202,19 +1231,18 @@
      * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
      * applied. This flag applies whether the pinger is transactional or not.
      *
-     * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit is
-     * applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the commit
-     * is applied. These flags will only apply if using a transactional pinger.
-     *
-     * @param  session The session to commit
+     * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
+     * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
+     * commit is applied. These flags will only apply if using a transactional pinger.
      *
-     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+     * @param session The session to commit
      *
      * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
      *
-     * @todo   Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
-     *         method, because commits only apply to transactional pingers, but fail after send applied to transactional and
-     *         non-transactional alike.
+     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+     * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+     * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
+     * non-transactional alike.
      */
     protected boolean commitTx(Session session) throws JMSException
     {
@@ -1335,12 +1363,12 @@
     /**
      * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
      * PingPongProducer#onMessage} method is called, the chained listener set through the {@link
-     * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of messages
-     * with that correlation id.
+     * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
+     * messages with that correlation id.
      *
-     * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be given
-     * unique message counts. It will always be called while the producer waiting for all messages to arrive is still
-     * blocked.
+     * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+     * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+     * still blocked.
      */
     public static interface ChainedMessageListener
     {
@@ -1348,8 +1376,8 @@
     }
 
     /**
-     * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be added to
-     * this: read/write lock to make onMessage more concurrent as described in class header comment.
+     * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
+     * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
      */
     protected static class PerCorrelationId
     {

Modified: incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java?view=diff&rev=535309&r1=535308&r2=535309
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java Fri May  4 09:12:59 2007
@@ -35,6 +35,9 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
 
 import uk.co.thebadgerset.junit.extensions.TimingController;
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
@@ -48,18 +51,16 @@
  * waiting until all expected replies are received.
  *
  * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
- * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency
- * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every
- * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
+ * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
+ * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
  *
- * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the individual
- * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time)
- * can be calculated in order to examine the relationship between throughput and latency.
+ * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
+ * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
+ * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
  *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><td> Responsibilities <th> Collaborations
- * <tr><td> Send many ping messages and output timings for sampled individual pings.
- * </table>
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
+ * messages and output timings for sampled individual pings. </table>
  */
 public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
 {
@@ -77,9 +78,12 @@
     /** Used to generate unique correlation ids for each test run. */
     private AtomicLong corellationIdGenerator = new AtomicLong();
 
-    /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+    /**
+     * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
+     * controler.
+     */
     private Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
     /** Holds the batched results listener, that does logging on batch boundaries. */
     private BatchedResultsListener batchedResultsListener = null;
@@ -98,9 +102,7 @@
                                               Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
     }
 
-    /**
-     * Compile all the tests into a test suite.
-     */
+    /** Compile all the tests into a test suite. */
     public static Test suite()
     {
         // Build a new test suite
@@ -133,8 +135,8 @@
     }
 
     /**
-     * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
-     * all replies have been received or a time out occurs before exiting this method.
+     * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
+     * replies have been received or a time out occurs before exiting this method.
      *
      * @param numPings The number of pings to send.
      */
@@ -169,9 +171,9 @@
 
         // Generate a sample message of the specified size.
         Message msg =
-            pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                      testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                      testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                          testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                          testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
@@ -190,9 +192,7 @@
         perCorrelationIds.remove(messageCorrelationId);
     }
 
-    /**
-     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
-     */
+    /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
     public void threadSetUp()
     {
         _logger.debug("public void threadSetUp(): called");
@@ -228,14 +228,15 @@
 
     /**
      * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
-     * be attached to the pinger, in order to receive notifications about every message received and the number remaining
-     * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test
-     * timing for the actual number of messages received in the current batch.
+     * be attached to the pinger, in order to receive notifications about every message received and the number
+     * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
+     * outputs a test timing for the actual number of messages received in the current batch.
      */
     private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
     {
         /** The test results logging batch size. */
         int _batchSize;
+        private boolean _strictAMQP;
 
         /**
          * Creates a results listener on the specified batch size.
@@ -245,6 +246,7 @@
         public BatchedResultsListener(int batchSize)
         {
             _batchSize = batchSize;
+            _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
         }
 
         /**
@@ -278,7 +280,19 @@
 
                     // Extract the send time from the message and work out from the current time, what the ping latency was.
                     // The ping producer time stamps messages in nanoseconds.
-                    long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+                    long startTime;
+
+                    if (_strictAMQP)
+                    {
+                        Long value = ((AMQMessage) message).getTimestampProperty(new AMQShortString(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME));
+
+                        startTime = (value == null ? 0L : value);
+                    }
+                    else
+                    {
+                        startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+                    }
+
                     long now = System.nanoTime();
                     long pingTime = now - startTime;
 
@@ -306,8 +320,8 @@
     }
 
     /**
-     * Holds state specific to each correlation id, needed to output test results. This consists of the count of
-     * the total expected number of messages, and the timing controller for the thread sending those message ids.
+     * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
+     * total expected number of messages, and the timing controller for the thread sending those message ids.
      */
     private static class PerCorrelationId
     {