You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC

svn commit: r1172657 [13/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/csha...

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Mon Sep 19 15:13:18 2011
@@ -21,10 +21,14 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.dtx.XidImpl;
-import org.apache.qpid.transport.*;
-
+import org.apache.qpid.transport.DtxXaStatus;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.RecoverResult;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.XaResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -211,9 +215,28 @@ public class XAResourceImpl implements X
      * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
      */
     public boolean isSameRM(XAResource xaResource) throws XAException
-    {
-        // TODO : get the server identity of xaResource and compare it with our own one
-        return false;
+    {       
+        if(this == xaResource)
+        {
+            return true;            
+        }       
+        if(!(xaResource instanceof XAResourceImpl))
+        {
+            return false;           
+        }
+        
+        XAResourceImpl other = (XAResourceImpl)xaResource;
+
+        String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+        String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
+        
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
+        }
+        
+        return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+                
     }
 
     /**

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Mon Sep 19 15:13:18 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
     {
         super(qpidConnection, con, channelId, false,  // this is not a transacted session
               Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
-              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
         createSession();
         _xaResource = new XAResourceImpl(this);
     }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java Mon Sep 19 15:13:18 2011
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
  * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
  * </table>
  *
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- *      {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos)
+ *      Then have a wrapping continuation (this), which blocks on an arbitrary
  *      Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
  *      Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
  *      to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.client.handler;
 
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -34,18 +41,9 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-
 public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
 {
     private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -197,40 +195,20 @@ public class ConnectionStartMethodHandle
     private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
     {
         final String mechanisms = new String(availableMechanisms, "utf8");
-        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
-        HashSet mechanismSet = new HashSet();
-        while (tokenizer.hasMoreTokens())
-        {
-            mechanismSet.add(tokenizer.nextToken());
-        }
-
-        String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
-        StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
-        while (prefTokenizer.hasMoreTokens())
-        {
-            String mech = prefTokenizer.nextToken();
-            if (mechanismSet.contains(mech))
-            {
-                return mech;
-            }
-        }
-
-        return null;
+        return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
     }
 
     private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
         throws AMQException
     {
-        Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
         try
         {
-            Object instance = mechanismClass.newInstance();
-            AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
-            cbh.initialise(protocolSession.getAMQConnection().getConnectionURL());
+            AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
+            instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
 
-            return cbh;
+            return instance;
         }
-        catch (Exception e)
+        catch (IllegalArgumentException e)
         {
             throw new AMQException(null, "Unable to create callback handler: " + e, e);
         }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java Mon Sep 19 15:13:18 2011
@@ -21,11 +21,6 @@
 
 package org.apache.qpid.client.message;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
 public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
 {
     public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Mon Sep 19 15:13:18 2011
@@ -55,6 +55,9 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This extends AbstractAMQMessageDelegate which contains common code between
@@ -63,6 +66,7 @@ import org.apache.qpid.transport.ReplyTo
  */
 public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
 {
+    private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class);
     private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
 
     public static final String JMS_TYPE = "x-jms-type";
@@ -95,8 +99,22 @@ public class AMQMessageDelegate_0_10 ext
 
         AMQDestination dest;
 
-        dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+        if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+        {
+            dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
                                    new AMQShortString(_deliveryProps.getRoutingKey()));
+        }
+        else
+        {
+            String subject = null;
+            if (messageProps != null && messageProps.getApplicationHeaders() != null)
+            {
+                subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT);
+            }
+            dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
+                    _deliveryProps.getRoutingKey(), subject);
+        }
+        
         setJMSDestination(dest);        
     }
 
@@ -242,13 +260,50 @@ public class AMQMessageDelegate_0_10 ext
                 String exchange = replyTo.getExchange();
                 String routingKey = replyTo.getRoutingKey();
 
-                dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+                if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+                {
+            
+                    dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+                }
+                else
+                {
+                    dest = convertToAddressBasedDestination(exchange,routingKey,null);
+                }
                 _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
             }
 
             return dest;
         }
     }
+    
+    private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
+    {
+        String addr;
+        if ("".equals(exchange)) // type Queue
+        {
+            subject = (subject == null) ? "" : "/" + subject;
+            addr = routingKey + subject;
+        }
+        else
+        {
+            addr = exchange + "/" + routingKey;
+        }
+        
+        try
+        {
+            return AMQDestination.createDestination("ADDR:" + addr.toString());
+        }
+        catch(Exception e)
+        {
+            // An exception is only thrown here if the address syntax is invalid.
+            // Logging the exception, but not throwing as this is only important to Qpid developers.
+            // An exception here means a bug in the code.
+            _logger.error("Exception when constructing an address string from the ReplyTo struct");
+            
+            // falling back to the old way of doing it to ensure the application continues.
+            return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+        } 
+    }
 
     public void setJMSReplyTo(Destination destination) throws JMSException
     {
@@ -287,6 +342,14 @@ public class AMQMessageDelegate_0_10 ext
                e.setLinkedException(ex);
                throw e;
            }
+           catch (TransportException e)
+           {
+               JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+               jmse.initCause(e);
+               jmse.setLinkedException(e);
+               throw jmse;
+           }
+
         }
         
         final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -337,7 +400,7 @@ public class AMQMessageDelegate_0_10 ext
         Destination replyTo = getJMSReplyTo();
         if(replyTo != null)
         {
-            return ((AMQDestination)replyTo).toURL();
+            return ((AMQDestination)replyTo).toString();
         }
         else
         {
@@ -634,11 +697,16 @@ public class AMQMessageDelegate_0_10 ext
         {
             return new String(_messageProps.getUserId());
         }
-        else if ("x-amqp-0-10.app-id".equals(propertyName) &&
+        else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) &&
                 _messageProps.getAppId() != null)
         {
             return new String(_messageProps.getAppId());
         }
+        else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) &&
+                _deliveryProps.getRoutingKey() != null)
+        {
+            return _deliveryProps.getRoutingKey();
+        }
         else
         {
             checkPropertyName(propertyName);
@@ -745,7 +813,7 @@ public class AMQMessageDelegate_0_10 ext
     {
         checkPropertyName(propertyName);
         checkWritableProperties();
-        if ("x-amqp-0-10.app-id".equals(propertyName))
+        if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName))
         {
             _messageProps.setAppId(value.getBytes());
         }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Mon Sep 19 15:13:18 2011
@@ -499,7 +499,6 @@ public class AMQMessageDelegate_0_8 exte
         {
             throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
         }
-        _contentHeaderProperties.updated();
     }
 
 

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Mon Sep 19 15:13:18 2011
@@ -24,11 +24,11 @@ package org.apache.qpid.client.message;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.transport.codec.BBEncoder;
@@ -81,18 +81,19 @@ public class AMQPEncodedMapMessage exten
     @ Override
     public ByteBuffer getData()
     {
-        writeMapToData();
-        return _data;
+        BBEncoder encoder = new BBEncoder(1024);
+        encoder.writeMap(_map);
+        return encoder.segment();
     }
     
     @ Override
-    protected void populateMapFromData() throws JMSException
+    protected void populateMapFromData(ByteBuffer data) throws JMSException
     {
-        if (_data != null)
+        if (data != null)
         {
-            _data.rewind();
+            data.rewind();
             BBDecoder decoder = new BBDecoder();
-            decoder.init(_data.buf());
+            decoder.init(data);
             _map = decoder.readMap();
         }
         else
@@ -101,16 +102,8 @@ public class AMQPEncodedMapMessage exten
         }
     }
 
-    @ Override
-    protected void writeMapToData()
-    {
-        BBEncoder encoder = new BBEncoder(1024);
-        encoder.writeMap(_map);
-        _data = ByteBuffer.wrap(encoder.segment());
-    }
-    
     // for testing
-    Map<String,Object> getMap()
+    public Map<String,Object> getMap()
     {
         return _map;
     }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -1,6 +1,6 @@
 package org.apache.qpid.client.message;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,22 +8,23 @@ package org.apache.qpid.client.message;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 
 public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
@@ -36,7 +37,7 @@ public class AMQPEncodedMapMessageFactor
         return new AMQPEncodedMapMessage(delegate,data);
     }
 
-    @Override
+
     public AbstractJMSMessage createMessage(
             AMQMessageDelegateFactory delegateFactory) throws JMSException
     {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Mon Sep 19 15:13:18 2011
@@ -21,784 +21,96 @@
 
 package org.apache.qpid.client.message;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.transport.util.Functions;
 
 /**
  * @author Apache Software Foundation
  */
-public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
+public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
 {
+    protected boolean _readableMessage = false;
 
-    protected static final byte BOOLEAN_TYPE = (byte) 1;
-
-    protected static final byte BYTE_TYPE = (byte) 2;
-
-    protected static final byte BYTEARRAY_TYPE = (byte) 3;
-
-    protected static final byte SHORT_TYPE = (byte) 4;
-
-    protected static final byte CHAR_TYPE = (byte) 5;
-
-    protected static final byte INT_TYPE = (byte) 6;
-
-    protected static final byte LONG_TYPE = (byte) 7;
-
-    protected static final byte FLOAT_TYPE = (byte) 8;
-
-    protected static final byte DOUBLE_TYPE = (byte) 9;
-
-    protected static final byte STRING_TYPE = (byte) 10;
-
-    protected static final byte NULL_STRING_TYPE = (byte) 11;
-
-    /**
-     * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
-     * a byte array in multiple chunks, hence this is used to track how much is left to be read
-     */
-    private int _byteArrayRemaining = -1;
-
-    AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
-    {
-
-        this(delegateFactory, null);
-    }
-
-    /**
-     * Construct a stream message with existing data.
-     *
-     * @param delegateFactory
-     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     */
-    AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+    AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage)
     {
 
-        super(delegateFactory, data); // this instanties a content header
+        super(delegateFactory, fromReceivedMessage); // this instanties a content header
+        _readableMessage = fromReceivedMessage;
     }
 
-    AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+    AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException
     {
 
-        super(delegate, data);
-    }
+        super(delegate, fromReceivedMessage);
+        _readableMessage = fromReceivedMessage;
 
-
-    protected byte readWireType() throws MessageFormatException, MessageEOFException,
-            MessageNotReadableException
-    {
-        checkReadable();
-        checkAvailable(1);
-        return _data.get();
     }
 
-    protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
+    protected void checkReadable() throws MessageNotReadableException
     {
-        checkWritable();
-        _data.put(type);
-        _changedData = true;
-    }
-
-    protected boolean readBoolean() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        boolean result;
-        try
+        if (!_readableMessage)
         {
-            switch (wireType)
-            {
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = readBooleanImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Boolean.parseBoolean(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
+            throw new MessageNotReadableException("You need to call reset() to make the message readable");
         }
     }
 
-    private boolean readBooleanImpl()
+    @Override
+    protected void checkWritable() throws MessageNotWriteableException
     {
-        return _data.get() != 0;
-    }
-
-    protected byte readByte() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        byte result;
-        try
+        super.checkWritable();
+        if(_readableMessage)
         {
-            switch (wireType)
-            {
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Byte.parseByte(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
-            }
+            throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
         }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-        return result;
     }
 
-    private byte readByteImpl()
+    public void clearBody() throws JMSException
     {
-        return _data.get();
+        super.clearBody();
+        _readableMessage = false;
     }
 
-    protected short readShort() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        short result;
-        try
-        {
-            switch (wireType)
-            {
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Short.parseShort(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a short");
-            }
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-        return result;
-    }
 
-    private short readShortImpl()
+    public String toBodyString() throws JMSException
     {
-        return _data.getShort();
-    }
-
-    /**
-     * Note that this method reads a unicode character as two bytes from the stream
-     *
-     * @return the character read from the stream
-     * @throws javax.jms.JMSException
-     */
-    protected char readChar() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
         try
         {
-        	if(wireType == NULL_STRING_TYPE){
-        		throw new NullPointerException();
+            ByteBuffer data = getData();
+        	if (data != null)
+        	{
+        		return Functions.str(data, 100, 0);
+        	}
+        	else
+        	{
+        		return "";
         	}
 
-            if (wireType != CHAR_TYPE)
-            {
-                _data.position(position);
-                throw new MessageFormatException("Unable to convert " + wireType + " to a char");
-            }
-            else
-            {
-                checkAvailable(2);
-                return readCharImpl();
-            }
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private char readCharImpl()
-    {
-        return _data.getChar();
-    }
-
-    protected int readInt() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        int result;
-        try
-        {
-            switch (wireType)
-            {
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Integer.parseInt(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to an int");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    protected int readIntImpl()
-    {
-        return _data.getInt();
-    }
-
-    protected long readLong() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        long result;
-        try
-        {
-            switch (wireType)
-            {
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = readLongImpl();
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Long.parseLong(readStringImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a long");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private long readLongImpl()
-    {
-        return _data.getLong();
-    }
-
-    protected float readFloat() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        float result;
-        try
-        {
-            switch (wireType)
-            {
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Float.parseFloat(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a float");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private float readFloatImpl()
-    {
-        return _data.getFloat();
-    }
-
-    protected double readDouble() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        double result;
-        try
-        {
-            switch (wireType)
-            {
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = readDoubleImpl();
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = Double.parseDouble(readStringImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a double");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    private double readDoubleImpl()
-    {
-        return _data.getDouble();
-    }
-
-    protected String readString() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        String result;
-        try
-        {
-            switch (wireType)
-            {
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = readStringImpl();
-                    break;
-                case NULL_STRING_TYPE:
-                    result = null;
-                    throw new NullPointerException("data is null");
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = String.valueOf(readBooleanImpl());
-                    break;
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = String.valueOf(readLongImpl());
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = String.valueOf(readIntImpl());
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = String.valueOf(readShortImpl());
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = String.valueOf(readByteImpl());
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = String.valueOf(readFloatImpl());
-                    break;
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = String.valueOf(readDoubleImpl());
-                    break;
-                case CHAR_TYPE:
-                    checkAvailable(2);
-                    result = String.valueOf(readCharImpl());
-                    break;
-                default:
-                    _data.position(position);
-                    throw new MessageFormatException("Unable to convert " + wireType + " to a String");
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    protected String readStringImpl() throws JMSException
-    {
-        try
-        {
-            return _data.getString(Charset.forName("UTF-8").newDecoder());
         }
-        catch (CharacterCodingException e)
+        catch (Exception e)
         {
-            JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+            JMSException jmse = new JMSException(e.toString());
             jmse.setLinkedException(e);
             jmse.initCause(e);
             throw jmse;
         }
-    }
-
-    protected int readBytes(byte[] bytes) throws JMSException
-    {
-        if (bytes == null)
-        {
-            throw new IllegalArgumentException("byte array must not be null");
-        }
-        checkReadable();
-        // first call
-        if (_byteArrayRemaining == -1)
-        {
-            // type discriminator checked separately so you get a MessageFormatException rather than
-            // an EOF even in the case where both would be applicable
-            checkAvailable(1);
-            byte wireType = readWireType();
-            if (wireType != BYTEARRAY_TYPE)
-            {
-                throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
-            }
-            checkAvailable(4);
-            int size = _data.getInt();
-            // length of -1 indicates null
-            if (size == -1)
-            {
-                return -1;
-            }
-            else
-            {
-                if (size > _data.remaining())
-                {
-                    throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
-                                                  _data.remaining() + " bytes");
-                }
-                else
-                {
-                    _byteArrayRemaining = size;
-                }
-            }
-        }
-        else if (_byteArrayRemaining == 0)
-        {
-            _byteArrayRemaining = -1;
-            return -1;
-        }
-
-        int returnedSize = readBytesImpl(bytes);
-        if (returnedSize < bytes.length)
-        {
-            _byteArrayRemaining = -1;
-        }
-        return returnedSize;
-    }
-
-    private int readBytesImpl(byte[] bytes)
-    {
-        int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
-        _byteArrayRemaining -= count;
-
-        if (count == 0)
-        {
-            return 0;
-        }
-        else
-        {
-            _data.get(bytes, 0, count);
-            return count;
-        }
-    }
-
-    protected Object readObject() throws JMSException
-    {
-        int position = _data.position();
-        byte wireType = readWireType();
-        Object result = null;
-        try
-        {
-            switch (wireType)
-            {
-                case BOOLEAN_TYPE:
-                    checkAvailable(1);
-                    result = readBooleanImpl();
-                    break;
-                case BYTE_TYPE:
-                    checkAvailable(1);
-                    result = readByteImpl();
-                    break;
-                case BYTEARRAY_TYPE:
-                    checkAvailable(4);
-                    int size = _data.getInt();
-                    if (size == -1)
-                    {
-                        result = null;
-                    }
-                    else
-                    {
-                        _byteArrayRemaining = size;
-                        byte[] bytesResult = new byte[size];
-                        readBytesImpl(bytesResult);
-                        result = bytesResult;
-                    }
-                    break;
-                case SHORT_TYPE:
-                    checkAvailable(2);
-                    result = readShortImpl();
-                    break;
-                case CHAR_TYPE:
-                    checkAvailable(2);
-                    result = readCharImpl();
-                    break;
-                case INT_TYPE:
-                    checkAvailable(4);
-                    result = readIntImpl();
-                    break;
-                case LONG_TYPE:
-                    checkAvailable(8);
-                    result = readLongImpl();
-                    break;
-                case FLOAT_TYPE:
-                    checkAvailable(4);
-                    result = readFloatImpl();
-                    break;
-                case DOUBLE_TYPE:
-                    checkAvailable(8);
-                    result = readDoubleImpl();
-                    break;
-                case NULL_STRING_TYPE:
-                    result = null;
-                    break;
-                case STRING_TYPE:
-                    checkAvailable(1);
-                    result = readStringImpl();
-                    break;
-            }
-            return result;
-        }
-        catch (RuntimeException e)
-        {
-            _data.position(position);
-            throw e;
-        }
-    }
-
-    protected void writeBoolean(boolean b) throws JMSException
-    {
-        writeTypeDiscriminator(BOOLEAN_TYPE);
-        _data.put(b ? (byte) 1 : (byte) 0);
-    }
-
-    protected void writeByte(byte b) throws JMSException
-    {
-        writeTypeDiscriminator(BYTE_TYPE);
-        _data.put(b);
-    }
-
-    protected void writeShort(short i) throws JMSException
-    {
-        writeTypeDiscriminator(SHORT_TYPE);
-        _data.putShort(i);
-    }
-
-    protected void writeChar(char c) throws JMSException
-    {
-        writeTypeDiscriminator(CHAR_TYPE);
-        _data.putChar(c);
-    }
-
-    protected void writeInt(int i) throws JMSException
-    {
-        writeTypeDiscriminator(INT_TYPE);
-        writeIntImpl(i);
-    }
-
-    protected void writeIntImpl(int i)
-    {
-        _data.putInt(i);
-    }
-
-    protected void writeLong(long l) throws JMSException
-    {
-        writeTypeDiscriminator(LONG_TYPE);
-        _data.putLong(l);
-    }
 
-    protected void writeFloat(float v) throws JMSException
-    {
-        writeTypeDiscriminator(FLOAT_TYPE);
-        _data.putFloat(v);
     }
 
-    protected void writeDouble(double v) throws JMSException
-    {
-        writeTypeDiscriminator(DOUBLE_TYPE);
-        _data.putDouble(v);
-    }
 
-    protected void writeString(String string) throws JMSException
-    {
-        if (string == null)
-        {
-            writeTypeDiscriminator(NULL_STRING_TYPE);
-        }
-        else
-        {
-            writeTypeDiscriminator(STRING_TYPE);
-            try
-            {
-                writeStringImpl(string);
-            }
-            catch (CharacterCodingException e)
-            {
-                JMSException jmse = new JMSException("Unable to encode string: " + e);
-                jmse.setLinkedException(e);
-                jmse.initCause(e);
-                throw jmse;
-            }
-        }
-    }
-
-    protected void writeStringImpl(String string)
-            throws CharacterCodingException
-    {
-        _data.putString(string, Charset.forName("UTF-8").newEncoder());
-        // we must write the null terminator ourselves
-        _data.put((byte) 0);
-    }
+    abstract public void reset();
 
-    protected void writeBytes(byte[] bytes) throws JMSException
-    {
-        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
-    }
 
-    protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
-    {
-        writeTypeDiscriminator(BYTEARRAY_TYPE);
-        if (bytes == null)
-        {
-            _data.putInt(-1);
-        }
-        else
-        {
-            _data.putInt(length);
-            _data.put(bytes, offset, length);
-        }
-    }
 
-    protected void writeObject(Object object) throws JMSException
-    {
-        checkWritable();
-        Class clazz;
 
-        if (object == null)
-        {
-            // string handles the output of null values
-            clazz = String.class;
-        }
-        else
-        {
-            clazz = object.getClass();
-        }
-
-        if (clazz == Byte.class)
-        {
-            writeByte((Byte) object);
-        }
-        else if (clazz == Boolean.class)
-        {
-            writeBoolean((Boolean) object);
-        }
-        else if (clazz == byte[].class)
-        {
-            writeBytes((byte[]) object);
-        }
-        else if (clazz == Short.class)
-        {
-            writeShort((Short) object);
-        }
-        else if (clazz == Character.class)
-        {
-            writeChar((Character) object);
-        }
-        else if (clazz == Integer.class)
-        {
-            writeInt((Integer) object);
-        }
-        else if (clazz == Long.class)
-        {
-            writeLong((Long) object);
-        }
-        else if (clazz == Float.class)
-        {
-            writeFloat((Float) object);
-        }
-        else if (clazz == Double.class)
-        {
-            writeDouble((Double) object);
-        }
-        else if (clazz == String.class)
-        {
-            writeString((String) object);
-        }
-        else
-        {
-            throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
-        }
-    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Sep 19 15:13:18 2011
@@ -20,66 +20,38 @@
  */
 package org.apache.qpid.client.message;
 
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Enumeration;
 import java.util.UUID;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
 {
 
 
-
-    protected ByteBuffer _data;
-    protected boolean _readableMessage = false;
-    protected boolean _changedData = true;
-
     /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
 
-
-
-
     protected AMQMessageDelegate _delegate;
     private boolean _redelivered;
+    private boolean _receivedFromServer;
 
-    protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+    protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData)
     {
         _delegate = delegateFactory.createDelegate();
-        _data = data;
-        if (_data != null)
-        {
-            _data.acquire();
-        }
-
-
-        _readableMessage = (data != null);
-        _changedData = (data == null);
-
+        setContentType(getMimeType());
     }
 
-    protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+    protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException
     {
 
         _delegate = delegate;
-
-        _data = data;
-        if (_data != null)
-        {
-            _data.acquire();
-        }
-
-        _readableMessage = data != null;
-
+        setContentType(getMimeType());
     }
 
     public String getJMSMessageID() throws JMSException
@@ -329,12 +301,9 @@ public abstract class AbstractJMSMessage
 
     public void clearBody() throws JMSException
     {
-        clearBodyImpl();
-        _readableMessage = false;
-
+        _receivedFromServer = false;
     }
 
-
     public void acknowledgeThis() throws JMSException
     {
         _delegate.acknowledgeThis();
@@ -345,14 +314,7 @@ public abstract class AbstractJMSMessage
         _delegate.acknowledge();
     }
 
-    /**
-     * This forces concrete classes to implement clearBody()
-     *
-     * @throws JMSException
-     */
-    public abstract void clearBodyImpl() throws JMSException;
-
-    /**
+    /*
      * Get a String representation of the body of the message. Used in the toString() method which outputs this before
      * message properties.
      */
@@ -413,63 +375,24 @@ public abstract class AbstractJMSMessage
         return _delegate;
     }
 
-    public ByteBuffer getData()
-    {
-        // make sure we rewind the data just in case any method has moved the
-        // position beyond the start
-        if (_data != null)
-        {
-            reset();
-        }
+    abstract public ByteBuffer getData() throws JMSException;
 
-        return _data;
-    }
-
-    protected void checkReadable() throws MessageNotReadableException
-    {
-        if (!_readableMessage)
-        {
-            throw new MessageNotReadableException("You need to call reset() to make the message readable");
-        }
-    }
 
     protected void checkWritable() throws MessageNotWriteableException
     {
-        if (_readableMessage)
+        if (_receivedFromServer)
         {
             throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
         }
     }
 
-    public void reset()
-    {
-        if (!_changedData)
-        {
-            _data.rewind();
-        }
-        else
-        {
-            _data.flip();
-            _changedData = false;
-        }
-    }
 
-    public int getContentLength()
+    public void setReceivedFromServer()
     {
-        if(_data != null)
-        {
-            return _data.remaining();
-        }
-        else
-        {
-            return 0;
-        }
+        _receivedFromServer = true;
     }
 
-    public void receivedFromServer()
-    {
-        _changedData = false;
-    }
+
 
     /**
      * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
@@ -38,6 +36,8 @@ import javax.jms.JMSException;
 import java.util.Iterator;
 import java.util.List;
 
+import java.nio.ByteBuffer;
+
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
     private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
@@ -57,7 +57,7 @@ public abstract class AbstractJMSMessage
                 _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
             }
 
-            data = ((ContentBody) bodies.get(0)).payload;
+            data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload);
         }
         else if (bodies != null)
         {
@@ -72,7 +72,7 @@ public abstract class AbstractJMSMessage
             while (it.hasNext())
             {
                 ContentBody cb = (ContentBody) it.next();
-                final ByteBuffer payload = cb.payload;
+                final ByteBuffer payload = ByteBuffer.wrap(cb._payload);
                 if(payload.isDirect() || payload.isReadOnly())
                 {
                     data.put(payload);
@@ -82,7 +82,6 @@ public abstract class AbstractJMSMessage
                     data.put(payload.array(), payload.arrayOffset(), payload.limit());
                 }
 
-                payload.release();
             }
 
             data.flip();
@@ -109,7 +108,7 @@ public abstract class AbstractJMSMessage
 
 
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
-                                                          DeliveryProperties deliveryProps,  
+                                                          DeliveryProperties deliveryProps,
                                                           java.nio.ByteBuffer body) throws AMQException
     {
         ByteBuffer data;
@@ -118,7 +117,7 @@ public abstract class AbstractJMSMessage
 
         if (body != null)
         {
-            data = ByteBuffer.wrap(body);
+            data = body;
         }
         else // body == null
         {
@@ -155,7 +154,7 @@ public abstract class AbstractJMSMessage
     {
         final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
         msg.setJMSRedelivered(redelivered);
-        msg.receivedFromServer();
+        msg.setReceivedFromServer();
         return msg;
     }
 
@@ -166,7 +165,7 @@ public abstract class AbstractJMSMessage
         final AbstractJMSMessage msg =
                 create010MessageWithBody(messageNbr,msgProps,deliveryProps, body);
         msg.setJMSRedelivered(redelivered);
-        msg.receivedFromServer();
+        msg.setReceivedFromServer();
         return msg;
     }
 

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.message;
 
+import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
@@ -28,47 +29,56 @@ import java.nio.charset.CharsetEncoder;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
-public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
+public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage
 {
     public static final String MIME_TYPE = "application/octet-stream";
 
 
+    private TypedBytesContentReader _typedBytesContentReader;
+    private TypedBytesContentWriter _typedBytesContentWriter;
 
-    public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
-    {
-        this(delegateFactory,null);
-
-    }
 
-    /**
-     * Construct a bytes message with existing data.
-     *
-     * @param delegateFactory
-     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     */
-    JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+    public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
     {
-
-        super(delegateFactory, data); // this instanties a content header
+        super(delegateFactory,false);
+        _typedBytesContentWriter = new TypedBytesContentWriter();
     }
 
     JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        super(delegate, data);
+        super(delegate, data!=null);
+        _typedBytesContentReader = new TypedBytesContentReader(data);
     }
 
 
     public void reset()
     {
-        super.reset();
         _readableMessage = true;
+
+        if(_typedBytesContentReader != null)
+        {
+            _typedBytesContentReader.reset();
+        }
+        else if (_typedBytesContentWriter != null)
+        {
+            _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
+        }
+    }
+
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _typedBytesContentReader = null;
+        _typedBytesContentWriter = new TypedBytesContentWriter();
+
     }
 
     protected String getMimeType()
@@ -76,45 +86,57 @@ public class JMSBytesMessage extends Abs
         return MIME_TYPE;
     }
 
+    @Override
+    public java.nio.ByteBuffer getData() throws JMSException
+    {
+        return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
+    }
+
     public long getBodyLength() throws JMSException
     {
         checkReadable();
-        return _data.limit();
+        return _typedBytesContentReader.size();
     }
 
     public boolean readBoolean() throws JMSException
     {
         checkReadable();
         checkAvailable(1);
-        return _data.get() != 0;
+
+        return _typedBytesContentReader.readBooleanImpl();
+    }
+
+    private void checkAvailable(final int i) throws MessageEOFException
+    {
+        _typedBytesContentReader.checkAvailable(1);
     }
 
     public byte readByte() throws JMSException
     {
         checkReadable();
         checkAvailable(1);
-        return _data.get();
+        return _typedBytesContentReader.readByteImpl();
     }
 
     public int readUnsignedByte() throws JMSException
     {
         checkReadable();
         checkAvailable(1);
-        return _data.getUnsigned();
+        return _typedBytesContentReader.readByteImpl() & 0xFF;
     }
 
     public short readShort() throws JMSException
     {
         checkReadable();
         checkAvailable(2);
-        return _data.getShort();
+        return _typedBytesContentReader.readShortImpl();
     }
 
     public int readUnsignedShort() throws JMSException
     {
         checkReadable();
         checkAvailable(2);
-        return _data.getUnsignedShort();
+        return _typedBytesContentReader.readShortImpl() & 0xFFFF;
     }
 
     /**
@@ -127,35 +149,35 @@ public class JMSBytesMessage extends Abs
     {
         checkReadable();
         checkAvailable(2);
-        return _data.getChar();
+        return _typedBytesContentReader.readCharImpl();
     }
 
     public int readInt() throws JMSException
     {
         checkReadable();
         checkAvailable(4);
-        return _data.getInt();
+        return _typedBytesContentReader.readIntImpl();
     }
 
     public long readLong() throws JMSException
     {
         checkReadable();
         checkAvailable(8);
-        return _data.getLong();
+        return _typedBytesContentReader.readLongImpl();
     }
 
     public float readFloat() throws JMSException
     {
         checkReadable();
         checkAvailable(4);
-        return _data.getFloat();
+        return _typedBytesContentReader.readFloatImpl();
     }
 
     public double readDouble() throws JMSException
     {
         checkReadable();
         checkAvailable(8);
-        return _data.getDouble();
+        return _typedBytesContentReader.readDoubleImpl();
     }
 
     public String readUTF() throws JMSException
@@ -164,34 +186,7 @@ public class JMSBytesMessage extends Abs
         // we check only for one byte since theoretically the string could be only a
         // single byte when using UTF-8 encoding
 
-        try
-        {
-            short length = readShort();
-            if(length == 0)
-            {
-                return "";
-            }
-            else
-            {
-                CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
-                ByteBuffer encodedString = _data.slice();
-                encodedString.limit(length);
-                _data.position(_data.position()+length);
-                CharBuffer string = decoder.decode(encodedString.buf());
-                
-                return string.toString();
-            }
-
-
-            
-        }
-        catch (CharacterCodingException e)
-        {
-            JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
-            jmse.setLinkedException(e);
-            jmse.initCause(e);
-            throw jmse;
-        }
+        return _typedBytesContentReader.readLengthPrefixedUTF();
     }
 
     public int readBytes(byte[] bytes) throws JMSException
@@ -201,14 +196,14 @@ public class JMSBytesMessage extends Abs
             throw new IllegalArgumentException("byte array must not be null");
         }
         checkReadable();
-        int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+        int count = (_typedBytesContentReader.remaining() >= bytes.length ? bytes.length : _typedBytesContentReader.remaining());
         if (count == 0)
         {
             return -1;
         }
         else
         {
-            _data.get(bytes, 0, count);
+            _typedBytesContentReader.readRawBytes(bytes, 0, count);
             return count;
         }
     }
@@ -224,110 +219,82 @@ public class JMSBytesMessage extends Abs
             throw new IllegalArgumentException("maxLength must be <= bytes.length");
         }
         checkReadable();
-        int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+        int count = (_typedBytesContentReader.remaining() >= maxLength ? maxLength : _typedBytesContentReader.remaining());
         if (count == 0)
         {
             return -1;
         }
         else
         {
-            _data.get(bytes, 0, count);
+            _typedBytesContentReader.readRawBytes(bytes, 0, count);
             return count;
         }
     }
 
+
     public void writeBoolean(boolean b) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.put(b ? (byte) 1 : (byte) 0);
+        _typedBytesContentWriter.writeBooleanImpl(b);
     }
 
     public void writeByte(byte b) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.put(b);
+        _typedBytesContentWriter.writeByteImpl(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putShort(i);
+        _typedBytesContentWriter.writeShortImpl(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putChar(c);
+        _typedBytesContentWriter.writeCharImpl(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putInt(i);
+        _typedBytesContentWriter.writeIntImpl(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putLong(l);
+        _typedBytesContentWriter.writeLongImpl(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putFloat(v);
+        _typedBytesContentWriter.writeFloatImpl(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
         checkWritable();
-        _changedData = true;
-        _data.putDouble(v);
+        _typedBytesContentWriter.writeDoubleImpl(v);
     }
 
     public void writeUTF(String string) throws JMSException
     {
         checkWritable();
-        try
-        {
-            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
-            java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
-            
-            _data.putShort((short)encodedString.limit());
-            _data.put(encodedString);
-            _changedData = true;
-            //_data.putString(string, Charset.forName("UTF-8").newEncoder());
-            // we must add the null terminator manually
-            //_data.put((byte)0);
-        }
-        catch (CharacterCodingException e)
-        {
-            JMSException jmse = new JMSException("Unable to encode string: " + e);
-            jmse.setLinkedException(e);
-            jmse.initCause(e);
-            throw jmse;
-        }
+        _typedBytesContentWriter.writeLengthPrefixedUTF(string);
     }
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        checkWritable();
-        _data.put(bytes);
-        _changedData = true;
+        writeBytes(bytes, 0, bytes.length);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
         checkWritable();
-        _data.put(bytes, offset, length);
-        _changedData = true;
+        _typedBytesContentWriter.writeBytesRaw(bytes, offset, length);
     }
 
     public void writeObject(Object object) throws JMSException

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -22,11 +22,12 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
+import java.nio.ByteBuffer;
+
 public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
 {
     protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Mon Sep 19 15:13:18 2011
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.client.message;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Enumeration;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
 import org.apache.qpid.AMQPInvalidClassException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -282,7 +285,7 @@ public final class JMSHeaderAdapter
                         s = String.valueOf(o);
                     }
                 }
-            }//else return s // null; 
+            }//else return s // null;
         }
 
         return s;
@@ -458,9 +461,29 @@ public final class JMSHeaderAdapter
         return getHeaders().isEmpty();
     }
 
-    public void writeToBuffer(ByteBuffer data)
+    public void writeToBuffer(final ByteBuffer data)
     {
-        getHeaders().writeToBuffer(data);
+        try
+        {
+            getHeaders().writeToBuffer(new DataOutputStream(new OutputStream()
+            {
+                @Override
+                public void write(final int b)
+                {
+                    data.put((byte)b);
+                }
+
+                @Override
+                public void write(final byte[] b, final int off, final int len)
+                {
+                    data.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
+        {
+            throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e);
+        }
     }
 
     public Enumeration getMapNames()

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,8 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,13 +29,14 @@ import org.slf4j.LoggerFactory;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
-public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
+public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage
 {
     private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
 
@@ -54,10 +52,10 @@ public class JMSMapMessage extends Abstr
     JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
     {
 
-        super(delegateFactory, data); // this instantiates a content header
+        super(delegateFactory, data!=null); // this instantiates a content header
         if(data != null)
         {
-            populateMapFromData();
+            populateMapFromData(data);
         }
 
     }
@@ -65,10 +63,10 @@ public class JMSMapMessage extends Abstr
     JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
 
-        super(delegate, data);
+        super(delegate, data != null);
         try
         {
-            populateMapFromData();
+            populateMapFromData(data);
         }
         catch (JMSException je)
         {
@@ -89,18 +87,10 @@ public class JMSMapMessage extends Abstr
         return MIME_TYPE;
     }
 
-    public ByteBuffer getData()
-    {
-        // What if _data is null?
-        writeMapToData();
-
-        return super.getData();
-    }
-
     @Override
-    public void clearBodyImpl() throws JMSException
+    public void clearBody() throws JMSException
     {
-        super.clearBodyImpl();
+        super.clearBody();
         _map.clear();
     }
 
@@ -458,17 +448,18 @@ public class JMSMapMessage extends Abstr
         return _map.containsKey(propName);
     }
 
-    protected void populateMapFromData() throws JMSException
+    protected void populateMapFromData(ByteBuffer data) throws JMSException
     {
-        if (_data != null)
+        TypedBytesContentReader reader = new TypedBytesContentReader(data);
+        if (data != null)
         {
-            _data.rewind();
+            data.rewind();
 
-            final int entries = readIntImpl();
+            final int entries = reader.readIntImpl();
             for (int i = 0; i < entries; i++)
             {
-                String propName = readStringImpl();
-                Object value = readObject();
+                String propName = reader.readStringImpl();
+                Object value = reader.readObject();
                 _map.put(propName, value);
             }
         }
@@ -478,35 +469,21 @@ public class JMSMapMessage extends Abstr
         }
     }
 
-    protected void writeMapToData()
+    public ByteBuffer getData()
+            throws JMSException
     {
-        allocateInitialBuffer();
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+
         final int size = _map.size();
-        writeIntImpl(size);
+        writer.writeIntImpl(size);
         for (Map.Entry<String, Object> entry : _map.entrySet())
         {
-            try
-            {
-                writeStringImpl(entry.getKey());
-            }
-            catch (CharacterCodingException e)
-            {
-                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
-
-            }
+            writer.writeNullTerminatedStringImpl(entry.getKey());
 
-            try
-            {
-                writeObject(entry.getValue());
-            }
-            catch (JMSException e)
-            {
-                Object value = entry.getValue();
-                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
-                    + " (type: " + value.getClass().getName() + ").", e);
-            }
+            writer.writeObject(entry.getValue());
         }
 
+        return writer.getData();
     }
 
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -14,18 +14,16 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSMapMessageFactory extends AbstractJMSMessageFactory
 {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Mon Sep 19 15:13:18 2011
@@ -20,26 +20,28 @@
  */
 package org.apache.qpid.client.message;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.io.*;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.ObjectMessage;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
 
 public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
 {
     public static final String MIME_TYPE = "application/java-object-stream";
+    private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256;
+
+    private Serializable _readData;
+    private ByteBuffer _data;
 
+    private Exception _exception;
+
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
 
-    private static final int DEFAULT_BUFFER_SIZE = 1024;
 
     /**
      * Creates empty, writable message for use by producers
@@ -47,41 +49,57 @@ public class JMSObjectMessage extends Ab
      */
     public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(delegateFactory, null);
-    }
-
-    private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
-    {
-        super(delegateFactory, data);
-        if (data == null)
-        {
-            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-            _data.setAutoExpand(true);
-        }
-
-        setContentType(getMimeType());
+        super(delegateFactory, false);
     }
 
     /**
      * Creates read only message for delivery to consumers
      */
 
-      JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+      JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException
       {
-          super(delegate, data);
+          super(delegate, data!=null);
+
+          try
+          {
+              ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+              {
+
+
+                  @Override
+                  public int read() throws IOException
+                  {
+                      return data.get();
+                  }
+
+                  @Override
+                  public int read(byte[] b, int off, int len) throws IOException
+                  {
+                      len = data.remaining() < len ? data.remaining() : len;
+                      data.get(b, off, len);
+                      return len;
+                  }
+              });
+
+              _readData = (Serializable) in.readObject();
+          }
+          catch (IOException e)
+          {
+              _exception = e;
+          }
+          catch (ClassNotFoundException e)
+          {
+              _exception = e;
+          }
       }
 
 
-    public void clearBodyImpl() throws JMSException
+    public void clearBody() throws JMSException
     {
-        if (_data != null)
-        {
-            _data.release();
-            _data = null;
-        }
-
-
-
+        super.clearBody();
+        _exception = null;
+        _readData = null;
+        _data = null;
     }
 
     public String toBodyString() throws JMSException
@@ -94,83 +112,116 @@ public class JMSObjectMessage extends Ab
         return MIME_TYPE;
     }
 
-    public void setObject(Serializable serializable) throws JMSException
+    @Override
+    public ByteBuffer getData() throws JMSException
     {
-        checkWritable();
-
-        if (_data == null)
+        if(_exception != null)
         {
-            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-            _data.setAutoExpand(true);
+            final MessageFormatException messageFormatException =
+                    new MessageFormatException("Unable to deserialize message");
+            messageFormatException.setLinkedException(_exception);
+            throw messageFormatException;
+        }
+        if(_readData == null)
+        {
+
+            return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate();
         }
         else
         {
-            _data.rewind();
+            try
+            {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+                ObjectOutputStream oos = new ObjectOutputStream(baos);
+                oos.writeObject(_readData);
+                oos.flush();
+                return ByteBuffer.wrap(baos.toByteArray());
+            }
+            catch (IOException e)
+            {
+                final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+                        _readData.getClass().getName() + ", value " + _readData);
+                jmsException.setLinkedException(e);
+                throw jmsException;
+            }
         }
+    }
+
+    public void setObject(Serializable serializable) throws JMSException
+    {
+        checkWritable();
+        clearBody();
 
         try
         {
-            ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
-            out.writeObject(serializable);
-            out.flush();
-            out.close();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(serializable);
+            oos.flush();
+            _data = ByteBuffer.wrap(baos.toByteArray());
         }
         catch (IOException e)
         {
-            MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
-            mfe.setLinkedException(e);
-            mfe.initCause(e);
-            throw mfe;
+            final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+                    serializable.getClass().getName() + ", value " + serializable);
+            jmsException.setLinkedException(e);
+            throw jmsException;
         }
 
     }
 
     public Serializable getObject() throws JMSException
     {
-        ObjectInputStream in = null;
-        if (_data == null)
+        if(_exception != null)
         {
-            return null;
+            final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message");
+            messageFormatException.setLinkedException(_exception);
+            throw messageFormatException;
         }
-
-        try
+        else if(_readData != null || _data == null)
         {
-            _data.rewind();
-            in = new ObjectInputStream(_data.asInputStream());
-
-            return (Serializable) in.readObject();
+            return _readData;
         }
-        catch (IOException e)
-        {
-            MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
-            mfe.setLinkedException(e);
-            mfe.initCause(e);
-            throw mfe;
-        }
-        catch (ClassNotFoundException e)
-        {
-            MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
-            mfe.setLinkedException(e);
-            mfe.initCause(e);
-            throw mfe;
-        }
-        finally
+        else
         {
-          //  _data.rewind();
-            close(in);
-        }
-    }
+            Exception exception = null;
 
-    private static void close(InputStream in)
-    {
-        try
-        {
-            if (in != null)
+            final ByteBuffer data = _data.duplicate();
+            try
             {
-                in.close();
+                ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+                {
+                    @Override
+                    public int read() throws IOException
+                    {
+                        return data.get();
+                    }
+
+                    @Override
+                    public int read(byte[] b, int off, int len) throws IOException
+                    {
+                        len = data.remaining() < len ? data.remaining() : len;
+                        data.get(b, off, len);
+                        return len;
+                    }
+                });
+
+                return (Serializable) in.readObject();
             }
+            catch (ClassNotFoundException e)
+            {
+                exception = e;
+            }
+            catch (IOException e)
+            {
+                exception = e;
+            }
+
+            JMSException jmsException = new JMSException("Could not deserialize object");
+            jmsException.setLinkedException(exception);
+            throw jmsException;
         }
-        catch (IOException ignore)
-        { }
+
     }
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,8 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
 {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org