You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/13 19:20:12 UTC
svn commit: r486783 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession.java BasicMessageProducer.java message/AMQMessage.java
message/AbstractJMSMessage.java message/JMSTextMessage.java
Author: rgreig
Date: Wed Dec 13 10:20:11 2006
New Revision: 486783
URL: http://svn.apache.org/viewvc?view=rev&rev=486783
Log:
QPID-179 Now has hook for pre-send preparation of message which in turn allows us to handle the distinction between null and empty String text message bodies. Actual distinction is carried in a message property. Patch supplied by Rob Godfrey.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Dec 13 10:20:11 2006
@@ -25,9 +25,9 @@
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -38,7 +38,6 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
-
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -287,7 +286,7 @@
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -303,7 +302,7 @@
public MapMessage createMapMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -319,7 +318,7 @@
public javax.jms.Message createMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -335,7 +334,7 @@
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -351,7 +350,7 @@
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -403,7 +402,7 @@
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -473,7 +472,7 @@
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -493,7 +492,9 @@
}
catch (AMQException e)
{
- throw new JMSException("Error closing session: " + e);
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
finally
{
@@ -536,7 +537,7 @@
*/
public void closed(Throwable e)
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -747,7 +748,7 @@
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -763,7 +764,7 @@
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -772,20 +773,20 @@
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -795,7 +796,7 @@
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -807,7 +808,7 @@
boolean exclusive,
String selector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
@@ -818,7 +819,7 @@
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
selector, rawSelector);
}
@@ -831,7 +832,7 @@
String selector,
FieldTable rawSelector) throws JMSException
{
- checkValidDestination(destination);
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
@@ -963,7 +964,7 @@
public Queue createQueue(String queueName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (queueName.indexOf('/') == -1)
{
return new AMQQueue(queueName);
@@ -993,7 +994,7 @@
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1009,7 +1010,7 @@
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector);
@@ -1018,14 +1019,14 @@
public QueueSender createSender(Queue queue) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
//return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
public Topic createTopic(String topicName) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
if (topicName.indexOf('/') == -1)
{
@@ -1056,8 +1057,8 @@
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1073,8 +1074,8 @@
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1088,8 +1089,8 @@
*/
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1100,8 +1101,8 @@
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
+ checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1109,41 +1110,39 @@
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
- checkNotClosed();
- checkValidTopic(topic);
- //return (TopicPublisher) createProducer(topic);
- return new TopicPublisherAdapter(createProducer(topic), topic);
+ checkNotClosed();
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
+ checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
return new AMQTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
- checkNotClosed();
+ checkNotClosed();
//send a queue.delete for the subscription
String queue = _connection.getClientID() + ":" + name;
@@ -1350,21 +1349,27 @@
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private void checkValidTopic(Topic topic) throws InvalidDestinationException{
- if (topic == null){
- throw new javax.jms.InvalidDestinationException("Invalid Topic");
- }
- }
-
- private void checkValidQueue(Queue queue) throws InvalidDestinationException{
- if (queue == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
- }
-
- private void checkValidDestination(Destination destination) throws InvalidDestinationException{
- if (destination == null){
- throw new javax.jms.InvalidDestinationException("Invalid Queue");
- }
+ private void checkValidTopic(Topic topic) throws InvalidDestinationException
+ {
+ if (topic == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
+ }
+
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException
+ {
+ if (queue == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
+ }
+
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException
+ {
+ if (destination == null)
+ {
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,15 +24,10 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import java.io.UnsupportedEncodingException;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
@@ -103,6 +98,7 @@
private final boolean _mandatory;
private final boolean _waitUntilSent;
+ private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -349,7 +345,7 @@
{
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
- }
+ }
declareDestination((AMQDestination)destination);
}
@@ -382,6 +378,7 @@
currentTime = System.currentTimeMillis();
message.setJMSTimestamp(currentTime);
}
+ message.prepareForSending();
ByteBuffer payload = message.getData();
BasicContentHeaderProperties contentHeaderProperties = message.getJmsContentHeaderProperties();
@@ -402,7 +399,7 @@
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- int size = payload.limit();
+ int size = (payload != null) ? payload.limit() : 0;
ContentBody[] contentBodies = createContentBodies(payload);
AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
for (int i = 0; i < contentBodies.length; i++)
@@ -437,14 +434,11 @@
*/
private ContentBody[] createContentBodies(ByteBuffer payload)
{
- if (payload == null)
+ if (payload == null || payload.remaining() == 0)
{
- return null;
- }
- else if (payload.remaining() == 0)
- {
- return new ContentBody[0];
+ return NO_CONTENT_BODIES;
}
+
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int dataLength = payload.remaining();
@@ -485,31 +479,31 @@
checkNotClosed();
_encoding = encoding;
}
-
+
private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
if(_session == null || _session.isClosed()){
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
-
+
private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is null");
}
}
-
+
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
if (_destination != null && suppliedDestination != null){
throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
-
+
if (suppliedDestination == null){
- throw new InvalidDestinationException("Supplied Destination was invalid");
+ throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
-
+
public AMQSession getSession() {
return _session;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,8 @@
import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.client.AMQSession;
+import javax.jms.JMSException;
+
public class AMQMessage
{
protected ContentHeaderProperties _contentHeaderProperties;
@@ -67,5 +69,13 @@
public long getDeliveryTag()
{
return _deliveryTag;
- }
+ }
+
+ /**
+ * Invoked prior to sending the message. Allows the message to be modified if necessary before
+ * sending.
+ */
+ public void prepareForSending() throws JMSException
+ {
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Dec 13 10:20:11 2006
@@ -32,7 +32,6 @@
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -40,7 +39,6 @@
import javax.jms.MessageNotWriteableException;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.Iterator;
import java.util.Map;
public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
@@ -257,13 +255,6 @@
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
-
- if (getJmsContentHeaderProperties() == null)
- {
- System.out.println("HEADERS ARE NULL");
- }
-
-
return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
}
@@ -383,6 +374,12 @@
getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
}
+ protected void removeProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+ }
+
public void acknowledge() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
@@ -468,31 +465,6 @@
// Call to ensure that the it has been set.
getJmsContentHeaderProperties().getHeaders();
- }
-
- public FieldTable populateHeadersFromMessageProperties()
- {
- //
- // We need to convert every property into a String representation
- // Note that type information is preserved in the property name
- //
- final FieldTable table = FieldTableFactory.newFieldTable();
- final Iterator entries = getJmsContentHeaderProperties().getHeaders().entrySet().iterator();
- while (entries.hasNext())
- {
- final Map.Entry entry = (Map.Entry) entries.next();
- final String propertyName = (String) entry.getKey();
- if (propertyName == null)
- {
- continue;
- }
- else
- {
- table.put(propertyName, entry.getValue().toString());
- }
- }
- return table;
-
}
public BasicContentHeaderProperties getJmsContentHeaderProperties()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=486783&r1=486782&r2=486783
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Wed Dec 13 10:20:11 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,6 +35,11 @@
private String _decodedValue;
+ /**
+ * This constant represents the name of a property that is set when the message payload is null.
+ */
+ private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+
JMSTextMessage() throws JMSException
{
this(null, null);
@@ -91,31 +96,34 @@
return MIME_TYPE;
}
- public void setText(String string) throws JMSException
+ public void setText(String text) throws JMSException
{
checkWritable();
-
+
clearBody();
try
{
- _data = ByteBuffer.allocate(string.length());
- _data.limit(string.length());
- //_data.sweep();
- _data.setAutoExpand(true);
- if (getJmsContentHeaderProperties().getEncoding() == null)
- {
- _data.put(string.getBytes());
- }
- else
- {
- _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ if (text != null)
+ {
+ _data = ByteBuffer.allocate(text.length());
+ _data.limit(text.length()) ;
+ //_data.sweep();
+ _data.setAutoExpand(true);
+ if (getJmsContentHeaderProperties().getEncoding() == null)
+ {
+ _data.put(text.getBytes());
+ }
+ else
+ {
+ _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ }
}
- _decodedValue = string;
+ _decodedValue = text;
}
catch (UnsupportedEncodingException e)
{
// should never occur
- JMSException jmse = new JMSException("Unable to decode string data");
+ JMSException jmse = new JMSException("Unable to decode text data");
jmse.setLinkedException(e);
}
}
@@ -133,6 +141,11 @@
else
{
_data.rewind();
+
+ if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
+ {
+ return null;
+ }
if (getJmsContentHeaderProperties().getEncoding() != null)
{
try
@@ -160,6 +173,20 @@
}
}
return _decodedValue;
+ }
+ }
+
+ @Override
+ public void prepareForSending() throws JMSException
+ {
+ super.prepareForSending();
+ if (_data == null)
+ {
+ setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
+ }
+ else
+ {
+ removeProperty(PAYLOAD_NULL_PROPERTY);
}
}
}