You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 11:20:41 UTC

svn commit: r1295495 [2/4] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/bin/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclien...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Mar  1 10:20:36 2012
@@ -65,4 +65,16 @@ public interface AMQConnectionDelegate
     ProtocolVersion getProtocolVersion();
 
     boolean verifyClientID() throws JMSException, AMQException;
+
+    /**
+     * Tests whether the server has advertised support for the specified feature
+     * via the qpid.features server connection property.  By convention the feature name
+     * with begin <code>qpid.</code> followed by one or more words separated by minus signs
+     * e.g. qpid.jms-selector.
+     *
+     * @param featureName name of feature.
+     *
+     * @return true if the feature is supported by the server
+     */
+    boolean isSupportedServerFeature(final String featureName);
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Mar  1 10:20:36 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client;
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.client;
  *
  */
 
+package org.apache.qpid.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.transport.ClientConnectionDelegate;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
@@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10 
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
 
     /**
-     * The name of the UUID property
-     */
-    private static final String UUID_NAME = "qpid.federation_tag";
-    /**
      * The AMQ Connection.
      */
-    private AMQConnection _conn;
+    private final AMQConnection _conn;
 
     /**
-     * The QpidConeection instance that is mapped with thie JMS connection.
+     * The QpidConeection instance that is mapped with this JMS connection.
      */
     org.apache.qpid.transport.Connection _qpidConnection;
     private ConnectionException exception = null;
@@ -281,24 +278,29 @@ public class AMQConnectionDelegate_0_10 
         {
             _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
 
-            try
+            _qpidConnection.notifyFailoverRequired();
+
+            synchronized (_conn.getFailoverMutex())
             {
-                if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+                try
                 {
-                    _conn.failoverPrep();
-                    _conn.resubscribeSessions();
-                    _conn.fireFailoverComplete();
-                    return;
+                    if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+                    {
+                        _conn.failoverPrep();
+                        _conn.resubscribeSessions();
+                        _conn.fireFailoverComplete();
+                        return;
+                    }
+                }
+                catch (Exception e)
+                {
+                    _logger.error("error during failover", e);
+                }
+                finally
+                {
+                    _conn.getProtocolHandler().getFailoverLatch().countDown();
+                    _conn.getProtocolHandler().setFailoverLatch(null);
                 }
-            }
-            catch (Exception e)
-            {
-                _logger.error("error during failover", e);
-            }
-            finally
-            {
-                _conn.getProtocolHandler().getFailoverLatch().countDown();
-                _conn.getProtocolHandler().setFailoverLatch(null);
             }
         }
 
@@ -324,6 +326,18 @@ public class AMQConnectionDelegate_0_10 
 
     public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
     {
+        if (_conn.isFailingOver())
+        {
+            try
+            {
+                _conn.blockUntilNotFailingOver();
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+
         try
         {
             return operation.execute();
@@ -352,7 +366,32 @@ public class AMQConnectionDelegate_0_10 
 
     public String getUUID()
     {
-        return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+        return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG);
+    }
+
+    /*
+     * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+     */
+    public boolean isSupportedServerFeature(final String featureName)
+    {
+        if (featureName == null)
+        {
+            throw new IllegalArgumentException("featureName cannot be null");
+        }
+        final Map<String, Object> serverProperties = _qpidConnection.getServerProperties();
+        boolean featureSupported = false;
+        if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES))
+        {
+            final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES);
+            featureSupported = supportServerFeatures instanceof List && ((List<String>)supportServerFeatures).contains(featureName);
+        }
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported);
+        }
+
+        return featureSupported;
     }
 
     private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Mar  1 10:20:36 2012
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
 import java.security.GeneralSecurityException;
-import java.security.Security;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateWaiter;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.BasicQosBody;
 import org.apache.qpid.framing.BasicQosOkBody;
 import org.apache.qpid.framing.ChannelOpenBody;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
 public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
-    private AMQConnection _conn;
+    private final AMQConnection _conn;
 
 
     public void closeConnection(long timeout) throws JMSException, AMQException
@@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 i
     {
         return true;
     }
+
+    /*
+     * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+     */
+    public boolean isSupportedServerFeature(String featureName)
+    {
+        // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now
+        // we just hardcode JMS selectors as supported.
+        return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Mar  1 10:20:36 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Destination;
 import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination imp
 
     private boolean _browseOnly;
     
-    private boolean _isAddressResolved;
+    private AtomicLong _addressResolved = new AtomicLong(0);
 
     private AMQShortString _queueName;
 
@@ -77,7 +78,7 @@ public abstract class AMQDestination imp
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
-    
+
     // ----- Fields required to support new address syntax -------
     
     public enum DestSyntax {        
@@ -740,12 +741,12 @@ public abstract class AMQDestination imp
     
     public boolean isAddressResolved()
     {
-        return _isAddressResolved;
+        return _addressResolved.get() > 0;
     }
 
-    public void setAddressResolved(boolean addressResolved)
+    public void setAddressResolved(long addressResolved)
     {
-        _isAddressResolved = addressResolved;
+        _addressResolved.set(addressResolved);
     }
     
     private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination imp
         dest.setTargetNode(_targetNode);
         dest.setSourceNode(_sourceNode);
         dest.setLink(_link);
-        dest.setAddressResolved(_isAddressResolved);
+        dest.setAddressResolved(_addressResolved.get());
         return dest;        
     }
     
@@ -836,4 +837,9 @@ public abstract class AMQDestination imp
     {
         _isDurable = b;
     }
+
+    public boolean isResolvedAfter(long time)
+    {
+        return _addressResolved.get() > time;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar  1 10:20:36 2012
@@ -89,9 +89,9 @@ import org.apache.qpid.client.message.Un
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
@@ -308,7 +308,7 @@ public abstract class AMQSession<C exten
     protected final FlowControllingBlockingQueue _queue;
 
     /** Holds the highest received delivery tag. */
-    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
     
     /** All the not yet acknowledged message tags */
@@ -534,7 +534,7 @@ public abstract class AMQSession<C exten
         {
             _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
         }
-        
+
         // Add creation logging to tie in with the existing close logging
         if (_logger.isInfoEnabled())
         {
@@ -856,6 +856,10 @@ public abstract class AMQSession<C exten
         //Check that we are clean to commit.
         if (_failedOverDirty)
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+            }
             rollback();
 
             throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -890,7 +894,7 @@ public abstract class AMQSession<C exten
         C consumer = _consumers.get(consumerTag);
         if (consumer != null)
         {
-            if (!consumer.isNoConsume())  // Normal Consumer
+            if (!consumer.isBrowseOnly())  // Normal Consumer
             {
                 // Clean the Maps up first
                 // Flush any pending messages for this consumerTag
@@ -1092,7 +1096,7 @@ public abstract class AMQSession<C exten
                     // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
                     // argument, as specifying null for the arguments when querying means they should not be checked at all
                     args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
-                    
+
                     // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                     // says we must trash the subscription.
                     boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
@@ -1814,9 +1818,7 @@ public abstract class AMQSession<C exten
                     suspendChannel(true);
                 }
 
-                // Let the dispatcher know that all the incomming messages
-                // should be rolled back(reject/release)
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 syncDispatchQueue();
 
@@ -2008,28 +2010,11 @@ public abstract class AMQSession<C exten
 
                         AMQDestination amqd = (AMQDestination) destination;
 
-                        // TODO: Define selectors in AMQP
-                        // TODO: construct the rawSelector from the selector string if rawSelector == null
-                        final FieldTable ft = FieldTableFactory.newFieldTable();
-                        // if (rawSelector != null)
-                        // ft.put("headers", rawSelector.getDataAsBytes());
-                        // rawSelector is used by HeadersExchange and is not a JMS Selector
-                        if (rawSelector != null)
-                        {
-                            ft.addAll(rawSelector);
-                        }
-
-                        // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a 
-                        // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
-                        // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
-                        // argument, as specifying null for the arguments when querying means they should not be checked at all
-                        ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
-
                         C consumer;
                         try
                         {
                             consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
-                                                             noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+                                                             noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
                         }
                         catch(TransportException e)
                         {
@@ -2570,7 +2555,7 @@ public abstract class AMQSession<C exten
      * @param queueName
      */
     private void consumeFromQueue(C consumer, AMQShortString queueName,
-                                  AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+                                  AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
 
@@ -2598,7 +2583,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendConsume(C consumer, AMQShortString queueName,
-                                     AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
+                                     AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
 
     private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
             throws JMSException
@@ -2923,7 +2908,7 @@ public abstract class AMQSession<C exten
 
         try
         {
-            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
+            consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
         }
         catch (FailoverException e)
         {
@@ -3202,13 +3187,13 @@ public abstract class AMQSession<C exten
                     setConnectionStopped(true);
                 }
 
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
                 for (C consumer : _consumers.values())
                 {
-                    if (!consumer.isNoConsume())
+                    if (!consumer.isBrowseOnly())
                     {
                         consumer.rollback();
                     }
@@ -3351,6 +3336,11 @@ public abstract class AMQSession<C exten
                 if (!(message instanceof CloseConsumerMessage)
                     && tagLE(deliveryTag, _rollbackMark.get()))
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message because delivery tag " + deliveryTag
+                                + " <= rollback mark " + _rollbackMark.get());
+                    }
                     rejectMessage(message, true);
                 }
                 else if (_usingDispatcherForCleanup)
@@ -3390,7 +3380,7 @@ public abstract class AMQSession<C exten
                     }
                     else
                     {
-                        if (consumer.isNoConsume())
+                        if (consumer.isBrowseOnly())
                         {
                             _dispatcherLogger.info("Received a message("
                                                    + System.identityHashCode(message) + ")" + "["
@@ -3412,6 +3402,11 @@ public abstract class AMQSession<C exten
                 // Don't reject if we're already closing
                 if (!_closed.get())
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+                    }
                     rejectMessage(message, true);
                 }
             }
@@ -3542,4 +3537,15 @@ public abstract class AMQSession<C exten
     {
         return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
     }
+
+    private void setRollbackMark()
+    {
+        // Let the dispatcher know that all the incomming messages
+        // should be rolled back(reject/release)
+        _rollbackMark.set(_highestDeliveryTag.get());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar  1 10:20:36 2012
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
@@ -74,6 +75,7 @@ import org.apache.qpid.transport.Session
 import org.apache.qpid.transport.SessionListener;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,23 +296,34 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    void messageAcknowledge(RangeSet ranges, boolean accept)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept)
     {
         messageAcknowledge(ranges,accept,false);
     }
     
-    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
     {
-        Session ssn = getQpidSession();
-        for (Range range : ranges)
+        final Session ssn = getQpidSession();
+        flushProcessed(ranges,accept);
+        if (accept)
         {
-            ssn.processed(range);
+            ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
         }
-        ssn.flushProcessed(accept ? BATCH : NONE);
-        if (accept)
+    }
+
+    /**
+     * Flush any outstanding commands. This causes session complete to be sent.
+     * @param ranges the range of command ids.
+     * @param batch true if batched.
+     */
+    void flushProcessed(final RangeSet ranges, final boolean batch)
+    {
+        final Session ssn = getQpidSession();
+        for (final Range range : ranges)
         {
-            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+            ssn.processed(range);
         }
+        ssn.flushProcessed(batch ? BATCH : NONE);
     }
 
     /**
@@ -364,7 +377,7 @@ public class AMQSession_0_10 extends AMQ
                 _logger.debug("Binding queue : " + queue + 
                               " exchange: " + exchange + 
                               " using binding key " + binding.getBindingKey() + 
-                              " with args " + printMap(binding.getArgs()));
+                              " with args " + Strings.printMap(binding.getArgs()));
                 getQpidSession().exchangeBind(queue, 
                                               exchange,
                                               binding.getBindingKey(),
@@ -496,13 +509,13 @@ public class AMQSession_0_10 extends AMQ
     public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
                                                       final int prefetchLow, final boolean noLocal,
                                                       final boolean exclusive, String messageSelector,
-                                                      final FieldTable ft, final boolean noConsume,
+                                                      final FieldTable rawSelector, final boolean noConsume,
                                                       final boolean autoClose) throws JMSException
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
         return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
-                                             _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+                                             _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
                                              prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
     }
 
@@ -568,56 +581,30 @@ public class AMQSession_0_10 extends AMQ
      * Registers the consumer with the broker
      */
     public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
-                            boolean nowait, String messageSelector, int tag)
+                            boolean nowait, MessageFilter messageSelector, int tag)
             throws AMQException, FailoverException
     {        
-        boolean preAcquire;
-        
-        long capacity = getCapacity(consumer.getDestination());
-        
-        try
-        {
-            boolean isTopic;
-            Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
-            
-            if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
-            {
-                isTopic = consumer.getDestination() instanceof AMQTopic ||
-                          consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
-                
-                preAcquire = isTopic || (!consumer.isNoConsume()  && 
-                        (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
-            }
-            else
-            {
-                isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-                
-                preAcquire = !consumer.isNoConsume() && 
-                             (isTopic || consumer.getMessageSelector() == null || 
-                              consumer.getMessageSelector().equals(""));
-                
-                arguments.putAll(
-                        (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
-            }
-            
-            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-            
-            if (consumer.getDestination().getLink() != null)
-            {
-                acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
-            }
-            
-            getQpidSession().messageSubscribe
-                (queueName.toString(), String.valueOf(tag),
-                 acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
-                 preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
-                 consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
-        }
-        catch (JMSException e)
+        boolean preAcquire = consumer.isPreAcquire();
+
+        AMQDestination destination = consumer.getDestination();
+        long capacity = consumer.getCapacity();
+
+        Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+        Link link = destination.getLink();
+        if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
         {
-            throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+            arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
         }
 
+        boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+        getQpidSession().messageSubscribe
+            (queueName.toString(), String.valueOf(tag),
+             acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+             preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
+             consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
         String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
 
         if (capacity == 0)
@@ -646,21 +633,6 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    private long getCapacity(AMQDestination destination)
-    {
-        long capacity = 0;
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
-        {
-            capacity = destination.getLink().getConsumerCapacity();
-        }
-        else if (prefetch())
-        {
-            capacity = getAMQConnection().getMaxPrefetch();
-        }
-        return capacity;
-    }
-
     /**
      * Create an 0_10 message producer
      */
@@ -825,7 +797,7 @@ public class AMQSession_0_10 extends AMQ
                 //only set if msg list is null
                 try
                 {
-                    long capacity = getCapacity(consumer.getDestination());
+                    long capacity = consumer.getCapacity();
                     
                     if (capacity == 0)
                     {
@@ -969,17 +941,23 @@ public class AMQSession_0_10 extends AMQ
 
     /**
      * Store non committed messages for this session
-     * With 0.10 messages are consumed with window mode, we must send a completion
-     * before the window size is reached so credits don't dry up.
      * @param id
      */
     @Override protected void addDeliveredMessage(long id)
     {
         _txRangeSet.add((int) id);
         _txSize++;
+    }
+
+    /**
+     * With 0.10 messages are consumed with window mode, we must send a completion
+     * before the window size is reached so credits don't dry up.
+     */
+    protected void sendTxCompletionsIfNecessary()
+    {
         // this is a heuristic, we may want to have that configurable
-        if (_connection.getMaxPrefetch() == 1 ||
-                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
+        if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
+                _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
         {
             // send completed so consumer credits don't dry up
             messageAcknowledge(_txRangeSet, false);
@@ -1168,8 +1146,8 @@ public class AMQSession_0_10 extends AMQ
                                               boolean isConsumer,
                                               boolean noWait) throws AMQException
     {
-        if (dest.isAddressResolved())
-        {           
+        if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+        {
             if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
             {
                 createSubscriptionQueue(dest);
@@ -1189,22 +1167,6 @@ public class AMQSession_0_10 extends AMQ
             
             int type = resolveAddressType(dest);
             
-            if (type == AMQDestination.QUEUE_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.UNRELIABLE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
-            {
-                throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");                      
-            }
-            
             switch (type)
             {
                 case AMQDestination.QUEUE_TYPE: 
@@ -1258,7 +1220,7 @@ public class AMQSession_0_10 extends AMQ
                             "The name '" + dest.getAddressName() +
                             "' supplied in the address doesn't resolve to an exchange or a queue");
             }
-            dest.setAddressResolved(true);
+            dest.setAddressResolved(System.currentTimeMillis());
         }
     }
     
@@ -1352,22 +1314,6 @@ public class AMQSession_0_10 extends AMQ
         dest.setRoutingKey(new AMQShortString(dest.getSubject()));
     }
     
-    /** This should be moved to a suitable utility class */
-    private String printMap(Map<String,Object> map)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("<");
-        if (map != null)
-        {
-            for(String key : map.keySet())
-            {
-                sb.append(key).append(" = ").append(map.get(key)).append(" ");
-            }
-        }
-        sb.append(">");
-        return sb.toString();
-    }
-
     protected void acknowledgeImpl()
     {
         RangeSet range = gatherUnackedRangeSet();
@@ -1378,4 +1324,15 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().sync();
         }
     }
+
+    @Override
+    void resubscribe() throws AMQException
+    {
+        // Also reset the delivery tag tracker, to insure we dont
+        // return the first <total number of msgs received on session>
+        // messages sent by the brokers following the first rollback
+        // after failover
+        _highestDeliveryTag.set(-1);
+        super.resubscribe();
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Mar  1 10:20:36 2012
@@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -333,24 +334,9 @@ public final class AMQSession_0_8 extend
                                       AMQShortString queueName,
                                       AMQProtocolHandler protocolHandler,
                                       boolean nowait,
-                                      String messageSelector,
+                                      MessageFilter messageSelector,
                                       int tag) throws AMQException, FailoverException
     {
-        FieldTable arguments = FieldTableFactory.newFieldTable();
-        if ((messageSelector != null) && !messageSelector.equals(""))
-        {
-            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
-        }
-
-        if (consumer.isAutoClose())
-        {
-            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
-        }
-
-        if (consumer.isNoConsume())
-        {
-            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
-        }
 
         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
                                                                            queueName,
@@ -359,7 +345,7 @@ public final class AMQSession_0_8 extend
                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
                                                                            consumer.isExclusive(),
                                                                            nowait,
-                                                                           arguments);
+                                                                           consumer.getArguments());
 
 
         AMQFrame jmsConsume = body.generateFrame(_channelId);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar  1 10:20:36 2012
@@ -20,10 +20,14 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
@@ -31,6 +35,7 @@ import org.apache.qpid.transport.Transpo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
@@ -52,7 +57,7 @@ public abstract class BasicMessageConsum
     /** The connection being used by this consumer */
     protected final AMQConnection _connection;
 
-    protected final String _messageSelector;
+    protected final MessageFilter _messageSelectorFilter;
 
     private final boolean _noLocal;
 
@@ -138,7 +143,7 @@ public abstract class BasicMessageConsum
      */
     private final boolean _autoClose;
 
-    private final boolean _noConsume;
+    private final boolean _browseOnly;
     private List<StackTraceElement> _closedStack = null;
 
 
@@ -146,28 +151,44 @@ public abstract class BasicMessageConsum
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                    AMQSession session, AMQProtocolHandler protocolHandler,
-                                   FieldTable arguments, int prefetchHigh, int prefetchLow,
-                                   boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+                                   FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+                                   boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
     {
         _channelId = channelId;
         _connection = connection;
-        _messageSelector = messageSelector;
         _noLocal = noLocal;
         _destination = destination;
         _messageFactory = messageFactory;
         _session = session;
         _protocolHandler = protocolHandler;
-        _arguments = arguments;
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
         
         _synchronousQueue = new LinkedBlockingQueue();
         _autoClose = autoClose;
-        _noConsume = noConsume;
+        _browseOnly = browseOnly;
+
+        try
+        {
+            if (messageSelector == null || "".equals(messageSelector.trim()))
+            {
+                _messageSelectorFilter = null;
+            }
+            else
+            {
+                _messageSelectorFilter = new JMSSelectorFilter(messageSelector);
+            }
+        }
+        catch (final AMQInternalException ie)
+        {
+            InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue");
+            ise.setLinkedException(ie);
+            throw ise;
+        }
 
         // Force queue browsers not to use acknowledge modes.
-        if (_noConsume)
+        if (_browseOnly)
         {
             _acknowledgeMode = Session.NO_ACKNOWLEDGE;
         }
@@ -175,6 +196,21 @@ public abstract class BasicMessageConsum
         {
             _acknowledgeMode = acknowledgeMode;
         }
+
+        final FieldTable ft = FieldTableFactory.newFieldTable();
+        // rawSelector is used by HeadersExchange and is not a JMS Selector
+        if (rawSelector != null)
+        {
+            ft.addAll(rawSelector);
+        }
+
+        // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+        // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+        // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
+        // argument, as specifying null for the arguments when querying means they should not be checked at all
+        ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+
+        _arguments = ft;
     }
 
     public AMQDestination getDestination()
@@ -186,7 +222,7 @@ public abstract class BasicMessageConsum
     {
         checkPreConditions();
 
-        return _messageSelector;
+        return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
     }
 
     public MessageListener getMessageListener() throws JMSException
@@ -345,6 +381,11 @@ public abstract class BasicMessageConsum
         return _receiving.get();
     }
 
+    public MessageFilter getMessageSelectorFilter()
+    {
+        return _messageSelectorFilter;
+    }
+
     public Message receive() throws JMSException
     {
         return receive(0);
@@ -874,9 +915,9 @@ public abstract class BasicMessageConsum
         return _autoClose;
     }
 
-    public boolean isNoConsume()
+    public boolean isBrowseOnly()
     {
-        return _noConsume;
+        return _browseOnly;
     }
 
     public void rollback()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar  1 10:20:36 2012
@@ -20,22 +20,20 @@ package org.apache.qpid.client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.jms.Session;
 
-import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -51,11 +49,6 @@ public class BasicMessageConsumer_0_10 e
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     /**
-     * The message selector filter associated with this consumer message selector
-     */
-    private MessageFilter _filter = null;
-
-    /**
      * The underlying QpidSession
      */
     private AMQSession_0_10 _0_10session;
@@ -63,7 +56,7 @@ public class BasicMessageConsumer_0_10 e
     /**
      * Indicates whether this consumer receives pre-acquired messages
      */
-    private boolean _preAcquire = true;
+    private final boolean _preAcquire;
 
     /**
      * Specify whether this consumer is performing a sync receive
@@ -71,44 +64,27 @@ public class BasicMessageConsumer_0_10 e
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
     
-    private long capacity = 0;
+    private final long _capacity;
+
+    /** Flag indicating if the server supports message selectors */
+    protected final boolean _serverJmsSelectorSupport;
 
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
-                                        AMQSession session, AMQProtocolHandler protocolHandler,
-                                        FieldTable arguments, int prefetchHigh, int prefetchLow,
-                                        boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+                                        AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
+                                        FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+                                        boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
             throws JMSException
     {
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
-                arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+                rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
         _0_10session = (AMQSession_0_10) session;
-        if (messageSelector != null && !messageSelector.equals(""))
-        {
-            try
-            {
-                _filter = new JMSSelectorFilter(messageSelector);
-            }
-            catch (AMQInternalException e)
-            {
-                throw new InvalidSelectorException("cannot create consumer because of selector issue");
-            }
-            if (destination instanceof AMQQueue)
-            {
-                _preAcquire = false;
-            }
-        }
-        
-        // Destination setting overrides connection defaults
-        if (destination.getDestSyntax() == DestSyntax.ADDR && 
-                destination.getLink().getConsumerCapacity() > 0)
-        {
-            capacity = destination.getLink().getConsumerCapacity();
-        }
-        else if (getSession().prefetch())
-        {
-            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
-        }
+
+        _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+        _capacity = evaluateCapacity(destination);
+        _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+
 
         if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) 
         {            
@@ -122,7 +98,6 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
-
     @Override public void setConsumerTag(int consumerTag)
     {
         super.setConsumerTag(consumerTag);
@@ -148,15 +123,22 @@ public class BasicMessageConsumer_0_10 e
         {
             if (checkPreConditions(jmsMessage))
             {
-                if (isMessageListenerSet() && capacity == 0)
+                if (isMessageListenerSet() && _capacity == 0)
                 {
-                    _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                              MessageCreditUnit.MESSAGE, 1,
-                                                              Option.UNRELIABLE);
+                    messageFlow();
                 }
                 _logger.debug("messageOk, trying to notify");
                 super.notifyMessage(jmsMessage);
             }
+            else
+            {
+                // if we are synchronously waiting for a message
+                // and messages are not pre-fetched we then need to request another one
+                if(_capacity == 0)
+                {
+                   messageFlow();
+                }
+            }
         }
         catch (AMQException e)
         {
@@ -227,12 +209,11 @@ public class BasicMessageConsumer_0_10 e
     private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
     {
         boolean messageOk = true;
-        // TODO Use a tag for fiding out if message filtering is done here or by the broker.
         try
         {
-            if (_messageSelector != null && !_messageSelector.equals(""))
+            if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
             {
-                messageOk = _filter.matches(message);
+                messageOk = _messageSelectorFilter.matches(message);
             }
         }
         catch (Exception e)
@@ -245,6 +226,7 @@ public class BasicMessageConsumer_0_10 e
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
         }
+
         if (!messageOk)
         {
             if (_preAcquire)
@@ -261,23 +243,15 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Message not OK, releasing");
+                    _logger.debug("filterMessage - not ack'ing message as not acquired");
                 }
-                releaseMessage(message);
-            }
-            // if we are syncrhonously waiting for a message
-            // and messages are not prefetched we then need to request another one
-            if(capacity == 0)
-            {
-               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                         MessageCreditUnit.MESSAGE, 1,
-                                                         Option.UNRELIABLE);
+                flushUnwantedMessage(message);
             }
         }
-        // now we need to acquire this message if needed
-        // this is the case of queue with a message selector set
-        if (!_preAcquire && messageOk && !isNoConsume())
+        else if (!_preAcquire && !isBrowseOnly())
         {
+            // now we need to acquire this message if needed
+            // this is the case of queue with a message selector set
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("filterMessage - trying to acquire message");
@@ -285,6 +259,7 @@ public class BasicMessageConsumer_0_10 e
             messageOk = acquireMessage(message);
             _logger.debug("filterMessage - message acquire status : " + messageOk);
         }
+
         return messageOk;
     }
 
@@ -295,38 +270,38 @@ public class BasicMessageConsumer_0_10 e
      * @param message The message to be acknowledged
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+    private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
     {
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.messageAcknowledge
-                (ranges,
-                 _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.messageAcknowledge
+            (ranges,
+             _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
 
-            AMQException amqe = _0_10session.getCurrentException();
-            if (amqe != null)
-            {
-                throw amqe;
-            }
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
         }
     }
 
     /**
-     * Release a message
+     * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+     * processed to ensure their AMQP command-id is marked completed.
      *
-     * @param message The message to be released
-     * @throws AMQException If the message cannot be released due to some internal error.
+     * @param message The unwanted message to be flushed
+     * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
      */
-    private void releaseMessage(AbstractJMSMessage message) throws AMQException
+    private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
     {
-        if (_preAcquire)
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.flushProcessed(ranges,false);
+
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
         {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.getQpidSession().messageRelease(ranges);
-            _0_10session.sync();
+            throw amqe;
         }
     }
 
@@ -337,36 +312,37 @@ public class BasicMessageConsumer_0_10 e
      * @return true if the message has been acquired, false otherwise.
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+    private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
 
-            Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+        final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
 
-            RangeSet acquired = acq.getTransfers();
-            if (acquired != null && acquired.size() > 0)
-            {
-                result = true;
-            }
+        final RangeSet acquired = acq.getTransfers();
+        if (acquired != null && acquired.size() > 0)
+        {
+            result = true;
         }
         return result;
     }
 
+    private void messageFlow()
+    {
+        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                  MessageCreditUnit.MESSAGE, 1,
+                                                  Option.UNRELIABLE);
+    }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
         super.setMessageListener(messageListener);
         try
         {
-            if (messageListener != null && capacity == 0)
+            if (messageListener != null && _capacity == 0)
             {
-                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                          MessageCreditUnit.MESSAGE, 1,
-                                                          Option.UNRELIABLE);
+                messageFlow();
             }
             if (messageListener != null && !_synchronousQueue.isEmpty())
             {
@@ -389,9 +365,7 @@ public class BasicMessageConsumer_0_10 e
     {
         if (_0_10session.isStarted() && _syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow
-                (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
-                 Option.UNRELIABLE);
+            messageFlow();
         }
     }
 
@@ -406,15 +380,13 @@ public class BasicMessageConsumer_0_10 e
      */
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if (capacity == 0)
+        if (_capacity == 0)
         {
             _syncReceive.set(true);
         }
-        if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
+        if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1,
-                                                      Option.UNRELIABLE);
+            messageFlow();
         }
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())
@@ -427,18 +399,18 @@ public class BasicMessageConsumer_0_10 e
                 (getConsumerTagString(), MessageCreditUnit.BYTE,
                  0xFFFFFFFF, Option.UNRELIABLE);
             
-            if (capacity > 0)
+            if (_capacity > 0)
             {
                 _0_10session.getQpidSession().messageFlow
                                                (getConsumerTagString(),
                                                 MessageCreditUnit.MESSAGE,
-                                                capacity,
+                                                _capacity,
                                                 Option.UNRELIABLE);
             }
             _0_10session.syncDispatchQueue();
             o = super.getMessageFromQueue(-1);
         }
-        if (capacity == 0)
+        if (_capacity == 0)
         {
             _syncReceive.set(false);
         }
@@ -448,16 +420,26 @@ public class BasicMessageConsumer_0_10 e
     void postDeliver(AbstractJMSMessage msg)
     {
         super.postDeliver(msg);
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+
+        switch (_acknowledgeMode)
         {
-          _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+            case Session.SESSION_TRANSACTED:
+                _0_10session.sendTxCompletionsIfNecessary();
+                break;
+            case Session.NO_ACKNOWLEDGE:
+                if (!_session.isInRecovery())
+                {
+                  _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                break;
+            case Session.AUTO_ACKNOWLEDGE:
+                if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+                {
+                    ((AMQSession_0_10) getSession()).getQpidSession().sync();
+                }
+                break;
         }
         
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
-             !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
-        {
-            ((AMQSession_0_10) getSession()).getQpidSession().sync();
-        }
     }
 
     Message receiveBrowse() throws JMSException
@@ -526,4 +508,51 @@ public class BasicMessageConsumer_0_10 e
             }
         }
     }
+
+    long getCapacity()
+    {
+        return _capacity;
+    }
+
+    boolean isPreAcquire()
+    {
+        return _preAcquire;
+    }
+
+    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+    {
+        boolean preAcquire;
+        if (browseOnly)
+        {
+            preAcquire = false;
+        }
+        else
+        {
+            boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+            if (isQueue && getMessageSelectorFilter() != null)
+            {
+                preAcquire = false;
+            }
+            else
+            {
+                preAcquire = true;
+            }
+        }
+        return preAcquire;
+    }
+
+    private long evaluateCapacity(AMQDestination destination)
+    {
+        long capacity = 0;
+        if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+        {
+            capacity = destination.getLink().getConsumerCapacity();
+        }
+        else if (getSession().prefetch())
+        {
+            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+        }
+        return capacity;
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Mar  1 10:20:36 2012
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.client;
 
-import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,24 +38,23 @@ public class BasicMessageConsumer_0_8 ex
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
                                        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-                                       AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
-                                       boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+                                       AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+                                       boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
     {
         super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
-              protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
-              acknowledgeMode, noConsume, autoClose);
-        try
+              protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
+              acknowledgeMode, browseOnly, autoClose);
+        final FieldTable consumerArguments = getArguments();
+        if (isAutoClose())
         {
-            
-            if (messageSelector != null && messageSelector.length() > 0)
-            {
-                JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
-            }
+            consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
         }
-        catch (AMQInternalException e)
+
+        if (isBrowseOnly())
         {
-            throw new InvalidSelectorException("cannot create consumer because of selector issue");
+            consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
         }
+
     }
 
     void sendCancel() throws AMQException, FailoverException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Mar  1 10:20:36 2012
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 e
         }
         catch (Exception e)
         {
-            JMSException jmse = new JMSException("Exception when sending message");
+            JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
             jmse.setLinkedException(e);
             jmse.initCause(e);
             throw jmse;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Mar  1 10:20:36 2012
@@ -20,18 +20,14 @@
  */
 package org.apache.qpid.client.messaging.address;
 
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
 public class Link
 { 
     public enum FilterType { SQL92, XQUERY, SUBJECT }
     
-    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
     
     protected String name;
     protected String _filter;
@@ -42,7 +38,7 @@ public class Link
     protected int _producerCapacity = 0;
     protected Node node;
     protected Subscription subscription;
-    protected Reliability reliability = UNSPECIFIED;
+    protected Reliability reliability = Reliability.AT_LEAST_ONCE;
     
     public Reliability getReliability()
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar  1 10:20:36 2012
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolV
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -362,7 +363,15 @@ public class AMQProtocolSession implemen
 
     public void closeProtocolSession() throws AMQException
     {
-        _protocolHandler.closeConnection(0);
+        try
+        {
+            _protocolHandler.getNetworkConnection().close();
+        }
+        catch(TransportException e)
+        {
+            //ignore such exceptions, they were already logged
+            //and this is a forcible close.
+        }
     }
 
     public void failover(String host, int port)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Thu Mar  1 10:20:36 2012
@@ -26,20 +26,21 @@ import org.slf4j.LoggerFactory;
 
 public class JMSSelectorFilter implements MessageFilter
 {
-    /**
-     * this JMSSelectorFilter's logger
-     */
     private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class);
 
-    private String _selector;
-    private BooleanExpression _matcher;
+    private final String _selector;
+    private final BooleanExpression _matcher;
 
     public JMSSelectorFilter(String selector) throws AMQInternalException
     {
+        if (selector == null || "".equals(selector))
+        {
+            throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string");
+        }
         _selector = selector;
-        if (JMSSelectorFilter._logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+            _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
         }
         _matcher = new SelectorParser().parse(selector);
     }
@@ -49,16 +50,15 @@ public class JMSSelectorFilter implement
         try
         {
             boolean match = _matcher.matches(message);
-            if (JMSSelectorFilter._logger.isDebugEnabled())
+            if (_logger.isDebugEnabled())
             {
-                JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
-                        .identityHashCode(_selector) + "):" + _selector);
+                _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector);
             }
             return match;
         }
         catch (AMQInternalException e)
         {
-            JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message  " + message, e);
+            _logger.warn("Caught exception when evaluating message selector for message  " + message, e);
         }
         return false;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java Thu Mar  1 10:20:36 2012
@@ -17,11 +17,11 @@
  */
 package org.apache.qpid.filter;
 
-import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 
 
 public interface MessageFilter
 {
-    boolean matches(AbstractJMSMessage message) throws AMQInternalException;
+    boolean matches(AbstractJMSMessage message);
+    String getSelector();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Mar  1 10:20:36 2012
@@ -29,7 +29,6 @@ import javax.jms.MessageProducer;
 import junit.framework.TestCase;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.Connection.SessionFactory;
@@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
         }
         catch (Exception e)
@@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             session.start();
             consumer.receive(1);
             fail("JMSException should be thrown");
@@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             session.start();
             consumer.receive(1);
         }
@@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             session.start();
             consumer.receiveNoWait();
             fail("JMSException should be thrown");
@@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             consumer.setMessageListener(new MockMessageListener());
             fail("JMSException should be thrown");
         }
@@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             consumer.setMessageListener(new MockMessageListener());
         }
         catch (Exception e)
@@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             consumer.close();
         }
         catch (Exception e)
@@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends
         try
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
-                    null, new FieldTable(), false, true);
+                    null, null, false, true);
             consumer.close();
             fail("JMSException should be thrown");
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Thu Mar  1 10:20:36 2012
@@ -47,12 +47,17 @@ public class MessageConverterTest extend
     protected JMSTextMessage testTextMessage;
 
     protected JMSMapMessage testMapMessage;
-    private AMQSession _session = new TestAMQSession();
+    private AMQConnection _connection;
+    private AMQSession _session;
 
 
     protected void setUp() throws Exception
     {
         super.setUp();
+
+        _connection =  new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'");
+        _session = new TestAMQSession(_connection);
+
         testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
 
         //Set Message Text

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Mar  1 10:20:36 2012
@@ -29,22 +29,25 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.BasicMessageConsumer_0_8;
 import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.MessageFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
 public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
 
-    public TestAMQSession()
+    public TestAMQSession(AMQConnection connection)
     {
-        super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+        super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
     }
 
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
@@ -124,7 +127,7 @@ public class TestAMQSession extends AMQS
         return false;
     }
 
-    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
+    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException
     {
 
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml Thu Mar  1 10:20:36 2012
@@ -23,7 +23,7 @@
   <dirname property="project.root" file="${ant.file.common}"/>
 
   <property name="project.name"          value="qpid"/>
-  <property name="project.version"       value="0.13"/>
+  <property name="project.version"       value="0.15"/>
   <property name="project.url"           value="http://qpid.apache.org"/>
   <property name="project.groupid"       value="org.apache.qpid"/>
   <property name="project.namever"       value="${project.name}-${project.version}"/>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd Thu Mar  1 10:20:36 2012
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.13.0
+ver: 0.15.0
 
 Bundle-SymbolicName: qpid-common
 Bundle-Version: ${ver}

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Thu Mar  1 10:20:36 2012
@@ -832,9 +832,12 @@ public class FieldTable
     public void addAll(FieldTable fieldTable)
     {
         initMapIfNecessary();
-        _encodedForm = null;
-        _properties.putAll(fieldTable._properties);
-        recalculateEncodedSize();
+        if (fieldTable._properties != null)
+        {
+            _encodedForm = null;
+            _properties.putAll(fieldTable._properties);
+            recalculateEncodedSize();
+        }
     }
 
     public static Map<String, Object> convertToMap(final FieldTable fieldTable)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Mar  1 10:20:36 2012
@@ -526,10 +526,6 @@ public class Connection extends Connecti
     {
         synchronized (lock)
         {
-            for (Session ssn : channels.values())
-            {
-                ssn.closeCode(close);
-            }
             ConnectionCloseCode code = close.getReplyCode();
             if (code != ConnectionCloseCode.NORMAL)
             {
@@ -701,8 +697,17 @@ public class Connection extends Connecti
         return channels.values();
     }
 
-    public boolean hasSessionWithName(final String name)
+    public boolean hasSessionWithName(final byte[] name)
     {
-        return sessions.containsKey(new Binary(name.getBytes()));
+        return sessions.containsKey(new Binary(name));
+    }
+
+    public void notifyFailoverRequired()
+    {
+        List<Session> values = new ArrayList<Session>(channels.values());
+        for (Session ssn : values)
+        {
+            ssn.notifyFailoverRequired();
+        }
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Mar  1 10:20:36 2012
@@ -195,10 +195,17 @@ public class ServerDelegate extends Conn
     @Override
     public void sessionAttach(Connection conn, SessionAttach atc)
     {
+        sessionAttachImpl(conn, atc);
+    }
+
+    protected Session sessionAttachImpl(Connection conn, SessionAttach atc)
+    {
         Session ssn = getSession(conn, atc);
         conn.map(ssn, atc.getChannel());
         ssn.sessionAttached(atc.getName());
         ssn.setState(Session.State.OPEN);
+
+        return ssn;
     }
 
     protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org