You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/21 15:24:40 UTC

svn commit: r489368 - in /incubator/qpid/branches/new_persistence/java: broker/src/main/java/org/apache/qpid/server/exchange/ client/example/ client/example/src/ client/example/src/main/ client/example/src/main/java/ client/example/src/main/java/org/ c...

Author: rgreig
Date: Thu Dec 21 06:24:38 2006
New Revision: 489368

URL: http://svn.apache.org/viewvc?view=rev&rev=489368
Log:
Merge from trunk up to rev 486165

Added:
    incubator/qpid/branches/new_persistence/java/client/example/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/
    incubator/qpid/branches/new_persistence/java/client/example/pom.xml
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/pom.xml
    incubator/qpid/branches/new_persistence/java/client/example/src/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/
      - copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
    incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
      - copied unchanged from r486165, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Removed:
    incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/
Modified:
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
    incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
    incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Thu Dec 21 06:24:38 2006
@@ -206,7 +206,18 @@
         }
         if (!routed)
         {
-            _logger.warn("Exchange " + getName() + ": message not routable.");
+
+            String msg = "Exchange " + getName() + ": message not routable.";
+
+            if (payload.getPublishBody().mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn(msg);
+            }
+
         }
     }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Dec 21 06:24:38 2006
@@ -21,23 +21,20 @@
 package org.apache.qpid.client.message;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 
 import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
 import javax.jms.MessageEOFException;
-import javax.jms.MessageNotWriteableException;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.nio.charset.CharacterCodingException;
 
 /**
  * @author Apache Software Foundation
  */
 public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{    
+{
 
     /**
      * The default initial size of the buffer. The buffer expands automatically.
@@ -79,7 +76,7 @@
     {
         _data.clear();
     }
-    
+
     public String toBodyString() throws JMSException
     {
         checkReadable();
@@ -124,7 +121,7 @@
             return data;
         }
     }
-    
+
     /**
      * Check that there is at least a certain number of bytes available to read
      *
@@ -138,10 +135,4 @@
             throw new MessageEOFException("Unable to read " + len + " bytes");
         }
     }
-
-    public void reset() throws JMSException
-    {
-        super.reset();
-        _data.flip();
-    }    
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Dec 21 06:24:38 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -384,15 +384,15 @@
     }
 
     public void acknowledge() throws JMSException
-    {                	
+    {
         // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
         // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
         if (_session != null)
         {
         	if (_session.getAMQConnection().isClosed()){
         		throw new javax.jms.IllegalStateException("Connection is already closed");
-        	}       		
-        	
+        	}
+
             // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
             // received on the session
             _session.acknowledgeMessage(_deliveryTag, true);
@@ -546,7 +546,14 @@
 
     public void reset() throws JMSException
     {
-        _readableMessage = true;
+        if (_readableMessage)
+        {
+            _data.rewind();
+        }
+        else
+        {
+            _data.flip();
+            _readableMessage = true;
+        }
     }
-
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Dec 21 06:24:38 2006
@@ -33,7 +33,7 @@
  */
 public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
 {
-    public static final String MIME_TYPE="jms/stream-message";    
+    public static final String MIME_TYPE="jms/stream-message";
 
     private static final byte BOOLEAN_TYPE = (byte) 1;
 
@@ -55,6 +55,8 @@
 
     private static final byte STRING_TYPE = (byte) 10;
 
+    private static final byte NULL_STRING_TYPE = (byte) 11;
+
     /**
      * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
      * a byte array in multiple chunks, hence this is used to track how much is left to be read
@@ -89,7 +91,7 @@
         return MIME_TYPE;
     }
 
-    private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+    private byte readWireType() throws MessageFormatException, MessageEOFException,
             MessageNotReadableException
     {
         checkReadable();
@@ -105,22 +107,32 @@
 
     public boolean readBoolean() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         boolean result;
-        switch (wireType)
+        try
         {
-            case BOOLEAN_TYPE:
-                checkAvailable(1);
-                result = readBooleanImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Boolean.parseBoolean(readStringImpl());
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+            switch (wireType)
+            {
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = readBooleanImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Boolean.parseBoolean(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private boolean readBooleanImpl()
@@ -130,20 +142,30 @@
 
     public byte readByte() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         byte result;
-        switch (wireType)
+        try
+        {
+            switch (wireType)
+            {
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Byte.parseByte(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+            }
+        }
+        catch (RuntimeException e)
         {
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = readByteImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Byte.parseByte(readStringImpl());
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+            _data.position(position);
+            throw e;
         }
         return result;
     }
@@ -155,24 +177,34 @@
 
     public short readShort() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         short result;
-        switch (wireType)
+        try
         {
-            case SHORT_TYPE:
-                checkAvailable(2);
-                result = readShortImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Short.parseShort(readStringImpl());
-                break;
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = readByteImpl();
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+            switch (wireType)
+            {
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Short.parseShort(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+            }
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
         return result;
     }
@@ -190,15 +222,25 @@
      */
     public char readChar() throws JMSException
     {
-        byte wireType = readAndCheckType();
-        if (wireType != CHAR_TYPE)
+        int position = _data.position();
+        byte wireType = readWireType();
+        try
         {
-            throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+            if (wireType != CHAR_TYPE)
+            {
+                _data.position(position);
+                throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+            }
+            else
+            {
+                checkAvailable(2);
+                return readCharImpl();
+            }
         }
-        else
+        catch (RuntimeException e)
         {
-            checkAvailable(2);
-            return readCharImpl();
+            _data.position(position);
+            throw e;
         }
     }
 
@@ -209,30 +251,40 @@
 
     public int readInt() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         int result;
-        switch (wireType)
+        try
         {
-            case INT_TYPE:
-                checkAvailable(4);
-                result = readIntImpl();
-                break;
-            case SHORT_TYPE:
-                checkAvailable(2);
-                result = readShortImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Integer.parseInt(readStringImpl());
-                break;
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = readByteImpl();
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+            switch (wireType)
+            {
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Integer.parseInt(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private int readIntImpl()
@@ -242,34 +294,44 @@
 
     public long readLong() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         long result;
-        switch (wireType)
+        try
         {
-            case LONG_TYPE:
-                checkAvailable(8);
-                result = readLongImpl();
-                break;
-            case INT_TYPE:
-                checkAvailable(4);
-                result = readIntImpl();
-                break;
-            case SHORT_TYPE:
-                checkAvailable(2);
-                result = readShortImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Long.parseLong(readStringImpl());
-                break;
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = readByteImpl();
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+            switch (wireType)
+            {
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = readLongImpl();
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Long.parseLong(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private long readLongImpl()
@@ -279,22 +341,32 @@
 
     public float readFloat() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         float result;
-        switch (wireType)
+        try
+        {
+            switch (wireType)
+            {
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Float.parseFloat(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
         {
-            case FLOAT_TYPE:
-                checkAvailable(4);
-                result = readFloatImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Float.parseFloat(readStringImpl());
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private float readFloatImpl()
@@ -304,26 +376,36 @@
 
     public double readDouble() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         double result;
-        switch (wireType)
+        try
+        {
+            switch (wireType)
+            {
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = readDoubleImpl();
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Double.parseDouble(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
         {
-            case DOUBLE_TYPE:
-                checkAvailable(8);
-                result = readDoubleImpl();
-                break;
-            case FLOAT_TYPE:
-                checkAvailable(4);
-                result = readFloatImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = Double.parseDouble(readStringImpl());
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private double readDoubleImpl()
@@ -333,50 +415,63 @@
 
     public String readString() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         String result;
-        switch (wireType)
+        try
         {
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = readStringImpl();
-                break;
-            case BOOLEAN_TYPE:
-                checkAvailable(1);
-                result = String.valueOf(readBooleanImpl());
-                break;
-            case LONG_TYPE:
-                checkAvailable(8);
-                result = String.valueOf(readLongImpl());
-                break;
-            case INT_TYPE:
-                checkAvailable(4);
-                result = String.valueOf(readIntImpl());
-                break;
-            case SHORT_TYPE:
-                checkAvailable(2);
-                result = String.valueOf(readShortImpl());
-                break;
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = String.valueOf(readByteImpl());
-                break;
-            case FLOAT_TYPE:
-                checkAvailable(4);
-                result = String.valueOf(readFloatImpl());
-                break;
-            case DOUBLE_TYPE:
-                checkAvailable(8);
-                result = String.valueOf(readDoubleImpl());
-                break;
-            case CHAR_TYPE:
-                checkAvailable(2);
-                result = String.valueOf(readCharImpl());
-                break;
-            default:
-                throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+            switch (wireType)
+            {
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = readStringImpl();
+                    break;
+                case NULL_STRING_TYPE:
+                    result = null;
+                    break;
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = String.valueOf(readBooleanImpl());
+                    break;
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = String.valueOf(readLongImpl());
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = String.valueOf(readIntImpl());
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = String.valueOf(readShortImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = String.valueOf(readByteImpl());
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = String.valueOf(readFloatImpl());
+                    break;
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = String.valueOf(readDoubleImpl());
+                    break;
+                case CHAR_TYPE:
+                    checkAvailable(2);
+                    result = String.valueOf(readCharImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     private String readStringImpl() throws JMSException
@@ -406,7 +501,7 @@
             // type discriminator checked separately so you get a MessageFormatException rather than
             // an EOF even in the case where both would be applicable
             checkAvailable(1);
-            byte wireType = readAndCheckType();
+            byte wireType = readWireType();
             if (wireType != BYTEARRAY_TYPE)
             {
                 throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
@@ -431,18 +526,25 @@
                 }
             }
         }
+        else if (_byteArrayRemaining == 0)
+        {
+            _byteArrayRemaining = -1;
+            return -1;
+        }
 
-        return readBytesImpl(bytes);
+        int returnedSize = readBytesImpl(bytes);
+        if (returnedSize < bytes.length)
+        {
+            _byteArrayRemaining = -1;
+        }
+        return returnedSize;
     }
 
     private int readBytesImpl(byte[] bytes)
     {
         int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
         _byteArrayRemaining -= count;
-        if (_byteArrayRemaining == 0)
-        {
-            _byteArrayRemaining = -1;
-        }
+
         if (count == 0)
         {
             return 0;
@@ -456,62 +558,74 @@
 
     public Object readObject() throws JMSException
     {
-        byte wireType = readAndCheckType();
+        int position = _data.position();
+        byte wireType = readWireType();
         Object result = null;
-        switch (wireType)
+        try
         {
-            case BOOLEAN_TYPE:
-                checkAvailable(1);
-                result = readBooleanImpl();
-                break;
-            case BYTE_TYPE:
-                checkAvailable(1);
-                result = readByteImpl();
-                break;
-            case BYTEARRAY_TYPE:
-                checkAvailable(4);
-                int size = _data.getInt();
-                if (size == -1)
-                {
+            switch (wireType)
+            {
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = readBooleanImpl();
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                case BYTEARRAY_TYPE:
+                    checkAvailable(4);
+                    int size = _data.getInt();
+                    if (size == -1)
+                    {
+                        result = null;
+                    }
+                    else
+                    {
+                        _byteArrayRemaining = size;
+                        result = new byte[size];
+                        readBytesImpl(new byte[size]);
+                    }
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case CHAR_TYPE:
+                    checkAvailable(2);
+                    result = readCharImpl();
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = readLongImpl();
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = readDoubleImpl();
+                    break;
+                case NULL_STRING_TYPE:
                     result = null;
-                }
-                else
-                {
-                    _byteArrayRemaining = size;
-                    result = new byte[size];
-                    readBytesImpl(new byte[size]);
-                }
-                break;
-            case SHORT_TYPE:
-                checkAvailable(2);
-                result = readShortImpl();
-                break;
-            case CHAR_TYPE:
-                checkAvailable(2);
-                result = readCharImpl();
-                break;
-            case INT_TYPE:
-                checkAvailable(4);
-                result = readIntImpl();
-                break;
-            case LONG_TYPE:
-                checkAvailable(8);
-                result = readLongImpl();
-                break;
-            case FLOAT_TYPE:
-                checkAvailable(4);
-                result = readFloatImpl();
-                break;
-            case DOUBLE_TYPE:
-                checkAvailable(8);
-                result = readDoubleImpl();
-                break;
-            case STRING_TYPE:
-                checkAvailable(1);
-                result = readStringImpl();
-                break;
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = readStringImpl();
+                    break;
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
         }
-        return result;
     }
 
     public void writeBoolean(boolean b) throws JMSException
@@ -564,18 +678,25 @@
 
     public void writeString(String string) throws JMSException
     {
-        writeTypeDiscriminator(STRING_TYPE);
-        try
+        if (string == null)
         {
-            _data.putString(string, Charset.forName("UTF-8").newEncoder());
-            // we must write the null terminator ourselves
-            _data.put((byte)0);
+            writeTypeDiscriminator(NULL_STRING_TYPE);
         }
-        catch (CharacterCodingException e)
+        else
         {
-            JMSException ex = new JMSException("Unable to encode string: " + e);
-            ex.setLinkedException(e);
-            throw ex;
+            writeTypeDiscriminator(STRING_TYPE);
+            try
+            {
+                _data.putString(string, Charset.forName("UTF-8").newEncoder());
+                // we must write the null terminator ourselves
+                _data.put((byte)0);
+            }
+            catch (CharacterCodingException e)
+            {
+                JMSException ex = new JMSException("Unable to encode string: " + e);
+                ex.setLinkedException(e);
+                throw ex;
+            }
         }
     }
 
@@ -601,11 +722,17 @@
     public void writeObject(Object object) throws JMSException
     {
         checkWritable();
+        Class clazz = null;
         if (object == null)
         {
-            throw new NullPointerException("Argument must not be null");
+            // string handles the output of null values
+            clazz = String.class;
+        }
+        else
+        {
+            clazz = object.getClass();
         }
-        Class clazz = object.getClass();
+
         if (clazz == Byte.class)
         {
             writeByte((Byte) object);

Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java Thu Dec 21 06:24:38 2006
@@ -36,7 +36,7 @@
 {
 
     public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory";
-    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
     public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
     public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
     public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
@@ -154,7 +154,7 @@
                 }
                 catch (NamingException e)
                 {
-
+                    System.out.println("Operation failed: " + e);
                 }
 
                 // Perform the bind
@@ -169,11 +169,11 @@
             }
             catch (NamingException amqe)
             {
-
+                System.out.println("Operation failed: " + amqe);
             }
             catch (URLSyntaxException e)
             {
-
+                System.out.println("Operation failed: " + e);
             }
 
         }

Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java Thu Dec 21 06:24:38 2006
@@ -20,24 +20,25 @@
  */
 package org.apache.qpid.IBMPerfTest;
 
-import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+import org.apache.qpid.client.AMQSession;
 
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import javax.jms.*;
-import java.util.Hashtable;
 import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
 
 public class JNDIBindQueue
-{    
-    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
     public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
     public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
 
@@ -98,7 +99,7 @@
             }
             catch (JMSException closeE)
             {
-
+                System.out.println("Connection closing failed: " + closeE);    
             }
         }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java Thu Dec 21 06:24:38 2006
@@ -20,24 +20,25 @@
  */
 package org.apache.qpid.IBMPerfTest;
 
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import java.util.Hashtable;
 import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
 
 public class JNDIBindTopic
-{    
-    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator  + "IBMPerfTestsJNDI";
     public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
 
     public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
@@ -99,11 +100,9 @@
             }
             catch (JMSException closeE)
             {
-
+                System.out.println("Operation failed: " + closeE);
             }
         }
-
-
     }
 
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java Thu Dec 21 06:24:38 2006
@@ -168,24 +168,6 @@
         }
     }
 
-    public void testWriteObjectThrowsNPE() throws Exception
-    {
-        try
-        {
-            JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
-            bm.writeObject(null);
-            fail("expected exception did not occur");
-        }
-        catch (NullPointerException n)
-        {
-            // ok
-        }
-        catch (Exception e)
-        {
-            fail("expected NullPointerException, got " + e);
-        }
-    }
-
     public void testReadBoolean() throws Exception
     {
         JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -221,9 +203,34 @@
         len = bm.readBytes(bytes);
         assertEquals(-1, len);
         len = bm.readBytes(bytes);
+        assertEquals(-1, len);
+        len = bm.readBytes(bytes);
         assertEquals(0, len);
     }
 
+    public void testReadBytesFollowedByPrimitive() throws Exception
+    {
+        JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+        bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8});
+        bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7});
+        bm.writeString("Foo");
+        bm.reset();
+        int len;
+        do
+        {
+            len = bm.readBytes(new byte[2]);
+        }
+        while (len == 2);
+
+        do
+        {
+            len = bm.readBytes(new byte[2]);
+        }
+        while (len == 2);
+
+        assertEquals("Foo", bm.readString());
+    }
+
     public void testReadMultipleByteArrays() throws Exception
     {
         JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -577,11 +584,11 @@
         bm = TestMessageHelper.newJMSStreamMessage();
         bm.writeString("2");
         bm.reset();
-        assertEquals((byte)2, bm.readByte());        
+        assertEquals((byte)2, bm.readByte());
         bm.reset();
         assertEquals((short)2, bm.readShort());
         bm.reset();
-        assertEquals((int)2, bm.readInt());
+        assertEquals(2, bm.readInt());
         bm.reset();
         assertEquals((long)2, bm.readLong());
         bm = TestMessageHelper.newJMSStreamMessage();
@@ -590,6 +597,16 @@
         assertEquals(5.7f, bm.readFloat());
         bm.reset();
         assertEquals(5.7d, bm.readDouble());
+    }
+
+    public void testNulls() throws Exception
+    {
+        JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+        bm.writeString(null);
+        bm.writeObject(null);
+        bm.reset();
+        assertNull(bm.readObject());
+        assertNull(bm.readObject());
     }
 
     public static junit.framework.Test suite()

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Thu Dec 21 06:24:38 2006
@@ -94,25 +94,45 @@
 
     protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
     {
-        routeAndTest(m, Arrays.asList(expected));
+        routeAndTest(m, false, Arrays.asList(expected));
+    }
+
+    protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+    {
+        routeAndTest(m, expectReturn, Arrays.asList(expected));
     }
 
     protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
     {
-        route(m);
-        for (TestQueue q : queues)
+        routeAndTest(m, false, expected);
+    }
+
+    protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+    {
+        try
         {
-            if (expected.contains(q))
+            route(m);
+            assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+            for (TestQueue q : queues)
             {
-                assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
-                //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
-            }
-            else
-            {
-                assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
-                //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+                if (expected.contains(q))
+                {
+                    assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+                    //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+                }
+                else
+                {
+                    assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+                    //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+                }
             }
         }
+
+        catch (NoRouteException ex)
+        {
+            assertTrue("Expected "+m+" not to be returned",expectReturn);
+        }
+
     }
 
     static FieldTable getHeaders(String... entries)

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Thu Dec 21 06:24:38 2006
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
 
 public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
 {
@@ -52,6 +53,19 @@
         routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
                      q1, q2, q3, q4, q5, q6, q7, q8);
         routeAndTest(new Message("Message6", "F0002"));
+
+        Message m7 = new Message("Message7", "XXXXX");
+
+        BasicPublishBody pb7 = m7.getPublishBody();
+        pb7.mandatory = true;
+        routeAndTest(m7,true);
+
+        Message m8 = new Message("Message8", "F0000");
+        BasicPublishBody pb8 = m8.getPublishBody();
+        pb8.mandatory = true;
+        routeAndTest(m8,false,q1);
+
+
     }
 
     public void testAny() throws AMQException
@@ -69,6 +83,20 @@
         routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
         routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
         routeAndTest(new Message("Message6", "F0002"));
+    }
+
+    public void testMandatory() throws AMQException
+    {
+        TestQueue q1 = bindDefault("F0000");
+        Message m1 = new Message("Message1", "XXXXX");
+        Message m2 = new Message("Message2", "F0000");
+        BasicPublishBody pb1 = m1.getPublishBody();
+        pb1.mandatory = true;
+        BasicPublishBody pb2 = m1.getPublishBody();
+        pb2.mandatory = true;
+        routeAndTest(m1,true);
+
+
     }
 
     public static junit.framework.Test suite()