You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/04/09 12:54:07 UTC
svn commit: r932352 - in /qpid/trunk/qpid/python: examples/api/ qpid/
qpid/messaging/ qpid/tests/messaging/
Author: rhs
Date: Fri Apr 9 10:54:07 2010
New Revision: 932352
URL: http://svn.apache.org/viewvc?rev=932352&view=rev
Log:
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish
- Connection.connect() split into Connection.open(), Connection.attach()
- Connection.disconnect() -> Connection.detach()
- reconnect_hosts -> reconnect_urls
- transport now takes tcp, ssl, and tcp+tls
Modified:
qpid/trunk/qpid/python/examples/api/drain
qpid/trunk/qpid/python/examples/api/server
qpid/trunk/qpid/python/examples/api/spout
qpid/trunk/qpid/python/qpid/brokertest.py
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/messaging/exceptions.py
qpid/trunk/qpid/python/qpid/messaging/transports.py
qpid/trunk/qpid/python/qpid/messaging/util.py
qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/message.py
Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Fri Apr 9 10:54:07 2010
@@ -51,7 +51,6 @@ if opts.verbose:
else:
enable("qpid", WARN)
-url = URL(opts.broker)
if args:
addr = args.pop(0)
else:
@@ -72,15 +71,12 @@ class Formatter:
def __getitem__(self, st):
return eval(st, self.environ)
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
- username=url.user,
- password=url.password,
+conn = Connection(opts.broker,
reconnect=opts.reconnect,
reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
try:
- conn.connect()
+ conn.open()
ssn = conn.session()
rcv = ssn.receiver(addr)
Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Fri Apr 9 10:54:07 2010
@@ -44,16 +44,12 @@ if opts.verbose:
else:
enable("qpid", WARN)
-url = URL(opts.broker)
if args:
addr = args.pop(0)
else:
parser.error("address is required")
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
- username=url.user,
- password=url.password,
+conn = Connection(opts.broker,
reconnect=opts.reconnect,
reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
@@ -75,7 +71,7 @@ def dispatch(msg):
return result
try:
- conn.connect()
+ conn.open()
ssn = conn.session()
rcv = ssn.receiver(addr)
Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Fri Apr 9 10:54:07 2010
@@ -65,7 +65,6 @@ if opts.verbose:
else:
enable("qpid", WARN)
-url = URL(opts.broker)
if opts.id is None:
spout_id = str(uuid4())
else:
@@ -92,15 +91,12 @@ if opts.entries:
else:
content = text
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
- username=url.user,
- password=url.password,
+conn = Connection(opts.broker,
reconnect=opts.reconnect,
reconnect_interval=opts.reconnect_interval,
reconnect_limit=opts.reconnect_limit)
try:
- conn.connect()
+ conn.open()
ssn = conn.session()
snd = ssn.sender(addr)
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Apr 9 10:54:07 2010
@@ -295,7 +295,8 @@ class Broker(Popen):
def connect(self):
"""New API connection to the broker."""
- return messaging.Connection.open(self.host(), self.port())
+ return messaging.Connection.establish(host=self.host(),
+ port=self.port())
def connect_old(self):
"""Old API connection to the broker."""
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Apr 9 10:54:07 2010
@@ -324,9 +324,10 @@ class Driver:
self._selector = Selector.default()
self._attempts = 0
self._delay = self.connection.reconnect_interval_min
+ urls = [URL(u) for u in self.connection.reconnect_urls]
self._hosts = [(self.connection.host, self.connection.port)] + \
- self.connection.reconnect_hosts
- self._reconnect_log = self.connection.options.get("reconnect_log", True)
+ [(u.host, u.port) for u in urls]
+ self._reconnect_log = self.connection.reconnect_log
self._host = 0
self._retrying = False
self._transport = None
@@ -463,7 +464,7 @@ class Driver:
self.engine = Engine(self.connection)
self.engine.open()
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
- trans = getattr(transports, self.connection.transport, None)
+ trans = transports.TRANSPORTS.get(self.connection.transport)
if trans:
self._transport = trans(host, port)
else:
@@ -507,9 +508,7 @@ class Engine:
self._channels = 0
self._sessions = {}
- options = self.connection.options
-
- self.address_cache = Cache(options.get("address_ttl", 60))
+ self.address_cache = Cache(self.connection.address_ttl)
self._status = CLOSED
self._buf = ""
@@ -528,11 +527,11 @@ class Engine:
self._sasl.setAttr("password", self.connection.password)
if self.connection.host:
self._sasl.setAttr("host", self.connection.host)
- self._sasl.setAttr("service", options.get("service", "qpidd"))
- if "min_ssf" in options:
- self._sasl.setAttr("minssf", options["min_ssf"])
- if "max_ssf" in options:
- self._sasl.setAttr("maxssf", options["max_ssf"])
+ self._sasl.setAttr("service", self.connection.sasl_service)
+ if self.connection.sasl_min_ssf is not None:
+ self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
+ if self.connection.sasl_max_ssf is not None:
+ self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
self._sasl.init()
self._sasl_encode = False
self._sasl_decode = False
@@ -619,8 +618,8 @@ class Engine:
(cli_major, cli_minor, major, minor))
def do_connection_start(self, start):
- if self.connection.mechanisms:
- permitted = self.connection.mechanisms.split()
+ if self.connection.sasl_mechanisms:
+ permitted = self.connection.sasl_mechanisms.split()
mechs = [m for m in start.mechanisms if m in permitted]
else:
mechs = start.mechanisms
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Fri Apr 9 10:54:07 2010
@@ -36,7 +36,7 @@ from qpid.messaging.constants import *
from qpid.messaging.exceptions import *
from qpid.messaging.message import *
from qpid.ops import PRIMITIVE
-from qpid.util import default
+from qpid.util import default, URL
from threading import Thread, RLock
log = getLogger("qpid.messaging")
@@ -51,61 +51,110 @@ class Connection:
"""
@static
- def open(host, port=None, username="guest", password="guest", **options):
+ def establish(url=None, **options):
"""
- Creates an AMQP connection and connects it to the given host and port.
-
- @type host: str
- @param host: the name or ip address of the remote host
- @type port: int
- @param port: the port number of the remote host
- @rtype: Connection
- @return: a connected Connection
+ Constructs a L{Connection} with the supplied parameters and opens
+ it.
"""
- conn = Connection(host, port, username, password, **options)
- conn.connect()
+ conn = Connection(url, **options)
+ conn.open()
return conn
- def __init__(self, host, port=None, username="guest", password="guest", **options):
+ def __init__(self, url=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be used.
+ @type url: str
+ @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
@type host: str
- @param host: the name or ip address of the remote host
+ @param host: the name or ip address of the remote host (overriden by url)
@type port: int
- @param port: the port number of the remote host
+ @param port: the port number of the remote host (overriden by url)
+ @type transport: str
+ @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
+ @type heartbeat: int
+ @param heartbeat: heartbeat interval in seconds
+
+ @type username: str
+ @param username: the username for authentication (overriden by url)
+ @type password: str
+ @param password: the password for authentication (overriden by url)
+
+ @type sasl_mechanisms: str
+ @param sasl_mechanisms: space separated list of permitted sasl mechanisms
+ @type sasl_service: str
+ @param sasl_service: ???
+ @type sasl_min_ssf: ???
+ @param sasl_min_ssf: ???
+ @type sasl_max_ssf: ???
+ @param sasl_max_ssf: ???
+
+ @type reconnect: bool
+ @param reconnect: enable/disable automatic reconnect
+ @type reconnect_timeout: float
+ @param reconnect_timeout: total time to attempt reconnect
+ @type reconnect_internal_min: float
+ @param reconnect_internal_min: minimum interval between reconnect attempts
+ @type reconnect_internal_max: float
+ @param reconnect_internal_max: maximum interval between reconnect attempts
+ @type reconnect_internal: float
+ @param reconnect_interval: set both min and max reconnect intervals
+ @type reconnect_limit: int
+ @param reconnect_limit: limit the total number of reconnect attempts
+ @type reconnect_urls: list[str]
+ @param reconnect_urls: list of backup hosts specified as urls
+
+ @type address_ttl: float
+ @param address_ttl: time until cached address resolution expires
+
@rtype: Connection
@return: a disconnected Connection
"""
- self.host = host
- self.username = username
- self.password = password
- self.mechanisms = options.get("mechanisms")
+ if url is None:
+ url = options.get("host")
+ if isinstance(url, basestring):
+ url = URL(url)
+ self.host = url.host
+ if url.scheme == url.AMQP:
+ self.transport = "tcp"
+ elif url.scheme == url.AMQPS:
+ self.transport = "ssl"
+ else:
+ self.transport = options.get("transport", "tcp")
+ if self.transport in ("ssl", "tcp+tls"):
+ self.port = default(url.port, options.get("port", AMQPS_PORT))
+ else:
+ self.port = default(url.port, options.get("port", AMQP_PORT))
self.heartbeat = options.get("heartbeat")
+ self.username = default(url.user, options.get("username", "guest"))
+ self.password = default(url.password, options.get("password", "guest"))
+
+ self.sasl_mechanisms = options.get("sasl_mechanisms")
+ self.sasl_service = options.get("sasl_service", "qpidd")
+ self.sasl_min_ssf = options.get("sasl_min_ssf")
+ self.sasl_max_ssf = options.get("sasl_max_ssf")
+
self.reconnect = options.get("reconnect", False)
self.reconnect_timeout = options.get("reconnect_timeout")
- if "reconnect_interval_min" in options:
- self.reconnect_interval_min = options["reconnect_interval_min"]
- else:
- self.reconnect_interval_min = options.get("reconnect_interval", 1)
- if "reconnect_interval_max" in options:
- self.reconnect_interval_max = options["reconnect_interval_max"]
- else:
- self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
+ reconnect_interval = options.get("reconnect_interval")
+ self.reconnect_interval_min = options.get("reconnect_interval_min",
+ default(reconnect_interval, 1))
+ self.reconnect_interval_max = options.get("reconnect_interval_max",
+ default(reconnect_interval, 2*60))
self.reconnect_limit = options.get("reconnect_limit")
- self.reconnect_hosts = options.get("reconnect_hosts", [])
- self.transport = options.get("transport", "plain")
+ self.reconnect_urls = options.get("reconnect_urls", [])
+ self.reconnect_log = options.get("reconnect_log", True)
+
+ self.address_ttl = options.get("address_ttl", 60)
+
self.options = options
- if self.transport == "tls":
- self.port = default(port, AMQPS_PORT)
- else:
- self.port = default(port, AMQP_PORT)
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
+ self._open = False
self._connected = False
self._transport_connected = False
self._lock = RLock()
@@ -164,9 +213,26 @@ class Connection:
del self.sessions[ssn.name]
@synchronized
- def connect(self):
+ def open(self):
+ """
+ Opens a connection.
+ """
+ if self._open:
+ raise ConnectionError("already open")
+ self._open = True
+ self.attach()
+
+ @synchronized
+ def opened(self):
+ """
+ Return true if the connection is open, false otherwise.
+ """
+ return self._open
+
+ @synchronized
+ def attach(self):
"""
- Connect to the remote endpoint.
+ Attach to the remote endpoint.
"""
self._connected = True
self._driver.start()
@@ -181,9 +247,9 @@ class Connection:
if not (l.linked or l.error or l.closed)]
@synchronized
- def disconnect(self):
+ def detach(self):
"""
- Disconnect from the remote endpoint.
+ Detach from the remote endpoint.
"""
self._connected = False
self._wakeup()
@@ -192,9 +258,9 @@ class Connection:
self._condition.gc()
@synchronized
- def connected(self):
+ def attached(self):
"""
- Return true if the connection is connected, false otherwise.
+ Return true if the connection is attached, false otherwise.
"""
return self._connected
@@ -207,7 +273,8 @@ class Connection:
for ssn in self.sessions.values():
ssn.close()
finally:
- self.disconnect()
+ self.detach()
+ self._open = False
class Session:
@@ -680,7 +747,7 @@ class Sender:
"""
if not self.session.connection._connected or self.session.closing:
- raise Disconnected()
+ raise Detached()
self._ewait(lambda: self.linked)
Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Fri Apr 9 10:54:07 2010
@@ -33,10 +33,10 @@ class ConnectError(ConnectionError):
class SessionError(Exception):
pass
-class Disconnected(SessionError):
+class Detached(SessionError):
"""
Exception raised when an operation is attempted that is illegal when
- disconnected.
+ detached.
"""
pass
Modified: qpid/trunk/qpid/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/transports.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/transports.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/transports.py Fri Apr 9 10:54:07 2010
@@ -19,7 +19,9 @@
from qpid.util import connect
-class plain:
+TRANSPORTS = {}
+
+class tcp:
def __init__(self, host, port):
self.socket = connect(host, port)
@@ -42,6 +44,8 @@ class plain:
def close(self):
self.socket.close()
+TRANSPORTS["tcp"] = tcp
+
try:
from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE
@@ -105,3 +109,6 @@ else:
self.socket.setblocking(1)
# this closes the underlying socket
self.tls.close()
+
+ TRANSPORTS["ssl"] = tls
+ TRANSPORTS["tcp+tls"] = tls
Modified: qpid/trunk/qpid/python/qpid/messaging/util.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/util.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/util.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/util.py Fri Apr 9 10:54:07 2010
@@ -26,31 +26,31 @@ from threading import Thread
log = getLogger("qpid.messaging.util")
-def auto_fetch_reconnect_hosts(conn):
- ssn = conn.session("auto-fetch-reconnect-hosts")
+def auto_fetch_reconnect_urls(conn):
+ ssn = conn.session("auto-fetch-reconnect-urls")
rcv = ssn.receiver("amq.failover")
rcv.capacity = 10
def main():
while True:
msg = rcv.fetch()
- set_reconnect_hosts(conn, msg)
+ set_reconnect_urls(conn, msg)
ssn.acknowledge(msg, sync=False)
- thread = Thread(name="auto-fetch-reconnect-hosts", target=main)
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
thread.setDaemon(True)
thread.start()
-def set_reconnect_hosts(conn, msg):
- reconnect_hosts = []
+def set_reconnect_urls(conn, msg):
+ reconnect_urls = []
urls = msg.properties["amq.failover"]
for u in urls:
if u.startswith("amqp:tcp:"):
parts = u.split(":")
host, port = parts[2:4]
- reconnect_hosts.append((host, port))
- conn.reconnect_hosts = reconnect_hosts
- log.warn("set reconnect_hosts for conn %s: %s", conn, reconnect_hosts)
+ reconnect_urls.append("%s:%s" % (host, port))
+ conn.reconnect_urls = reconnect_urls
+ log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)
-__all__ = ["auto_fetch_reconnect_hosts", "set_reconnect_hosts"]
+__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"]
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Fri Apr 9 10:54:07 2010
@@ -50,7 +50,7 @@ class Base(Test):
self.rcv = self.setup_receiver()
def teardown(self):
- if self.conn is not None and self.conn.connected():
+ if self.conn is not None and self.conn.attached():
self.conn.close()
def content(self, base, count = None):
@@ -146,9 +146,9 @@ class Base(Test):
def transport(self):
if self.broker.scheme == self.broker.AMQPS:
- return "tls"
+ return "ssl"
else:
- return "plain"
+ return "tcp"
def connection_options(self):
return {"reconnect": self.reconnect(),
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Fri Apr 9 10:54:07 2010
@@ -28,26 +28,21 @@ from qpid.tests.messaging import Base
class SetupTests(Base):
- def testOpen(self):
- # XXX: need to flesh out URL support/syntax
- self.conn = Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ def testEstablish(self):
+ self.conn = Connection.establish(self.broker, **self.connection_options())
self.ping(self.conn.session())
- def testConnect(self):
- # XXX: need to flesh out URL support/syntax
- self.conn = Connection(self.broker.host, self.broker.port,
- **self.connection_options())
- self.conn.connect()
+ def testOpen(self):
+ self.conn = Connection(self.broker, **self.connection_options())
+ self.conn.open()
self.ping(self.conn.session())
def testConnectError(self):
try:
- self.conn = Connection.open("localhost", 0)
+ self.conn = Connection.establish("localhost:0")
assert False, "connect succeeded"
except ConnectError, e:
- # XXX: should verify that e includes appropriate diagnostic info
- pass
+ assert "Connection refused" in str(e)
def use_fds(self):
fds = []
@@ -66,8 +61,7 @@ class SetupTests(Base):
for i in range(32):
if fds: os.close(fds.pop())
for i in xrange(64):
- conn = Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ conn = Connection.establish(self.broker, **self.connection_options())
conn.close()
finally:
while fds:
@@ -76,8 +70,8 @@ class SetupTests(Base):
def testReconnect(self):
options = self.connection_options()
import socket
- from qpid.messaging import transports
- real = transports.plain
+ from qpid.messaging.transports import TRANSPORTS
+ real = TRANSPORTS["tcp"]
class flaky:
@@ -112,7 +106,7 @@ class SetupTests(Base):
def close(self):
self.real.close()
- transports.flaky = flaky
+ TRANSPORTS["flaky"] = flaky
options["reconnect"] = True
options["reconnect_interval"] = 0
@@ -120,7 +114,7 @@ class SetupTests(Base):
options["reconnect_log"] = False
options["transport"] = "flaky"
- self.conn = Connection.open(self.broker.host, self.broker.port, **options)
+ self.conn = Connection.establish(self.broker, **options)
ssn = self.conn.session()
snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
rcv = ssn.receiver(snd.target)
@@ -153,8 +147,7 @@ class SetupTests(Base):
class ConnectionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -172,23 +165,23 @@ class ConnectionTests(Base):
assert ssn1 is self.conn.session("one")
assert ssn2 is self.conn.session("two")
- def testDisconnect(self):
+ def testDetach(self):
ssn = self.conn.session()
self.ping(ssn)
- self.conn.disconnect()
+ self.conn.detach()
try:
self.ping(ssn)
assert False, "ping succeeded"
- except Disconnected:
- # this is the expected failure when pinging on a disconnected
+ except Detached:
+ # this is the expected failure when pinging on a detached
# connection
pass
- self.conn.connect()
+ self.conn.attach()
self.ping(ssn)
def testClose(self):
self.conn.close()
- assert not self.conn.connected()
+ assert not self.conn.attached()
ACK_QC = 'test-ack-queue; {create: always}'
ACK_QD = 'test-ack-queue; {delete: always}'
@@ -196,8 +189,7 @@ ACK_QD = 'test-ack-queue; {delete: alway
class SessionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -230,11 +222,11 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
- def testDisconnectedReceiver(self):
- self.conn.disconnect()
+ def testDetachedReceiver(self):
+ self.conn.detach()
rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
- m = self.content("testDisconnectedReceiver")
- self.conn.connect()
+ m = self.content("testDetachedReceiver")
+ self.conn.attach()
snd = self.ssn.sender("test-dis-rcv-queue")
snd.send(m)
self.drain(rcv, expected=[m])
@@ -475,7 +467,7 @@ class SessionTests(Base):
try:
self.ping(self.ssn)
assert False, "ping succeeded"
- except Disconnected:
+ except Detached:
pass
RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
@@ -483,8 +475,7 @@ RECEIVER_Q = 'test-receiver-queue; {crea
class ReceiverTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -653,8 +644,7 @@ class ReceiverTests(Base):
class AddressTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -896,8 +886,8 @@ test-link-bindings-queue; {
rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
for m in messages:
snd.send(m)
- self.conn.disconnect()
- self.conn.connect()
+ self.conn.detach()
+ self.conn.attach()
self.drain(rcv, expected=expected)
def testReliabilityUnreliable(self):
@@ -924,8 +914,7 @@ UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
class AddressErrorTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -991,8 +980,7 @@ SENDER_Q = 'test-sender-q; {create: alwa
class SenderTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Fri Apr 9 10:54:07 2010
@@ -53,8 +53,7 @@ ECHO_Q = 'test-message-echo-queue; {crea
class MessageEchoTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- **self.connection_options())
+ return Connection.establish(self.broker, **self.connection_options())
def setup_session(self):
return self.conn.session()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org