You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/13 19:20:12 UTC

svn commit: r486783 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession.java BasicMessageProducer.java message/AMQMessage.java message/AbstractJMSMessage.java message/JMSTextMessage.java

Author: rgreig
Date: Wed Dec 13 10:20:11 2006
New Revision: 486783

URL: http://svn.apache.org/viewvc?view=rev&rev=486783
Log:
QPID-179 Now has hook for pre-send preparation of message which in turn allows us to handle the distinction between null and empty String text message bodies. Actual distinction is carried in a message property. Patch supplied by Rob Godfrey.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Dec 13 10:20:11 2006
@@ -25,9 +25,9 @@
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.framing.*;
@@ -38,7 +38,6 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
-
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -287,7 +286,7 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -303,7 +302,7 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -319,7 +318,7 @@
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -335,7 +334,7 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -351,7 +350,7 @@
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -403,7 +402,7 @@
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -473,7 +472,7 @@
     {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
@@ -493,7 +492,9 @@
                 }
                 catch (AMQException e)
                 {
-                    throw new JMSException("Error closing session: " + e);
+                    JMSException jmse = new JMSException("Error closing session: " + e);
+                    jmse.setLinkedException(e);
+                    throw jmse;
                 }
                 finally
                 {
@@ -536,7 +537,7 @@
      */
     public void closed(Throwable e)
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -747,7 +748,7 @@
      */
     public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
         return new QueueReceiverAdaptor(dest, consumer);
@@ -763,7 +764,7 @@
      */
     public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
         BasicMessageConsumer consumer = (BasicMessageConsumer)
                 createConsumer(destination, messageSelector);
@@ -772,20 +773,20 @@
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
     }
 
@@ -795,7 +796,7 @@
                                           boolean exclusive,
                                           String selector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
     }
 
@@ -807,7 +808,7 @@
                                           boolean exclusive,
                                           String selector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
     }
 
@@ -818,7 +819,7 @@
                                           String selector,
                                           FieldTable rawSelector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
                                   selector, rawSelector);
     }
@@ -831,7 +832,7 @@
                                           String selector,
                                           FieldTable rawSelector) throws JMSException
     {
-    	checkValidDestination(destination);
+        checkValidDestination(destination);
         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
                                   selector, rawSelector);
     }
@@ -963,7 +964,7 @@
 
     public Queue createQueue(String queueName) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         if (queueName.indexOf('/') == -1)
         {
             return new AMQQueue(queueName);
@@ -993,7 +994,7 @@
      */
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
         return new QueueReceiverAdaptor(dest, consumer);
@@ -1009,7 +1010,7 @@
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
         BasicMessageConsumer consumer = (BasicMessageConsumer)
                 createConsumer(dest, messageSelector);
@@ -1018,14 +1019,14 @@
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         //return (QueueSender) createProducer(queue);
         return new QueueSenderAdapter(createProducer(queue), queue);
     }
 
     public Topic createTopic(String topicName) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
 
         if (topicName.indexOf('/') == -1)
         {
@@ -1056,8 +1057,8 @@
      */
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidTopic(topic);
+        checkNotClosed();
+        checkValidTopic(topic);
         AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
     }
@@ -1073,8 +1074,8 @@
      */
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidTopic(topic);
+        checkNotClosed();
+        checkValidTopic(topic);
         AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
     }
@@ -1088,8 +1089,8 @@
      */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidTopic(topic);
+        checkNotClosed();
+        checkValidTopic(topic);
         AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
     }
@@ -1100,8 +1101,8 @@
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
             throws JMSException
     {
-    	checkNotClosed();
-    	checkValidTopic(topic);
+        checkNotClosed();
+        checkValidTopic(topic);
         AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         return new TopicSubscriberAdaptor(dest, consumer);
@@ -1109,41 +1110,39 @@
 
     public TopicPublisher createPublisher(Topic topic) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidTopic(topic);
-        //return (TopicPublisher) createProducer(topic);
-        return new TopicPublisherAdapter(createProducer(topic), topic);
+        checkNotClosed();
+        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
     }
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidQueue(queue);
+        checkNotClosed();
+        checkValidQueue(queue);
         throw new UnsupportedOperationException("Queue browsing not supported");
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
-    	checkNotClosed();
-    	checkValidQueue(queue);
+        checkNotClosed();
+        checkValidQueue(queue);
         throw new UnsupportedOperationException("Queue browsing not supported");
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         return new AMQTemporaryQueue();
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
         return new AMQTemporaryTopic();
     }
 
     public void unsubscribe(String name) throws JMSException
     {
-    	checkNotClosed();
+        checkNotClosed();
 
         //send a queue.delete for the subscription
         String queue = _connection.getClientID() + ":" + name;
@@ -1350,21 +1349,27 @@
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    private void checkValidTopic(Topic topic) throws InvalidDestinationException{
-    	if (topic == null){
-    		throw new javax.jms.InvalidDestinationException("Invalid Topic");
-    	}
-    }
-
-    private void checkValidQueue(Queue queue) throws InvalidDestinationException{
-    	if (queue == null){
-    		throw new javax.jms.InvalidDestinationException("Invalid Queue");
-    	}
-    }
-
-    private void checkValidDestination(Destination destination) throws InvalidDestinationException{
-    	if (destination == null){
-    		throw new javax.jms.InvalidDestinationException("Invalid Queue");
-    	}
+    private void checkValidTopic(Topic topic) throws InvalidDestinationException
+    {
+        if (topic == null)
+        {
+            throw new javax.jms.InvalidDestinationException("Invalid Topic");
+        }
+    }
+
+    private void checkValidQueue(Queue queue) throws InvalidDestinationException
+    {
+        if (queue == null)
+        {
+            throw new javax.jms.InvalidDestinationException("Invalid Queue");
+        }
+    }
+
+    private void checkValidDestination(Destination destination) throws InvalidDestinationException
+    {
+        if (destination == null)
+        {
+            throw new javax.jms.InvalidDestinationException("Invalid Queue");
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,15 +24,10 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
 import java.io.UnsupportedEncodingException;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
@@ -103,6 +98,7 @@
     private final boolean _mandatory;
 
     private final boolean _waitUntilSent;
+    private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
                                    int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -349,7 +345,7 @@
         {
             throw new JMSException("Unsupported destination class: " +
                                    (destination != null ? destination.getClass() : null));
-        }        
+        }
         declareDestination((AMQDestination)destination);
     }
 
@@ -382,6 +378,7 @@
             currentTime = System.currentTimeMillis();
             message.setJMSTimestamp(currentTime);
         }
+        message.prepareForSending();
         ByteBuffer payload = message.getData();
         BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
 
@@ -402,7 +399,7 @@
         contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
         contentHeaderProperties.setPriority((byte) priority);
 
-        int size = payload.limit();
+        int size = (payload != null) ? payload.limit() : 0;
         ContentBody[] contentBodies = createContentBodies(payload);
         AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
         for (int i = 0; i < contentBodies.length; i++)
@@ -437,14 +434,11 @@
      */
     private ContentBody[] createContentBodies(ByteBuffer payload)
     {
-        if (payload == null)
+        if (payload == null || payload.remaining() == 0)
         {
-            return null;
-        }
-        else if (payload.remaining() == 0)
-        {
-            return new ContentBody[0];
+            return NO_CONTENT_BODIES;
         }
+
         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
         // (0xCE byte).
         int dataLength = payload.remaining();
@@ -485,31 +479,31 @@
         checkNotClosed();
         _encoding = encoding;
     }
-    
+
 	private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
 		checkNotClosed();
-				
+
 		if(_session == null || _session.isClosed()){
 			throw new javax.jms.IllegalStateException("Invalid Session");
 		}
 	}
-	
+
 	private void checkInitialDestination(){
 		if(_destination == null){
 			throw new UnsupportedOperationException("Destination is null");
 		}
 	}
-	
+
 	private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
 		if (_destination != null && suppliedDestination != null){
 			throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
 		}
-		
+
 		if (suppliedDestination == null){
-			throw new InvalidDestinationException("Supplied Destination was invalid"); 
+			throw new InvalidDestinationException("Supplied Destination was invalid");
 		}
 	}
-	
+
 
 	public AMQSession getSession() {
 		return _session;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,8 @@
 import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.client.AMQSession;
 
+import javax.jms.JMSException;
+
 public class AMQMessage
 {
     protected ContentHeaderProperties _contentHeaderProperties;
@@ -67,5 +69,13 @@
     public long getDeliveryTag()
     {
         return _deliveryTag;
-    }       
+    }
+
+    /**
+     * Invoked prior to sending the message. Allows the message to be modified if necessary before
+     * sending.
+     */
+    public void prepareForSending() throws JMSException
+    {
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Dec 13 10:20:11 2006
@@ -32,7 +32,6 @@
 import org.apache.qpid.client.JmsNotImplementedException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -40,7 +39,6 @@
 import javax.jms.MessageNotWriteableException;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.Iterator;
 import java.util.Map;
 
 public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
@@ -257,13 +255,6 @@
     public boolean getBooleanProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-
-        if (getJmsContentHeaderProperties() == null)
-        {
-            System.out.println("HEADERS ARE NULL");
-        }
-
-
         return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
     }
 
@@ -383,6 +374,12 @@
         getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
     }
 
+    protected void removeProperty(String propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+    }
+
     public void acknowledge() throws JMSException
     {
         // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
@@ -468,31 +465,6 @@
 
         // Call to ensure that the it has been set.
         getJmsContentHeaderProperties().getHeaders();
-    }
-
-    public FieldTable populateHeadersFromMessageProperties()
-    {
-        //
-        // We need to convert every property into a String representation
-        // Note that type information is preserved in the property name
-        //
-        final FieldTable table = FieldTableFactory.newFieldTable();
-        final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
-        while (entries.hasNext())
-        {
-            final Map.Entry entry = (Map.Entry) entries.next();
-            final String propertyName = (String) entry.getKey();
-            if (propertyName == null)
-            {
-                continue;
-            }
-            else
-            {
-                table.put(propertyName, entry.getValue().toString());
-            }
-        }
-        return table;
-
     }
 
     public BasicContentHeaderProperties getJmsContentHeaderProperties()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,6 +35,11 @@
 
     private String _decodedValue;
 
+    /**
+     * This constant represents the name of a property that is set when the message payload is null.
+     */
+    private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+
     JMSTextMessage() throws JMSException
     {
         this(null, null);
@@ -91,31 +96,34 @@
         return MIME_TYPE;
     }
 
-    public void setText(String string) throws JMSException
+    public void setText(String text) throws JMSException
     {
         checkWritable();
-        
+
         clearBody();
         try
         {
-            _data = ByteBuffer.allocate(string.length());
-            _data.limit(string.length());
-            //_data.sweep();
-            _data.setAutoExpand(true);
-            if (getJmsContentHeaderProperties().getEncoding() == null)
-            {
-                _data.put(string.getBytes());
-            }
-            else
-            {
-                _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding()));
+            if (text != null)
+            {                
+                _data = ByteBuffer.allocate(text.length());
+                _data.limit(text.length()) ;
+                //_data.sweep();
+                _data.setAutoExpand(true);
+                if (getJmsContentHeaderProperties().getEncoding() == null)
+                {
+                    _data.put(text.getBytes());
+                }
+                else
+                {
+                    _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
+                }
             }
-            _decodedValue = string;
+            _decodedValue = text;
         }
         catch (UnsupportedEncodingException e)
         {
             // should never occur
-            JMSException jmse = new JMSException("Unable to decode string data");
+            JMSException jmse = new JMSException("Unable to decode text data");
             jmse.setLinkedException(e);
         }
     }
@@ -133,6 +141,11 @@
         else
         {
             _data.rewind();
+
+            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
+            {
+                return null;
+            }
             if (getJmsContentHeaderProperties().getEncoding() != null)
             {
                 try
@@ -160,6 +173,20 @@
                 }
             }
             return _decodedValue;
+        }
+    }
+
+    @Override
+    public void prepareForSending() throws JMSException
+    {
+        super.prepareForSending();
+        if (_data == null)
+        {
+            setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
+        }
+        else
+        {
+            removeProperty(PAYLOAD_NULL_PROPERTY);
         }
     }
 }