You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 11:04:39 UTC

svn commit: r1295492 [3/6] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/bin/ bdbstore/etc/ bdbstore/etc/scripts/ bdbstore/src/ bdbstore/src/main/ bdbstore/src/main/java/ bdbstore/src/main/java/org/ bdbstore/src/main/java/org...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar  1 10:04:31 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,10 +37,7 @@ import javax.jms.MessageListener;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -117,29 +115,10 @@ public abstract class BasicMessageConsum
     protected final int _acknowledgeMode;
 
     /**
-     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
-     */
-    private int _outstanding;
-
-    /**
-     * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
-     * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
-     */
-    private boolean _dups_ok_acknowledge_send;
-
-    /**
      * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
      */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
-    /** The last tag that was "multiple" acknowledged on this session (if transacted) */
-    private long _lastAcked;
-
-    /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
-    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
-    private final Object _commitLock = new Object();
-
     /**
      * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
      * receive() is in progress.
@@ -289,17 +268,6 @@ public abstract class BasicMessageConsum
         }
     }
 
-    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
-    {
-        if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
-        }
-        
-        _session.setInRecovery(false);
-        preDeliver(jmsMsg);
-    }
-
     /**
      * @param immediate if true then return immediately if the connection is failing over
      *
@@ -322,14 +290,14 @@ public abstract class BasicMessageConsum
             }
         }
 
-        if (!_receiving.compareAndSet(false, true))
+        if (isMessageListenerSet())
         {
-            throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
         }
 
-        if (isMessageListenerSet())
+        if (!_receiving.compareAndSet(false, true))
         {
-            throw new javax.jms.IllegalStateException("A listener has already been set.");
+            throw new javax.jms.IllegalStateException("Another thread is already receiving.");
         }
 
         _receivingThread = Thread.currentThread();
@@ -408,7 +376,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preApplicationProcessing(m);
+                preDeliver(m);
                 postDeliver(m);
             }
             return m;
@@ -419,6 +387,10 @@ public abstract class BasicMessageConsum
 
             return null;
         }
+        catch(TransportException e)
+        {
+            throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+        }
         finally
         {
             releaseReceiving();
@@ -477,7 +449,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preApplicationProcessing(m);
+                preDeliver(m);
                 postDeliver(m);
             }
 
@@ -489,6 +461,10 @@ public abstract class BasicMessageConsum
 
             return null;
         }
+        catch(TransportException e)
+        {
+            throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+        }
         finally
         {
             releaseReceiving();
@@ -582,6 +558,10 @@ public abstract class BasicMessageConsum
                     {
                         throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
                     }
+                    catch (TransportException e)
+                    {
+                        throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+                    }
                 }
             }
             else
@@ -721,7 +701,7 @@ public abstract class BasicMessageConsum
         {
             if (isMessageListenerSet())
             {
-                preApplicationProcessing(jmsMessage);
+                preDeliver(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }
@@ -745,49 +725,42 @@ public abstract class BasicMessageConsum
         }
     }
 
-    void preDeliver(AbstractJMSMessage msg)
+    protected void preDeliver(AbstractJMSMessage msg)
     {
+        _session.setInRecovery(false);
+
         switch (_acknowledgeMode)
         {
-
             case Session.PRE_ACKNOWLEDGE:
                 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 break;
-
+            case Session.AUTO_ACKNOWLEDGE:
+                //fall through
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+                break;
             case Session.CLIENT_ACKNOWLEDGE:
                 // we set the session so that when the user calls acknowledge() it can call the method on session
                 // to send out the appropriate frame
                 msg.setAMQSession(_session);
+                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+                _session.markDirty();
                 break;
             case Session.SESSION_TRANSACTED:
-                if (isNoConsume())
-                {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
-                else
-                {
-                    _session.addDeliveredMessage(msg.getDeliveryTag());
-                    _session.markDirty();
-                }
-
+                _session.addDeliveredMessage(msg.getDeliveryTag());
+                _session.markDirty();
+                break;
+            case Session.NO_ACKNOWLEDGE:
+                //do nothing.
+                //path used for NO-ACK consumers, and browsers (see constructor).
                 break;
         }
-
     }
 
-    void postDeliver(AbstractJMSMessage msg) throws JMSException
+    void postDeliver(AbstractJMSMessage msg)
     {
         switch (_acknowledgeMode)
         {
-
-            case Session.CLIENT_ACKNOWLEDGE:
-                if (isNoConsume())
-                {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
-                _session.markDirty();
-                break;
-
             case Session.DUPS_OK_ACKNOWLEDGE:
             case Session.AUTO_ACKNOWLEDGE:
                 // we do not auto ack a message if the application code called recover()
@@ -825,63 +798,6 @@ public abstract class BasicMessageConsum
         return null;
     }
 
-    /**
-     * Acknowledge up to last message delivered (if any). Used when commiting.
-     */
-    void acknowledgeDelivered()
-    {
-        synchronized(_commitLock)
-        {
-            ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                tagsToAck.add(_receivedDeliveryTags.poll());
-            }
-
-            Collections.sort(tagsToAck);
-
-            long prevAcked = _lastAcked;
-            long oldAckPoint = -1;
-
-            while(oldAckPoint != prevAcked)
-            {
-                oldAckPoint = prevAcked;
-
-                Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
-                while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
-                {
-                    tagsToAckIterator.remove();
-                    prevAcked++;
-                }
-
-                Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
-                while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
-                {
-                    previousAckIterator.remove();
-                    prevAcked++;
-                }
-
-            }
-            if(prevAcked != _lastAcked)
-            {
-                _session.acknowledgeMessage(prevAcked, true);
-                _lastAcked = prevAcked;
-            }
-
-            Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
-            while(tagsToAckIterator.hasNext())
-            {
-                Long tag = tagsToAckIterator.next();
-                _session.acknowledgeMessage(tag, false);
-                _previouslyAcked.add(tag);
-            }
-        }
-    }
-
-
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -960,7 +876,7 @@ public abstract class BasicMessageConsum
 
     public boolean isNoConsume()
     {
-        return _noConsume || _destination.isBrowseOnly() ;
+        return _noConsume;
     }
 
     public void rollback()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar  1 10:04:31 2012
@@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestina
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
@@ -68,19 +66,13 @@ public class BasicMessageConsumer_0_10 e
     private boolean _preAcquire = true;
 
     /**
-     * Indicate whether this consumer is started.
-     */
-    private boolean _isStarted = false;
-
-    /**
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
     
     private long capacity = 0;
-        
-    //--- constructor
+
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
@@ -106,7 +98,6 @@ public class BasicMessageConsumer_0_10 e
                 _preAcquire = false;
             }
         }
-        _isStarted = connection.started();
         
         // Destination setting overrides connection defaults
         if (destination.getDestSyntax() == DestSyntax.ADDR && 
@@ -174,8 +165,6 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
-    //----- overwritten methods
-
     /**
      * This method is invoked when this consumer is stopped.
      * It tells the broker to stop delivering messages to this consumer.
@@ -205,11 +194,18 @@ public class BasicMessageConsumer_0_10 e
         super.notifyMessage(messageFrame);
     }
 
-    @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    @Override
+    protected void preDeliver(AbstractJMSMessage jmsMsg)
     {
-        super.preApplicationProcessing(jmsMsg);
-        if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+        super.preDeliver(jmsMsg);
+
+        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
         {
+            //For 0-10 we need to ensure that all messages are indicated processed in some way to
+            //ensure their AMQP command-id is marked completed, and so we must send a completion
+            //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+            //Add message to the unacked message list to ensure we dont lose record of it before
+            //sending a completion of some sort.
             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
         }
     }
@@ -221,7 +217,6 @@ public class BasicMessageConsumer_0_10 e
         return _messageFactory.createMessage(msg.getMessageTransfer());
     }
 
-    // private methods
     /**
      * Check whether a message can be delivered to this consumer.
      *
@@ -365,21 +360,28 @@ public class BasicMessageConsumer_0_10 e
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
         super.setMessageListener(messageListener);
-        if (messageListener != null && capacity == 0)
-        {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1,
-                                                      Option.UNRELIABLE);
-        }
-        if (messageListener != null && !_synchronousQueue.isEmpty())
+        try
         {
-            Iterator messages=_synchronousQueue.iterator();
-            while (messages.hasNext())
+            if (messageListener != null && capacity == 0)
             {
-                AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
-                messages.remove();
-                _session.rejectMessage(message, true);
+                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                          MessageCreditUnit.MESSAGE, 1,
+                                                          Option.UNRELIABLE);
             }
+            if (messageListener != null && !_synchronousQueue.isEmpty())
+            {
+                Iterator messages=_synchronousQueue.iterator();
+                while (messages.hasNext())
+                {
+                    AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+                    messages.remove();
+                    _session.rejectMessage(message, true);
+                }
+            }
+        }
+        catch(TransportException e)
+        {
+            throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
         }
     }
 
@@ -443,7 +445,7 @@ public class BasicMessageConsumer_0_10 e
         return o;
     }
 
-    void postDeliver(AbstractJMSMessage msg) throws JMSException
+    void postDeliver(AbstractJMSMessage msg)
     {
         super.postDeliver(msg);
         if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
@@ -452,10 +454,8 @@ public class BasicMessageConsumer_0_10 e
         }
         
         if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
-             !_session.isInRecovery() &&   
-             _session.getAMQConnection().getSyncAck())
+             !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
         {
-            ((AMQSession_0_10) getSession()).flushAcknowledgments();
             ((AMQSession_0_10) getSession()).getQpidSession().sync();
         }
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Mar  1 10:04:31 2012
@@ -39,6 +39,7 @@ import org.apache.qpid.client.message.Ab
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
 import org.slf4j.Logger;
@@ -113,8 +114,6 @@ public abstract class BasicMessageProduc
 
     private final boolean _mandatory;
 
-    private final boolean _waitUntilSent;
-
     private boolean _disableMessageId;
 
     private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
@@ -126,8 +125,7 @@ public abstract class BasicMessageProduc
     protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
-                                   boolean waitUntilSent) throws AMQException
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
     {
         _connection = connection;
         _destination = destination;
@@ -143,7 +141,6 @@ public abstract class BasicMessageProduc
 
         _immediate = immediate;
         _mandatory = mandatory;
-        _waitUntilSent = waitUntilSent;
         _userID = connection.getUsername();
         setPublishMode();
     }
@@ -266,7 +263,7 @@ public abstract class BasicMessageProduc
         return _destination;
     }
 
-    public void close()
+    public void close() throws JMSException
     {
         _closed.set(true);
         _session.deregisterProducer(_producerId);
@@ -363,19 +360,6 @@ public abstract class BasicMessageProduc
         }
     }
 
-    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
-    {
-        checkPreConditions();
-        checkDestination(destination);
-        synchronized (_connection.getFailoverMutex())
-        {
-            validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
-                     waitUntilSent);
-        }
-    }
-
     private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
     {
         if (message instanceof AbstractJMSMessage)
@@ -450,12 +434,6 @@ public abstract class BasicMessageProduc
         }
     }
 
-    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                            boolean mandatory, boolean immediate) throws JMSException
-    {
-        sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
-    }
-
     /**
      * The caller of this method must hold the failover mutex.
      *
@@ -470,23 +448,13 @@ public abstract class BasicMessageProduc
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
-                            boolean mandatory, boolean immediate, boolean wait) throws JMSException
+                            boolean mandatory, boolean immediate) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
-        if (_transacted)
-        {
-            if (_session.hasFailedOver() && _session.isDirty())
-            {
-                throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
-                                          new AMQSessionDirtyException("Failover has occurred and session is dirty " +
-                                                                       "so unable to send."));
-            }
-        }
-
         UUID messageId = null;
         if (_disableMessageId)
         {
@@ -498,7 +466,14 @@ public abstract class BasicMessageProduc
             message.setJMSMessageID(messageId);
         }
 
-        sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+        try
+        {
+            sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
+        }
+        catch (TransportException e)
+        {
+            throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
+        }
 
         if (message != origMessage)
         {
@@ -518,7 +493,7 @@ public abstract class BasicMessageProduc
 
     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                               UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                              boolean immediate, boolean wait) throws JMSException;
+                              boolean immediate) throws JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
@@ -596,6 +571,13 @@ public abstract class BasicMessageProduc
 
     public boolean isBound(AMQDestination destination) throws JMSException
     {
-        return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+        try
+        {
+            return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+        }
+        catch (TransportException e)
+        {
+            throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
+        }
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Mar  1 10:04:31 2012
@@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
@@ -47,6 +46,7 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,10 +61,9 @@ public class BasicMessageProducer_0_10 e
 
     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                               AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
-                              boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
+                              boolean immediate, boolean mandatory) throws AMQException
     {
-        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
-              mandatory, waitUntilSent);
+        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
         
         userIDBytes = Strings.toUTF8(_userID);
     }
@@ -104,7 +103,7 @@ public class BasicMessageProducer_0_10 e
      */
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                     boolean immediate, boolean wait) throws JMSException
+                     boolean immediate) throws JMSException
     {
         message.prepareForSending();
 
@@ -246,14 +245,14 @@ public class BasicMessageProducer_0_10 e
         }
     }
 
-
+    @Override
     public boolean isBound(AMQDestination destination) throws JMSException
     {
         return _session.isQueueBound(destination);
     }
     
     @Override
-    public void close()
+    public void close() throws JMSException
     {
         super.close();
         AMQDestination dest = _destination;
@@ -262,10 +261,18 @@ public class BasicMessageProducer_0_10 e
             if (dest.getDelete() == AddressOption.ALWAYS ||
                 dest.getDelete() == AddressOption.SENDER )
             {
-                ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+                try
+                {
+                    ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
                         _destination.getQueueName());
+                }
+                catch(TransportException e)
+                {
+                    throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+                }
             }
         }
     }
+
 }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Mar  1 10:04:31 2012
@@ -45,10 +45,9 @@ public class BasicMessageProducer_0_8 ex
 {
 
     BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
-            boolean waitUntilSent) throws AMQException
+            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
     {
-        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
+        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
     }
 
     void declareDestination(AMQDestination destination)
@@ -73,7 +72,7 @@ public class BasicMessageProducer_0_8 ex
 
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
-                     boolean immediate, boolean wait) throws JMSException
+                     boolean immediate) throws JMSException
     {
         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
                                                                                         destination.getExchangeName(),
@@ -168,7 +167,7 @@ public class BasicMessageProducer_0_8 ex
             throw jmse;
         }
 
-        _protocolHandler.writeFrame(compositeFrame, wait);
+        _protocolHandler.writeFrame(compositeFrame);
     }
 
     /**

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Thu Mar  1 10:04:31 2012
@@ -24,13 +24,16 @@ package org.apache.qpid.client;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.apache.qpid.framing.AMQShortString;
+
 /**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
  * so that operations related to their "temporary-ness" can be abstracted out.
  */
 interface TemporaryDestination extends Destination
 {
 
+    public AMQShortString getAMQQueueName();
     public void delete() throws JMSException;
     public AMQSession getSession();
     public boolean isDeleted();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Thu Mar  1 10:04:31 2012
@@ -78,7 +78,7 @@ public class ChannelCloseMethodHandler i
                 {
                     throw new AMQNoRouteException("Error: " + reason, null, null);
                 }
-                else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+                else if (errorCode == AMQConstant.ARGUMENT_INVALID)
                 {
                     _logger.debug("Broker responded with Invalid Argument.");
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Thu Mar  1 10:04:31 2012
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.client.handler;
 
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -34,18 +41,9 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-
 public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
 {
     private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -197,40 +195,20 @@ public class ConnectionStartMethodHandle
     private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
     {
         final String mechanisms = new String(availableMechanisms, "utf8");
-        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
-        HashSet mechanismSet = new HashSet();
-        while (tokenizer.hasMoreTokens())
-        {
-            mechanismSet.add(tokenizer.nextToken());
-        }
-
-        String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
-        StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
-        while (prefTokenizer.hasMoreTokens())
-        {
-            String mech = prefTokenizer.nextToken();
-            if (mechanismSet.contains(mech))
-            {
-                return mech;
-            }
-        }
-
-        return null;
+        return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
     }
 
     private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
         throws AMQException
     {
-        Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
         try
         {
-            Object instance = mechanismClass.newInstance();
-            AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
-            cbh.initialise(protocolSession.getAMQConnection().getConnectionURL());
+            AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
+            instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
 
-            return cbh;
+            return instance;
         }
-        catch (Exception e)
+        catch (IllegalArgumentException e)
         {
             throw new AMQException(null, "Unable to create callback handler: " + e, e);
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Thu Mar  1 10:04:31 2012
@@ -26,9 +26,7 @@ import org.apache.qpid.client.AMQSession
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
-import java.nio.ByteBuffer;
 import java.util.Enumeration;
-import java.util.Map;
 import java.util.UUID;
 
 public interface AMQMessageDelegate
@@ -130,9 +128,9 @@ public interface AMQMessageDelegate
 
     void removeProperty(final String propertyName) throws JMSException;
 
-    void setAMQSession(final AMQSession s);
+    void setAMQSession(final AMQSession<?,?> s);
 
-    AMQSession getAMQSession();
+    AMQSession<?,?> getAMQSession();
 
     long getDeliveryTag();
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Mar  1 10:04:31 2012
@@ -37,17 +37,14 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQPInvalidClassException;
 import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.Message;
-import org.apache.qpid.messaging.Address;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.ExchangeQueryResult;
 import org.apache.qpid.transport.Future;
@@ -56,6 +53,7 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,13 +74,8 @@ public class AMQMessageDelegate_0_10 ext
 
     private Destination _destination;
 
-
     private MessageProperties _messageProps;
     private DeliveryProperties _deliveryProps;
-    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-    private AMQSession _session;
-    private final long _deliveryTag;
-
 
     protected AMQMessageDelegate_0_10()
     {
@@ -92,9 +85,9 @@ public class AMQMessageDelegate_0_10 ext
 
     protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
     {
+        super(deliveryTag);
         _messageProps = messageProps;
         _deliveryProps = deliveryProps;
-        _deliveryTag = deliveryTag;
         _readableProperties = (_messageProps != null);
 
         AMQDestination dest;
@@ -205,7 +198,6 @@ public class AMQMessageDelegate_0_10 ext
         }
     }
 
-
     public long getJMSTimestamp() throws JMSException
     {
         return _deliveryProps.getTimestamp();
@@ -291,7 +283,7 @@ public class AMQMessageDelegate_0_10 ext
         
         try
         {
-            return AMQDestination.createDestination("ADDR:" + addr.toString());
+            return AMQDestination.createDestination("ADDR:" + addr);
         }
         catch(Exception e)
         {
@@ -325,14 +317,14 @@ public class AMQMessageDelegate_0_10 ext
         {
            try
            {
-               int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
+               int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
                if (type == AMQDestination.QUEUE_TYPE)
                {
-                   ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
+                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
                }
                else
                {
-                   ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
+                   ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
                }
            }
            catch(AMQException ex)
@@ -342,6 +334,14 @@ public class AMQMessageDelegate_0_10 ext
                e.setLinkedException(ex);
                throw e;
            }
+           catch (TransportException e)
+           {
+               JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+               jmse.initCause(e);
+               jmse.setLinkedException(e);
+               throw jmse;
+           }
+
         }
         
         final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -897,64 +897,6 @@ public class AMQMessageDelegate_0_10 ext
         _readableProperties = false;
     }
 
-
-    public void acknowledgeThis() throws JMSException
-    {
-        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            if (_session.getAMQConnection().isClosed())
-            {
-                throw new javax.jms.IllegalStateException("Connection is already closed");
-            }
-
-            // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
-            // received on the session
-            _session.acknowledgeMessage(_deliveryTag, true);
-        }
-    }
-
-    public void acknowledge() throws JMSException
-    {
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            _session.acknowledge();
-        }
-    }
-
-
-     /**
-     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
-     * acknowledge()
-     *
-     * @param s the AMQ session that delivered this message
-     */
-    public void setAMQSession(AMQSession s)
-    {
-        _session = s;
-    }
-
-    public AMQSession getAMQSession()
-    {
-        return _session;
-    }
-
-    /**
-     * Get the AMQ message number assigned to this message
-     *
-     * @return the message number
-     */
-    public long getDeliveryTag()
-    {
-        return _deliveryTag;
-    }
-
-
-
-
-
-
     protected void checkPropertyName(CharSequence propertyName)
     {
         if (propertyName == null)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Mar  1 10:04:31 2012
@@ -30,7 +30,6 @@ import java.util.UUID;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
@@ -60,15 +59,12 @@ public class AMQMessageDelegate_0_8 exte
             Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
     private ContentHeaderProperties _contentHeaderProperties;
-    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-    private AMQSession _session;
-    private final long _deliveryTag;
 
     // The base set of items that needs to be set. 
     private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
     {
+        super(deliveryTag);
         _contentHeaderProperties = properties;
-        _deliveryTag = deliveryTag;
         _readableProperties = (_contentHeaderProperties != null);
         _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
                                                                   : (new BasicContentHeaderProperties()).getHeaders() );
@@ -518,58 +514,4 @@ public class AMQMessageDelegate_0_8 exte
 
         _readableProperties = false;
     }
-
-
-    public void acknowledgeThis() throws JMSException
-    {
-        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            if (_session.getAMQConnection().isClosed())
-            {
-                throw new javax.jms.IllegalStateException("Connection is already closed");
-            }
-
-            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
-            // received on the session
-            _session.acknowledgeMessage(_deliveryTag, true);
-        }
-    }
-
-    public void acknowledge() throws JMSException
-    {
-        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            _session.acknowledge();
-        }
-    }
-    
-
-     /**
-     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
-     * acknowledge()
-     *
-     * @param s the AMQ session that delivered this message
-     */
-    public void setAMQSession(AMQSession s)
-    {
-        _session = s;
-    }
-
-    public AMQSession getAMQSession()
-    {
-        return _session;
-    }
-
-    /**
-     * Get the AMQ message number assigned to this message
-     *
-     * @return the message number
-     */
-    public long getDeliveryTag()
-    {
-        return _deliveryTag;
-    }
-
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Thu Mar  1 10:04:31 2012
@@ -23,9 +23,13 @@ package org.apache.qpid.client.message;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.JMSException;
+import javax.jms.Session;
+
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -78,7 +82,25 @@ public abstract class AbstractAMQMessage
                          new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
                                           ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
                                           AMQDestination.QUEUE_TYPE));        
-        
+    }
+
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+    private AMQSession<?,?> _session;
+    private final long _deliveryTag;
+
+    protected AbstractAMQMessageDelegate(long deliveryTag)
+    {
+        _deliveryTag = deliveryTag;
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     *
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _deliveryTag;
     }
 
     /**
@@ -157,6 +179,47 @@ public abstract class AbstractAMQMessage
     {
         return _exchangeMap.containsKey(exchange);
     }
+
+    public void acknowledgeThis() throws JMSException
+    {
+        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            if (_session.getAMQConnection().isClosed())
+            {
+                throw new javax.jms.IllegalStateException("Connection is already closed");
+            }
+
+            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+            // received on the session
+            _session.acknowledgeMessage(getDeliveryTag(), true);
+        }
+    }
+
+    public void acknowledge() throws JMSException
+    {
+        if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            _session.acknowledge();
+        }
+    }
+
+     /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession<?,?> s)
+    {
+        _session = s;
+    }
+
+    public AMQSession<?,?> getAMQSession()
+    {
+        return _session;
+    }
 }
 
 class ExchangeInfo
@@ -202,5 +265,5 @@ class ExchangeInfo
     public void setDestType(int destType)
     {
         this.destType = destType;
-    }        
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Mar  1 10:04:31 2012
@@ -30,7 +30,6 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.AMQConnectionClosedException;
@@ -47,6 +46,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
@@ -65,7 +65,6 @@ import org.apache.qpid.protocol.Protocol
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,7 +162,9 @@ public class AMQProtocolHandler implemen
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
-    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
+    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
@@ -513,18 +514,7 @@ public class AMQProtocolHandler implemen
         return getStateManager().createWaiter(states);
     }
 
-    /**
-     * Convenience method that writes a frame to the protocol session. Equivalent to calling
-     * getProtocolSession().write().
-     *
-     * @param frame the frame to write
-     */
-    public void writeFrame(AMQDataBlock frame)
-    {
-        writeFrame(frame, false);
-    }
-
-    public  synchronized void writeFrame(AMQDataBlock frame, boolean wait)
+    public  synchronized void writeFrame(AMQDataBlock frame)
     {
         final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
@@ -675,22 +665,21 @@ public class AMQProtocolHandler implemen
      * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
      * anyway.
      *
-     * @param timeout The timeout to wait for an acknowledgement to the close request.
+     * @param timeout The timeout to wait for an acknowledgment to the close request.
      *
      * @throws AMQException If the close fails for any reason.
      */
     public void closeConnection(long timeout) throws AMQException
     {
-        ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                                                                                                  new AMQShortString("JMS client is closing the connection."), 0, 0);
-
-        final AMQFrame frame = body.generateFrame(0);
-
-        //If the connection is already closed then don't do a syncWrite
         if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
         {
+            // Connection is already closed then don't do a syncWrite
             try
             {
+                final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                        new AMQShortString("JMS client is closing the connection."), 0, 0);
+                final AMQFrame frame = body.generateFrame(0);
+
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
                 _network.close();
                 closed();
@@ -701,10 +690,9 @@ public class AMQProtocolHandler implemen
             }
             catch (FailoverException e)
             {
-                _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
+                _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
             }
         }
-
     }
 
     /** @return the number of bytes read from this protocol session */

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar  1 10:04:31 2012
@@ -20,27 +20,35 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -289,22 +297,11 @@ public class AMQProtocolSession implemen
         return _connection.getSession(channelId);
     }
 
-    /**
-     * Convenience method that writes a frame to the protocol session. Equivalent to calling
-     * getProtocolSession().write().
-     *
-     * @param frame the frame to write
-     */
     public void writeFrame(AMQDataBlock frame)
     {
         _protocolHandler.writeFrame(frame);
     }
 
-    public void writeFrame(AMQDataBlock frame, boolean wait)
-    {
-        _protocolHandler.writeFrame(frame, wait);
-    }
-
     /**
      * Starts the process of closing a session
      *

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Thu Mar  1 10:04:31 2012
@@ -20,17 +20,22 @@
  */
 package org.apache.qpid.client.security;
 
-import org.apache.qpid.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
@@ -42,7 +47,7 @@ import java.util.Properties;
  * "amp.callbackhandler.properties". The format of the properties file is:
  *
  * <p/><pre>
- * CallbackHanlder.mechanism=fully.qualified.class.name
+ * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
  * </pre>
  *
  * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
@@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
     public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
 
     /** A static reference to the singleton instance of this registry. */
-    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+    private static final CallbackHandlerRegistry _instance;
 
     /** Holds a map from SASL mechanism names to call back handlers. */
-    private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
-
-    /** Holds a space delimited list of mechanisms that callback handlers exist for. */
-    private String _mechanisms;
-
-    /**
-     * Gets the singleton instance of this registry.
-     *
-     * @return The singleton instance of this registry.
-     */
-    public static CallbackHandlerRegistry getInstance()
-    {
-        return _instance;
-    }
+    private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
 
-    /**
-     * Gets the callback handler class for a given SASL mechanism name.
-     *
-     * @param mechanism The SASL mechanism name.
-     *
-     * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
-     */
-    public Class getCallbackHandlerClass(String mechanism)
-    {
-        return (Class) _mechanismToHandlerClassMap.get(mechanism);
-    }
+    /** Ordered collection of mechanisms for which callback handlers exist. */
+    private Collection<String> _mechanisms;
 
-    /**
-     * Gets a space delimited list of supported SASL mechanisms.
-     *
-     * @return A space delimited list of supported SASL mechanisms.
-     */
-    public String getMechanisms()
-    {
-        return _mechanisms;
-    }
-
-    /**
-     * Creates the call back handler registry from its configuration resource or file. This also has the side effect
-     * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
-     */
-    private CallbackHandlerRegistry()
+    static
     {
         // Register any configured SASL client factories.
         DynamicSaslRegistrar.registerSaslProviders();
@@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
             FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
                 CallbackHandlerRegistry.class.getClassLoader());
 
+        final Properties props = new Properties();
+
         try
         {
-            Properties props = new Properties();
+
             props.load(is);
-            parseProperties(props);
-            _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
         }
         catch (IOException e)
         {
@@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
                 }
             }
         }
+
+        _instance = new CallbackHandlerRegistry(props);
+        _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
+
     }
 
-    /*private InputStream openPropertiesInputStream(String filename)
+    /**
+     * Gets the singleton instance of this registry.
+     *
+     * @return The singleton instance of this registry.
+     */
+    public static CallbackHandlerRegistry getInstance()
+    {
+        return _instance;
+    }
+
+    public AMQCallbackHandler createCallbackHandler(final String mechanism)
     {
-        boolean useDefault = true;
-        InputStream is = null;
-        if (filename != null)
+        final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
+
+        if (mechanismClass == null)
         {
-            try
-            {
-                is = new BufferedInputStream(new FileInputStream(new File(filename)));
-                useDefault = false;
-            }
-            catch (FileNotFoundException e)
-            {
-                _logger.error("Unable to read from file " + filename + ": " + e, e);
-            }
+            throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
         }
 
-        if (useDefault)
+        try
+        {
+            return mechanismClass.newInstance();
+        }
+        catch (InstantiationException e)
+        {
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
+        }
+        catch (IllegalAccessException e)
         {
-            is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
         }
+    }
 
-        return is;
-    }*/
+    /**
+     * Gets collections of supported SASL mechanism names, ordered by preference
+     *
+     * @return collection of SASL mechanism names.
+     */
+    public Collection<String> getMechanisms()
+    {
+        return Collections.unmodifiableCollection(_mechanisms);
+    }
+
+    /**
+     * Creates the call back handler registry from its configuration resource or file.
+     *
+     * This also has the side effect of configuring and registering the SASL client factory
+     * implementations using {@link DynamicSaslRegistrar}.
+     *
+     * This constructor is default protection to allow for effective unit testing.  Clients must use
+     * {@link #getInstance()} to obtain the singleton instance.
+     */
+    CallbackHandlerRegistry(final Properties props)
+    {
+        parseProperties(props);
+    }
 
     /**
      * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
@@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
      */
     private void parseProperties(Properties props)
     {
+
+        final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
+
         Enumeration e = props.propertyNames();
         while (e.hasMoreElements())
         {
-            String propertyName = (String) e.nextElement();
-            int period = propertyName.indexOf(".");
-            if (period < 0)
-            {
-                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
+            final String propertyName = (String) e.nextElement();
+            final String[] parts = propertyName.split("\\.", 2);
 
-                continue;
-            }
+            checkPropertyNameFormat(propertyName, parts);
 
-            String mechanism = propertyName.substring(period + 1);
-            String className = props.getProperty(propertyName);
+            final String mechanism = parts[0];
+            final int ordinal = getPropertyOrdinal(propertyName, parts);
+            final String className = props.getProperty(propertyName);
             Class clazz = null;
             try
             {
@@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
                 {
                     _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
                         + ". Skipping");
-
                     continue;
                 }
-
                 _mechanismToHandlerClassMap.put(mechanism, clazz);
-                if (_mechanisms == null)
-                {
-                    _mechanisms = mechanism;
-                }
-                else
-                {
-                    // one time cost
-                    _mechanisms = _mechanisms + " " + mechanism;
-                }
+
+                mechanisms.put(ordinal, mechanism);
             }
             catch (ClassNotFoundException ex)
             {
@@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
                 continue;
             }
         }
+
+        _mechanisms = mechanisms.values();  // order guaranteed by keys of treemap (i.e. our ordinals)
+
+
+    }
+
+    private void checkPropertyNameFormat(final String propertyName, final String[] parts)
+    {
+        if (parts.length != 2)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
+        }
+    }
+
+    private int getPropertyOrdinal(final String propertyName, final String[] parts)
+    {
+        try
+        {
+            return Integer.parseInt(parts[1]);
+        }
+        catch(NumberFormatException nfe)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
+        }
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.  If more than one
+     * mechanism is mutually available the one appearing first (by ordinal) will be returned.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+
+        return selectMechInternal(peerList, Collections.<String>emptySet());
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @param restrictionList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList, final String restrictionList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+        final Set<String> restrictionSet = mechListToSet(restrictionList);
+
+        return selectMechInternal(peerList, restrictionSet);
+    }
+
+    private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
+    {
+        for (final String mech : _mechanisms)
+        {
+            if (peerSet.contains(mech))
+            {
+                if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
+                {
+                    return mech;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private Set<String> mechListToSet(final String mechanismList)
+    {
+        if (mechanismList == null)
+        {
+            return Collections.emptySet();
+        }
+
+        final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
+        final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
+        while (tokenizer.hasMoreTokens())
+        {
+            mechanismSet.add(tokenizer.nextToken());
+        }
+        return Collections.unmodifiableSet(mechanismSet);
     }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Thu Mar  1 10:04:31 2012
@@ -16,7 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+
+#
+# Format:
+# <mechanism name>.ordinal=<implementation>
+#
+# @see CallbackHandlerRegistry
+#
+
+EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Mar  1 10:04:31 2012
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Set;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
 
 /**
  * The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
  *
  * The two step process is required as there is an inherit race condition between starting a process that will cause
  * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
  */
 public class AMQStateManager implements AMQMethodListener
 {
@@ -84,7 +83,10 @@ public class AMQStateManager implements 
 
     public AMQState getCurrentState()
     {
-        return _currentState;
+        synchronized (_stateLock)
+        {
+            return _currentState;
+        }
     }
 
     public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+     * Setting of the ProtocolSession will be required when Failover has been successfully completed.
      *
      * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
      * connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Propogate error to waiters
+     * Propagate error to waiters
      *
-     * @param error The error to propogate.
+     * @param error The error to propagate.
      */
     public void error(Exception error)
     {
@@ -177,7 +179,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Create and add a new waiter to the notifcation list.
+     * Create and add a new waiter to the notification list.
      *
      * @param states The waiter will attempt to wait for one of these desired set states to be achived.
      *

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Thu Mar  1 10:04:31 2012
@@ -34,7 +34,7 @@ import java.util.Set;
  *
  * On construction the current state and a set of States to await for is provided.
  *
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
  * a desired state then await() returns immediately.
  *
  * Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends Blockin
 {
     private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
 
-    Set<AMQState> _awaitStates;
-    private AMQState _startState;
-    private AMQStateManager _stateManager;
+    private final Set<AMQState> _awaitStates;
+    private final AMQState _startState;
+    private final AMQStateManager _stateManager;
 
     /**
      *
@@ -78,9 +78,9 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved within the default timeout.
+     * Await for the required State to be achieved within the default timeout.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await() throws AMQException
     {
@@ -88,13 +88,13 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved.
+     * Await for the required State to be achieved.
      *
      * <b>It is the responsibility of this class to remove the waiter from the StateManager
      *
-     * @param timeout The time in milliseconds to wait for any of the states to be achived.
+     * @param timeout The time in milliseconds to wait for any of the states to be achieved.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await(long timeout) throws AMQException
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Thu Mar  1 10:04:31 2012
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.Reentr
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMetho
  */
 public abstract class BlockingWaiter<T>
 {
+    private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
     /** This flag is used to indicate that the blocked for method has been received. */
     private volatile boolean _ready = false;
 
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e.getMessage(), e);
                         // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
                         // if (!_ready && timeout != -1)
                         // {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
     }
 
     /**
-     * This is a callback, called when an error has occured that should interupt any waiter.
+     * This is a callback, called when an error has occurred that should interrupt any waiter.
      * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
      *
      * Once closed any notification of an exception will be ignored.
      *
-     * @param e The exception being propogated.
+     * @param e The exception being propagated.
      */
     public void error(Exception e)
     {
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
             }
             else
             {
-                System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+                _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
             }
 
             if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e1)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e1.getMessage(), e1);
                     }
                 }
                 _errorAck = false;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Thu Mar  1 10:04:31 2012
@@ -51,7 +51,4 @@ public interface MessageProducer extends
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException;
 
-    void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
-
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Thu Mar  1 10:04:31 2012
@@ -73,7 +73,7 @@ public class ChannelCloseMethodHandlerNo
             {
                 throw new AMQNoRouteException("Error: " + reason, null, null);
             }
-            else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+            else if (errorCode == AMQConstant.ARGUMENT_INVALID)
             {
                 _logger.debug("Broker responded with Invalid Argument.");
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Mar  1 10:04:31 2012
@@ -20,17 +20,24 @@
  */
 package org.apache.qpid.test.unit.message;
 
-import org.apache.qpid.client.*;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer_0_8;
+import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import javax.jms.*;
-
-import java.util.Map;
 
 public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
@@ -57,7 +64,12 @@ public class TestAMQSession extends AMQS
 
     }
 
-    public void sendCommit() throws AMQException, FailoverException
+    public void commitImpl() throws AMQException, FailoverException
+    {
+
+    }
+
+    public void acknowledgeImpl()
     {
 
     }
@@ -117,7 +129,7 @@ public class TestAMQSession extends AMQS
 
     }
 
-    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, long producerId)
     {
         return null;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml Thu Mar  1 10:04:31 2012
@@ -42,7 +42,6 @@
   <property name="build.report"          location="${build}/report"/>
   <property name="build.release"         location="${build}/release"/>
   <property name="build.release.prepare" location="${build.release}/prepare"/>
-  <property name="build.data"            location="${build.scratch}/data"/>
   <property name="build.plugins"         location="${build}/lib/plugins"/>
   <property name="build.coveragereport"  location="${build}/coverage"/>
   <property name="build.findbugs"        location="${build}/findbugs"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org