You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [30/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Oct 21 01:19:00 2011
@@ -19,11 +19,10 @@ package org.apache.qpid.client;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
@@ -66,13 +65,19 @@ public class BasicMessageConsumer_0_10 e
     private boolean _preAcquire = true;
 
     /**
+     * Indicate whether this consumer is started.
+     */
+    private boolean _isStarted = false;
+
+    /**
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
     
     private long capacity = 0;
-
+        
+    //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
@@ -98,6 +103,7 @@ public class BasicMessageConsumer_0_10 e
                 _preAcquire = false;
             }
         }
+        _isStarted = connection.started();
         
         // Destination setting overrides connection defaults
         if (destination.getDestSyntax() == DestSyntax.ADDR && 
@@ -150,20 +156,13 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (isMessageListenerSet() && capacity == 0)
                 {
-                    messageFlow();
+                    _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                              MessageCreditUnit.MESSAGE, 1,
+                                                              Option.UNRELIABLE);
                 }
                 _logger.debug("messageOk, trying to notify");
                 super.notifyMessage(jmsMessage);
             }
-            else
-            {
-                // if we are synchronously waiting for a message
-                // and messages are not pre-fetched we then need to request another one
-                if(capacity == 0)
-                {
-                   messageFlow();
-                }
-            }
         }
         catch (AMQException e)
         {
@@ -172,6 +171,8 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
+    //----- overwritten methods
+
     /**
      * This method is invoked when this consumer is stopped.
      * It tells the broker to stop delivering messages to this consumer.
@@ -201,18 +202,11 @@ public class BasicMessageConsumer_0_10 e
         super.notifyMessage(messageFrame);
     }
 
-    @Override
-    protected void preDeliver(AbstractJMSMessage jmsMsg)
+    @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
     {
-        super.preDeliver(jmsMsg);
-
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+        super.preApplicationProcessing(jmsMsg);
+        if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
         {
-            //For 0-10 we need to ensure that all messages are indicated processed in some way to
-            //ensure their AMQP command-id is marked completed, and so we must send a completion
-            //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
-            //Add message to the unacked message list to ensure we dont lose record of it before
-            //sending a completion of some sort.
             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
         }
     }
@@ -224,6 +218,7 @@ public class BasicMessageConsumer_0_10 e
         return _messageFactory.createMessage(msg.getMessageTransfer());
     }
 
+    // private methods
     /**
      * Check whether a message can be delivered to this consumer.
      *
@@ -252,7 +247,6 @@ public class BasicMessageConsumer_0_10 e
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
         }
-
         if (!messageOk)
         {
             if (_preAcquire)
@@ -269,12 +263,19 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("filterMessage - not ack'ing message as not acquired");
+                    _logger.debug("Message not OK, releasing");
                 }
-                flushUnwantedMessage(message);
+                releaseMessage(message);
+            }
+            // if we are syncrhonously waiting for a message
+            // and messages are not prefetched we then need to request another one
+            if(capacity == 0)
+            {
+               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                         MessageCreditUnit.MESSAGE, 1,
+                                                         Option.UNRELIABLE);
             }
         }
-
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
         if (!_preAcquire && messageOk && !isNoConsume())
@@ -286,7 +287,6 @@ public class BasicMessageConsumer_0_10 e
             messageOk = acquireMessage(message);
             _logger.debug("filterMessage - message acquire status : " + messageOk);
         }
-
         return messageOk;
     }
 
@@ -297,38 +297,38 @@ public class BasicMessageConsumer_0_10 e
      * @param message The message to be acknowledged
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
+    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
     {
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
-        _0_10session.messageAcknowledge
-            (ranges,
-             _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
-        final AMQException amqe = _0_10session.getCurrentException();
-        if (amqe != null)
+        if (!_preAcquire)
         {
-            throw amqe;
+            RangeSet ranges = new RangeSet();
+            ranges.add((int) message.getDeliveryTag());
+            _0_10session.messageAcknowledge
+                (ranges,
+                 _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+            AMQException amqe = _0_10session.getCurrentException();
+            if (amqe != null)
+            {
+                throw amqe;
+            }
         }
     }
 
     /**
-     * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
-     * processed to ensure their AMQP command-id is marked completed.
+     * Release a message
      *
-     * @param message The unwanted message to be flushed
-     * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+     * @param message The message to be released
+     * @throws AMQException If the message cannot be released due to some internal error.
      */
-    private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+    private void releaseMessage(AbstractJMSMessage message) throws AMQException
     {
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
-        _0_10session.flushProcessed(ranges,false);
-
-        final AMQException amqe = _0_10session.getCurrentException();
-        if (amqe != null)
+        if (_preAcquire)
         {
-            throw amqe;
+            RangeSet ranges = new RangeSet();
+            ranges.add((int) message.getDeliveryTag());
+            _0_10session.getQpidSession().messageRelease(ranges);
+            _0_10session.sync();
         }
     }
 
@@ -339,52 +339,44 @@ public class BasicMessageConsumer_0_10 e
      * @return true if the message has been acquired, false otherwise.
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
+    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
+        if (!_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add((int) message.getDeliveryTag());
 
-        final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+            Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
 
-        final RangeSet acquired = acq.getTransfers();
-        if (acquired != null && acquired.size() > 0)
-        {
-            result = true;
+            RangeSet acquired = acq.getTransfers();
+            if (acquired != null && acquired.size() > 0)
+            {
+                result = true;
+            }
         }
         return result;
     }
 
-    private void messageFlow()
-    {
-        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                  MessageCreditUnit.MESSAGE, 1,
-                                                  Option.UNRELIABLE);
-    }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
         super.setMessageListener(messageListener);
-        try
+        if (messageListener != null && capacity == 0)
         {
-            if (messageListener != null && capacity == 0)
-            {
-                messageFlow();
-            }
-            if (messageListener != null && !_synchronousQueue.isEmpty())
-            {
-                Iterator messages=_synchronousQueue.iterator();
-                while (messages.hasNext())
-                {
-                    AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
-                    messages.remove();
-                    _session.rejectMessage(message, true);
-                }
-            }
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                      MessageCreditUnit.MESSAGE, 1,
+                                                      Option.UNRELIABLE);
         }
-        catch(TransportException e)
+        if (messageListener != null && !_synchronousQueue.isEmpty())
         {
-            throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+            Iterator messages=_synchronousQueue.iterator();
+            while (messages.hasNext())
+            {
+                AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+                messages.remove();
+                _session.rejectMessage(message, true);
+            }
         }
     }
 
@@ -392,7 +384,9 @@ public class BasicMessageConsumer_0_10 e
     {
         if (_0_10session.isStarted() && _syncReceive.get())
         {
-            messageFlow();
+            _0_10session.getQpidSession().messageFlow
+                (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+                 Option.UNRELIABLE);
         }
     }
 
@@ -413,7 +407,9 @@ public class BasicMessageConsumer_0_10 e
         }
         if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
         {
-            messageFlow();
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                      MessageCreditUnit.MESSAGE, 1,
+                                                      Option.UNRELIABLE);
         }
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())
@@ -444,7 +440,7 @@ public class BasicMessageConsumer_0_10 e
         return o;
     }
 
-    void postDeliver(AbstractJMSMessage msg)
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
     {
         super.postDeliver(msg);
         if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
@@ -453,8 +449,10 @@ public class BasicMessageConsumer_0_10 e
         }
         
         if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
-             !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+             !_session.isInRecovery() &&   
+             _session.getAMQConnection().getSyncAck())
         {
+            ((AMQSession_0_10) getSession()).flushAcknowledgments();
             ((AMQSession_0_10) getSession()).getQpidSession().sync();
         }
     }
@@ -511,18 +509,4 @@ public class BasicMessageConsumer_0_10 e
             return _exclusive;
         }
     }
-    
-    void cleanupQueue() throws AMQException, FailoverException
-    {
-        AMQDestination dest = this.getDestination();
-        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
-        {
-            if (dest.getDelete() == AddressOption.ALWAYS ||
-                dest.getDelete() == AddressOption.RECEIVER )
-            {
-                ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        this.getDestination().getQueueName());
-            }
-        }
-    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Fri Oct 21 01:19:00 2011
@@ -88,8 +88,4 @@ public class BasicMessageConsumer_0_8 ex
         return receive();
     }
 
-    void cleanupQueue() throws AMQException, FailoverException
-    {
-        
-    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Oct 21 01:19:00 2011
@@ -39,7 +39,6 @@ import org.apache.qpid.client.message.Ab
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
 import org.slf4j.Logger;
@@ -114,6 +113,8 @@ public abstract class BasicMessageProduc
 
     private final boolean _mandatory;
 
+    private final boolean _waitUntilSent;
+
     private boolean _disableMessageId;
 
     private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
@@ -125,7 +126,8 @@ public abstract class BasicMessageProduc
     protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+                                   boolean waitUntilSent) throws AMQException
     {
         _connection = connection;
         _destination = destination;
@@ -141,6 +143,7 @@ public abstract class BasicMessageProduc
 
         _immediate = immediate;
         _mandatory = mandatory;
+        _waitUntilSent = waitUntilSent;
         _userID = connection.getUsername();
         setPublishMode();
     }
@@ -263,7 +266,7 @@ public abstract class BasicMessageProduc
         return _destination;
     }
 
-    public void close() throws JMSException
+    public void close()
     {
         _closed.set(true);
         _session.deregisterProducer(_producerId);
@@ -360,6 +363,19 @@ public abstract class BasicMessageProduc
         }
     }
 
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+    {
+        checkPreConditions();
+        checkDestination(destination);
+        synchronized (_connection.getFailoverMutex())
+        {
+            validateDestination(destination);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+                     waitUntilSent);
+        }
+    }
+
     private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
     {
         if (message instanceof AbstractJMSMessage)
@@ -434,6 +450,12 @@ public abstract class BasicMessageProduc
         }
     }
 
+    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                            boolean mandatory, boolean immediate) throws JMSException
+    {
+        sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
+    }
+
     /**
      * The caller of this method must hold the failover mutex.
      *
@@ -448,13 +470,23 @@ public abstract class BasicMessageProduc
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
-                            boolean mandatory, boolean immediate) throws JMSException
+                            boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
+        if (_transacted)
+        {
+            if (_session.hasFailedOver() && _session.isDirty())
+            {
+                throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+                                          new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+                                                                       "so unable to send."));
+            }
+        }
+
         UUID messageId = null;
         if (_disableMessageId)
         {
@@ -466,14 +498,7 @@ public abstract class BasicMessageProduc
             message.setJMSMessageID(messageId);
         }
 
-        try
-        {
-            sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
-        }
-        catch (TransportException e)
-        {
-            throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
-        }
+        sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
 
         if (message != origMessage)
         {
@@ -493,7 +518,7 @@ public abstract class BasicMessageProduc
 
     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                               UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                              boolean immediate) throws JMSException;
+                              boolean immediate, boolean wait) throws JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
@@ -571,13 +596,6 @@ public abstract class BasicMessageProduc
 
     public boolean isBound(AMQDestination destination) throws JMSException
     {
-        try
-        {
-            return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
-        }
-        catch (TransportException e)
-        {
-            throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
-        }
+        return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
     }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Oct 21 01:19:00 2011
@@ -19,7 +19,6 @@ package org.apache.qpid.client;
 
 import static org.apache.qpid.transport.Option.NONE;
 import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -31,12 +30,9 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
@@ -46,7 +42,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,9 +56,10 @@ public class BasicMessageProducer_0_10 e
 
     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                               AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
-                              boolean immediate, boolean mandatory) throws AMQException
+                              boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
     {
-        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
+              mandatory, waitUntilSent);
         
         userIDBytes = Strings.toUTF8(_userID);
     }
@@ -72,15 +68,12 @@ public class BasicMessageProducer_0_10 e
     {
         if (destination.getDestSyntax() == DestSyntax.BURL)
         {
-        	if (getSession().isDeclareExchanges())
-        	{
-	            String name = destination.getExchangeName().toString();
-	            ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
-	                (name,
-	                 destination.getExchangeClass().toString(),
-	                 null, null,
-	                 name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
-        	}
+            String name = destination.getExchangeName().toString();
+            ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+                (name,
+                 destination.getExchangeClass().toString(),
+                 null, null,
+                 name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
         }
         else
         {       
@@ -103,7 +96,7 @@ public class BasicMessageProducer_0_10 e
      */
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                     boolean immediate) throws JMSException
+                     boolean immediate, boolean wait) throws JMSException
     {
         message.prepareForSending();
 
@@ -178,7 +171,7 @@ public class BasicMessageProducer_0_10 e
         
         if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && 
            (destination.getSubject() != null || 
-              (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
+              (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null))
            )
         {
             Map<String,Object> appProps = messageProps.getApplicationHeaders();
@@ -188,21 +181,20 @@ public class BasicMessageProducer_0_10 e
                 messageProps.setApplicationHeaders(appProps);          
             }
             
-            if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
+            if (appProps.get("qpid.subject") == null)
             {
                 // use default subject in address string
-                appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
+                appProps.put("qpid.subject",destination.getSubject());
             }
                     
-            if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+            if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE)
             {
                 deliveryProp.setRoutingKey((String)
-                        messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));                
+                        messageProps.getApplicationHeaders().get("qpid.subject"));                
             }
         }
-
-        ByteBuffer data = message.getData();
-        messageProps.setContentLength(data.remaining());
+        
+        messageProps.setContentLength(message.getContentLength());
 
         // send the message
         try
@@ -218,17 +210,14 @@ public class BasicMessageProducer_0_10 e
                          deliveryMode == DeliveryMode.PERSISTENT)
                    );  
             
-            boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
-                                 (destination.getLink().getReliability() == Reliability.UNRELIABLE);
-            
-
-            ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice();
+            org.apache.mina.common.ByteBuffer data = message.getData();
+            ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
             
             ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), 
                                 MessageAcceptMode.NONE,
                                 MessageAcquireMode.PRE_ACQUIRED,
                                 new Header(deliveryProp, messageProps),
-                    buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE);
+                    buffer, sync ? SYNC : NONE);
             if (sync)
             {
                 ssn.sync();
@@ -245,34 +234,10 @@ public class BasicMessageProducer_0_10 e
         }
     }
 
-    @Override
+
     public boolean isBound(AMQDestination destination) throws JMSException
     {
         return _session.isQueueBound(destination);
     }
-    
-    @Override
-    public void close() throws JMSException
-    {
-        super.close();
-        AMQDestination dest = _destination;
-        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
-        {
-            if (dest.getDelete() == AddressOption.ALWAYS ||
-                dest.getDelete() == AddressOption.SENDER )
-            {
-                try
-                {
-                    ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        _destination.getQueueName());
-                }
-                catch(TransportException e)
-                {
-                    throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
-                }
-            }
-        }
-    }
-
 }
 

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Fri Oct 21 01:19:00 2011
@@ -27,13 +27,14 @@ import javax.jms.Message;
 import javax.jms.Topic;
 import javax.jms.Queue;
 
-import java.nio.ByteBuffer;
-
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.CompositeAMQDataBlock;
@@ -45,9 +46,10 @@ public class BasicMessageProducer_0_8 ex
 {
 
     BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+            boolean waitUntilSent) throws AMQException
     {
-        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
     }
 
     void declareDestination(AMQDestination destination)
@@ -72,7 +74,7 @@ public class BasicMessageProducer_0_8 ex
 
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
-                     boolean immediate) throws JMSException
+                     boolean immediate, boolean wait) throws JMSException
     {
         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
                                                                                         destination.getExchangeName(),
@@ -167,7 +169,7 @@ public class BasicMessageProducer_0_8 ex
             throw jmse;
         }
 
-        _protocolHandler.writeFrame(compositeFrame);
+        _protocolHandler.writeFrame(compositeFrame, wait);
     }
 
     /**
@@ -184,9 +186,7 @@ public class BasicMessageProducer_0_8 ex
 
         if (frames.length == (offset + 1))
         {
-            byte[] data = new byte[payload.remaining()];
-            payload.get(data);
-            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
         }
         else
         {
@@ -198,10 +198,7 @@ public class BasicMessageProducer_0_8 ex
                 payload.position((int) framePayloadMax * (i - offset));
                 int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
-                byte[] data = new byte[payload.remaining()];
-                payload.get(data);
-
-                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
 
                 remaining -= length;
             }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java Fri Oct 21 01:19:00 2011
@@ -1,23 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
 package org.apache.qpid.client;
 
 import java.util.ArrayList;

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.List;
 
 import org.apache.qpid.framing.AMQShortString;
 
@@ -35,18 +34,6 @@ public enum CustomJMSXProperty
     JMSXGroupSeq,
     JMSXUserID;
 
-    private static List<String> _names;
-
-    static
-    {
-        CustomJMSXProperty[] properties = values();
-        _names = new ArrayList<String>(properties.length);
-        for(CustomJMSXProperty property :  properties)
-        {
-            _names.add(property.toString());
-        }
-
-    }
 
     private final AMQShortString _nameAsShortString;
 
@@ -60,8 +47,20 @@ public enum CustomJMSXProperty
         return _nameAsShortString;
     }
 
-    public static Enumeration asEnumeration()
+    private static Enumeration _names;
+
+    public static synchronized Enumeration asEnumeration()
     {
-        return Collections.enumeration(_names);
+        if(_names == null)
+        {
+            CustomJMSXProperty[] properties = values();
+            ArrayList<String> nameList = new ArrayList<String>(properties.length);
+            for(CustomJMSXProperty property :  properties)
+            {
+                nameList.add(property.toString());
+            }
+            _names = Collections.enumeration(nameList);
+        }
+        return _names;    
     }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Fri Oct 21 01:19:00 2011
@@ -30,11 +30,9 @@ import org.apache.qpid.common.QpidProper
 public class QpidConnectionMetaData implements ConnectionMetaData
 {
 
-    private AMQConnection con;
 
     QpidConnectionMetaData(AMQConnection conn)
     {
-        this.con = conn;
     }
 
     public int getJMSMajorVersion() throws JMSException
@@ -64,12 +62,12 @@ public class QpidConnectionMetaData impl
 
     public int getProviderMajorVersion() throws JMSException
     {
-        return con.getProtocolVersion().getMajorVersion();
+        return 0;
     }
 
     public int getProviderMinorVersion() throws JMSException
     {
-        return con.getProtocolVersion().getMinorVersion();
+        return 8;
     }
 
     public String getProviderVersion() throws JMSException
@@ -80,7 +78,8 @@ public class QpidConnectionMetaData impl
 
     private String getProtocolVersion()
     {
-        return con.getProtocolVersion().toString();
+        // TODO - Implement based on connection negotiated protocol
+        return "0.8";
     }
 
     public String getBrokerVersion()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Fri Oct 21 01:19:00 2011
@@ -50,25 +50,25 @@ public class QueueSenderAdapter implemen
 
     public void send(Message msg) throws JMSException
     {
-        checkQueuePreConditions(_queue);
+        checkPreConditions();
         _delegate.send(msg);
     }
 
     public void send(Queue queue, Message msg) throws JMSException
     {
-        checkQueuePreConditions(queue);
+        checkPreConditions(queue);
         _delegate.send(queue, msg);
     }
 
     public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
-        checkQueuePreConditions(_queue);
+        checkPreConditions();
         _delegate.send(msg, deliveryMode, priority, timeToLive);
     }
 
     public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
-        checkQueuePreConditions(queue);
+        checkPreConditions(queue);
         _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
     }
 
@@ -122,19 +122,19 @@ public class QueueSenderAdapter implemen
 
     public void send(Destination dest, Message msg) throws JMSException
     {
-        checkQueuePreConditions((Queue) dest);
+        checkPreConditions((Queue) dest);
         _delegate.send(dest, msg);
     }
 
     public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
-        checkQueuePreConditions(_queue);
+        checkPreConditions();
         _delegate.send(msg, deliveryMode, priority, timeToLive);
     }
 
     public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
-        checkQueuePreConditions((Queue) dest);
+        checkPreConditions((Queue) dest);
         _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
     }
 
@@ -170,6 +170,11 @@ public class QueueSenderAdapter implemen
 
     private void checkPreConditions() throws JMSException
     {
+        checkPreConditions(_queue);
+    }
+
+    private void checkPreConditions(Queue queue) throws JMSException
+    {
         if (closed)
         {
             throw new javax.jms.IllegalStateException("Publisher is closed");
@@ -181,43 +186,39 @@ public class QueueSenderAdapter implemen
         {
             throw new javax.jms.IllegalStateException("Invalid Session");
         }
-    }
 
-    private void checkQueuePreConditions(Queue queue) throws JMSException
-    {
-       checkPreConditions() ;
-       
-       if (queue == null)
-       {
-          throw new UnsupportedOperationException("Queue is null.");
-       }
-       
-       if (!(queue instanceof AMQDestination))
-       {
-           throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
-       }
-  
-       AMQDestination destination = (AMQDestination) queue;
-       if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
-       {
-           if (_delegate.getSession().isStrictAMQP())
-           {
-               _delegate._logger.warn("AMQP does not support destination validation before publish, ");
-               destination.setCheckedForQueueBinding(true);
-           }
-           else
-           {
-               if (_delegate.isBound(destination))
-               {
-                   destination.setCheckedForQueueBinding(true);
-               }
-               else
-               {
-                   throw new InvalidDestinationException("Queue: " + queue
-                       + " is not a valid destination (no bindings on server");
-               }
-           }
-       }
+        if (queue == null)
+        {
+            throw new UnsupportedOperationException("Queue is null.");
+        }
+
+        if (!(queue instanceof AMQDestination))
+        {
+            throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
+        }
+
+        AMQDestination destination = (AMQDestination) queue;
+        if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
+        {
+
+            if (_delegate.getSession().isStrictAMQP())
+            {
+                _delegate._logger.warn("AMQP does not support destination validation before publish, ");
+                destination.setCheckedForQueueBinding(true);
+            }
+            else
+            {
+                if (_delegate.isBound(destination))
+                {
+                    destination.setCheckedForQueueBinding(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + queue
+                        + " is not a valid destination (no bindings on server");
+                }
+            }
+        }
     }
 
     private boolean checkQueueBeforePublish()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Fri Oct 21 01:19:00 2011
@@ -24,16 +24,13 @@ package org.apache.qpid.client;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
-import org.apache.qpid.framing.AMQShortString;
-
 /**
- * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
  * so that operations related to their "temporary-ness" can be abstracted out.
  */
 interface TemporaryDestination extends Destination
 {
 
-    public AMQShortString getAMQQueueName();
     public void delete() throws JMSException;
     public AMQSession getSession();
     public boolean isDeleted();

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Fri Oct 21 01:19:00 2011
@@ -31,9 +31,9 @@ public class XAConnectionImpl extends AM
     /**
      * Create a XAConnection from a connectionURL
      */
-    public XAConnectionImpl(ConnectionURL connectionURL) throws AMQException
+    public XAConnectionImpl(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
-        super(connectionURL);
+        super(connectionURL, sslConfig);
     }
 
     //-- interface XAConnection

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Fri Oct 21 01:19:00 2011
@@ -21,14 +21,10 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.dtx.XidImpl;
-import org.apache.qpid.transport.DtxXaStatus;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.Future;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RecoverResult;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.XaResult;
+import org.apache.qpid.transport.*;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,28 +211,9 @@ public class XAResourceImpl implements X
      * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
      */
     public boolean isSameRM(XAResource xaResource) throws XAException
-    {       
-        if(this == xaResource)
-        {
-            return true;            
-        }       
-        if(!(xaResource instanceof XAResourceImpl))
-        {
-            return false;           
-        }
-        
-        XAResourceImpl other = (XAResourceImpl)xaResource;
-
-        String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
-        String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-        
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
-        }
-        
-        return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-                
+    {
+        // TODO : get the server identity of xaResource and compare it with our own one
+        return false;
     }
 
     /**

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Fri Oct 21 01:19:00 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
     {
         super(qpidConnection, con, channelId, false,  // this is not a transacted session
               Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
-              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
+              MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
         createSession();
         _xaResource = new XAResourceImpl(this);
     }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java Fri Oct 21 01:19:00 2011
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
  * <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
  * </table>
  *
- * @todo Another continuation. Could use an interface Continuation (as described in other todos)
- *      Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ *      {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
  *      Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
  *      Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
  *      to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -78,7 +78,7 @@ public class ChannelCloseMethodHandler i
                 {
                     throw new AMQNoRouteException("Error: " + reason, null, null);
                 }
-                else if (errorCode == AMQConstant.ARGUMENT_INVALID)
+                else if (errorCode == AMQConstant.INVALID_ARGUMENT)
                 {
                     _logger.debug("Broker responded with Invalid Argument.");
 

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -20,13 +20,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import java.io.UnsupportedEncodingException;
-import java.util.StringTokenizer;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -41,9 +34,18 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
 public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
 {
     private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -195,20 +197,40 @@ public class ConnectionStartMethodHandle
     private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
     {
         final String mechanisms = new String(availableMechanisms, "utf8");
-        return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
+        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+        HashSet mechanismSet = new HashSet();
+        while (tokenizer.hasMoreTokens())
+        {
+            mechanismSet.add(tokenizer.nextToken());
+        }
+
+        String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+        StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+        while (prefTokenizer.hasMoreTokens())
+        {
+            String mech = prefTokenizer.nextToken();
+            if (mechanismSet.contains(mech))
+            {
+                return mech;
+            }
+        }
+
+        return null;
     }
 
     private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
         throws AMQException
     {
+        Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
         try
         {
-            AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
-            instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
+            Object instance = mechanismClass.newInstance();
+            AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
+            cbh.initialise(protocolSession);
 
-            return instance;
+            return cbh;
         }
-        catch (IllegalArgumentException e)
+        catch (Exception e)
         {
             throw new AMQException(null, "Unable to create callback handler: " + e, e);
         }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Fri Oct 21 01:19:00 2011
@@ -26,7 +26,9 @@ import org.apache.qpid.client.AMQSession
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import java.nio.ByteBuffer;
 import java.util.Enumeration;
+import java.util.Map;
 import java.util.UUID;
 
 public interface AMQMessageDelegate
@@ -128,9 +130,9 @@ public interface AMQMessageDelegate
 
     void removeProperty(final String propertyName) throws JMSException;
 
-    void setAMQSession(final AMQSession<?,?> s);
+    void setAMQSession(final AMQSession s);
 
-    AMQSession<?,?> getAMQSession();
+    AMQSession getAMQSession();
 
     long getDeliveryTag();
 

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java Fri Oct 21 01:19:00 2011
@@ -21,6 +21,11 @@
 
 package org.apache.qpid.client.message;
 
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
 public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
 {
     public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Fri Oct 21 01:19:00 2011
@@ -22,12 +22,10 @@
 package org.apache.qpid.client.message;
 
 import java.lang.ref.SoftReference;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -37,10 +35,12 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQPInvalidClassException;
 import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
@@ -53,9 +53,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.ReplyTo;
-import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This extends AbstractAMQMessageDelegate which contains common code between
@@ -64,7 +61,6 @@ import org.slf4j.LoggerFactory;
  */
 public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
 {
-    private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class);
     private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
 
     public static final String JMS_TYPE = "x-jms-type";
@@ -74,8 +70,13 @@ public class AMQMessageDelegate_0_10 ext
 
     private Destination _destination;
 
+
     private MessageProperties _messageProps;
     private DeliveryProperties _deliveryProps;
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+    private AMQSession _session;
+    private final long _deliveryTag;
+
 
     protected AMQMessageDelegate_0_10()
     {
@@ -85,29 +86,15 @@ public class AMQMessageDelegate_0_10 ext
 
     protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
     {
-        super(deliveryTag);
         _messageProps = messageProps;
         _deliveryProps = deliveryProps;
+        _deliveryTag = deliveryTag;
         _readableProperties = (_messageProps != null);
 
         AMQDestination dest;
 
-        if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
-        {
-            dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+        dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
                                    new AMQShortString(_deliveryProps.getRoutingKey()));
-        }
-        else
-        {
-            String subject = null;
-            if (messageProps != null && messageProps.getApplicationHeaders() != null)
-            {
-                subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT);
-            }
-            dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
-                    _deliveryProps.getRoutingKey(), subject);
-        }
-        
         setJMSDestination(dest);        
     }
 
@@ -198,6 +185,7 @@ public class AMQMessageDelegate_0_10 ext
         }
     }
 
+
     public long getJMSTimestamp() throws JMSException
     {
         return _deliveryProps.getTimestamp();
@@ -252,50 +240,13 @@ public class AMQMessageDelegate_0_10 ext
                 String exchange = replyTo.getExchange();
                 String routingKey = replyTo.getRoutingKey();
 
-                if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
-                {
-            
-                    dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
-                }
-                else
-                {
-                    dest = convertToAddressBasedDestination(exchange,routingKey,null);
-                }
+                dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
                 _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
             }
 
             return dest;
         }
     }
-    
-    private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
-    {
-        String addr;
-        if ("".equals(exchange)) // type Queue
-        {
-            subject = (subject == null) ? "" : "/" + subject;
-            addr = routingKey + subject;
-        }
-        else
-        {
-            addr = exchange + "/" + routingKey;
-        }
-        
-        try
-        {
-            return AMQDestination.createDestination("ADDR:" + addr);
-        }
-        catch(Exception e)
-        {
-            // An exception is only thrown here if the address syntax is invalid.
-            // Logging the exception, but not throwing as this is only important to Qpid developers.
-            // An exception here means a bug in the code.
-            _logger.error("Exception when constructing an address string from the ReplyTo struct");
-            
-            // falling back to the old way of doing it to ensure the application continues.
-            return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
-        } 
-    }
 
     public void setJMSReplyTo(Destination destination) throws JMSException
     {
@@ -317,14 +268,14 @@ public class AMQMessageDelegate_0_10 ext
         {
            try
            {
-               int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
+               int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
                if (type == AMQDestination.QUEUE_TYPE)
                {
-                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
+                   ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
                }
                else
                {
-                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
+                   ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
                }
            }
            catch(AMQException ex)
@@ -334,14 +285,6 @@ public class AMQMessageDelegate_0_10 ext
                e.setLinkedException(ex);
                throw e;
            }
-           catch (TransportException e)
-           {
-               JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
-               jmse.initCause(e);
-               jmse.setLinkedException(e);
-               throw jmse;
-           }
-
         }
         
         final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -392,7 +335,7 @@ public class AMQMessageDelegate_0_10 ext
         Destination replyTo = getJMSReplyTo();
         if(replyTo != null)
         {
-            return ((AMQDestination)replyTo).toString();
+            return ((AMQDestination)replyTo).toURL();
         }
         else
         {
@@ -689,16 +632,6 @@ public class AMQMessageDelegate_0_10 ext
         {
             return new String(_messageProps.getUserId());
         }
-        else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) &&
-                _messageProps.getAppId() != null)
-        {
-            return new String(_messageProps.getAppId());
-        }
-        else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) &&
-                _deliveryProps.getRoutingKey() != null)
-        {
-            return _deliveryProps.getRoutingKey();
-        }
         else
         {
             checkPropertyName(propertyName);
@@ -737,19 +670,7 @@ public class AMQMessageDelegate_0_10 ext
 
     public Enumeration getPropertyNames() throws JMSException
     {
-        List<String> props = new ArrayList<String>();
-        Map<String, Object> propertyMap = getApplicationHeaders();
-        for (String prop: getApplicationHeaders().keySet())
-        {
-            Object value = propertyMap.get(prop);
-            if (value instanceof Boolean || value instanceof Number 
-                || value instanceof String)
-            {
-                props.add(prop);
-            }
-        }
-        
-        return java.util.Collections.enumeration(props);        
+        return java.util.Collections.enumeration(getApplicationHeaders().keySet());
     }
 
     public void setBooleanProperty(String propertyName, boolean b) throws JMSException
@@ -805,14 +726,7 @@ public class AMQMessageDelegate_0_10 ext
     {
         checkPropertyName(propertyName);
         checkWritableProperties();
-        if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName))
-        {
-            _messageProps.setAppId(value.getBytes());
-        }
-        else
-        {
-            setApplicationHeader(propertyName, value);
-        }
+        setApplicationHeader(propertyName, value);
     }
 
     private static final Set<Class> ALLOWED = new HashSet();
@@ -897,6 +811,64 @@ public class AMQMessageDelegate_0_10 ext
         _readableProperties = false;
     }
 
+
+    public void acknowledgeThis() throws JMSException
+    {
+        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            if (_session.getAMQConnection().isClosed())
+            {
+                throw new javax.jms.IllegalStateException("Connection is already closed");
+            }
+
+            // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
+            // received on the session
+            _session.acknowledgeMessage(_deliveryTag, true);
+        }
+    }
+
+    public void acknowledge() throws JMSException
+    {
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            _session.acknowledge();
+        }
+    }
+
+
+     /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession s)
+    {
+        _session = s;
+    }
+
+    public AMQSession getAMQSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     *
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
+
+
+
+
+
     protected void checkPropertyName(CharSequence propertyName)
     {
         if (propertyName == null)

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Fri Oct 21 01:19:00 2011
@@ -30,6 +30,7 @@ import java.util.UUID;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
@@ -59,12 +60,15 @@ public class AMQMessageDelegate_0_8 exte
             Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
     private ContentHeaderProperties _contentHeaderProperties;
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+    private AMQSession _session;
+    private final long _deliveryTag;
 
     // The base set of items that needs to be set. 
     private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
     {
-        super(deliveryTag);
         _contentHeaderProperties = properties;
+        _deliveryTag = deliveryTag;
         _readableProperties = (_contentHeaderProperties != null);
         _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
                                                                   : (new BasicContentHeaderProperties()).getHeaders() );
@@ -495,6 +499,7 @@ public class AMQMessageDelegate_0_8 exte
         {
             throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
         }
+        _contentHeaderProperties.updated();
     }
 
 
@@ -514,4 +519,58 @@ public class AMQMessageDelegate_0_8 exte
 
         _readableProperties = false;
     }
+
+
+    public void acknowledgeThis() throws JMSException
+    {
+        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            if (_session.getAMQConnection().isClosed())
+            {
+                throw new javax.jms.IllegalStateException("Connection is already closed");
+            }
+
+            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+            // received on the session
+            _session.acknowledgeMessage(_deliveryTag, true);
+        }
+    }
+
+    public void acknowledge() throws JMSException
+    {
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            _session.acknowledge();
+        }
+    }
+    
+
+     /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession s)
+    {
+        _session = s;
+    }
+
+    public AMQSession getAMQSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     *
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Fri Oct 21 01:19:00 2011
@@ -23,12 +23,11 @@ package org.apache.qpid.client.message;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.transport.codec.BBEncoder;
@@ -66,7 +65,7 @@ public class AMQPEncodedMapMessage exten
         if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
                 || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
                 || (value instanceof Double) || (value instanceof String) || (value instanceof byte[])
-                || (value instanceof List) || (value instanceof Map) || (value instanceof UUID) || (value == null))
+                || (value instanceof List) || (value instanceof Map) || (value == null))
         {
             _map.put(propName, value);
         }
@@ -81,19 +80,18 @@ public class AMQPEncodedMapMessage exten
     @ Override
     public ByteBuffer getData()
     {
-        BBEncoder encoder = new BBEncoder(1024);
-        encoder.writeMap(_map);
-        return encoder.segment();
+        writeMapToData();
+        return _data;
     }
     
     @ Override
-    protected void populateMapFromData(ByteBuffer data) throws JMSException
+    protected void populateMapFromData() throws JMSException
     {
-        if (data != null)
+        if (_data != null)
         {
-            data.rewind();
+            _data.rewind();
             BBDecoder decoder = new BBDecoder();
-            decoder.init(data);
+            decoder.init(_data.buf());
             _map = decoder.readMap();
         }
         else
@@ -102,8 +100,16 @@ public class AMQPEncodedMapMessage exten
         }
     }
 
+    @ Override
+    protected void writeMapToData()
+    {
+        BBEncoder encoder = new BBEncoder(1024);
+        encoder.writeMap(_map);
+        _data = ByteBuffer.wrap(encoder.segment());
+    }
+    
     // for testing
-    public Map<String,Object> getMap()
+    Map<String,Object> getMap()
     {
         return _map;
     }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Fri Oct 21 01:19:00 2011
@@ -1,6 +1,6 @@
 package org.apache.qpid.client.message;
 /*
- *
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,23 +8,22 @@ package org.apache.qpid.client.message;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
+ * 
  */
 
 
 import javax.jms.JMSException;
 
-import java.nio.ByteBuffer;
-
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 
 public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
@@ -37,7 +36,7 @@ public class AMQPEncodedMapMessageFactor
         return new AMQPEncodedMapMessage(delegate,data);
     }
 
-
+    @Override
     public AbstractJMSMessage createMessage(
             AMQMessageDelegateFactory delegateFactory) throws JMSException
     {

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Fri Oct 21 01:19:00 2011
@@ -23,13 +23,9 @@ package org.apache.qpid.client.message;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jms.JMSException;
-import javax.jms.Session;
-
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -82,25 +78,7 @@ public abstract class AbstractAMQMessage
                          new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
                                           ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
                                           AMQDestination.QUEUE_TYPE));        
-    }
-
-    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-    private AMQSession<?,?> _session;
-    private final long _deliveryTag;
-
-    protected AbstractAMQMessageDelegate(long deliveryTag)
-    {
-        _deliveryTag = deliveryTag;
-    }
-
-    /**
-     * Get the AMQ message number assigned to this message
-     *
-     * @return the message number
-     */
-    public long getDeliveryTag()
-    {
-        return _deliveryTag;
+        
     }
 
     /**
@@ -179,47 +157,6 @@ public abstract class AbstractAMQMessage
     {
         return _exchangeMap.containsKey(exchange);
     }
-
-    public void acknowledgeThis() throws JMSException
-    {
-        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            if (_session.getAMQConnection().isClosed())
-            {
-                throw new javax.jms.IllegalStateException("Connection is already closed");
-            }
-
-            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
-            // received on the session
-            _session.acknowledgeMessage(getDeliveryTag(), true);
-        }
-    }
-
-    public void acknowledge() throws JMSException
-    {
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            _session.acknowledge();
-        }
-    }
-
-     /**
-     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
-     * acknowledge()
-     *
-     * @param s the AMQ session that delivered this message
-     */
-    public void setAMQSession(AMQSession<?,?> s)
-    {
-        _session = s;
-    }
-
-    public AMQSession<?,?> getAMQSession()
-    {
-        return _session;
-    }
 }
 
 class ExchangeInfo
@@ -265,5 +202,5 @@ class ExchangeInfo
     public void setDestType(int destType)
     {
         this.destType = destType;
-    }
+    }        
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org