You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/25 21:09:27 UTC

svn commit: r916433 - /qpid/trunk/qpid/python/qpid/messaging/driver.py

Author: rhs
Date: Thu Feb 25 20:09:27 2010
New Revision: 916433

URL: http://svn.apache.org/viewvc?rev=916433&view=rev
Log:
move reconnect logic away from engine portion of the driver

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=916433&r1=916432&r2=916433&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Feb 25 20:09:27 2010
@@ -283,6 +283,11 @@
 SUBJECT = "qpid.subject"
 TO = "qpid.to"
 
+CLOSED = "CLOSED"
+READ_ONLY = "READ_ONLY"
+WRITE_ONLY = "WRITE_ONLY"
+OPEN = "OPEN"
+
 class Driver:
 
   def __init__(self, connection):
@@ -299,6 +304,7 @@
         self.connection.backups
     self._host = 0
     self._retrying = False
+    self._socket = None
 
     self.reset()
 
@@ -315,7 +321,7 @@
 
     self.address_cache = Cache(options.get("address_ttl", 60))
 
-    self._socket = None
+    self._engine_status = CLOSED
     self._buf = ""
     self._hdr = ""
     self._op_enc = OpEncoder()
@@ -382,20 +388,60 @@
         rawlog.debug("READ[%s]: %r", self.log_id, data)
         self.engine_write(data)
       else:
-        rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername())
-        self.engine_close()
+        self.close_engine()
     except socket.error, e:
-      self.engine_close(e)
+      self.close_engine(e)
+
+    self.update_status()
+
     self.connection._waiter.notifyAll()
 
+  def close_engine(self, e=None):
+    if e is None:
+      e = "connection aborted"
+
+    if (recoverable and self.connection.reconnect and
+        (self.connection.reconnect_limit is None or
+         self.connection.reconnect_limit <= 0 or
+         self._attempts <= self.connection.reconnect_limit)):
+      if self._host > 0:
+        delay = 0
+      else:
+        delay = self.connection.reconnect_delay
+      self._timeout = time.time() + delay
+      log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
+      if delay > 0:
+        log.warn("sleeping %s seconds" % delay)
+      self._retrying = True
+      self.engine_close()
+    else:
+      self.engine_close(e)
+
+  def update_status(self):
+    status = self.engine_status()
+    return getattr(self, "st_%s" % status.lower())()
+
+  def st_closed(self):
+    rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
+    self._socket.close()
+    self._socket = None
+    return True
+
+  def st_open(self):
+    return False
+
   @synchronized
   def writeable(self):
+    notify = False
     try:
       n = self._socket.send(self.engine_peek())
       sent = self.engine_read(n)
       rawlog.debug("SENT[%s]: %r", self.log_id, sent)
     except socket.error, e:
-      self.engine_close(e)
+      self.close_engine(e)
+      notify = True
+
+    if self.update_status() or notify:
       self.connection._waiter.notifyAll()
 
   @synchronized
@@ -403,6 +449,9 @@
     self.dispatch()
     self.connection._waiter.notifyAll()
 
+  def engine_status(self):
+    return self._engine_status
+
   def engine_write(self, data):
     try:
       if self._sasl_decode:
@@ -423,17 +472,16 @@
         self.assign_id(op)
         opslog.debug("RCVD[%s]: %r", self.log_id, op)
         op.dispatch(self)
-      self.dispatch()
+      self.engine_dispatch()
     except VersionError, e:
-      self._error(e, False)
+      self.engine_close(e)
     except:
-      self._error(compat.format_exc(), False)
+      self.engine_close(compat.format_exc())
 
   def engine_close(self, e=None):
-    if e is None:
-      self._error("connection aborted", True)
-    else:
-      self._error(e, True)
+    self.reset()
+    if e:
+      self.connection.error = (e,)
 
   def assign_id(self, op):
     if isinstance(op, Command):
@@ -452,26 +500,6 @@
   def engine_peek(self):
     return self._buf
 
-  def _error(self, err, recoverable):
-    if self._socket is not None:
-      self._socket.close()
-    self.reset()
-    if (recoverable and self.connection.reconnect and
-        (self.connection.reconnect_limit is None or
-         self.connection.reconnect_limit <= 0 or
-         self._attempts <= self.connection.reconnect_limit)):
-      if self._host > 0:
-        delay = 0
-      else:
-        delay = self.connection.reconnect_delay
-      self._timeout = time.time() + delay
-      log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
-      if delay > 0:
-        log.warn("sleeping %s seconds" % delay)
-      self._retrying = True
-    else:
-      self.connection.error = (err,)
-
   def write_op(self, op):
     opslog.debug("SENT[%s]: %r", self.log_id, op)
     self._op_enc.write(op)
@@ -530,7 +558,6 @@
     # probably the right thing to do
 
   def do_connection_close_ok(self, close_ok):
-    self._socket.close()
     self.reset()
 
   def do_session_attached(self, atc):
@@ -585,19 +612,25 @@
 
   def dispatch(self):
     try:
-      if self._socket is None and self.connection._connected:
-        self.connect()
-      elif self._socket is not None and not self.connection._connected and not self._closing:
-        self.disconnect()
-
-      if self._connected and not self._closing:
-        for ssn in self.connection.sessions.values():
-          self.attach(ssn)
-          self.process(ssn)
+      if self._socket is None:
+        if self.connection._connected:
+          self.connect()
+      else:
+        self.engine_dispatch()
     except:
+      # XXX: Does socket get leaked if this occurs?
       msg = compat.format_exc()
       self.connection.error = (msg,)
 
+  def engine_dispatch(self):
+    if not self.connection._connected and not self._closing and self._engine_status != CLOSED:
+      self.disconnect()
+
+    if self._connected and not self._closing:
+      for ssn in self.connection.sessions.values():
+        self.attach(ssn)
+        self.process(ssn)
+
   def connect(self):
     try:
       # XXX: should make this non blocking
@@ -613,10 +646,14 @@
       self._attempts = 0
       self._host = 0
       self._retrying = False
-      self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+      self.engine_open()
     except socket.error, e:
       self._host = (self._host + 1) % len(self._hosts)
-      self._error(e, True)
+      self.close_engine(e)
+
+  def engine_open(self):
+    self._engine_status = OPEN
+    self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
 
   def disconnect(self):
     self.write_op(ConnectionClose(close_code.normal))



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org