You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/04/09 12:54:07 UTC

svn commit: r932352 - in /qpid/trunk/qpid/python: examples/api/ qpid/ qpid/messaging/ qpid/tests/messaging/

Author: rhs
Date: Fri Apr  9 10:54:07 2010
New Revision: 932352

URL: http://svn.apache.org/viewvc?rev=932352&view=rev
Log:
Changes to connection lifecycle methods and Connection parameters:
 - Connection.open -> Connection.establish
 - Connection.connect() split into Connection.open(), Connection.attach()
 - Connection.disconnect() -> Connection.detach()
 - reconnect_hosts -> reconnect_urls
 - transport now takes tcp, ssl, and tcp+tls

Modified:
    qpid/trunk/qpid/python/examples/api/drain
    qpid/trunk/qpid/python/examples/api/server
    qpid/trunk/qpid/python/examples/api/spout
    qpid/trunk/qpid/python/qpid/brokertest.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/messaging/exceptions.py
    qpid/trunk/qpid/python/qpid/messaging/transports.py
    qpid/trunk/qpid/python/qpid/messaging/util.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/message.py

Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Fri Apr  9 10:54:07 2010
@@ -51,7 +51,6 @@ if opts.verbose:
 else:
   enable("qpid", WARN)
 
-url = URL(opts.broker)
 if args:
   addr = args.pop(0)
 else:
@@ -72,15 +71,12 @@ class Formatter:
   def __getitem__(self, st):
     return eval(st, self.environ)
 
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
-                  username=url.user,
-                  password=url.password,
+conn = Connection(opts.broker,
                   reconnect=opts.reconnect,
                   reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
 try:
-  conn.connect()
+  conn.open()
   ssn = conn.session()
   rcv = ssn.receiver(addr)
 

Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Fri Apr  9 10:54:07 2010
@@ -44,16 +44,12 @@ if opts.verbose:
 else:
   enable("qpid", WARN)
 
-url = URL(opts.broker)
 if args:
   addr = args.pop(0)
 else:
   parser.error("address is required")
 
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
-                  username=url.user,
-                  password=url.password,
+conn = Connection(opts.broker,
                   reconnect=opts.reconnect,
                   reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
@@ -75,7 +71,7 @@ def dispatch(msg):
   return result
 
 try:
-  conn.connect()
+  conn.open()
   ssn = conn.session()
   rcv = ssn.receiver(addr)
 

Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Fri Apr  9 10:54:07 2010
@@ -65,7 +65,6 @@ if opts.verbose:
 else:
   enable("qpid", WARN)
 
-url = URL(opts.broker)
 if opts.id is None:
   spout_id = str(uuid4())
 else:
@@ -92,15 +91,12 @@ if opts.entries:
 else:
   content = text
 
-# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port,
-                  username=url.user,
-                  password=url.password,
+conn = Connection(opts.broker,
                   reconnect=opts.reconnect,
                   reconnect_interval=opts.reconnect_interval,
                   reconnect_limit=opts.reconnect_limit)
 try:
-  conn.connect()
+  conn.open()
   ssn = conn.session()
   snd = ssn.sender(addr)
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Apr  9 10:54:07 2010
@@ -295,7 +295,8 @@ class Broker(Popen):
 
     def connect(self):
         """New API connection to the broker."""
-        return messaging.Connection.open(self.host(), self.port())
+        return messaging.Connection.establish(host=self.host(),
+                                              port=self.port())
 
     def connect_old(self):
         """Old API connection to the broker."""

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Apr  9 10:54:07 2010
@@ -324,9 +324,10 @@ class Driver:
     self._selector = Selector.default()
     self._attempts = 0
     self._delay = self.connection.reconnect_interval_min
+    urls = [URL(u) for u in self.connection.reconnect_urls]
     self._hosts = [(self.connection.host, self.connection.port)] + \
-        self.connection.reconnect_hosts
-    self._reconnect_log = self.connection.options.get("reconnect_log", True)
+        [(u.host, u.port) for u in urls]
+    self._reconnect_log = self.connection.reconnect_log
     self._host = 0
     self._retrying = False
     self._transport = None
@@ -463,7 +464,7 @@ class Driver:
       self.engine = Engine(self.connection)
       self.engine.open()
       rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
-      trans = getattr(transports, self.connection.transport, None)
+      trans = transports.TRANSPORTS.get(self.connection.transport)
       if trans:
         self._transport = trans(host, port)
       else:
@@ -507,9 +508,7 @@ class Engine:
     self._channels = 0
     self._sessions = {}
 
-    options = self.connection.options
-
-    self.address_cache = Cache(options.get("address_ttl", 60))
+    self.address_cache = Cache(self.connection.address_ttl)
 
     self._status = CLOSED
     self._buf = ""
@@ -528,11 +527,11 @@ class Engine:
       self._sasl.setAttr("password", self.connection.password)
     if self.connection.host:
       self._sasl.setAttr("host", self.connection.host)
-    self._sasl.setAttr("service", options.get("service", "qpidd"))
-    if "min_ssf" in options:
-      self._sasl.setAttr("minssf", options["min_ssf"])
-    if "max_ssf" in options:
-      self._sasl.setAttr("maxssf", options["max_ssf"])
+    self._sasl.setAttr("service", self.connection.sasl_service)
+    if self.connection.sasl_min_ssf is not None:
+      self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
+    if self.connection.sasl_max_ssf is not None:
+      self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
     self._sasl.init()
     self._sasl_encode = False
     self._sasl_decode = False
@@ -619,8 +618,8 @@ class Engine:
                          (cli_major, cli_minor, major, minor))
 
   def do_connection_start(self, start):
-    if self.connection.mechanisms:
-      permitted = self.connection.mechanisms.split()
+    if self.connection.sasl_mechanisms:
+      permitted = self.connection.sasl_mechanisms.split()
       mechs = [m for m in start.mechanisms if m in permitted]
     else:
       mechs = start.mechanisms

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Fri Apr  9 10:54:07 2010
@@ -36,7 +36,7 @@ from qpid.messaging.constants import *
 from qpid.messaging.exceptions import *
 from qpid.messaging.message import *
 from qpid.ops import PRIMITIVE
-from qpid.util import default
+from qpid.util import default, URL
 from threading import Thread, RLock
 
 log = getLogger("qpid.messaging")
@@ -51,61 +51,110 @@ class Connection:
   """
 
   @static
-  def open(host, port=None, username="guest", password="guest", **options):
+  def establish(url=None, **options):
     """
-    Creates an AMQP connection and connects it to the given host and port.
-
-    @type host: str
-    @param host: the name or ip address of the remote host
-    @type port: int
-    @param port: the port number of the remote host
-    @rtype: Connection
-    @return: a connected Connection
+    Constructs a L{Connection} with the supplied parameters and opens
+    it.
     """
-    conn = Connection(host, port, username, password, **options)
-    conn.connect()
+    conn = Connection(url, **options)
+    conn.open()
     return conn
 
-  def __init__(self, host, port=None, username="guest", password="guest", **options):
+  def __init__(self, url=None, **options):
     """
     Creates a connection. A newly created connection must be connected
     with the Connection.connect() method before it can be used.
 
+    @type url: str
+    @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
     @type host: str
-    @param host: the name or ip address of the remote host
+    @param host: the name or ip address of the remote host (overriden by url)
     @type port: int
-    @param port: the port number of the remote host
+    @param port: the port number of the remote host (overriden by url)
+    @type transport: str
+    @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
+    @type heartbeat: int
+    @param heartbeat: heartbeat interval in seconds
+
+    @type username: str
+    @param username: the username for authentication (overriden by url)
+    @type password: str
+    @param password: the password for authentication (overriden by url)
+
+    @type sasl_mechanisms: str
+    @param sasl_mechanisms: space separated list of permitted sasl mechanisms
+    @type sasl_service: str
+    @param sasl_service: ???
+    @type sasl_min_ssf: ???
+    @param sasl_min_ssf: ???
+    @type sasl_max_ssf: ???
+    @param sasl_max_ssf: ???
+
+    @type reconnect: bool
+    @param reconnect: enable/disable automatic reconnect
+    @type reconnect_timeout: float
+    @param reconnect_timeout: total time to attempt reconnect
+    @type reconnect_internal_min: float
+    @param reconnect_internal_min: minimum interval between reconnect attempts
+    @type reconnect_internal_max: float
+    @param reconnect_internal_max: maximum interval between reconnect attempts
+    @type reconnect_internal: float
+    @param reconnect_interval: set both min and max reconnect intervals
+    @type reconnect_limit: int
+    @param reconnect_limit: limit the total number of reconnect attempts
+    @type reconnect_urls: list[str]
+    @param reconnect_urls: list of backup hosts specified as urls
+
+    @type address_ttl: float
+    @param address_ttl: time until cached address resolution expires
+
     @rtype: Connection
     @return: a disconnected Connection
     """
-    self.host = host
-    self.username = username
-    self.password = password
-    self.mechanisms = options.get("mechanisms")
+    if url is None:
+      url = options.get("host")
+    if isinstance(url, basestring):
+      url = URL(url)
+    self.host = url.host
+    if url.scheme == url.AMQP:
+      self.transport = "tcp"
+    elif url.scheme == url.AMQPS:
+      self.transport = "ssl"
+    else:
+      self.transport = options.get("transport", "tcp")
+    if self.transport in ("ssl", "tcp+tls"):
+      self.port = default(url.port, options.get("port", AMQPS_PORT))
+    else:
+      self.port = default(url.port, options.get("port", AMQP_PORT))
     self.heartbeat = options.get("heartbeat")
+    self.username = default(url.user, options.get("username", "guest"))
+    self.password = default(url.password, options.get("password", "guest"))
+
+    self.sasl_mechanisms = options.get("sasl_mechanisms")
+    self.sasl_service = options.get("sasl_service", "qpidd")
+    self.sasl_min_ssf = options.get("sasl_min_ssf")
+    self.sasl_max_ssf = options.get("sasl_max_ssf")
+
     self.reconnect = options.get("reconnect", False)
     self.reconnect_timeout = options.get("reconnect_timeout")
-    if "reconnect_interval_min" in options:
-      self.reconnect_interval_min = options["reconnect_interval_min"]
-    else:
-      self.reconnect_interval_min = options.get("reconnect_interval", 1)
-    if "reconnect_interval_max" in options:
-      self.reconnect_interval_max = options["reconnect_interval_max"]
-    else:
-      self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
+    reconnect_interval = options.get("reconnect_interval")
+    self.reconnect_interval_min = options.get("reconnect_interval_min",
+                                              default(reconnect_interval, 1))
+    self.reconnect_interval_max = options.get("reconnect_interval_max",
+                                              default(reconnect_interval, 2*60))
     self.reconnect_limit = options.get("reconnect_limit")
-    self.reconnect_hosts = options.get("reconnect_hosts", [])
-    self.transport = options.get("transport", "plain")
+    self.reconnect_urls = options.get("reconnect_urls", [])
+    self.reconnect_log = options.get("reconnect_log", True)
+
+    self.address_ttl = options.get("address_ttl", 60)
+
     self.options = options
 
-    if self.transport == "tls":
-      self.port = default(port, AMQPS_PORT)
-    else:
-      self.port = default(port, AMQP_PORT)
 
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}
+    self._open = False
     self._connected = False
     self._transport_connected = False
     self._lock = RLock()
@@ -164,9 +213,26 @@ class Connection:
     del self.sessions[ssn.name]
 
   @synchronized
-  def connect(self):
+  def open(self):
+    """
+    Opens a connection.
+    """
+    if self._open:
+      raise ConnectionError("already open")
+    self._open = True
+    self.attach()
+
+  @synchronized
+  def opened(self):
+    """
+    Return true if the connection is open, false otherwise.
+    """
+    return self._open
+
+  @synchronized
+  def attach(self):
     """
-    Connect to the remote endpoint.
+    Attach to the remote endpoint.
     """
     self._connected = True
     self._driver.start()
@@ -181,9 +247,9 @@ class Connection:
             if not (l.linked or l.error or l.closed)]
 
   @synchronized
-  def disconnect(self):
+  def detach(self):
     """
-    Disconnect from the remote endpoint.
+    Detach from the remote endpoint.
     """
     self._connected = False
     self._wakeup()
@@ -192,9 +258,9 @@ class Connection:
     self._condition.gc()
 
   @synchronized
-  def connected(self):
+  def attached(self):
     """
-    Return true if the connection is connected, false otherwise.
+    Return true if the connection is attached, false otherwise.
     """
     return self._connected
 
@@ -207,7 +273,8 @@ class Connection:
       for ssn in self.sessions.values():
         ssn.close()
     finally:
-      self.disconnect()
+      self.detach()
+      self._open = False
 
 class Session:
 
@@ -680,7 +747,7 @@ class Sender:
     """
 
     if not self.session.connection._connected or self.session.closing:
-      raise Disconnected()
+      raise Detached()
 
     self._ewait(lambda: self.linked)
 

Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Fri Apr  9 10:54:07 2010
@@ -33,10 +33,10 @@ class ConnectError(ConnectionError):
 class SessionError(Exception):
   pass
 
-class Disconnected(SessionError):
+class Detached(SessionError):
   """
   Exception raised when an operation is attempted that is illegal when
-  disconnected.
+  detached.
   """
   pass
 

Modified: qpid/trunk/qpid/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/transports.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/transports.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/transports.py Fri Apr  9 10:54:07 2010
@@ -19,7 +19,9 @@
 
 from qpid.util import connect
 
-class plain:
+TRANSPORTS = {}
+
+class tcp:
 
   def __init__(self, host, port):
     self.socket = connect(host, port)
@@ -42,6 +44,8 @@ class plain:
   def close(self):
     self.socket.close()
 
+TRANSPORTS["tcp"] = tcp
+
 try:
   from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
       SSL_ERROR_WANT_WRITE
@@ -105,3 +109,6 @@ else:
       self.socket.setblocking(1)
       # this closes the underlying socket
       self.tls.close()
+
+  TRANSPORTS["ssl"] = tls
+  TRANSPORTS["tcp+tls"] = tls

Modified: qpid/trunk/qpid/python/qpid/messaging/util.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/util.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/util.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/util.py Fri Apr  9 10:54:07 2010
@@ -26,31 +26,31 @@ from threading import Thread
 
 log = getLogger("qpid.messaging.util")
 
-def auto_fetch_reconnect_hosts(conn):
-  ssn = conn.session("auto-fetch-reconnect-hosts")
+def auto_fetch_reconnect_urls(conn):
+  ssn = conn.session("auto-fetch-reconnect-urls")
   rcv = ssn.receiver("amq.failover")
   rcv.capacity = 10
 
   def main():
     while True:
       msg = rcv.fetch()
-      set_reconnect_hosts(conn, msg)
+      set_reconnect_urls(conn, msg)
       ssn.acknowledge(msg, sync=False)
 
-  thread = Thread(name="auto-fetch-reconnect-hosts", target=main)
+  thread = Thread(name="auto-fetch-reconnect-urls", target=main)
   thread.setDaemon(True)
   thread.start()
 
 
-def set_reconnect_hosts(conn, msg):
-  reconnect_hosts = []
+def set_reconnect_urls(conn, msg):
+  reconnect_urls = []
   urls = msg.properties["amq.failover"]
   for u in urls:
     if u.startswith("amqp:tcp:"):
       parts = u.split(":")
       host, port = parts[2:4]
-      reconnect_hosts.append((host, port))
-  conn.reconnect_hosts = reconnect_hosts
-  log.warn("set reconnect_hosts for conn %s: %s", conn, reconnect_hosts)
+      reconnect_urls.append("%s:%s" % (host, port))
+  conn.reconnect_urls = reconnect_urls
+  log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)
 
-__all__ = ["auto_fetch_reconnect_hosts", "set_reconnect_hosts"]
+__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"]

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Fri Apr  9 10:54:07 2010
@@ -50,7 +50,7 @@ class Base(Test):
     self.rcv = self.setup_receiver()
 
   def teardown(self):
-    if self.conn is not None and self.conn.connected():
+    if self.conn is not None and self.conn.attached():
       self.conn.close()
 
   def content(self, base, count = None):
@@ -146,9 +146,9 @@ class Base(Test):
 
   def transport(self):
     if self.broker.scheme == self.broker.AMQPS:
-      return "tls"
+      return "ssl"
     else:
-      return "plain"
+      return "tcp"
 
   def connection_options(self):
     return {"reconnect": self.reconnect(),

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Fri Apr  9 10:54:07 2010
@@ -28,26 +28,21 @@ from qpid.tests.messaging import Base
 
 class SetupTests(Base):
 
-  def testOpen(self):
-    # XXX: need to flesh out URL support/syntax
-    self.conn = Connection.open(self.broker.host, self.broker.port,
-                                **self.connection_options())
+  def testEstablish(self):
+    self.conn = Connection.establish(self.broker, **self.connection_options())
     self.ping(self.conn.session())
 
-  def testConnect(self):
-    # XXX: need to flesh out URL support/syntax
-    self.conn = Connection(self.broker.host, self.broker.port,
-                           **self.connection_options())
-    self.conn.connect()
+  def testOpen(self):
+    self.conn = Connection(self.broker, **self.connection_options())
+    self.conn.open()
     self.ping(self.conn.session())
 
   def testConnectError(self):
     try:
-      self.conn = Connection.open("localhost", 0)
+      self.conn = Connection.establish("localhost:0")
       assert False, "connect succeeded"
     except ConnectError, e:
-      # XXX: should verify that e includes appropriate diagnostic info
-      pass
+      assert "Connection refused" in str(e)
 
   def use_fds(self):
     fds = []
@@ -66,8 +61,7 @@ class SetupTests(Base):
       for i in range(32):
         if fds: os.close(fds.pop())
       for i in xrange(64):
-        conn = Connection.open(self.broker.host, self.broker.port,
-                               **self.connection_options())
+        conn = Connection.establish(self.broker, **self.connection_options())
         conn.close()
     finally:
       while fds:
@@ -76,8 +70,8 @@ class SetupTests(Base):
   def testReconnect(self):
     options = self.connection_options()
     import socket
-    from qpid.messaging import transports
-    real = transports.plain
+    from qpid.messaging.transports import TRANSPORTS
+    real = TRANSPORTS["tcp"]
 
     class flaky:
 
@@ -112,7 +106,7 @@ class SetupTests(Base):
       def close(self):
         self.real.close()
 
-    transports.flaky = flaky
+    TRANSPORTS["flaky"] = flaky
 
     options["reconnect"] = True
     options["reconnect_interval"] = 0
@@ -120,7 +114,7 @@ class SetupTests(Base):
     options["reconnect_log"] = False
     options["transport"] = "flaky"
 
-    self.conn = Connection.open(self.broker.host, self.broker.port, **options)
+    self.conn = Connection.establish(self.broker, **options)
     ssn = self.conn.session()
     snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
     rcv = ssn.receiver(snd.target)
@@ -153,8 +147,7 @@ class SetupTests(Base):
 class ConnectionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def testSessionAnon(self):
     ssn1 = self.conn.session()
@@ -172,23 +165,23 @@ class ConnectionTests(Base):
     assert ssn1 is self.conn.session("one")
     assert ssn2 is self.conn.session("two")
 
-  def testDisconnect(self):
+  def testDetach(self):
     ssn = self.conn.session()
     self.ping(ssn)
-    self.conn.disconnect()
+    self.conn.detach()
     try:
       self.ping(ssn)
       assert False, "ping succeeded"
-    except Disconnected:
-      # this is the expected failure when pinging on a disconnected
+    except Detached:
+      # this is the expected failure when pinging on a detached
       # connection
       pass
-    self.conn.connect()
+    self.conn.attach()
     self.ping(ssn)
 
   def testClose(self):
     self.conn.close()
-    assert not self.conn.connected()
+    assert not self.conn.attached()
 
 ACK_QC = 'test-ack-queue; {create: always}'
 ACK_QD = 'test-ack-queue; {delete: always}'
@@ -196,8 +189,7 @@ ACK_QD = 'test-ack-queue; {delete: alway
 class SessionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -230,11 +222,11 @@ class SessionTests(Base):
     self.ssn.acknowledge(msg)
     snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
 
-  def testDisconnectedReceiver(self):
-    self.conn.disconnect()
+  def testDetachedReceiver(self):
+    self.conn.detach()
     rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
-    m = self.content("testDisconnectedReceiver")
-    self.conn.connect()
+    m = self.content("testDetachedReceiver")
+    self.conn.attach()
     snd = self.ssn.sender("test-dis-rcv-queue")
     snd.send(m)
     self.drain(rcv, expected=[m])
@@ -475,7 +467,7 @@ class SessionTests(Base):
     try:
       self.ping(self.ssn)
       assert False, "ping succeeded"
-    except Disconnected:
+    except Detached:
       pass
 
 RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
@@ -483,8 +475,7 @@ RECEIVER_Q = 'test-receiver-queue; {crea
 class ReceiverTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -653,8 +644,7 @@ class ReceiverTests(Base):
 class AddressTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -896,8 +886,8 @@ test-link-bindings-queue; {
     rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
     for m in messages:
       snd.send(m)
-    self.conn.disconnect()
-    self.conn.connect()
+    self.conn.detach()
+    self.conn.attach()
     self.drain(rcv, expected=expected)
 
   def testReliabilityUnreliable(self):
@@ -924,8 +914,7 @@ UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
 class AddressErrorTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -991,8 +980,7 @@ SENDER_Q = 'test-sender-q; {create: alwa
 class SenderTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=932352&r1=932351&r2=932352&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Fri Apr  9 10:54:07 2010
@@ -53,8 +53,7 @@ ECHO_Q = 'test-message-echo-queue; {crea
 class MessageEchoTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port,
-                           **self.connection_options())
+    return Connection.establish(self.broker, **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org