You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [35/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 16:14:30 2013
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.client;
 
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
-import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +35,7 @@ import org.apache.qpid.client.failover.F
 import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AMQPEncodedListMessage;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.CloseConsumerMessage;
 import org.apache.qpid.client.message.JMSBytesMessage;
@@ -49,14 +45,13 @@ import org.apache.qpid.client.message.JM
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ListMessage;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.SessionException;
@@ -122,27 +117,16 @@ public abstract class AMQSession<C exten
     /** Immediate message prefetch default. */
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
-    /**
-     * The period to wait while flow controlled before sending a log message confirming that the session is still
-     * waiting on flow control being revoked
-     */
-    private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
-                                                                 DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
-
-    /**
-     * The period to wait while flow controlled before declaring a failure
-     */
-    private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
-                                                                  DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
-
     private final boolean _delareQueues =
-        Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+        Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true"));
 
     private final boolean _declareExchanges =
-        Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
+        Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true"));
 
     private final boolean _useAMQPEncodedMapMessage;
 
+    private final boolean _useAMQPEncodedStreamMessage;
+
     /**
      * Flag indicating to start dispatcher as a daemon thread
      */
@@ -265,11 +249,6 @@ public abstract class AMQSession<C exten
     /** Has failover occured on this session with outstanding actions to commit? */
     private boolean _failedOverDirty;
 
-    /** Flow control */
-    private FlowControlIndicator _flowControl = new FlowControlIndicator();
-
-
-
     /** Holds the highest received delivery tag. */
     protected AtomicLong getHighestDeliveryTag()
     {
@@ -408,22 +387,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    private static final class FlowControlIndicator
-    {
-        private volatile boolean _flowControl = true;
-
-        public synchronized void setFlowControl(boolean flowControl)
-        {
-            _flowControl = flowControl;
-            notify();
-        }
-
-        public boolean getFlowControl()
-        {
-            return _flowControl;
-        }
-    }
-
     /**
      * Creates a new session on a connection.
      *
@@ -439,6 +402,7 @@ public abstract class AMQSession<C exten
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
         _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
+        _useAMQPEncodedStreamMessage = con == null ? false : !con.isUseLegacyStreamMessageFormat();
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
         _strictAMQPFATAL =
                 Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -649,12 +613,6 @@ public abstract class AMQSession<C exten
      */
     public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
 
-    public MethodRegistry getMethodRegistry()
-    {
-        MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
-        return methodRegistry;
-    }
-
     /**
      * Binds the named queue, with the specified routing key, to the named exchange.
      *
@@ -1041,12 +999,11 @@ public abstract class AMQSession<C exten
         {
             try
             {
-                handleAddressBasedDestination(dest,false,noLocal,true);
+                resolveAddress(dest,false,noLocal);
                 if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
                 {
                     throw new JMSException("Durable subscribers can only be created for Topics");
                 }
-                dest.getSourceNode().setDurable(true);
             }
             catch(AMQException e)
             {
@@ -1158,6 +1115,14 @@ public abstract class AMQSession<C exten
         }
     }
 
+    public ListMessage createListMessage() throws JMSException
+    {
+        checkNotClosed();
+        AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+        msg.setAMQSession(this);
+        return msg;
+    }
+
     public MapMessage createMapMessage() throws JMSException
     {
         checkNotClosed();
@@ -1400,17 +1365,15 @@ public abstract class AMQSession<C exten
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
-        // calls through connection.closeAllSessions which is also called by the public connection.close()
-        // with a null cause
-        // When we are closing the Session due to a protocol session error we simply create a new AMQException
-        // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
-        // We need to determin here if the connection should be
-
-        synchronized (getFailoverMutex())
+        checkNotClosed();
+        if (_useAMQPEncodedStreamMessage)
+        {
+            AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory());
+            msg.setAMQSession(this);
+            return msg;
+        }
+        else
         {
-            checkNotClosed();
-
             JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
             msg.setAMQSession(this);
             return msg;
@@ -1550,7 +1513,7 @@ public abstract class AMQSession<C exten
 
     public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
     {
-        declareExchange(name, type, getProtocolHandler(), nowait);
+        declareExchange(name, type, nowait, false, false, false);
     }
 
     abstract public void sync() throws AMQException;
@@ -1690,8 +1653,7 @@ public abstract class AMQSession<C exten
             throws
             AMQException
     {
-        AMQProtocolHandler protocolHandler = getProtocolHandler();
-        declareExchange(amqd, protocolHandler, false);
+        declareExchange(amqd, false);
         AMQShortString queueName = declareQueue(amqd, false);
         bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
     }
@@ -2582,11 +2544,9 @@ public abstract class AMQSession<C exten
 
     /**
      * Register to consume from the queue.
-     *
      * @param queueName
      */
-    private void consumeFromQueue(C consumer, AMQShortString queueName,
-                                  AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException
+    private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
 
@@ -2603,7 +2563,7 @@ public abstract class AMQSession<C exten
 
         try
         {
-            sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
+            sendConsume(consumer, queueName, nowait, tagId);
         }
         catch (AMQException e)
         {
@@ -2614,7 +2574,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendConsume(C consumer, AMQShortString queueName,
-                                     AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
+                                     boolean nowait, int tag) throws AMQException, FailoverException;
 
     private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
             throws JMSException
@@ -2648,9 +2608,10 @@ public abstract class AMQSession<C exten
     public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
                                             final Boolean immediate, final long producerId) throws JMSException;
 
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
+    private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException
     {
-        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
+        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(),
+                        amqd.isExchangeAutoDelete(), amqd.isExchangeInternal());
     }
 
     /**
@@ -2707,33 +2668,29 @@ public abstract class AMQSession<C exten
      *
      * @param name            The name of the exchange to declare.
      * @param type            The type of the exchange to declare.
-     * @param protocolHandler The protocol handler to process the communication through.
      * @param nowait
-     *
+     * @param durable
+     * @param autoDelete
+     * @param internal
      * @throws AMQException If the exchange cannot be declared for any reason.
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+                                 final boolean nowait, final boolean durable,
+                                 final boolean autoDelete, final boolean internal) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendExchangeDeclare(name, type, protocolHandler, nowait);
+                sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal);
                 return null;
             }
         }, _connection).execute();
     }
 
-    public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
-                                             final boolean nowait) throws AMQException, FailoverException;
-
-
-    void declareQueuePassive(AMQDestination queue) throws AMQException
-    {
-        declareQueue(queue,false,false,true);
-    }
+    public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+                                             boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
 
     /**
      * Declares a queue for a JMS destination.
@@ -2768,31 +2725,8 @@ public abstract class AMQSession<C exten
         return declareQueue(amqd, noLocal, nowait, false);
     }
 
-    protected AMQShortString declareQueue(final AMQDestination amqd,
-                                          final boolean noLocal, final boolean nowait, final boolean passive)
-            throws AMQException
-    {
-        final AMQProtocolHandler protocolHandler = getProtocolHandler();
-        return new FailoverNoopSupport<AMQShortString, AMQException>(
-                new FailoverProtectedOperation<AMQShortString, AMQException>()
-                {
-                    public AMQShortString execute() throws AMQException, FailoverException
-                    {
-                        // Generate the queue name if the destination indicates that a client generated name is to be used.
-                        if (amqd.isNameRequired())
-                        {
-                            amqd.setQueueName(protocolHandler.generateQueueName());
-                        }
-
-                        sendQueueDeclare(amqd, protocolHandler, nowait, passive);
-
-                        return amqd.getAMQQueueName();
-                    }
-                }, _connection).execute();
-    }
-
-    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean nowait, boolean passive) throws AMQException, FailoverException;
+    protected abstract AMQShortString declareQueue(final AMQDestination amqd,
+                                          final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException;
 
     /**
      * Undeclares the specified queue.
@@ -2845,21 +2779,6 @@ public abstract class AMQSession<C exten
         return ++_nextProducerId;
     }
 
-    protected AMQProtocolHandler getProtocolHandler()
-    {
-        return _connection.getProtocolHandler();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolHandler().getProtocolMajorVersion();
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolHandler().getProtocolMinorVersion();
-    }
-
     protected boolean hasMessageListeners()
     {
         return _hasMessageListeners;
@@ -2918,17 +2837,15 @@ public abstract class AMQSession<C exten
     {
         AMQDestination amqd = consumer.getDestination();
 
-        AMQProtocolHandler protocolHandler = getProtocolHandler();
-
         if (amqd.getDestSyntax() == DestSyntax.ADDR)
         {
-            handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
+            resolveAddress(amqd,true,consumer.isNoLocal());
         }
         else
         {
             if (_declareExchanges)
             {
-                declareExchange(amqd, protocolHandler, nowait);
+                declareExchange(amqd, nowait);
             }
     
             if (_delareQueues || amqd.isNameRequired())
@@ -2973,7 +2890,7 @@ public abstract class AMQSession<C exten
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+            consumeFromQueue(consumer, queueName, nowait);
         }
         catch (FailoverException e)
         {
@@ -2981,10 +2898,9 @@ public abstract class AMQSession<C exten
         }
     }
 
-    public abstract void handleAddressBasedDestination(AMQDestination dest, 
+    public abstract void resolveAddress(AMQDestination dest,
                                                        boolean isConsumer,
-                                                       boolean noLocal,
-                                                       boolean noWait) throws AMQException;
+                                                       boolean noLocal) throws AMQException;
     
     private void registerProducer(long producerId, MessageProducer producer)
     {
@@ -3141,47 +3057,14 @@ public abstract class AMQSession<C exten
         _ticket = ticket;
     }
 
-    public boolean isFlowBlocked()
-    {
-        synchronized (_flowControl)
-        {
-            return !_flowControl.getFlowControl();
-        }
-    }
-
-    public void setFlowControl(final boolean active)
-    {
-        _flowControl.setFlowControl(active);
-        if (_logger.isInfoEnabled())
-        {
-            _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
-        }
-    }
-
-    public void checkFlowControl() throws InterruptedException, JMSException
-    {
-        long expiryTime = 0L;
-        synchronized (_flowControl)
-        {
-            while (!_flowControl.getFlowControl() &&
-                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
-                                     : expiryTime) >= System.currentTimeMillis() )
-            {
-
-                _flowControl.wait(_flowControlWaitPeriod);
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
-                }
-            }
-            if(!_flowControl.getFlowControl())
-            {
-                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
-                throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
-            }
-        }
+    /**
+     * Tests whether flow to this session is blocked.
+     *
+     * @return true if flow is blocked or false otherwise.
+     */
+    public abstract boolean isFlowBlocked();
 
-    }
+    public abstract void setFlowControl(final boolean active);
 
     public interface Dispatchable
     {

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 28 16:14:30 2013
@@ -17,6 +17,11 @@
  */
 package org.apache.qpid.client;
 
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,8 +34,10 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.Binding;
@@ -44,18 +51,32 @@ import org.apache.qpid.client.message.Me
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
 import org.apache.qpid.client.messaging.address.AddressHelper;
 import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
+import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Serial;
 import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
@@ -347,15 +368,22 @@ public class AMQSession_0_10 extends AMQ
         }
         else
         {
+            // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
             List<Binding> bindings = new ArrayList<Binding>();
-            bindings.addAll(destination.getSourceNode().getBindings());
-            bindings.addAll(destination.getTargetNode().getBindings());
+            bindings.addAll(destination.getNode().getBindings());
             
             String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
                                      destination.getAddressName(): "amq.topic";
             
             for (Binding binding: bindings)
             {
+                // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+                // The null check below is a way to side step that issue while fixing QPID-4146
+                // Note this issue only affects producers.
+                if (binding.getQueue() == null && queueName == null)
+                {
+                    continue;
+                }
                 String queue = binding.getQueue() == null?
                                    queueName.asString(): binding.getQueue();
                                    
@@ -523,11 +551,9 @@ public class AMQSession_0_10 extends AMQ
                                                       final FieldTable rawSelector, final boolean noConsume,
                                                       final boolean autoClose) throws JMSException
     {
-
-        final AMQProtocolHandler protocolHandler = getProtocolHandler();
         return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
-                getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
-                                             prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
+                getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow,
+                                             exclusive, getAcknowledgeMode(), noConsume, autoClose);
     }
 
     /**
@@ -558,7 +584,7 @@ public class AMQSession_0_10 extends AMQ
             rk = routingKey.toString();
         }
                 
-        return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+        return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
     }
     
     public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
@@ -591,10 +617,22 @@ public class AMQSession_0_10 extends AMQ
      * This method is invoked when a consumer is created
      * Registers the consumer with the broker
      */
-    public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+    public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName,
                             boolean nowait, int tag)
             throws AMQException, FailoverException
-    {        
+    {
+        if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+        {
+            if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+            {
+                String selector =  consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+                createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+                queueName = consumer.getDestination().getAMQQueueName();
+                consumer.setQueuename(queueName);
+            }
+            handleLinkCreation(consumer.getDestination());
+        }
         boolean preAcquire = consumer.isPreAcquire();
 
         AMQDestination destination = consumer.getDestination();
@@ -637,11 +675,7 @@ public class AMQSession_0_10 extends AMQ
                                          capacity,
                                          Option.UNRELIABLE);
         }
-
-        if (!nowait)
-        {
-            sync();
-        }
+        sync();
     }
 
     /**
@@ -653,7 +687,7 @@ public class AMQSession_0_10 extends AMQ
         try
         {
             return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
-                                             getProtocolHandler(), producerId, immediate, mandatory);
+                                             producerId, immediate, mandatory);
         }
         catch (AMQException e)
         {
@@ -673,26 +707,25 @@ public class AMQSession_0_10 extends AMQ
     /**
      * creates an exchange if it does not already exist
      */
-    public void sendExchangeDeclare(final AMQShortString name,
-            final AMQShortString type,
-            final AMQProtocolHandler protocolHandler, final boolean nowait)
-            throws AMQException, FailoverException
+    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+                                    boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
     {
-        sendExchangeDeclare(name.asString(), type.asString(), null, null,
-                nowait);
+        //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it
+        sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
     }
 
     public void sendExchangeDeclare(final String name, final String type,
             final String alternateExchange, final Map<String, Object> args,
-            final boolean nowait) throws AMQException
+            final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
     {
         getQpidSession().exchangeDeclare(
                 name,
                 type,
                 alternateExchange,
                 args,
-                name.toString().startsWith("amq.") ? Option.PASSIVE
-                        : Option.NONE);
+                name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+                durable ? Option.DURABLE : Option.NONE,
+                autoDelete ? Option.AUTO_DELETE : Option.NONE);
         // We need to sync so that we get notify of an error.
         if (!nowait)
         {
@@ -717,18 +750,8 @@ public class AMQSession_0_10 extends AMQ
     /**
      * Declare a queue with the given queueName
      */
-    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait, boolean passive)
-            throws AMQException, FailoverException
-    {
-        // do nothing this is only used by 0_8
-    }
-
-    /**
-     * Declare a queue with the given queueName
-     */
-    public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                               final boolean noLocal, final boolean nowait, boolean passive)
+    public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal,
+                                               final boolean nowait, boolean passive)
             throws AMQException
     {
         AMQShortString queueName;
@@ -759,7 +782,8 @@ public class AMQSession_0_10 extends AMQ
         }
         else
         {
-            QueueNode node = (QueueNode)amqd.getSourceNode();
+            // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java
+            Node node = amqd.getNode();
             Map<String,Object> arguments = new HashMap<String,Object>();
             arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
             if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
@@ -925,12 +949,11 @@ public class AMQSession_0_10 extends AMQ
         return getCurrentException();
     }
 
+    @Override
     protected AMQShortString declareQueue(final AMQDestination amqd,
                                           final boolean noLocal, final boolean nowait, final boolean passive)
             throws AMQException
     {
-        final AMQProtocolHandler protocolHandler = getProtocolHandler();
-
         return new FailoverNoopSupport<AMQShortString, AMQException>(
                 new FailoverProtectedOperation<AMQShortString, AMQException>()
                 {
@@ -947,7 +970,7 @@ public class AMQSession_0_10 extends AMQ
                             amqd.setQueueName(new AMQShortString( binddingKey + "@"
                                     + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
                         }
-                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
+                        return send0_10QueueDeclare(amqd, noLocal, nowait, passive);
                     }
                 }, getAMQConnection()).execute();
     }
@@ -1072,11 +1095,12 @@ public class AMQSession_0_10 extends AMQ
         return AMQMessageDelegateFactory.FACTORY_0_10;
     }
     
-    public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
+    public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
     {
         boolean match = true;
         ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
         match = !result.getNotFound();        
+        Node node = dest.getNode();
         
         if (match)
         {
@@ -1086,16 +1110,6 @@ public class AMQSession_0_10 extends AMQ
                          (node.getExchangeType() != null && 
                           node.getExchangeType().equals(result.getType())) &&
                          (matchProps(result.getArguments(),node.getDeclareArgs()));
-            }            
-            else if (node.getExchangeType() != null)
-            {
-                // even if assert is false, better to verify this
-                match = node.getExchangeType().equals(result.getType());
-                if (!match)
-                {
-                    _logger.debug("Exchange type doesn't match. Expected : " +  node.getExchangeType() +
-                             " actual " + result.getType());
-                }
             }
             else
             {
@@ -1104,18 +1118,27 @@ public class AMQSession_0_10 extends AMQ
                 dest.setExchangeClass(new AMQShortString(result.getType()));
             }
         }
-        
+
+        if (assertNode)
+        {
+            if (!match)
+            {
+                throw new AMQException("Assert failed for address : " + dest  +", Result was : " + result);
+            }
+        }
+
         return match;
     }
     
-    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+    public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
     {
         boolean match = true;
         try
         {
             QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
             match = dest.getAddressName().equals(result.getQueue());
-            
+            Node node = dest.getNode();
+
             if (match && assertNode)
             {
                 match = (result.getDurable() == node.isDurable()) && 
@@ -1123,9 +1146,13 @@ public class AMQSession_0_10 extends AMQ
                          (result.getExclusive() == node.isExclusive()) &&
                          (matchProps(result.getArguments(),node.getDeclareArgs()));
             }
-            else if (match)
+
+            if (assertNode)
             {
-                // should I use the queried details to update the local data structure.
+                if (!match)
+                {
+                    throw new AMQException("Assert failed for address : " + dest  +", Result was : " + result);
+                }
             }
         }
         catch(SessionException e)
@@ -1140,7 +1167,6 @@ public class AMQSession_0_10 extends AMQ
                         "Error querying queue",e);
             }
         }
-        
         return match;
     }
     
@@ -1179,17 +1205,13 @@ public class AMQSession_0_10 extends AMQ
      */
     
     @SuppressWarnings("deprecation")
-    public void handleAddressBasedDestination(AMQDestination dest, 
+    public void resolveAddress(AMQDestination dest,
                                               boolean isConsumer,
-                                              boolean noLocal,
-                                              boolean noWait) throws AMQException
+                                              boolean noLocal) throws AMQException
     {
         if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
         {
-            if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
-            {
-                createSubscriptionQueue(dest,noLocal);
-            }
+            return;
         }
         else
         {
@@ -1209,46 +1231,32 @@ public class AMQSession_0_10 extends AMQ
             {
                 case AMQDestination.QUEUE_TYPE: 
                 {
-                    if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+                    if(createNode)
                     {
-                        setLegacyFiledsForQueueType(dest);
+                        setLegacyFieldsForQueueType(dest);
+                        handleQueueNodeCreation(dest,noLocal);
                         break;
                     }
-                    else if(createNode)
+                    else if (isQueueExist(dest,assertNode))
                     {
-                        setLegacyFiledsForQueueType(dest);
-                        send0_10QueueDeclare(dest,null,noLocal,noWait, false);
-                        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                                      null,dest.getExchangeName(),dest, false);
+                        setLegacyFieldsForQueueType(dest);
                         break;
-                    }                
+                    }
                 }
                 
                 case AMQDestination.TOPIC_TYPE: 
                 {
-                    if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+                    if(createNode)
                     {                    
                         setLegacyFiledsForTopicType(dest);
                         verifySubject(dest);
-                        if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
-                        {  
-                            createSubscriptionQueue(dest, noLocal);
-                        }
+                        handleExchangeNodeCreation(dest);
                         break;
                     }
-                    else if(createNode)
+                    else if (isExchangeExist(dest,assertNode))
                     {                    
                         setLegacyFiledsForTopicType(dest);
                         verifySubject(dest);
-                        sendExchangeDeclare(dest.getAddressName(), 
-                                dest.getExchangeClass().asString(),
-                                dest.getTargetNode().getAlternateExchange(),
-                                dest.getTargetNode().getDeclareArgs(),
-                                false);        
-                        if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) 
-                        {
-                            createSubscriptionQueue(dest,noLocal);
-                        }
                         break;
                     }
                 }
@@ -1287,7 +1295,6 @@ public class AMQSession_0_10 extends AMQ
                 throw new AMQException("Ambiguous address, please specify queue or topic as node type");
             }
             dest.setAddressType(type);
-            dest.rebuildTargetAndSourceNodes(type);
             return type;
         }        
     }
@@ -1309,30 +1316,45 @@ public class AMQSession_0_10 extends AMQ
             }
         }
     }
-    
-    private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
+
+    void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
     {
-        QueueNode node = (QueueNode)dest.getSourceNode();  // source node is never null
-        
-        if (dest.getQueueName() == null)
+        Link link = dest.getLink();
+        String queueName = dest.getQueueName();
+
+        if (queueName == null)
         {
-            if (dest.getLink() != null && dest.getLink().getName() != null) 
-            {
-                dest.setQueueName(new AMQShortString(dest.getLink().getName())); 
-            }
+            queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+            dest.setQueueName(new AMQShortString(queueName));
+        }
+
+        SubscriptionQueue queueProps = link.getSubscriptionQueue();
+        Map<String,Object> arguments = queueProps.getDeclareArgs();
+        if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+        {
+            arguments.put(AddressHelper.NO_LOCAL, noLocal);
+        }
+
+        if (link.isDurable() && queueName.startsWith("TempQueue"))
+        {
+            throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
         }
-        node.setExclusive(true);
-        node.setAutoDelete(!node.isDurable());
-        send0_10QueueDeclare(dest,null,noLocal,true, false);
-        getQpidSession().exchangeBind(dest.getQueueName(), 
-        		              dest.getAddressName(), 
-        		              dest.getSubject(), 
-        		              Collections.<String,Object>emptyMap());
-        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                null,dest.getExchangeName(),dest, false);
+
+        getQpidSession().queueDeclare(queueName,
+                queueProps.getAlternateExchange(), arguments,
+                queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+                link.isDurable() ? Option.DURABLE : Option.NONE,
+                queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+        Map<String,Object> bindingArguments = new HashMap<String, Object>();
+        bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+        getQpidSession().exchangeBind(queueName,
+                              dest.getAddressName(),
+                              dest.getSubject(),
+                              bindingArguments);
     }
-    
-    public void setLegacyFiledsForQueueType(AMQDestination dest)
+
+    public void setLegacyFieldsForQueueType(AMQDestination dest)
     {
         // legacy support
         dest.setQueueName(new AMQShortString(dest.getAddressName()));
@@ -1345,7 +1367,7 @@ public class AMQSession_0_10 extends AMQ
     {
         // legacy support
         dest.setExchangeName(new AMQShortString(dest.getAddressName()));
-        ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+        Node node = dest.getNode();
         dest.setExchangeClass(node.getExchangeType() == null? 
                               ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
                               new AMQShortString(node.getExchangeType()));  
@@ -1424,6 +1446,13 @@ public class AMQSession_0_10 extends AMQ
         return _qpidSession.isFlowBlocked();
     }
 
+    @Override
+    public void setFlowControl(boolean active)
+    {
+        // Supported by 0-8..0-9-1 only
+        throw new UnsupportedOperationException("Operation not supported by this protocol");
+    }
+
     private void cancelTimerTask()
     {
         if (flushTask != null)
@@ -1432,5 +1461,148 @@ public class AMQSession_0_10 extends AMQ
             flushTask = null;
         }
     }
-}
 
+    private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+    {
+        Node node = dest.getNode();
+        Map<String,Object> arguments = node.getDeclareArgs();
+        if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+        {
+            arguments.put(AddressHelper.NO_LOCAL, noLocal);
+        }
+        getQpidSession().queueDeclare(dest.getAddressName(),
+                node.getAlternateExchange(), arguments,
+                node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+                node.isDurable() ? Option.DURABLE : Option.NONE,
+                node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+        createBindings(dest, dest.getNode().getBindings());
+        sync();
+    }
+
+    void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+    {
+        Node node = dest.getNode();
+        sendExchangeDeclare(dest.getAddressName(),
+                node.getExchangeType(),
+                node.getAlternateExchange(),
+                node.getDeclareArgs(),
+                false,
+                node.isDurable(),
+                node.isAutoDelete());
+
+        // If bindings are specified without a queue name and is called by the producer,
+        // the broker will send an exception as expected.
+        createBindings(dest, dest.getNode().getBindings());
+        sync();
+    }
+
+    void handleLinkCreation(AMQDestination dest) throws AMQException
+    {
+        createBindings(dest, dest.getLink().getBindings());
+    }
+
+    void createBindings(AMQDestination dest, List<Binding> bindings)
+    {
+        String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+                .getAddressName() : "amq.topic";
+
+        String defaultQueueName = null;
+        if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+        {
+            defaultQueueName = dest.getQueueName();
+        }
+        else
+        {
+            defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+        }
+
+        for (Binding binding: bindings)
+        {
+            String queue = binding.getQueue() == null?
+                    defaultQueueName: binding.getQueue();
+
+            String exchange = binding.getExchange() == null ?
+                        defaultExchangeForBinding :
+                        binding.getExchange();
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Binding queue : " + queue +
+                         " exchange: " + exchange +
+                         " using binding key " + binding.getBindingKey() +
+                         " with args " + Strings.printMap(binding.getArgs()));
+            }
+            getQpidSession().exchangeBind(queue,
+                                     exchange,
+                                     binding.getBindingKey(),
+                                     binding.getArgs());
+       }
+    }
+
+    void handleLinkDelete(AMQDestination dest) throws AMQException
+    {
+        // We need to destroy link bindings
+        String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+                .getAddressName() : "amq.topic";
+
+        String defaultQueueName = null;
+        if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+        {
+            defaultQueueName = dest.getQueueName();
+        }
+        else
+        {
+            defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+        }
+
+        for (Binding binding: dest.getLink().getBindings())
+        {
+            String queue = binding.getQueue() == null?
+                    defaultQueueName: binding.getQueue();
+
+            String exchange = binding.getExchange() == null ?
+                        defaultExchangeForBinding :
+                        binding.getExchange();
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Unbinding queue : " + queue +
+                         " exchange: " + exchange +
+                         " using binding key " + binding.getBindingKey() +
+                         " with args " + Strings.printMap(binding.getArgs()));
+            }
+            getQpidSession().exchangeUnbind(queue, exchange,
+                                            binding.getBindingKey());
+        }
+    }
+
+    void deleteSubscriptionQueue(AMQDestination dest) throws AMQException
+    {
+        // We need to delete the subscription queue.
+        if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+            dest.getLink().getSubscriptionQueue().isExclusive() &&
+            isQueueExist(dest, false))
+        {
+            getQpidSession().queueDelete(dest.getQueueName());
+        }
+    }
+
+    void handleNodeDelete(AMQDestination dest) throws AMQException
+    {
+        if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+        {
+            if (isExchangeExist(dest,false))
+            {
+                getQpidSession().exchangeDelete(dest.getAddressName());
+            }
+        }
+        else
+        {
+            if (isQueueExist(dest,false))
+            {
+                getQpidSession().queueDelete(dest.getAddressName());
+            }
+        }
+    }
+}

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 16:14:30 2013
@@ -21,12 +21,18 @@
 package org.apache.qpid.client;
 
 
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
+import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
@@ -58,6 +64,27 @@ public class AMQSession_0_8 extends AMQS
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
+    public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
+
+    private final boolean _syncAfterClientAck =
+            Boolean.parseBoolean(System.getProperty(QPID_SYNC_AFTER_CLIENT_ACK, "true"));
+
+    /**
+     * The period to wait while flow controlled before sending a log message confirming that the session is still
+     * waiting on flow control being revoked
+     */
+    private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
+                                                                 DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
+
+    /**
+     * The period to wait while flow controlled before declaring a failure
+     */
+    private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
+                                                                  DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+
+    /** Flow control */
+    private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
     /**
      * Creates a new session on a connection.
      *
@@ -98,8 +125,9 @@ public class AMQSession_0_8 extends AMQS
         return getProtocolHandler().getProtocolVersion();
     }
 
-    protected void acknowledgeImpl()
+    protected void acknowledgeImpl() throws JMSException
     {
+        boolean syncRequired = false;
         while (true)
         {
             Long tag = getUnacknowledgedMessageTags().poll();
@@ -109,6 +137,19 @@ public class AMQSession_0_8 extends AMQS
             }
 
             acknowledgeMessage(tag, false);
+            syncRequired = true;
+        }
+
+        try
+        {
+            if (syncRequired && _syncAfterClientAck)
+            {
+                sync();
+            }
+        }
+        catch (AMQException a)
+        {
+            throw new JMSAMQException("Failed to sync after acknowledge", a);
         }
     }
 
@@ -359,9 +400,9 @@ public class AMQSession_0_8 extends AMQS
         }
     }    
 
-    @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+    @Override
+    public void sendConsume(BasicMessageConsumer_0_8 consumer,
                                       AMQShortString queueName,
-                                      AMQProtocolHandler protocolHandler,
                                       boolean nowait,
                                       int tag) throws AMQException, FailoverException
     {
@@ -380,27 +421,29 @@ public class AMQSession_0_8 extends AMQS
 
         if (nowait)
         {
-            protocolHandler.writeFrame(jmsConsume);
+            getProtocolHandler().writeFrame(jmsConsume);
         }
         else
         {
-            protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+            getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class);
         }
     }
 
-    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
-            final boolean nowait) throws AMQException, FailoverException
+    @Override
+    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+            boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
     {
+        //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
         ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
                                                                                  name.toString().startsWith("amq."),
-                                                                                 false,false,false,false,null);
+                                                                                 durable, autoDelete, internal, false, null);
         AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
 
-        protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+        getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
-    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait, boolean passive) throws AMQException, FailoverException
+    private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
     {
         QueueDeclareBody body =
                 getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -414,7 +457,32 @@ public class AMQSession_0_8 extends AMQS
 
         AMQFrame queueDeclare = body.generateFrame(getChannelId());
 
-        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+        getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+    }
+
+    @Override
+    protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal,
+                                          final boolean nowait, final boolean passive) throws AMQException
+    {
+        //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+        return new FailoverNoopSupport<AMQShortString, AMQException>(
+                new FailoverProtectedOperation<AMQShortString, AMQException>()
+                {
+                    public AMQShortString execute() throws AMQException, FailoverException
+                    {
+                        // Generate the queue name if the destination indicates that a client generated name is to be used.
+                        if (amqd.isNameRequired())
+                        {
+                            amqd.setQueueName(protocolHandler.generateQueueName());
+                        }
+
+                        sendQueueDeclare(amqd, passive);
+
+                        return amqd.getAMQQueueName();
+                    }
+                }, getAMQConnection()).execute();
     }
 
     public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
@@ -440,10 +508,8 @@ public class AMQSession_0_8 extends AMQS
             final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
             final boolean noConsume, final boolean autoClose)  throws JMSException
     {
-
-        final AMQProtocolHandler protocolHandler = getProtocolHandler();
        return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
-               getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+               getMessageFactoryRegistry(),this, arguments, prefetchHigh, prefetchLow,
                                  exclusive, getAcknowledgeMode(), noConsume, autoClose);
     }
 
@@ -629,12 +695,11 @@ public class AMQSession_0_8 extends AMQS
         declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
     }
 
-    public void handleAddressBasedDestination(AMQDestination dest, 
+    public void resolveAddress(AMQDestination dest,
                                               boolean isConsumer,
-                                              boolean noLocal,
-                                              boolean noWait) throws AMQException
+                                              boolean noLocal) throws AMQException
     {
-        throw new UnsupportedOperationException("The new addressing based sytanx is "
+        throw new UnsupportedOperationException("The new addressing based syntax is "
                 + "not supported for AMQP 0-8/0-9 versions");
     }
     
@@ -662,14 +727,23 @@ public class AMQSession_0_8 extends AMQS
                             queueName == null ? null : new AMQShortString(queueName),
                             bindingKey == null ? null : new AMQShortString(bindingKey));
     }
-  
+
+    private AMQProtocolHandler getProtocolHandler()
+    {
+        return getAMQConnection().getProtocolHandler();
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+        return methodRegistry;
+    }
 
     public AMQException getLastException()
     {
         // if the Connection has closed then we should throw any exception that
         // has occurred that we were not waiting for
-        AMQStateManager manager = getAMQConnection().getProtocolHandler()
-                .getStateManager();
+        AMQStateManager manager = getProtocolHandler().getStateManager();
         
         Exception e = manager.getLastException();
         if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -693,6 +767,49 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
+    public boolean isFlowBlocked()
+    {
+        synchronized (_flowControl)
+        {
+            return !_flowControl.getFlowControl();
+        }
+    }
+
+    public void setFlowControl(final boolean active)
+    {
+        _flowControl.setFlowControl(active);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+        }
+    }
+
+    void checkFlowControl() throws InterruptedException, JMSException
+    {
+        long expiryTime = 0L;
+        synchronized (_flowControl)
+        {
+            while (!_flowControl.getFlowControl() &&
+                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
+                                     : expiryTime) >= System.currentTimeMillis() )
+            {
+
+                _flowControl.wait(_flowControlWaitPeriod);
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+                }
+            }
+            if(!_flowControl.getFlowControl())
+            {
+                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+                throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
+            }
+        }
+    }
+
+
+
     public abstract static class DestinationCache<T extends AMQDestination>
     {
         private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
@@ -740,6 +857,22 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
+    private static final class FlowControlIndicator
+    {
+        private volatile boolean _flowControl = true;
+
+        public synchronized void setFlowControl(boolean flowControl)
+        {
+            _flowControl = flowControl;
+            notify();
+        }
+
+        public boolean getFlowControl()
+        {
+            return _flowControl;
+        }
+    }
+
     private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
     private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
 

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Feb 28 16:14:30 2013
@@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestina
                     AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
                     // link is never null if dest was created using an address string.
                     t.getLink().setName(queueName.asString());               
-                    t.getSourceNode().setAutoDelete(false);
-                    t.getSourceNode().setDurable(true);
+                    t.getLink().getSubscriptionQueue().setAutoDelete(false);
+                    t.getLink().setDurable(true);
                     
                     // The legacy fields are also populated just in case.
                     t.setQueueName(queueName);

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 16:14:30 2013
@@ -31,7 +31,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.CloseConsumerMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.client.filter.JMSSelectorFilter;
 import org.apache.qpid.framing.AMQShortString;
@@ -87,8 +86,6 @@ public abstract class BasicMessageConsum
 
     private final AMQSession _session;
 
-    private final AMQProtocolHandler _protocolHandler;
-
     /**
      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
      */
@@ -140,9 +137,9 @@ public abstract class BasicMessageConsum
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
-                                   AMQSession session, AMQProtocolHandler protocolHandler,
-                                   FieldTable rawSelector, int prefetchHigh, int prefetchLow,
-                                   boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+                                   AMQSession session, FieldTable rawSelector,
+                                   int prefetchHigh, int prefetchLow, boolean exclusive,
+                                   int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
     {
         _channelId = channelId;
         _connection = connection;
@@ -150,7 +147,6 @@ public abstract class BasicMessageConsum
         _destination = destination;
         _messageFactory = messageFactory;
         _session = session;
-        _protocolHandler = protocolHandler;
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
@@ -597,7 +593,6 @@ public abstract class BasicMessageConsum
                             {
                                 sendCancel();
                             }
-                            cleanupQueue();
                         }
                     }
                     catch (AMQException e)
@@ -635,8 +630,6 @@ public abstract class BasicMessageConsum
     }
 
     abstract void sendCancel() throws AMQException, FailoverException;
-    
-    abstract void cleanupQueue() throws AMQException, FailoverException;
 
     /**
      * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
@@ -1042,10 +1035,4 @@ public abstract class BasicMessageConsum
     {
         return _messageFactory;
     }
-
-    protected AMQProtocolHandler getProtocolHandler()
-    {
-        return _protocolHandler;
-    }
-
 }

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 16:14:30 2013
@@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.Session;
@@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 e
 
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
-                                        AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
-                                        FieldTable rawSelector, int prefetchHigh, int prefetchLow,
-                                        boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
+                                        AMQSession<?,?> session, FieldTable rawSelector,
+                                        int prefetchHigh, int prefetchLow, boolean exclusive,
+                                        int acknowledgeMode, boolean browseOnly, boolean autoClose)
             throws JMSException
     {
-        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
-                rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
+                prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
         _0_10session = (AMQSession_0_10) session;
 
         _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
@@ -96,6 +95,7 @@ public class BasicMessageConsumer_0_10 e
 
         _capacity = evaluateCapacity(destination);
 
+        // This is due to the Destination carrying the temporary subscription name which is incorrect.
         if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) 
         {            
             boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; 
@@ -164,6 +164,7 @@ public class BasicMessageConsumer_0_10 e
     @Override void sendCancel() throws AMQException
     {
         _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+        postSubscription();
         try
         {
             _0_10session.getQpidSession().sync();
@@ -500,7 +501,7 @@ public class BasicMessageConsumer_0_10 e
         }
     }
     
-    void cleanupQueue() throws AMQException, FailoverException
+    void postSubscription() throws AMQException
     {
         AMQDestination dest = this.getDestination();
         if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
@@ -508,9 +509,11 @@ public class BasicMessageConsumer_0_10 e
             if (dest.getDelete() == AddressOption.ALWAYS ||
                 dest.getDelete() == AddressOption.RECEIVER )
             {
-                ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        this.getDestination().getQueueName());
+                ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+                ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
             }
+            // Subscription queue is handled as part of linkDelete method.
+            ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
         }
     }
 
@@ -560,4 +563,4 @@ public class BasicMessageConsumer_0_10 e
         return capacity;
     }
 
-}
+}
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Feb 28 16:14:30 2013
@@ -29,7 +29,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQFrame;
@@ -52,12 +51,12 @@ public class BasicMessageConsumer_0_8 ex
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
                                        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
-                                       AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
-                                       boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
+                                       FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive,
+                                       int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
     {
         super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
-              protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
-              acknowledgeMode, browseOnly, autoClose);
+              rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
+              browseOnly, autoClose);
         final FieldTable consumerArguments = getArguments();
         if (isAutoClose())
         {
@@ -93,13 +92,19 @@ public class BasicMessageConsumer_0_8 ex
         }
     }
 
+    @Override
+    public AMQSession_0_8 getSession()
+    {
+        return (AMQSession_0_8) super.getSession();
+    }
+
     void sendCancel() throws AMQException, FailoverException
     {
         BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
 
         final AMQFrame cancelFrame = body.generateFrame(getChannelId());
 
-        getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
+        getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
 
         if (_logger.isDebugEnabled())
         {
@@ -122,11 +127,6 @@ public class BasicMessageConsumer_0_8 ex
         return receive();
     }
 
-    void cleanupQueue() throws AMQException, FailoverException
-    {
-        
-    }
-
     public RejectBehaviour getRejectBehaviour()
     {
         return _rejectBehaviour;

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Feb 28 16:14:30 2013
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client;
 
-import java.io.UnsupportedEncodingException;
 import java.util.UUID;
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -36,15 +35,15 @@ import javax.jms.Topic;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
+
+
     enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
 
     private final Logger _logger ;
@@ -71,18 +70,6 @@ public abstract class BasicMessageProduc
     private AMQDestination _destination;
 
     /**
-     * Default encoding used for messages produced by this producer.
-     */
-    private String _encoding;
-
-    /**
-     * Default encoding used for message produced by this producer.
-     */
-    private String _mimeType;
-
-    private AMQProtocolHandler _protocolHandler;
-
-    /**
      * True if this producer was created from a transacted session
      */
     private boolean _transacted;
@@ -135,14 +122,12 @@ public abstract class BasicMessageProduc
     private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
 
     protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
-                                   Boolean immediate, Boolean mandatory) throws AMQException
+                                   AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
     {
     	_logger = logger;
     	_connection = connection;
         _destination = destination;
         _transacted = transacted;
-        _protocolHandler = protocolHandler;
         _channelId = channelId;
         _session = session;
         _producerId = producerId;
@@ -163,6 +148,11 @@ public abstract class BasicMessageProduc
         setPublishMode();
     }
 
+    protected AMQConnection getConnection()
+    {
+        return _connection;
+    }
+
     void setPublishMode()
     {
         // Publish mode could be configured at destination level as well.
@@ -303,7 +293,6 @@ public abstract class BasicMessageProduc
         checkPreConditions();
         checkInitialDestination();
 
-
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -467,7 +456,7 @@ public abstract class BasicMessageProduc
                 JMSException ex = new JMSException("Error validating destination");
                 ex.initCause(e);
                 ex.setLinkedException(e);
-                
+
                 throw ex;
             }
             amqDestination.setExchangeExistsChecked(true);
@@ -558,19 +547,7 @@ public abstract class BasicMessageProduc
         }
     }
 
-    public void setMimeType(String mimeType) throws JMSException
-    {
-        checkNotClosed();
-        _mimeType = mimeType;
-    }
-
-    public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
-    {
-        checkNotClosed();
-        _encoding = encoding;
-    }
-
-    private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+    private void checkPreConditions() throws JMSException
     {
         checkNotClosed();
 
@@ -584,15 +561,16 @@ public abstract class BasicMessageProduc
         }
     }
 
-    private void checkInitialDestination()
+    private void checkInitialDestination() throws JMSException
     {
         if (_destination == null)
         {
             throw new UnsupportedOperationException("Destination is null");
         }
+        checkValidQueue();
     }
 
-    private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+    private void checkDestination(Destination suppliedDestination) throws JMSException
     {
         if ((_destination != null) && (suppliedDestination != null))
         {
@@ -600,6 +578,11 @@ public abstract class BasicMessageProduc
                     "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
         }
 
+        if(suppliedDestination instanceof AMQQueue)
+        {
+            AMQQueue destination = (AMQQueue) suppliedDestination;
+            checkValidQueue(destination);
+        }
         if (suppliedDestination == null)
         {
             throw new InvalidDestinationException("Supplied Destination was invalid");
@@ -607,6 +590,43 @@ public abstract class BasicMessageProduc
 
     }
 
+    private void checkValidQueue() throws JMSException
+    {
+        if(_destination instanceof AMQQueue)
+        {
+            checkValidQueue((AMQQueue) _destination);
+        }
+    }
+
+    private void checkValidQueue(AMQQueue destination) throws JMSException
+    {
+        if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
+        {
+            if (getSession().isStrictAMQP())
+            {
+                getLogger().warn("AMQP does not support destination validation before publish");
+                destination.setCheckedForQueueBinding(true);
+            }
+            else
+            {
+                if (isBound(destination))
+                {
+                    destination.setCheckedForQueueBinding(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + destination.getQueueName()
+                        + " is not a valid destination (no binding on server)");
+                }
+            }
+        }
+    }
+
+    private boolean validateQueueOnSend()
+    {
+        return _connection.validateQueueOnSend();
+    }
+
     /**
      * The session used to create this producer
      */
@@ -645,16 +665,6 @@ public abstract class BasicMessageProduc
         _destination = destination;
     }
 
-    protected AMQProtocolHandler getProtocolHandler()
-    {
-        return _protocolHandler;
-    }
-
-    protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
-    {
-        _protocolHandler = protocolHandler;
-    }
-
     protected int getChannelId()
     {
         return _channelId;

Modified: qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/asyncstore/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Feb 28 16:14:30 2013
@@ -27,7 +27,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
@@ -60,10 +59,9 @@ public class BasicMessageProducer_0_10 e
     private byte[] userIDBytes;
 
     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                              AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
-                              Boolean immediate, Boolean mandatory) throws AMQException
+                              AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
     {
-        super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+        super(_logger, connection, destination, transacted, channelId, session, producerId, immediate, mandatory);
         
         userIDBytes = Strings.toUTF8(getUserID());
     }
@@ -79,14 +77,18 @@ public class BasicMessageProducer_0_10 e
 	                (name,
 	                 destination.getExchangeClass().toString(),
 	                 null, null,
-	                 name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+	                 name.startsWith("amq.") ? Option.PASSIVE : Option.NONE,
+	                 destination.isExchangeDurable() ? Option.DURABLE : Option.NONE,
+	                 destination.isExchangeAutoDelete() ? Option.AUTO_DELETE : Option.NONE);
         	}
         }
         else
         {       
             try
             {
-                getSession().handleAddressBasedDestination(destination,false,false,false);
+                getSession().resolveAddress(destination,false,false);
+                ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
+                ((AMQSession_0_10)getSession()).sync();
             }
             catch(Exception e)
             {
@@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 e
         return getSession().isQueueBound(destination);
     }
     
+    // We should have a close and closed method to distinguish between normal close
+    // and a close due to session or connection error.
     @Override
     public void close() throws JMSException
     {
         super.close();
         AMQDestination dest = getAMQDestination();
-        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+        AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
+        if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
-            if (dest.getDelete() == AddressOption.ALWAYS ||
-                dest.getDelete() == AddressOption.SENDER )
+            try
             {
-                try
-                {
-                    ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        getAMQDestination().getQueueName());
-                }
-                catch(TransportException e)
+                if (dest.getDelete() == AddressOption.ALWAYS ||
+                    dest.getDelete() == AddressOption.SENDER )
                 {
-                    throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+                    ssn.handleNodeDelete(dest);
                 }
+                ssn.handleLinkDelete(dest);
+            }
+            catch(TransportException e)
+            {
+                throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+            }
+            catch (AMQException e)
+            {
+                JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+                ex.setLinkedException(e);
+                ex.initCause(e);
+                throw ex;
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org