You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 14:52:10 UTC
svn commit: r821779 [11/11] - in /qpid/branches/java-broker-0-10/qpid: ./
cpp/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/python/qmf/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/
cpp/examples/messaging/ cpp/include/qmf/ cpp/include/q...
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Mon Oct 5 12:51:57 2009
@@ -21,6 +21,7 @@
import junit.framework.TestResult;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.transport.TransportConnection;
@@ -30,6 +31,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +75,7 @@
protected long RECEIVE_TIMEOUT = 1000l;
private Map<String, String> _setProperties = new HashMap<String, String>();
+ private XMLConfiguration _testConfiguration = new XMLConfiguration();
/**
* Some tests are excluded when the property test.excludes is set to true.
@@ -183,8 +186,7 @@
public static final String QUEUE = "queue";
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
- private Map<String,String> _env;
-
+ private Map<String, String> _env;
public QpidTestCase(String name)
{
@@ -335,7 +337,7 @@
latch.countDown();
}
- if (latch != null && line.contains(stopped))
+ if (!seenReady && line.contains(stopped))
{
stopLine = line;
}
@@ -368,7 +370,9 @@
/**
* Return the management portin use by the broker on this main port
+ *
* @param mainPort the broker's main port.
+ *
* @return the management port that corresponds to the broker on the given port
*/
protected int getManagementPort(int mainPort)
@@ -415,9 +419,14 @@
{
port = getPort(port);
+ // Save any configuratio changes that have been made
+ saveTestConfiguration();
+
Process process = null;
if (_broker.equals(VM))
{
+ setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port)));
+ saveTestConfiguration();
// create an in_VM broker
ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(_configFile), port);
TransportConnection.createVMBroker(port);
@@ -438,15 +447,35 @@
env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
//Add the test name to the broker run.
- env.put("QPID_PNAME", "-DPNAME=\"" + _testName + "\"");
+ // DON'T change PNAME, qpid.stop needs this value.
+ env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
env.put("QPID_WORK", System.getProperty("QPID_WORK"));
// Add all the environment settings the test requested
if (!_env.isEmpty())
{
- for(Map.Entry<String,String> entry : _env.entrySet())
+ for (Map.Entry<String, String> entry : _env.entrySet())
+ {
+ env.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ String QPID_OPTS = " ";
+ // Add all the specified system properties to QPID_OPTS
+ if (!_setProperties.isEmpty())
+ {
+ for (String key : _setProperties.keySet())
+ {
+ QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " ";
+ }
+
+ if (env.containsKey("QPID_OPTS"))
{
- env.put(entry.getKey() ,entry.getValue());
+ env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS);
+ }
+ else
+ {
+ env.put("QPID_OPTS", QPID_OPTS);
}
}
@@ -484,6 +513,27 @@
_brokers.put(port, process);
}
+ public String getTestConfigFile()
+ {
+ String path = _output == null ? System.getProperty("java.io.tmpdir") : _output;
+ return path + "/" + getTestQueueName() + ".xml";
+ }
+
+ protected void saveTestConfiguration() throws ConfigurationException
+ {
+ String testConfig = getTestConfigFile();
+ //Specifiy the test configuration
+ setSystemProperty("test.config", testConfig);
+
+ // This is a work
+ if (_testConfiguration.isEmpty())
+ {
+ _testConfiguration.addProperty("test", getTestQueueName());
+ }
+
+ _testConfiguration.save(getTestConfigFile());
+ }
+
public void cleanBroker()
{
if (_brokerClean != null)
@@ -565,18 +615,12 @@
storeClass = bdb;
}
- // First we munge the config file and, if we're in a VM, set up an additional logfile
- XMLConfiguration configuration = new XMLConfiguration(_configFile);
- configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+
+ _testConfiguration.setProperty("virtualhosts.virtualhost." + virtualhost +
".store.class", storeClass.getName());
- configuration.setProperty("virtualhosts.virtualhost." + virtualhost +
+ _testConfiguration.setProperty("virtualhosts.virtualhost." + virtualhost +
".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY,
- "${work}/" + virtualhost);
-
- File tmpFile = File.createTempFile("configFile", "test");
- tmpFile.deleteOnExit();
- configuration.save(tmpFile);
- _configFile = tmpFile;
+ "${QPID_WORK}/" + virtualhost);
}
/**
@@ -591,6 +635,10 @@
*/
protected String getConfigurationStringProperty(String property) throws ConfigurationException
{
+ // Call save Configuration to be sure we have saved the test specific
+ // file. As the optional status
+ saveTestConfiguration();
+
ServerConfiguration configuration = new ServerConfiguration(_configFile);
return configuration.getConfig().getString(property);
}
@@ -613,48 +661,9 @@
protected void setConfigurationProperty(String property, String value)
throws ConfigurationException, IOException
{
- XMLConfiguration configuration = new XMLConfiguration(_configFile);
-
- // If we are modifying a virtualhost value then we need to do so in
- // the virtualhost.xml file as these values overwrite the values in
- // the main config.xml file
- if (property.startsWith("virtualhosts"))
- {
- // So locate the virtualhost.xml file and use the ServerConfiguration
- // flatConfig method to get the interpolated value.
- String vhostConfigFile = ServerConfiguration.
- flatConfig(_configFile).getString("virtualhosts");
-
- // Load the vhostConfigFile
- XMLConfiguration vhostConfiguration = new XMLConfiguration(vhostConfigFile);
-
- // Set the value specified in to the vhostConfig.
- // Remembering that property will be 'virtualhosts.virtulhost....'
- // so we need to take off the 'virtualhosts.' from the start.
- vhostConfiguration.setProperty(property.substring(property.indexOf(".") + 1), value);
-
- // Write out the new virtualhost config file
- File tmpFile = File.createTempFile("virtualhost-configFile", ".xml");
- tmpFile.deleteOnExit();
- vhostConfiguration.save(tmpFile);
-
- // Change the property and value to be the new virtualhosts file
- // so that then update the value in the main config file.
- property = "virtualhosts";
- value = tmpFile.getAbsolutePath();
- }
-
- configuration.setProperty(property, value);
-
- // Write the new server config file
- File tmpFile = File.createTempFile("configFile", ".xml");
- tmpFile.deleteOnExit();
- configuration.save(tmpFile);
-
- _logger.info("Qpid Test Case now using configuration File:"
- + tmpFile.getAbsolutePath());
-
- _configFile = tmpFile;
+ //Write the value in to this configuration file which will override the
+ // defaults.
+ _testConfiguration.setProperty(property, value);
}
/**
@@ -695,14 +704,13 @@
* Add an environtmen variable for the external broker environment
*
* @param property the property to set
- * @param value the value to set it to
+ * @param value the value to set it to
*/
protected void setBrokerEnvironment(String property, String value)
{
- _env.put(property,value);
+ _env.put(property, value);
}
-
/**
* Check whether the broker is an 0.8
*
@@ -720,7 +728,7 @@
protected boolean isJavaBroker()
{
- return _brokerLanguage.equals("java");
+ return _brokerLanguage.equals("java") || _broker.equals("vm");
}
protected boolean isCppBroker()
@@ -831,7 +839,7 @@
*
* @throws Exception if there is an error getting the connection
*/
- public Connection getConnection(String username, String password) throws Exception
+ public Connection getConnection(String username, String password) throws JMSException, NamingException
{
_logger.info("get Connection");
Connection con = getConnectionFactory().createConnection(username, password);
@@ -840,7 +848,7 @@
return con;
}
- public Connection getConnection(String username, String password, String id) throws Exception
+ public Connection getConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
_logger.info("get Connection");
Connection con;
@@ -860,6 +868,7 @@
/**
* Return a uniqueName for this test.
* In this case it returns a queue Named by the TestCase and TestName
+ *
* @return String name for a queue
*/
protected String getTestQueueName()
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java Mon Oct 5 12:51:57 2009
@@ -158,18 +158,6 @@
validateLogDoesNotContainsMessage(_monitor, notLogged);
}
- public void testWaitForMessage_Found() throws IOException
- {
- String message = getName() + ": Test Message";
-
- long TIME_OUT = 2000;
-
- logMessageWithDelay(message, TIME_OUT / 2);
-
- assertTrue("Message was not logged ",
- _monitor.waitForMessage(message, TIME_OUT));
- }
-
public void testWaitForMessage_Timeout() throws IOException
{
String message = getName() + ": Test Message";
Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes Mon Oct 5 12:51:57 2009
@@ -83,3 +83,12 @@
// CPP Broker does not have a JMX interface to test
org.apache.qpid.management.jmx.*
+// JMX is used in this test for validation
+org.apache.qpid.server.queue.ModelTest#*
+
+
+// 0-10 is not supported by the MethodRegistry
+org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
+
+// QPID-2084 : this test needs more work for 0-10
+org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/010Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/010Excludes:799241-816580
Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes Mon Oct 5 12:51:57 2009
@@ -15,3 +15,8 @@
org.apache.qpid.client.SessionCreateTest#*
org.apache.qpid.test.client.RollbackOrderTest#*
+
+// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
+org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
+org.apache.qpid.server.queue.ModelTest#*
+
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08Excludes:799241-816580
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:799241-816580
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:799241-816580
Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes Mon Oct 5 12:51:57 2009
@@ -13,5 +13,9 @@
org.apache.qpid.server.logging.BrokerLoggingTest#testBrokerShutdownStopped
org.apache.qpid.server.logging.VirtualHostLoggingTest#testVirtualhostClosure
org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#testMessageStoreClose
-org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#testMessageStoreClose
+// QPID-XXX : Test fails to start external broker due to Derby Exception.
+org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
+
+// QPID-2081 :The configuration changes are now highlighting the close race condition
+org.apache.qpid.server.security.acl.SimpleACLTest#*
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/Excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/Excludes:799241-816580
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/XAExcludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:799241-816580
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:799241-807984
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:799241-816580
Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile Mon Oct 5 12:51:57 2009
@@ -8,6 +8,7 @@
broker.module.ssl=${module.dir}/ssl.so
broker.module.cluster=${module.dir}/cluster.so
broker.module.store=${store.module.dir}/msgstore.so
+broker.stopped=Exception constructed
broker.modules=
broker.args=
Modified: qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile (original)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile Mon Oct 5 12:51:57 2009
@@ -18,9 +18,10 @@
test.port=15672
test.mport=18999
+#Note : Management will start open second port on: mport + 100 : 19099
test.port.ssl=15671
-test.port.alt=15772
-test.port.alt.ssl=15771
+test.port.alt=25672
+test.port.alt.ssl=25671
test.exclude=true
profile.excludes=08TransientExcludes
Modified: qpid/branches/java-broker-0-10/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/Makefile?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/Makefile (original)
+++ qpid/branches/java-broker-0-10/qpid/python/Makefile Mon Oct 5 12:51:57 2009
@@ -46,6 +46,12 @@
build: $(TARGETS)
+.PHONY: doc
+
+doc:
+ @mkdir -p $(BUILD)
+ epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log
+
install: build
install -d $(PYTHON_LIB)
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/datatypes.py Mon Oct 5 12:51:57 2009
@@ -234,6 +234,24 @@
def add(self, lower, upper = None):
self.add_range(Range(lower, upper))
+ def empty(self):
+ for r in self.ranges:
+ if r.lower <= r.upper:
+ return False
+ return True
+
+ def max(self):
+ if self.ranges:
+ return self.ranges[-1].upper
+ else:
+ return None
+
+ def min(self):
+ if self.ranges:
+ return self.ranges[0].lower
+ else:
+ return None
+
def __iter__(self):
return iter(self.ranges)
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/delegates.py Mon Oct 5 12:51:57 2009
@@ -139,12 +139,18 @@
class Client(Delegate):
+ ppid = 0
+ try:
+ ppid = os.getppid()
+ except:
+ pass
+
PROPERTIES = {"product": "qpid python client",
"version": "development",
"platform": os.name,
"qpid.client_process": os.path.basename(sys.argv[0]),
"qpid.client_pid": os.getpid(),
- "qpid.client_ppid": os.getppid()}
+ "qpid.client_ppid": ppid}
def __init__(self, connection, username="guest", password="guest",
mechanism="PLAIN", heartbeat=None):
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/messaging.py Mon Oct 5 12:51:57 2009
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,64 +30,18 @@
- protocol negotiation/multiprotocol impl
"""
-import connection, time, socket, sys, compat
from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial
-from exceptions import Timeout
+from concurrency import synchronized, Waiter
+from datatypes import timestamp, uuid4, Serial
from logging import getLogger
-from ops import PRIMITIVE, delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import PRIMITIVE
from threading import Thread, RLock, Condition
-from util import connect
+from util import default
log = getLogger("qpid.messaging")
static = staticmethod
-def synchronized(meth):
- def sync_wrapper(self, *args, **kwargs):
- self.lock()
- try:
- return meth(self, *args, **kwargs)
- finally:
- self.unlock()
- return sync_wrapper
-
-class Lockable(object):
-
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
- def wait(self, predicate, timeout=None):
- passed = 0
- start = time.time()
- while not predicate():
- if timeout is None:
- # using the timed wait prevents keyboard interrupts from being
- # blocked while waiting
- self._condition.wait(3)
- elif passed < timeout:
- self._condition.wait(timeout - passed)
- else:
- return False
- passed = time.time() - start
- return True
-
- def notify(self):
- self._condition.notify()
-
- def notifyAll(self):
- self._condition.notifyAll()
-
-def default(value, default):
- if value is None:
- return default
- else:
- return value
-
AMQP_PORT = 5672
AMQPS_PORT = 5671
@@ -103,12 +57,19 @@
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
class ConnectionError(Exception):
+ """
+ The base class for all connection related exceptions.
+ """
pass
class ConnectError(ConnectionError):
+ """
+ Exception raised when there is an error connecting to the remote
+ peer.
+ """
pass
-class Connection(Lockable):
+class Connection:
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -153,27 +114,27 @@
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
self._modcount = Serial(0)
self.error = None
+ from driver import Driver
self._driver = Driver(self)
self._driver.start()
- def wakeup(self):
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
+ def _wakeup(self):
self._modcount += 1
self._driver.wakeup()
- def catchup(self, exc=ConnectionError):
- mc = self._modcount
- self.wait(lambda: not self._driver._modcount < mc)
- self.check_error(exc)
-
- def check_error(self, exc=ConnectionError):
+ def _check_error(self, exc=ConnectionError):
if self.error:
raise exc(*self.error)
- def ewait(self, predicate, timeout=None, exc=ConnectionError):
- result = self.wait(lambda: self.error or predicate(), timeout)
- self.check_error(exc)
+ def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+ result = self._wait(lambda: self.error or predicate(), timeout)
+ self._check_error(exc)
return result
@synchronized
@@ -200,7 +161,7 @@
else:
ssn = Session(self, name, self.started, transactional=transactional)
self.sessions[name] = ssn
- self.wakeup()
+ self._wakeup()
return ssn
@synchronized
@@ -213,8 +174,8 @@
Connect to the remote endpoint.
"""
self._connected = True
- self.wakeup()
- self.ewait(lambda: self._driver._connected, exc=ConnectError)
+ self._wakeup()
+ self._ewait(lambda: self._driver._connected, exc=ConnectError)
@synchronized
def disconnect(self):
@@ -222,8 +183,8 @@
Disconnect from the remote endpoint.
"""
self._connected = False
- self.wakeup()
- self.ewait(lambda: not self._driver._connected)
+ self._wakeup()
+ self._ewait(lambda: not self._driver._connected)
@synchronized
def connected(self):
@@ -273,17 +234,6 @@
ssn.exchange_bind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#"))
-FILTER_DEFAULTS = {
- "topic": Pattern("*")
- }
-
-def delegate(handler, session):
- class Delegate(Client):
-
- def message_transfer(self, cmd):
- handler._message_transfer(session, cmd)
- return Delegate
-
class SessionError(Exception):
pass
@@ -304,7 +254,7 @@
class TransactionAborted(SessionError):
pass
-class Session(Lockable):
+class Session:
"""
Sessions provide a linear context for sending and receiving
@@ -329,12 +279,14 @@
self.incoming = []
self.unacked = []
self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
self.closing = False
self.closed = False
self._lock = connection._lock
- self._condition = connection._condition
+ self.running = True
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -342,17 +294,17 @@
def __repr__(self):
return "<Session %s>" % self.name
- def wakeup(self):
- self.connection.wakeup()
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
- def catchup(self, exc=SessionError):
- self.connection.catchup(exc)
+ def _wakeup(self):
+ self.connection._wakeup()
- def check_error(self, exc=SessionError):
- self.connection.check_error(exc)
+ def _check_error(self, exc=SessionError):
+ self.connection._check_error(exc)
- def ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection.ewait(predicate, timeout, exc)
+ def _ewait(self, predicate, timeout=None, exc=SessionError):
+ return self.connection._ewait(predicate, timeout, exc)
@synchronized
def sender(self, target):
@@ -367,7 +319,7 @@
"""
sender = Sender(self, len(self.senders), target)
self.senders.append(sender)
- self.wakeup()
+ self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
# into the driver loop with messages sent for senders that haven't
# been linked yet, something similar can probably happen for
@@ -388,7 +340,7 @@
receiver = Receiver(self, len(self.receivers), source, filter,
self.started)
self.receivers.append(receiver)
- self.wakeup()
+ self._wakeup()
return receiver
@synchronized
@@ -416,8 +368,8 @@
@synchronized
def _get(self, predicate, timeout=None):
- if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -427,13 +379,15 @@
return None
@synchronized
- def acknowledge(self, message=None):
+ def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@type message: Message
@param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
"""
if message is None:
messages = self.unacked[:]
@@ -441,12 +395,18 @@
messages = [message]
for m in messages:
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self._wakeup()
+ self._ewait(lambda: len(self.acked) < self.ack_capacity)
self.unacked.remove(m)
self.acked.append(m)
- self.wakeup()
- self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
- self.check_error()
+ self._wakeup()
+ if sync:
+ self._ewait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -457,8 +417,8 @@
if not self.transactional:
raise NontransactionalSession()
self.committing = True
- self.wakeup()
- self.ewait(lambda: not self.committing)
+ self._wakeup()
+ self._ewait(lambda: not self.committing)
if self.aborted:
raise TransactionAborted()
assert self.committed
@@ -472,8 +432,8 @@
if not self.transactional:
raise NontransactionalSession()
self.aborting = True
- self.wakeup()
- self.ewait(lambda: not self.aborting)
+ self._wakeup()
+ self._ewait(lambda: not self.aborting)
assert self.aborted
@synchronized
@@ -493,7 +453,7 @@
for rcv in self.receivers:
rcv.stop()
# TODO: think about stopping individual receivers in listen mode
- self.wait(lambda: self._peek(self._pred) is None)
+ self._wait(lambda: self._peek(self._pred) is None)
self.started = False
def _pred(self, m):
@@ -501,6 +461,7 @@
@synchronized
def run(self):
+ self.running = True
try:
while True:
msg = self._get(self._pred)
@@ -509,10 +470,10 @@
else:
msg._receiver.listener(msg)
if self._peek(self._pred) is None:
- self.notifyAll()
+ self.connection._waiter.notifyAll()
finally:
- self.closed = True
- self.notifyAll()
+ self.running = False
+ self.connection._waiter.notifyAll()
@synchronized
def close(self):
@@ -523,33 +484,22 @@
link.close()
self.closing = True
- self.wakeup()
- self.catchup()
- self.wait(lambda: self.closed)
+ self._wakeup()
+ self._ewait(lambda: self.closed and not self.running)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
+ # XXX: should be able to express this condition through API calls
+ self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
-
-def reply_to2addr(reply_to):
- if reply_to.routing_key is None:
- return reply_to.exchange
- elif reply_to.exchange in (None, ""):
- return reply_to.routing_key
- else:
- return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-
class SendError(SessionError):
pass
-class Sender(Lockable):
+class InsufficientCapacity(SendError):
+ pass
+
+class Sender:
"""
Sends outgoing messages.
@@ -559,32 +509,50 @@
self.session = session
self.index = index
self.target = target
+ self.capacity = UNLIMITED
+ self.queued = Serial(0)
+ self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
- self._condition = self.session._condition
- def wakeup(self):
- self.session.wakeup()
+ def _wakeup(self):
+ self.session._wakeup()
- def catchup(self, exc=SendError):
- self.session.catchup(exc)
+ def _check_error(self, exc=SendError):
+ self.session._check_error(exc)
- def check_error(self, exc=SendError):
- self.session.check_error(exc)
+ def _ewait(self, predicate, timeout=None, exc=SendError):
+ return self.session._ewait(predicate, timeout, exc)
- def ewait(self, predicate, timeout=None, exc=SendError):
- return self.session.ewait(predicate, timeout, exc)
+ @synchronized
+ def pending(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
@synchronized
- def send(self, object):
+ def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
L{Message} and sent. If it is of type L{Message}, it will be sent
- directly.
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
"""
if not self.session.connection._connected or self.session.closing:
@@ -595,12 +563,23 @@
else:
message = Message(object)
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+
# XXX: what if we send the same message to multiple senders?
message._sender = self
self.session.outgoing.append(message)
+ self.queued += 1
+ mno = self.queued
- self.wakeup()
- self.ewait(lambda: message not in self.session.outgoing)
+ self._wakeup()
+
+ if sync:
+ self._ewait(lambda: self.acked >= mno)
+ assert message not in self.session.outgoing
@synchronized
def close(self):
@@ -622,7 +601,7 @@
"""
pass
-class Receiver(Lockable):
+class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
@@ -649,22 +628,25 @@
self.closed = False
self.listener = None
self._lock = self.session._lock
- self._condition = self.session._condition
-
- def wakeup(self):
- self.session.wakeup()
- def catchup(self, exc=ReceiveError):
- self.session.catchup()
+ def _wakeup(self):
+ self.session._wakeup()
- def check_error(self, exc=ReceiveError):
- self.session.check_error(exc)
+ def _check_error(self, exc=ReceiveError):
+ self.session._check_error(exc)
- def ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session.ewait(predicate, timeout, exc)
+ def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+ return self.session._ewait(predicate, timeout, exc)
@synchronized
def pending(self):
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
return self.received - self.returned
def _capacity(self):
@@ -700,23 +682,23 @@
"""
if self._capacity() == 0:
self.granted = self.returned + 1
- self.wakeup()
- self.ewait(lambda: self.impending == self.granted)
+ self._wakeup()
+ self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
self.drain = True
self.granted = self.received
- self.wakeup()
- self.ewait(lambda: self.impending == self.received)
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
self.drain = False
self._grant()
- self.wakeup()
+ self._wakeup()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
elif self._capacity() not in (0, UNLIMITED.value):
self.granted += 1
- self.wakeup()
+ self._wakeup()
return msg
def _grant(self):
@@ -736,7 +718,7 @@
"""
self.started = True
self._grant()
- self.wakeup()
+ self._wakeup()
@synchronized
def stop(self):
@@ -745,8 +727,8 @@
"""
self.started = False
self._grant()
- self.wakeup()
- self.ewait(lambda: self.impending == self.received)
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
@synchronized
def close(self):
@@ -754,9 +736,9 @@
Close the receiver.
"""
self.closing = True
- self.wakeup()
+ self._wakeup()
try:
- self.ewait(lambda: self.closed)
+ self._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -843,391 +825,7 @@
def __repr__(self):
return "Message(%r)" % self.content
-class Attachment:
-
- def __init__(self, target):
- self.target = target
-
-DURABLE_DEFAULT=True
-
-class Driver(Lockable):
-
- def __init__(self, connection):
- self.connection = connection
- self._lock = self.connection._lock
- self._condition = self.connection._condition
- self._wakeup_cond = Condition()
- self._socket = None
- self._conn = None
- self._connected = False
- self._attachments = {}
- self._modcount = self.connection._modcount
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
- # XXX: need to figure out how to join on this thread
-
- def start(self):
- self.thread.start()
-
- def wakeup(self):
- self._wakeup_cond.acquire()
- try:
- self._wakeup_cond.notifyAll()
- finally:
- self._wakeup_cond.release()
-
- def start(self):
- self.thread.start()
-
- def run(self):
- while True:
- self._wakeup_cond.acquire()
- try:
- if self.connection._modcount <= self._modcount:
- self._wakeup_cond.wait(10)
- finally:
- self._wakeup_cond.release()
- self.dispatch(self.connection._modcount)
-
- @synchronized
- def dispatch(self, modcount):
- try:
- if self._conn is None and self.connection._connected:
- self.connect()
- elif self._conn is not None and not self.connection._connected:
- self.disconnect()
-
- if self._conn is not None:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
-
- exi = None
- except:
- exi = sys.exc_info()
-
- if exi:
- msg = compat.format_exc()
- recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
- "Bad file descriptor", "start timed out", "Broken pipe"]
- for r in recoverable:
- if self.connection.reconnect and r in msg:
- print "waiting to retry"
- self.reset()
- time.sleep(3)
- print "retrying..."
- return
- else:
- self.connection.error = (msg,)
-
- self._modcount = modcount
- self.notifyAll()
-
- def connect(self):
- if self._conn is not None:
- return
- try:
- self._socket = connect(self.connection.host, self.connection.port)
- except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start(timeout=10)
- self._connected = True
- except connection.VersionError, e:
- raise ConnectError(e)
- except Timeout:
- print "start timed out"
- raise ConnectError("start timed out")
-
- def disconnect(self):
- self._conn.close()
- self.reset()
-
- def reset(self):
- self._conn = None
- self._connected = False
- self._attachments.clear()
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
-
- def connected(self):
- return self._conn is not None
-
- def attach(self, ssn):
- _ssn = self._attachments.get(ssn)
- if _ssn is None:
- _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
- _ssn.auto_sync = False
- _ssn.invoke_lock = self._lock
- _ssn.lock = self._lock
- _ssn.condition = self._condition
- if ssn.transactional:
- # XXX: adding an attribute to qpid.session.Session
- _ssn.acked = []
- _ssn.tx_select()
- self._attachments[ssn] = _ssn
-
- for snd in ssn.senders:
- self.link_out(snd)
- for rcv in ssn.receivers:
- self.link_in(rcv)
-
- if ssn.closing:
- _ssn.close()
- del self._attachments[ssn]
-
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
-
- def link_out(self, snd):
- _ssn = self._attachments[snd.session]
- _snd = self._attachments.get(snd)
- if _snd is None:
- _snd = Attachment(snd)
- node, _snd._subject = parse_addr(snd.target)
- result = self._exchange_query(_ssn, node)
- if result.not_found:
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
- _ssn.sync()
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
- self._attachments[snd] = _snd
-
- if snd.closed:
- del self._attachments[snd]
- return None
- else:
- return _snd
-
- def link_in(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self._attachments.get(rcv)
- if _rcv is None:
- _rcv = Attachment(rcv)
- result = self._exchange_query(_ssn, rcv.source)
- if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
- else:
- f = rcv.filter
- f._bind(_ssn, rcv.source, _rcv._queue)
- _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
- _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
- self._attachments[rcv] = _rcv
- # XXX: need to kill syncs
- _ssn.sync()
-
- if rcv.closing:
- _ssn.message_cancel(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del self._attachments[rcv]
- rcv.closed = True
- return None
- else:
- return _rcv
-
- def process(self, ssn):
- if ssn.closing: return
-
- _ssn = self._attachments[ssn]
-
- while ssn.outgoing:
- msg = ssn.outgoing[0]
- snd = msg._sender
- self.send(snd, msg)
- ssn.outgoing.pop(0)
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- if ssn.acked:
- messages = ssn.acked[:]
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- ch = _ssn.channel
- if ch is None:
- raise SessionDetached()
- ch.session_completed(_ssn.receiver._completed)
- _ssn.message_accept(ids, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
-
- for m in messages:
- ssn.acked.remove(m)
- if ssn.transactional:
- _ssn.acked.append(m)
-
- if ssn.committing:
- _ssn.tx_commit(sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del _ssn.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
-
- if ssn.aborting:
- for rcv in ssn.receivers:
- _ssn.message_stop(rcv.destination)
- _ssn.sync()
-
- messages = _ssn.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- _ssn.channel.session_completed(_ssn.receiver._completed)
- _ssn.message_release(ids)
- _ssn.tx_rollback(sync=True)
- _ssn.sync()
-
- del ssn.incoming[:]
- del ssn.unacked[:]
- del _ssn.acked[:]
-
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
-
- def grant(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
-
- if rcv.granted is UNLIMITED:
- if rcv.impending is UNLIMITED:
- delta = 0
- else:
- delta = UNLIMITED
- elif rcv.impending is UNLIMITED:
- delta = -1
- else:
- delta = max(rcv.granted, rcv.received) - rcv.impending
-
- if delta is UNLIMITED:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
- rcv.impending = UNLIMITED
- elif delta > 0:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
- rcv.impending += delta
- elif delta < 0:
- if rcv.drain:
- _ssn.message_flush(rcv.destination, sync=True)
- else:
- _ssn.message_stop(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- rcv.impending = rcv.received
- self.grant(rcv)
-
- def process_receiver(self, rcv):
- if rcv.closed: return
- self.grant(rcv)
-
- def send(self, snd, msg):
- _ssn = self._attachments[snd.session]
- _snd = self.link_out(snd)
-
- # XXX: what if subject is specified for a normal queue?
- if _snd._routing_key is None:
- rk = msg.subject
- else:
- rk = _snd._routing_key
- # XXX: do we need to query to figure out how to create the reply-to interoperably?
- if msg.reply_to:
- rt = _ssn.reply_to(*parse_addr(msg.reply_to))
- else:
- rt = None
- dp = _ssn.delivery_properties(routing_key=rk)
- mp = _ssn.message_properties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- content_type=msg.content_type,
- application_headers=msg.properties)
- if msg.subject is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["subject"] = msg.subject
- if msg.to is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["to"] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
- enc, dec = get_codec(msg.content_type)
- body = enc(msg.content)
- _ssn.message_transfer(destination=_snd._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", snd.session, msg)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- # XXX: should we log the ack somehow too?
-
- @synchronized
- def _message_transfer(self, ssn, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = ssn.receivers[int(cmd.destination)]
- msg._receiver = rcv
- rcv.received += 1
- log.debug("RECV [%s] %s", ssn, msg)
- ssn.incoming.append(msg)
- self.notifyAll()
- return INCOMPLETE
-
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
- ap = mp.application_headers
- enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
- msg = Message(content)
- msg.id = mp.message_id
- if ap is not None:
- msg.to = ap.get("to")
- msg.subject = ap.get("subject")
- msg.user_id = mp.user_id
- if mp.reply_to is not None:
- msg.reply_to = reply_to2addr(mp.reply_to)
- msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
- msg.properties = mp.application_headers
- msg.content_type = mp.content_type
- msg._transfer_id = message.id
- return msg
-
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
- "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
+ "ConnectionError", "ConnectError", "SessionError", "Disconnected",
+ "SendError", "InsufficientCapacity", "ReceiveError", "Empty",
+ "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/ops.py Mon Oct 5 12:51:57 2009
@@ -74,10 +74,7 @@
def dispatch(self, target, *args):
handler = "do_%s" % self.NAME
- if hasattr(target, handler):
- getattr(target, handler)(self, *args)
- else:
- print "UNHANDLED:", target, args
+ getattr(target, handler)(self, *args)
def __repr__(self, extras=()):
return "%s(%s)" % (self.__class__.__name__,
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/tests/messaging.py Mon Oct 5 12:51:57 2009
@@ -23,7 +23,8 @@
import time
from qpid.tests import Test
from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+ InsufficientCapacity, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -71,13 +72,15 @@
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None):
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
contents = []
try:
while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(0).content)
+ contents.append(rcv.fetch(timeout=timeout).content)
except Empty:
pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
return contents
def assertEmpty(self, rcv):
@@ -224,27 +227,27 @@
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
- def testAcknowledge(self):
+ def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
snd = self.ssn.sender("test-ack-queue")
- tid = "a"
- contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
rcv = self.ssn.receiver(snd.target)
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=contents)
self.ssn.close()
# drain the queue again, verify that they are all the messages
# were requeued, and ack this time before closing
self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
rcv = self.ssn.receiver("test-ack-queue")
- drained = self.drain(rcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
- self.ssn.acknowledge()
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
self.ssn.close()
# drain the queue a final time and verify that the messages were
@@ -253,6 +256,33 @@
rcv = self.ssn.receiver("test-ack-queue")
self.assertEmpty(rcv)
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver("test-ack-queue"))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
def send(self, ssn, queue, base, count=1):
snd = ssn.sender(queue)
contents = []
@@ -543,6 +573,48 @@
def testSendMap(self):
self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ drained = self.drain(self.rcv, timeout=self.delay())
+ assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
+
class MessageTests(Base):
def testCreateString(self):
Modified: qpid/branches/java-broker-0-10/qpid/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/qpid/util.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/qpid/util.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/qpid/util.py Mon Oct 5 12:51:57 2009
@@ -134,3 +134,9 @@
if self.port:
s += ":%s" % self.port
return s
+
+def default(value, default):
+ if value is None:
+ return default
+ else:
+ return value
Modified: qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests/datatypes.py Mon Oct 5 12:51:57 2009
@@ -148,6 +148,34 @@
assert range.lower == 0
assert range.upper == 8
+ def testEmpty(self):
+ s = RangedSet()
+ assert s.empty()
+ s.add(0, -1)
+ assert s.empty()
+ s.add(0, 0)
+ assert not s.empty()
+
+ def testMinMax(self):
+ s = RangedSet()
+ assert s.max() is None
+ assert s.min() is None
+ s.add(0, 10)
+ assert s.max() == 10
+ assert s.min() == 0
+ s.add(0, 5)
+ assert s.max() == 10
+ assert s.min() == 0
+ s.add(0, 11)
+ assert s.max() == 11
+ assert s.min() == 0
+ s.add(15, 20)
+ assert s.max() == 20
+ assert s.min() == 0
+ s.add(-10, -5)
+ assert s.max() == 20
+ assert s.min() == -10
+
class RangeTest(TestCase):
def testIntersect1(self):
Modified: qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests_0-10/alternate_exchange.py Mon Oct 5 12:51:57 2009
@@ -141,7 +141,61 @@
session.exchange_delete(exchange="e")
session.exchange_delete(exchange="alternate")
self.assertEquals(530, e.args[0].error_code)
-
+
+
+ def test_modify_existing_exchange_alternate(self):
+ """
+ Ensure that attempting to modify an exhange to change
+ the alternate throws an exception
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="alt2", type="direct")
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+ try:
+ # attempt to change the alternate on an already existing exchange
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2")
+ self.fail("Expected changing an alternate on an existing exchange to fail")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+ session = self.conn.session("alternate", 2)
+ session.exchange_delete(exchange="onealternate")
+ session.exchange_delete(exchange="alt2")
+ session.exchange_delete(exchange="alt1")
+
+
+ def test_add_alternate_to_exchange(self):
+ """
+ Ensure that attempting to modify an exhange by adding
+ an alternate throws an exception
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="noalternate", type="fanout")
+ try:
+ # attempt to add an alternate on an already existing exchange
+ session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1")
+ self.fail("Expected adding an alternate on an existing exchange to fail")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+ session = self.conn.session("alternate", 2)
+ session.exchange_delete(exchange="noalternate")
+ session.exchange_delete(exchange="alt1")
+
+
+ def test_del_alternate_to_exchange(self):
+ """
+ Ensure that attempting to modify an exhange by declaring
+ it again without an alternate does nothing
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+ # attempt to re-declare without an alternate - silently ignore
+ session.exchange_declare(exchange="onealternate", type="fanout" )
+ session.exchange_delete(exchange="onealternate")
+ session.exchange_delete(exchange="alt1")
+
def assertEmpty(self, queue):
try:
Modified: qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py (original)
+++ qpid/branches/java-broker-0-10/qpid/python/tests_0-10/management.py Mon Oct 5 12:51:57 2009
@@ -295,3 +295,25 @@
sleep(1)
self.assertEqual(handler.check(), "pass")
+ def test_connection_close(self):
+ """
+ Test management method for closing connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("my-named-session")
+
+ #using qmf find named session and close the corresponding connection:
+ qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+ qmf_ssn_object._connectionRef_.close()
+
+ #check that connection is closed
+ try:
+ conn.session("another-session")
+ self.fail("Expected failure from closed connection")
+ except: None
+
+ #make sure that the named session has been closed and the name can be re-used
+ conn = self.connect()
+ session = conn.session("my-named-session")
+ session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
Propchange: qpid/branches/java-broker-0-10/qpid/python/tests_0-9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 12:51:57 2009
@@ -1 +1 @@
-/qpid/trunk/qpid/python/tests_0-9/queue.py:804203-807984
+/qpid/trunk/qpid/python/tests_0-9/queue.py:804203-816580
Modified: qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml?rev=821779&r1=821778&r2=821779&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml (original)
+++ qpid/branches/java-broker-0-10/qpid/specs/management-schema.xml Mon Oct 5 12:51:57 2009
@@ -169,7 +169,7 @@
<property name="type" type="sstr" access="RO"/>
<property name="durable" type="bool" access="RO"/>
<property name="autoDelete" type="bool" access="RO"/>
- <property name="altExchange" type="objId" access="RO" optional="y"/>
+ <property name="altExchange" type="objId" references="Exchange" access="RO" optional="y"/>
<property name="arguments" type="map" access="RO" desc="Arguments supplied in exchange.declare"/>
<statistic name="producerCount" type="hilo32" desc="Current producers on exchange"/>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org