You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 14:52:10 UTC

svn commit: r821779 [11/11] - in /qpid/branches/java-broker-0-10/qpid: ./ cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/examples/messaging/ cpp/include/qmf/ cpp/include/q...

Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Mon Oct  5 12:51:57 2009
@@ -21,6 +21,7 @@
 import junit.framework.TestResult;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.client.transport.TransportConnection;
@@ -30,6 +31,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.store.DerbyMessageStore;
+import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +75,7 @@
     protected long RECEIVE_TIMEOUT = 1000l;
 
     private Map<String, String> _setProperties = new HashMap<String, String>();
+    private XMLConfiguration _testConfiguration = new XMLConfiguration();
 
     /**
      * Some tests are excluded when the property test.excludes is set to true.
@@ -183,8 +186,7 @@
     public static final String QUEUE = "queue";
     public static final String TOPIC = "topic";
     /** Map to hold test defined environment properties */
-    private Map<String,String> _env;
-
+    private Map<String, String> _env;
 
     public QpidTestCase(String name)
     {
@@ -335,7 +337,7 @@
                         latch.countDown();
                     }
 
-                    if (latch != null && line.contains(stopped))
+                    if (!seenReady && line.contains(stopped))
                     {
                         stopLine = line;
                     }
@@ -368,7 +370,9 @@
 
     /**
      * Return the management portin use by the broker on this main port
+     *
      * @param mainPort the broker's main port.
+     *
      * @return the management port that corresponds to the broker on the given port
      */
     protected int getManagementPort(int mainPort)
@@ -415,9 +419,14 @@
     {
         port = getPort(port);
 
+        // Save any configuratio changes that have been made
+        saveTestConfiguration();
+
         Process process = null;
         if (_broker.equals(VM))
         {
+            setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port)));
+            saveTestConfiguration();
             // create an in_VM broker
             ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(_configFile), port);
             TransportConnection.createVMBroker(port);
@@ -438,15 +447,35 @@
             env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
 
             //Add the test name to the broker run.
-            env.put("QPID_PNAME", "-DPNAME=\"" + _testName + "\"");
+            // DON'T change PNAME, qpid.stop needs this value.
+            env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
             env.put("QPID_WORK", System.getProperty("QPID_WORK"));
 
             // Add all the environment settings the test requested
             if (!_env.isEmpty())
             {
-                for(Map.Entry<String,String> entry : _env.entrySet())
+                for (Map.Entry<String, String> entry : _env.entrySet())
+                {
+                    env.put(entry.getKey(), entry.getValue());
+                }
+            }
+
+            String QPID_OPTS = " ";
+            // Add all the specified system properties to QPID_OPTS
+            if (!_setProperties.isEmpty())
+            {
+                for (String key : _setProperties.keySet())
+                {
+                    QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " ";
+                }
+
+                if (env.containsKey("QPID_OPTS"))
                 {
-                    env.put(entry.getKey() ,entry.getValue());
+                    env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS);
+                }
+                else
+                {
+                    env.put("QPID_OPTS", QPID_OPTS);
                 }
             }
 
@@ -484,6 +513,27 @@
         _brokers.put(port, process);
     }
 
+    public String getTestConfigFile()
+    {
+        String path = _output == null ? System.getProperty("java.io.tmpdir") : _output;
+        return path + "/" + getTestQueueName() + ".xml";
+    }
+
+    protected void saveTestConfiguration() throws ConfigurationException
+    {
+        String testConfig = getTestConfigFile();
+        //Specifiy the test configuration
+        setSystemProperty("test.config", testConfig);
+
+        // This is a work
+        if (_testConfiguration.isEmpty())
+        {
+            _testConfiguration.addProperty("test", getTestQueueName());
+        }
+
+        _testConfiguration.save(getTestConfigFile());
+    }
+
     public void cleanBroker()
     {
         if (_brokerClean != null)
@@ -565,18 +615,12 @@
             storeClass = bdb;
         }
 
-        // First we munge the config file and, if we're in a VM, set up an additional logfile
-        XMLConfiguration configuration = new XMLConfiguration(_configFile);
-        configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+
+        _testConfiguration.setProperty("virtualhosts.virtualhost." + virtualhost +
                                   ".store.class", storeClass.getName());
-        configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+        _testConfiguration.setProperty("virtualhosts.virtualhost." + virtualhost +
                                   ".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
-                                  "${work}/" + virtualhost);
-
-        File tmpFile = File.createTempFile("configFile", "test");
-        tmpFile.deleteOnExit();
-        configuration.save(tmpFile);
-        _configFile = tmpFile;
+                                  "${QPID_WORK}/" + virtualhost);
     }
 
     /**
@@ -591,6 +635,10 @@
      */
     protected String getConfigurationStringProperty(String property) throws ConfigurationException
     {
+        // Call save Configuration to be sure we have saved the test specific
+        // file. As the optional status
+        saveTestConfiguration();
+
         ServerConfiguration configuration = new ServerConfiguration(_configFile);
         return configuration.getConfig().getString(property);
     }
@@ -613,48 +661,9 @@
     protected void setConfigurationProperty(String property, String value)
             throws ConfigurationException, IOException
     {
-        XMLConfiguration configuration = new XMLConfiguration(_configFile);
-
-        // If we are modifying a virtualhost value then we need to do so in
-        // the virtualhost.xml file as these values overwrite the values in
-        // the main config.xml file
-        if (property.startsWith("virtualhosts"))
-        {
-            // So locate the virtualhost.xml file and use the ServerConfiguration
-            // flatConfig method to get the interpolated value.
-            String vhostConfigFile = ServerConfiguration.
-                    flatConfig(_configFile).getString("virtualhosts");
-
-            // Load the vhostConfigFile
-            XMLConfiguration vhostConfiguration = new XMLConfiguration(vhostConfigFile);
-
-            // Set the value specified in to the vhostConfig.
-            // Remembering that property will be 'virtualhosts.virtulhost....'
-            // so we need to take off the 'virtualhosts.' from the start.
-            vhostConfiguration.setProperty(property.substring(property.indexOf(".") + 1), value);
-
-            // Write out the new virtualhost config file
-            File tmpFile = File.createTempFile("virtualhost-configFile", ".xml");
-            tmpFile.deleteOnExit();
-            vhostConfiguration.save(tmpFile);
-
-            // Change the property and value to be the new virtualhosts file
-            // so that then update the value in the main config file.
-            property = "virtualhosts";
-            value = tmpFile.getAbsolutePath();
-        }
-
-        configuration.setProperty(property, value);
-
-        // Write the new server config file
-        File tmpFile = File.createTempFile("configFile", ".xml");
-        tmpFile.deleteOnExit();
-        configuration.save(tmpFile);
-
-        _logger.info("Qpid Test Case now using configuration File:"
-                     + tmpFile.getAbsolutePath());
-
-        _configFile = tmpFile;
+        //Write the value in to this configuration file which will override the
+        // defaults.
+        _testConfiguration.setProperty(property, value);
     }
 
     /**
@@ -695,14 +704,13 @@
      * Add an environtmen variable for the external broker environment
      *
      * @param property the property to set
-     * @param value the value to set it to
+     * @param value    the value to set it to
      */
     protected void setBrokerEnvironment(String property, String value)
     {
-        _env.put(property,value);
+        _env.put(property, value);
     }
 
-
     /**
      * Check whether the broker is an 0.8
      *
@@ -720,7 +728,7 @@
 
     protected boolean isJavaBroker()
     {
-        return _brokerLanguage.equals("java");
+        return _brokerLanguage.equals("java") || _broker.equals("vm");
     }
 
     protected boolean isCppBroker()
@@ -831,7 +839,7 @@
      *
      * @throws Exception if there is an error getting the connection
      */
-    public Connection getConnection(String username, String password) throws Exception
+    public Connection getConnection(String username, String password) throws JMSException, NamingException
     {
         _logger.info("get Connection");
         Connection con = getConnectionFactory().createConnection(username, password);
@@ -840,7 +848,7 @@
         return con;
     }
 
-    public Connection getConnection(String username, String password, String id) throws Exception
+    public Connection getConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
     {
         _logger.info("get Connection");
         Connection con;
@@ -860,6 +868,7 @@
     /**
      * Return a uniqueName for this test.
      * In this case it returns a queue Named by the TestCase and TestName
+     *
      * @return String name for a queue
      */
     protected String getTestQueueName()

Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java Mon Oct  5 12:51:57 2009
@@ -158,18 +158,6 @@
         validateLogDoesNotContainsMessage(_monitor, notLogged);
     }
 
-    public void testWaitForMessage_Found() throws IOException
-    {
-        String message = getName() + ": Test Message";
-
-        long TIME_OUT = 2000;
-
-        logMessageWithDelay(message, TIME_OUT / 2);
-
-        assertTrue("Message was not logged ",
-                    _monitor.waitForMessage(message, TIME_OUT));
-    }
-
     public void testWaitForMessage_Timeout() throws IOException
     {
         String message = getName() + ": Test Message";

Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes Mon Oct  5 12:51:57 2009
@@ -83,3 +83,12 @@
 
 // CPP Broker does not have a JMX interface to test
 org.apache.qpid.management.jmx.*
+// JMX is used in this test for validation
+org.apache.qpid.server.queue.ModelTest#*
+
+
+// 0-10 is not supported by the MethodRegistry
+org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
+
+// QPID-2084 : this test needs more work for 0-10
+org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/010Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/010Excludes:799241-816580

Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes Mon Oct  5 12:51:57 2009
@@ -15,3 +15,8 @@
 org.apache.qpid.client.SessionCreateTest#*
 
 org.apache.qpid.test.client.RollbackOrderTest#*
+
+// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
+org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
+org.apache.qpid.server.queue.ModelTest#*
+

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08Excludes:799241-816580

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:799241-816580

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:799241-816580

Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes Mon Oct  5 12:51:57 2009
@@ -13,5 +13,9 @@
 org.apache.qpid.server.logging.BrokerLoggingTest#testBrokerShutdownStopped
 org.apache.qpid.server.logging.VirtualHostLoggingTest#testVirtualhostClosure
 org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#testMessageStoreClose
-org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#testMessageStoreClose
 
+// QPID-XXX : Test fails to start external broker due to Derby Exception.
+org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
+
+// QPID-2081 :The configuration changes are now highlighting the close race condition
+org.apache.qpid.server.security.acl.SimpleACLTest#*

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/Excludes:799241-816580

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/XAExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:799241-816580

Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:799241-816580

Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile Mon Oct  5 12:51:57 2009
@@ -8,6 +8,7 @@
 broker.module.ssl=${module.dir}/ssl.so
 broker.module.cluster=${module.dir}/cluster.so
 broker.module.store=${store.module.dir}/msgstore.so
+broker.stopped=Exception constructed
 
 broker.modules=
 broker.args=

Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile Mon Oct  5 12:51:57 2009
@@ -18,9 +18,10 @@
 
 test.port=15672
 test.mport=18999
+#Note : Management will start open second port on: mport + 100 : 19099 
 test.port.ssl=15671
-test.port.alt=15772
-test.port.alt.ssl=15771
+test.port.alt=25672
+test.port.alt.ssl=25671
 
 test.exclude=true
 profile.excludes=08TransientExcludes

Modified: qpid/branches/java-broker-0-10/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/Makefile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/Makefile (original)
+++ qpid/branches/java-broker-0-10/qpid/python/Makefile Mon Oct  5 12:51:57 2009
@@ -46,6 +46,12 @@
 
 build: $(TARGETS)
 
+.PHONY: doc
+
+doc:
+	@mkdir -p $(BUILD)
+	epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log
+
 install: build
 	install -d $(PYTHON_LIB)
 

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py Mon Oct  5 12:51:57 2009
@@ -234,6 +234,24 @@
   def add(self, lower, upper = None):
     self.add_range(Range(lower, upper))
 
+  def empty(self):
+    for r in self.ranges:
+      if r.lower <= r.upper:
+        return False
+    return True
+
+  def max(self):
+    if self.ranges:
+      return self.ranges[-1].upper
+    else:
+      return None
+
+  def min(self):
+    if self.ranges:
+      return self.ranges[0].lower
+    else:
+      return None
+
   def __iter__(self):
     return iter(self.ranges)
 

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py Mon Oct  5 12:51:57 2009
@@ -139,12 +139,18 @@
 
 class Client(Delegate):
 
+  ppid = 0
+  try:
+    ppid = os.getppid()
+  except:
+    pass
+
   PROPERTIES = {"product": "qpid python client",
                 "version": "development",
                 "platform": os.name,
                 "qpid.client_process": os.path.basename(sys.argv[0]),
                 "qpid.client_pid": os.getpid(),
-                "qpid.client_ppid": os.getppid()}
+                "qpid.client_ppid": ppid}
 
   def __init__(self, connection, username="guest", password="guest",
                mechanism="PLAIN", heartbeat=None):

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py Mon Oct  5 12:51:57 2009
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,64 +30,18 @@
   - protocol negotiation/multiprotocol impl
 """
 
-import connection, time, socket, sys, compat
 from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial
-from exceptions import Timeout
+from concurrency import synchronized, Waiter
+from datatypes import timestamp, uuid4, Serial
 from logging import getLogger
-from ops import PRIMITIVE, delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import PRIMITIVE
 from threading import Thread, RLock, Condition
-from util import connect
+from util import default
 
 log = getLogger("qpid.messaging")
 
 static = staticmethod
 
-def synchronized(meth):
-  def sync_wrapper(self, *args, **kwargs):
-    self.lock()
-    try:
-      return meth(self, *args, **kwargs)
-    finally:
-      self.unlock()
-  return sync_wrapper
-
-class Lockable(object):
-
-  def lock(self):
-    self._lock.acquire()
-
-  def unlock(self):
-    self._lock.release()
-
-  def wait(self, predicate, timeout=None):
-    passed = 0
-    start = time.time()
-    while not predicate():
-      if timeout is None:
-        # using the timed wait prevents keyboard interrupts from being
-        # blocked while waiting
-        self._condition.wait(3)
-      elif passed < timeout:
-        self._condition.wait(timeout - passed)
-      else:
-        return False
-      passed = time.time() - start
-    return True
-
-  def notify(self):
-    self._condition.notify()
-
-  def notifyAll(self):
-    self._condition.notifyAll()
-
-def default(value, default):
-  if value is None:
-    return default
-  else:
-    return value
-
 AMQP_PORT = 5672
 AMQPS_PORT = 5671
 
@@ -103,12 +57,19 @@
 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
 
 class ConnectionError(Exception):
+  """
+  The base class for all connection related exceptions.
+  """
   pass
 
 class ConnectError(ConnectionError):
+  """
+  Exception raised when there is an error connecting to the remote
+  peer.
+  """
   pass
 
-class Connection(Lockable):
+class Connection:
 
   """
   A Connection manages a group of L{Sessions<Session>} and connects
@@ -153,27 +114,27 @@
     self._connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
+    self._waiter = Waiter(self._condition)
     self._modcount = Serial(0)
     self.error = None
+    from driver import Driver
     self._driver = Driver(self)
     self._driver.start()
 
-  def wakeup(self):
+  def _wait(self, predicate, timeout=None):
+    return self._waiter.wait(predicate, timeout=timeout)
+
+  def _wakeup(self):
     self._modcount += 1
     self._driver.wakeup()
 
-  def catchup(self, exc=ConnectionError):
-    mc = self._modcount
-    self.wait(lambda: not self._driver._modcount < mc)
-    self.check_error(exc)
-
-  def check_error(self, exc=ConnectionError):
+  def _check_error(self, exc=ConnectionError):
     if self.error:
       raise exc(*self.error)
 
-  def ewait(self, predicate, timeout=None, exc=ConnectionError):
-    result = self.wait(lambda: self.error or predicate(), timeout)
-    self.check_error(exc)
+  def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+    result = self._wait(lambda: self.error or predicate(), timeout)
+    self._check_error(exc)
     return result
 
   @synchronized
@@ -200,7 +161,7 @@
     else:
       ssn = Session(self, name, self.started, transactional=transactional)
       self.sessions[name] = ssn
-      self.wakeup()
+      self._wakeup()
       return ssn
 
   @synchronized
@@ -213,8 +174,8 @@
     Connect to the remote endpoint.
     """
     self._connected = True
-    self.wakeup()
-    self.ewait(lambda: self._driver._connected, exc=ConnectError)
+    self._wakeup()
+    self._ewait(lambda: self._driver._connected, exc=ConnectError)
 
   @synchronized
   def disconnect(self):
@@ -222,8 +183,8 @@
     Disconnect from the remote endpoint.
     """
     self._connected = False
-    self.wakeup()
-    self.ewait(lambda: not self._driver._connected)
+    self._wakeup()
+    self._ewait(lambda: not self._driver._connected)
 
   @synchronized
   def connected(self):
@@ -273,17 +234,6 @@
     ssn.exchange_bind(exchange=exchange, queue=queue,
                       binding_key=self.value.replace("*", "#"))
 
-FILTER_DEFAULTS = {
-  "topic": Pattern("*")
-  }
-
-def delegate(handler, session):
-  class Delegate(Client):
-
-    def message_transfer(self, cmd):
-      handler._message_transfer(session, cmd)
-  return Delegate
-
 class SessionError(Exception):
   pass
 
@@ -304,7 +254,7 @@
 class TransactionAborted(SessionError):
   pass
 
-class Session(Lockable):
+class Session:
 
   """
   Sessions provide a linear context for sending and receiving
@@ -329,12 +279,14 @@
     self.incoming = []
     self.unacked = []
     self.acked = []
+    # XXX: I hate this name.
+    self.ack_capacity = UNLIMITED
 
     self.closing = False
     self.closed = False
 
     self._lock = connection._lock
-    self._condition = connection._condition
+    self.running = True
     self.thread = Thread(target = self.run)
     self.thread.setDaemon(True)
     self.thread.start()
@@ -342,17 +294,17 @@
   def __repr__(self):
     return "<Session %s>" % self.name
 
-  def wakeup(self):
-    self.connection.wakeup()
+  def _wait(self, predicate, timeout=None):
+    return self.connection._wait(predicate, timeout=timeout)
 
-  def catchup(self, exc=SessionError):
-    self.connection.catchup(exc)
+  def _wakeup(self):
+    self.connection._wakeup()
 
-  def check_error(self, exc=SessionError):
-    self.connection.check_error(exc)
+  def _check_error(self, exc=SessionError):
+    self.connection._check_error(exc)
 
-  def ewait(self, predicate, timeout=None, exc=SessionError):
-    return self.connection.ewait(predicate, timeout, exc)
+  def _ewait(self, predicate, timeout=None, exc=SessionError):
+    return self.connection._ewait(predicate, timeout, exc)
 
   @synchronized
   def sender(self, target):
@@ -367,7 +319,7 @@
     """
     sender = Sender(self, len(self.senders), target)
     self.senders.append(sender)
-    self.wakeup()
+    self._wakeup()
     # XXX: because of the lack of waiting here we can end up getting
     # into the driver loop with messages sent for senders that haven't
     # been linked yet, something similar can probably happen for
@@ -388,7 +340,7 @@
     receiver = Receiver(self, len(self.receivers), source, filter,
                         self.started)
     self.receivers.append(receiver)
-    self.wakeup()
+    self._wakeup()
     return receiver
 
   @synchronized
@@ -416,8 +368,8 @@
 
   @synchronized
   def _get(self, predicate, timeout=None):
-    if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
-                 timeout):
+    if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+                  timeout):
       msg = self._pop(predicate)
       if msg is not None:
         msg._receiver.returned += 1
@@ -427,13 +379,15 @@
     return None
 
   @synchronized
-  def acknowledge(self, message=None):
+  def acknowledge(self, message=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
     unacknowledged messages on the session are acknowledged.
 
     @type message: Message
     @param message: the message to acknowledge or None
+    @type sync: boolean
+    @param sync: if true then block until the message(s) are acknowledged
     """
     if message is None:
       messages = self.unacked[:]
@@ -441,12 +395,18 @@
       messages = [message]
 
     for m in messages:
+      if self.ack_capacity is not UNLIMITED:
+        if self.ack_capacity <= 0:
+          # XXX: this is currently a SendError, maybe it should be a SessionError?
+          raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+        self._wakeup()
+        self._ewait(lambda: len(self.acked) < self.ack_capacity)
       self.unacked.remove(m)
       self.acked.append(m)
 
-    self.wakeup()
-    self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
-    self.check_error()
+    self._wakeup()
+    if sync:
+      self._ewait(lambda: not [m for m in messages if m in self.acked])
 
   @synchronized
   def commit(self):
@@ -457,8 +417,8 @@
     if not self.transactional:
       raise NontransactionalSession()
     self.committing = True
-    self.wakeup()
-    self.ewait(lambda: not self.committing)
+    self._wakeup()
+    self._ewait(lambda: not self.committing)
     if self.aborted:
       raise TransactionAborted()
     assert self.committed
@@ -472,8 +432,8 @@
     if not self.transactional:
       raise NontransactionalSession()
     self.aborting = True
-    self.wakeup()
-    self.ewait(lambda: not self.aborting)
+    self._wakeup()
+    self._ewait(lambda: not self.aborting)
     assert self.aborted
 
   @synchronized
@@ -493,7 +453,7 @@
     for rcv in self.receivers:
       rcv.stop()
     # TODO: think about stopping individual receivers in listen mode
-    self.wait(lambda: self._peek(self._pred) is None)
+    self._wait(lambda: self._peek(self._pred) is None)
     self.started = False
 
   def _pred(self, m):
@@ -501,6 +461,7 @@
 
   @synchronized
   def run(self):
+    self.running = True
     try:
       while True:
         msg = self._get(self._pred)
@@ -509,10 +470,10 @@
         else:
           msg._receiver.listener(msg)
           if self._peek(self._pred) is None:
-            self.notifyAll()
+            self.connection._waiter.notifyAll()
     finally:
-      self.closed = True
-      self.notifyAll()
+      self.running = False
+      self.connection._waiter.notifyAll()
 
   @synchronized
   def close(self):
@@ -523,33 +484,22 @@
       link.close()
 
     self.closing = True
-    self.wakeup()
-    self.catchup()
-    self.wait(lambda: self.closed)
+    self._wakeup()
+    self._ewait(lambda: self.closed and not self.running)
     while self.thread.isAlive():
       self.thread.join(3)
     self.thread = None
+    # XXX: should be able to express this condition through API calls
+    self._ewait(lambda: not self.outgoing and not self.acked)
     self.connection._remove_session(self)
 
-def parse_addr(address):
-  parts = address.split("/", 1)
-  if len(parts) == 1:
-    return parts[0], None
-  else:
-    return parts[0], parts[i1]
-
-def reply_to2addr(reply_to):
-  if reply_to.routing_key is None:
-    return reply_to.exchange
-  elif reply_to.exchange in (None, ""):
-    return reply_to.routing_key
-  else:
-    return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-
 class SendError(SessionError):
   pass
 
-class Sender(Lockable):
+class InsufficientCapacity(SendError):
+  pass
+
+class Sender:
 
   """
   Sends outgoing messages.
@@ -559,32 +509,50 @@
     self.session = session
     self.index = index
     self.target = target
+    self.capacity = UNLIMITED
+    self.queued = Serial(0)
+    self.acked = Serial(0)
     self.closed = False
     self._lock = self.session._lock
-    self._condition = self.session._condition
 
-  def wakeup(self):
-    self.session.wakeup()
+  def _wakeup(self):
+    self.session._wakeup()
 
-  def catchup(self, exc=SendError):
-    self.session.catchup(exc)
+  def _check_error(self, exc=SendError):
+    self.session._check_error(exc)
 
-  def check_error(self, exc=SendError):
-    self.session.check_error(exc)
+  def _ewait(self, predicate, timeout=None, exc=SendError):
+    return self.session._ewait(predicate, timeout, exc)
 
-  def ewait(self, predicate, timeout=None, exc=SendError):
-    return self.session.ewait(predicate, timeout, exc)
+  @synchronized
+  def pending(self):
+    """
+    Returns the number of messages awaiting acknowledgment.
+    @rtype: int
+    @return: the number of unacknowledged messages
+    """
+    return self.queued - self.acked
 
   @synchronized
-  def send(self, object):
+  def send(self, object, sync=True, timeout=None):
     """
     Send a message. If the object passed in is of type L{unicode},
     L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
     L{Message} and sent. If it is of type L{Message}, it will be sent
-    directly.
+    directly. If the sender capacity is not L{UNLIMITED} then send
+    will block until there is available capacity to send the message.
+    If the timeout parameter is specified, then send will throw an
+    L{InsufficientCapacity} exception if capacity does not become
+    available within the specified time.
 
     @type object: unicode, str, list, dict, Message
     @param object: the message or content to send
+
+    @type sync: boolean
+    @param sync: if true then block until the message is sent
+
+    @type timeout: float
+    @param timeout: the time to wait for available capacity
     """
 
     if not self.session.connection._connected or self.session.closing:
@@ -595,12 +563,23 @@
     else:
       message = Message(object)
 
+    if self.capacity is not UNLIMITED:
+      if self.capacity <= 0:
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
+      if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
+
     # XXX: what if we send the same message to multiple senders?
     message._sender = self
     self.session.outgoing.append(message)
+    self.queued += 1
+    mno = self.queued
 
-    self.wakeup()
-    self.ewait(lambda: message not in self.session.outgoing)
+    self._wakeup()
+
+    if sync:
+      self._ewait(lambda: self.acked >= mno)
+      assert message not in self.session.outgoing
 
   @synchronized
   def close(self):
@@ -622,7 +601,7 @@
   """
   pass
 
-class Receiver(Lockable):
+class Receiver:
 
   """
   Receives incoming messages from a remote source. Messages may be
@@ -649,22 +628,25 @@
     self.closed = False
     self.listener = None
     self._lock = self.session._lock
-    self._condition = self.session._condition
-
-  def wakeup(self):
-    self.session.wakeup()
 
-  def catchup(self, exc=ReceiveError):
-    self.session.catchup()
+  def _wakeup(self):
+    self.session._wakeup()
 
-  def check_error(self, exc=ReceiveError):
-    self.session.check_error(exc)
+  def _check_error(self, exc=ReceiveError):
+    self.session._check_error(exc)
 
-  def ewait(self, predicate, timeout=None, exc=ReceiveError):
-    return self.session.ewait(predicate, timeout, exc)
+  def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+    return self.session._ewait(predicate, timeout, exc)
 
   @synchronized
   def pending(self):
+    """
+    Returns the number of messages available to be fetched by the
+    application.
+
+    @rtype: int
+    @return: the number of available messages
+    """
     return self.received - self.returned
 
   def _capacity(self):
@@ -700,23 +682,23 @@
     """
     if self._capacity() == 0:
       self.granted = self.returned + 1
-      self.wakeup()
-    self.ewait(lambda: self.impending == self.granted)
+      self._wakeup()
+    self._ewait(lambda: self.impending >= self.granted)
     msg = self.session._get(self._pred, timeout=timeout)
     if msg is None:
       self.drain = True
       self.granted = self.received
-      self.wakeup()
-      self.ewait(lambda: self.impending == self.received)
+      self._wakeup()
+      self._ewait(lambda: self.impending == self.received)
       self.drain = False
       self._grant()
-      self.wakeup()
+      self._wakeup()
       msg = self.session._get(self._pred, timeout=0)
       if msg is None:
         raise Empty()
     elif self._capacity() not in (0, UNLIMITED.value):
       self.granted += 1
-      self.wakeup()
+      self._wakeup()
     return msg
 
   def _grant(self):
@@ -736,7 +718,7 @@
     """
     self.started = True
     self._grant()
-    self.wakeup()
+    self._wakeup()
 
   @synchronized
   def stop(self):
@@ -745,8 +727,8 @@
     """
     self.started = False
     self._grant()
-    self.wakeup()
-    self.ewait(lambda: self.impending == self.received)
+    self._wakeup()
+    self._ewait(lambda: self.impending == self.received)
 
   @synchronized
   def close(self):
@@ -754,9 +736,9 @@
     Close the receiver.
     """
     self.closing = True
-    self.wakeup()
+    self._wakeup()
     try:
-      self.ewait(lambda: self.closed)
+      self._ewait(lambda: self.closed)
     finally:
       self.session.receivers.remove(self)
 
@@ -843,391 +825,7 @@
   def __repr__(self):
     return "Message(%r)" % self.content
 
-class Attachment:
-
-  def __init__(self, target):
-    self.target = target
-
-DURABLE_DEFAULT=True
-
-class Driver(Lockable):
-
-  def __init__(self, connection):
-    self.connection = connection
-    self._lock = self.connection._lock
-    self._condition = self.connection._condition
-    self._wakeup_cond = Condition()
-    self._socket = None
-    self._conn = None
-    self._connected = False
-    self._attachments = {}
-    self._modcount = self.connection._modcount
-    self.thread = Thread(target=self.run)
-    self.thread.setDaemon(True)
-    # XXX: need to figure out how to join on this thread
-
-  def start(self):
-    self.thread.start()
-
-  def wakeup(self):
-    self._wakeup_cond.acquire()
-    try:
-      self._wakeup_cond.notifyAll()
-    finally:
-      self._wakeup_cond.release()
-
-  def start(self):
-    self.thread.start()
-
-  def run(self):
-    while True:
-      self._wakeup_cond.acquire()
-      try:
-        if self.connection._modcount <= self._modcount:
-          self._wakeup_cond.wait(10)
-      finally:
-        self._wakeup_cond.release()
-      self.dispatch(self.connection._modcount)
-
-  @synchronized
-  def dispatch(self, modcount):
-    try:
-      if self._conn is None and self.connection._connected:
-        self.connect()
-      elif self._conn is not None and not self.connection._connected:
-        self.disconnect()
-
-      if self._conn is not None:
-        for ssn in self.connection.sessions.values():
-          self.attach(ssn)
-          self.process(ssn)
-
-      exi = None
-    except:
-      exi = sys.exc_info()
-
-    if exi:
-      msg = compat.format_exc()
-      recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
-                     "Bad file descriptor", "start timed out", "Broken pipe"]
-      for r in recoverable:
-        if self.connection.reconnect and r in msg:
-          print "waiting to retry"
-          self.reset()
-          time.sleep(3)
-          print "retrying..."
-          return
-      else:
-        self.connection.error = (msg,)
-
-    self._modcount = modcount
-    self.notifyAll()
-
-  def connect(self):
-    if self._conn is not None:
-      return
-    try:
-      self._socket = connect(self.connection.host, self.connection.port)
-    except socket.error, e:
-      raise ConnectError(e)
-    self._conn = connection.Connection(self._socket)
-    try:
-      self._conn.start(timeout=10)
-      self._connected = True
-    except connection.VersionError, e:
-      raise ConnectError(e)
-    except Timeout:
-      print "start timed out"
-      raise ConnectError("start timed out")
-
-  def disconnect(self):
-    self._conn.close()
-    self.reset()
-
-  def reset(self):
-    self._conn = None
-    self._connected = False
-    self._attachments.clear()
-    for ssn in self.connection.sessions.values():
-      for m in ssn.acked + ssn.unacked + ssn.incoming:
-        m._transfer_id = None
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-
-  def connected(self):
-    return self._conn is not None
-
-  def attach(self, ssn):
-    _ssn = self._attachments.get(ssn)
-    if _ssn is None:
-      _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
-      _ssn.auto_sync = False
-      _ssn.invoke_lock = self._lock
-      _ssn.lock = self._lock
-      _ssn.condition = self._condition
-      if ssn.transactional:
-        # XXX: adding an attribute to qpid.session.Session
-        _ssn.acked = []
-        _ssn.tx_select()
-      self._attachments[ssn] = _ssn
-
-    for snd in ssn.senders:
-      self.link_out(snd)
-    for rcv in ssn.receivers:
-      self.link_in(rcv)
-
-    if ssn.closing:
-      _ssn.close()
-      del self._attachments[ssn]
-
-  def _exchange_query(self, ssn, address):
-    # XXX: auto sync hack is to avoid deadlock on future
-    result = ssn.exchange_query(name=address, sync=True)
-    ssn.sync()
-    return result.get()
-
-  def link_out(self, snd):
-    _ssn = self._attachments[snd.session]
-    _snd = self._attachments.get(snd)
-    if _snd is None:
-      _snd = Attachment(snd)
-      node, _snd._subject = parse_addr(snd.target)
-      result = self._exchange_query(_ssn, node)
-      if result.not_found:
-        # XXX: should check 'create' option
-        _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
-        _ssn.sync()
-        _snd._exchange = ""
-        _snd._routing_key = node
-      else:
-        _snd._exchange = node
-        _snd._routing_key = _snd._subject
-      self._attachments[snd] = _snd
-
-    if snd.closed:
-      del self._attachments[snd]
-      return None
-    else:
-      return _snd
-
-  def link_in(self, rcv):
-    _ssn = self._attachments[rcv.session]
-    _rcv = self._attachments.get(rcv)
-    if _rcv is None:
-      _rcv = Attachment(rcv)
-      result = self._exchange_query(_ssn, rcv.source)
-      if result.not_found:
-        _rcv._queue = rcv.source
-        # XXX: should check 'create' option
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
-      else:
-        _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
-        if rcv.filter is None:
-          f = FILTER_DEFAULTS[result.type]
-        else:
-          f = rcv.filter
-        f._bind(_ssn, rcv.source, _rcv._queue)
-      _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
-      _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
-      self._attachments[rcv] = _rcv
-      # XXX: need to kill syncs
-      _ssn.sync()
-
-    if rcv.closing:
-      _ssn.message_cancel(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del self._attachments[rcv]
-      rcv.closed = True
-      return None
-    else:
-      return _rcv
-
-  def process(self, ssn):
-    if ssn.closing: return
-
-    _ssn = self._attachments[ssn]
-
-    while ssn.outgoing:
-      msg = ssn.outgoing[0]
-      snd = msg._sender
-      self.send(snd, msg)
-      ssn.outgoing.pop(0)
-
-    for rcv in ssn.receivers:
-      self.process_receiver(rcv)
-
-    if ssn.acked:
-      messages = ssn.acked[:]
-      ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      ch = _ssn.channel
-      if ch is None:
-        raise SessionDetached()
-      ch.session_completed(_ssn.receiver._completed)
-      _ssn.message_accept(ids, sync=True)
-      # XXX: really need to make this async so that we don't give up the lock
-      _ssn.sync()
-
-      for m in messages:
-        ssn.acked.remove(m)
-        if ssn.transactional:
-          _ssn.acked.append(m)
-
-    if ssn.committing:
-      _ssn.tx_commit(sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del _ssn.acked[:]
-      ssn.committing = False
-      ssn.committed = True
-      ssn.aborting = False
-      ssn.aborted = False
-
-    if ssn.aborting:
-      for rcv in ssn.receivers:
-        _ssn.message_stop(rcv.destination)
-      _ssn.sync()
-
-      messages = _ssn.acked + ssn.unacked + ssn.incoming
-      ids = RangedSet(*[m._transfer_id for m in messages])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      _ssn.channel.session_completed(_ssn.receiver._completed)
-      _ssn.message_release(ids)
-      _ssn.tx_rollback(sync=True)
-      _ssn.sync()
-
-      del ssn.incoming[:]
-      del ssn.unacked[:]
-      del _ssn.acked[:]
-
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-        rcv.returned = rcv.received
-        # XXX: do we need to update granted here as well?
-
-      for rcv in ssn.receivers:
-        self.process_receiver(rcv)
-
-      ssn.aborting = False
-      ssn.aborted = True
-      ssn.committing = False
-      ssn.committed = False
-
-  def grant(self, rcv):
-    _ssn = self._attachments[rcv.session]
-    _rcv = self.link_in(rcv)
-
-    if rcv.granted is UNLIMITED:
-      if rcv.impending is UNLIMITED:
-        delta = 0
-      else:
-        delta = UNLIMITED
-    elif rcv.impending is UNLIMITED:
-      delta = -1
-    else:
-      delta = max(rcv.granted, rcv.received) - rcv.impending
-
-    if delta is UNLIMITED:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
-      rcv.impending = UNLIMITED
-    elif delta > 0:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
-      rcv.impending += delta
-    elif delta < 0:
-      if rcv.drain:
-        _ssn.message_flush(rcv.destination, sync=True)
-      else:
-        _ssn.message_stop(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      rcv.impending = rcv.received
-      self.grant(rcv)
-
-  def process_receiver(self, rcv):
-    if rcv.closed: return
-    self.grant(rcv)
-
-  def send(self, snd, msg):
-    _ssn = self._attachments[snd.session]
-    _snd = self.link_out(snd)
-
-    # XXX: what if subject is specified for a normal queue?
-    if _snd._routing_key is None:
-      rk = msg.subject
-    else:
-      rk = _snd._routing_key
-    # XXX: do we need to query to figure out how to create the reply-to interoperably?
-    if msg.reply_to:
-      rt = _ssn.reply_to(*parse_addr(msg.reply_to))
-    else:
-      rt = None
-    dp = _ssn.delivery_properties(routing_key=rk)
-    mp = _ssn.message_properties(message_id=msg.id,
-                                 user_id=msg.user_id,
-                                 reply_to=rt,
-                                 correlation_id=msg.correlation_id,
-                                 content_type=msg.content_type,
-                                 application_headers=msg.properties)
-    if msg.subject is not None:
-      if mp.application_headers is None:
-        mp.application_headers = {}
-      mp.application_headers["subject"] = msg.subject
-    if msg.to is not None:
-      if mp.application_headers is None:
-        mp.application_headers = {}
-      mp.application_headers["to"] = msg.to
-    if msg.durable:
-      dp.delivery_mode = delivery_mode.persistent
-    enc, dec = get_codec(msg.content_type)
-    body = enc(msg.content)
-    _ssn.message_transfer(destination=_snd._exchange,
-                          message=Message010(dp, mp, body),
-                          sync=True)
-    log.debug("SENT [%s] %s", snd.session, msg)
-    # XXX: really need to make this async so that we don't give up the lock
-    _ssn.sync()
-    # XXX: should we log the ack somehow too?
-
-  @synchronized
-  def _message_transfer(self, ssn, cmd):
-    m = Message010(cmd.payload)
-    m.headers = cmd.headers
-    m.id = cmd.id
-    msg = self._decode(m)
-    rcv = ssn.receivers[int(cmd.destination)]
-    msg._receiver = rcv
-    rcv.received += 1
-    log.debug("RECV [%s] %s", ssn, msg)
-    ssn.incoming.append(msg)
-    self.notifyAll()
-    return INCOMPLETE
-
-  def _decode(self, message):
-    dp = message.get("delivery_properties")
-    mp = message.get("message_properties")
-    ap = mp.application_headers
-    enc, dec = get_codec(mp.content_type)
-    content = dec(message.body)
-    msg = Message(content)
-    msg.id = mp.message_id
-    if ap is not None:
-      msg.to = ap.get("to")
-      msg.subject = ap.get("subject")
-    msg.user_id = mp.user_id
-    if mp.reply_to is not None:
-      msg.reply_to = reply_to2addr(mp.reply_to)
-    msg.correlation_id = mp.correlation_id
-    msg.durable = dp.delivery_mode == delivery_mode.persistent
-    msg.properties = mp.application_headers
-    msg.content_type = mp.content_type
-    msg._transfer_id = message.id
-    return msg
-
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
-           "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
+           "ConnectionError", "ConnectError", "SessionError", "Disconnected",
+           "SendError", "InsufficientCapacity", "ReceiveError", "Empty",
+           "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py Mon Oct  5 12:51:57 2009
@@ -74,10 +74,7 @@
 
   def dispatch(self, target, *args):
     handler = "do_%s" % self.NAME
-    if hasattr(target, handler):
-      getattr(target, handler)(self, *args)
-    else:
-      print "UNHANDLED:", target, args
+    getattr(target, handler)(self, *args)
 
   def __repr__(self, extras=()):
     return "%s(%s)" % (self.__class__.__name__,

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py Mon Oct  5 12:51:57 2009
@@ -23,7 +23,8 @@
 import time
 from qpid.tests import Test
 from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+    InsufficientCapacity, Message, UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -71,13 +72,15 @@
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None):
+  def drain(self, rcv, limit=None, timeout=0, expected=None):
     contents = []
     try:
       while limit is None or len(contents) < limit:
-        contents.append(rcv.fetch(0).content)
+        contents.append(rcv.fetch(timeout=timeout).content)
     except Empty:
       pass
+    if expected is not None:
+      assert expected == contents, "expected %s, got %s" % (expected, contents)
     return contents
 
   def assertEmpty(self, rcv):
@@ -224,27 +227,27 @@
 
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
-  def testAcknowledge(self):
+  def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
     snd = self.ssn.sender("test-ack-queue")
-    tid = "a"
-    contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+    contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
     rcv = self.ssn.receiver(snd.target)
-    assert contents == self.drain(rcv)
+    self.drain(rcv, expected=contents)
     self.ssn.close()
 
     # drain the queue again, verify that they are all the messages
     # were requeued, and ack this time before closing
     self.ssn = self.conn.session()
+    if ack_capacity is not None:
+      self.ssn.ack_capacity = ack_capacity
     rcv = self.ssn.receiver("test-ack-queue")
-    drained = self.drain(rcv)
-    assert contents == drained, "expected %s, got %s" % (contents, drained)
-    self.ssn.acknowledge()
+    self.drain(rcv, expected=contents)
+    acker(self.ssn)
     self.ssn.close()
 
     # drain the queue a final time and verify that the messages were
@@ -253,6 +256,33 @@
     rcv = self.ssn.receiver("test-ack-queue")
     self.assertEmpty(rcv)
 
+  def testAcknowledge(self):
+    self.ackTest(lambda ssn: ssn.acknowledge())
+
+  def testAcknowledgeAsync(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+  def testAcknowledgeAsyncAckCap0(self):
+    try:
+      try:
+        self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+        assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+      except InsufficientCapacity:
+        pass
+    finally:
+      self.ssn.ack_capacity = UNLIMITED
+      self.drain(self.ssn.receiver("test-ack-queue"))
+      self.ssn.acknowledge()
+
+  def testAcknowledgeAsyncAckCap1(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+  def testAcknowledgeAsyncAckCap5(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+  def testAcknowledgeAsyncAckCapUNLIMITED(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
   def send(self, ssn, queue, base, count=1):
     snd = ssn.sender(queue)
     contents = []
@@ -543,6 +573,48 @@
   def testSendMap(self):
     self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
 
+  def asyncTest(self, capacity):
+    self.snd.capacity = capacity
+    msgs = [self.content("asyncTest", i) for i in range(15)]
+    for m in msgs:
+      self.snd.send(m, sync=False)
+    drained = self.drain(self.rcv, timeout=self.delay())
+    assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+    self.ssn.acknowledge()
+
+  def testSendAsyncCapacity0(self):
+    try:
+      self.asyncTest(0)
+      assert False, "send shouldn't succeed with zero capacity"
+    except InsufficientCapacity:
+      # this is expected
+      pass
+
+  def testSendAsyncCapacity1(self):
+    self.asyncTest(1)
+
+  def testSendAsyncCapacity5(self):
+    self.asyncTest(5)
+
+  def testSendAsyncCapacityUNLIMITED(self):
+    self.asyncTest(UNLIMITED)
+
+  def testCapacityTimeout(self):
+    self.snd.capacity = 1
+    msgs = []
+    caught = False
+    while len(msgs) < 100:
+      m = self.content("testCapacity", len(msgs))
+      try:
+        self.snd.send(m, sync=False, timeout=0)
+        msgs.append(m)
+      except InsufficientCapacity:
+        caught = True
+        break
+    self.drain(self.rcv, expected=msgs)
+    self.ssn.acknowledge()
+    assert caught, "did not exceed capacity"
+
 class MessageTests(Base):
 
   def testCreateString(self):

Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/util.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/util.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/util.py Mon Oct  5 12:51:57 2009
@@ -134,3 +134,9 @@
     if self.port:
       s += ":%s" % self.port
     return s
+
+def default(value, default):
+  if value is None:
+    return default
+  else:
+    return value

Modified: qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py Mon Oct  5 12:51:57 2009
@@ -148,6 +148,34 @@
     assert range.lower == 0
     assert range.upper == 8
 
+  def testEmpty(self):
+    s = RangedSet()
+    assert s.empty()
+    s.add(0, -1)
+    assert s.empty()
+    s.add(0, 0)
+    assert not s.empty()
+
+  def testMinMax(self):
+    s = RangedSet()
+    assert s.max() is None
+    assert s.min() is None
+    s.add(0, 10)
+    assert s.max() == 10
+    assert s.min() == 0
+    s.add(0, 5)
+    assert s.max() == 10
+    assert s.min() == 0
+    s.add(0, 11)
+    assert s.max() == 11
+    assert s.min() == 0
+    s.add(15, 20)
+    assert s.max() == 20
+    assert s.min() == 0
+    s.add(-10, -5)
+    assert s.max() == 20
+    assert s.min() == -10
+
 class RangeTest(TestCase):
 
   def testIntersect1(self):

Modified: qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py Mon Oct  5 12:51:57 2009
@@ -141,7 +141,61 @@
             session.exchange_delete(exchange="e")
             session.exchange_delete(exchange="alternate")
             self.assertEquals(530, e.args[0].error_code)
-            
+
+
+    def test_modify_existing_exchange_alternate(self):
+        """
+        Ensure that attempting to modify an exhange to change
+        the alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="alt2", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        try:
+            # attempt to change the alternate on an already existing exchange
+            session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2")
+            self.fail("Expected changing an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt2")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_add_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by adding
+        an alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="noalternate", type="fanout")
+        try:
+            # attempt to add an alternate on an already existing exchange
+            session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1")
+            self.fail("Expected adding an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="noalternate")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_del_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by declaring
+        it again without an alternate does nothing
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        # attempt to re-declare without an alternate - silently ignore
+        session.exchange_declare(exchange="onealternate", type="fanout" )
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt1")
+
 
     def assertEmpty(self, queue):
         try:

Modified: qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py Mon Oct  5 12:51:57 2009
@@ -295,3 +295,25 @@
         sleep(1)
         self.assertEqual(handler.check(), "pass")
 
+    def test_connection_close(self):
+        """
+        Test management method for closing connection
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("my-named-session")
+
+        #using qmf find named session and close the corresponding connection:
+        qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+        qmf_ssn_object._connectionRef_.close()
+
+        #check that connection is closed
+        try:
+            conn.session("another-session")
+            self.fail("Expected failure from closed connection")
+        except: None
+        
+        #make sure that the named session has been closed and the name can be re-used
+        conn = self.connect()
+        session = conn.session("my-named-session")
+        session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)

Propchange: qpid/branches/java-broker-0-10/qpid/python/tests_0-9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/python/tests_0-9/queue.py:804203-807984
+/qpid/trunk/qpid/python/tests_0-9/queue.py:804203-816580

Modified: qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml (original)
+++ qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml Mon Oct  5 12:51:57 2009
@@ -169,7 +169,7 @@
     <property name="type"        type="sstr"  access="RO"/>
     <property name="durable"     type="bool"  access="RO"/>
     <property name="autoDelete"  type="bool"  access="RO"/>
-    <property name="altExchange" type="objId" access="RO" optional="y"/>
+    <property name="altExchange" type="objId" references="Exchange" access="RO" optional="y"/>
     <property name="arguments"   type="map"   access="RO" desc="Arguments supplied in exchange.declare"/>
 
     <statistic name="producerCount" type="hilo32"  desc="Current producers on exchange"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org