You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/22 13:52:41 UTC

svn commit: r912551 - /qpid/trunk/qpid/python/qpid/messaging/driver.py

Author: rhs
Date: Mon Feb 22 12:52:40 2010
New Revision: 912551

URL: http://svn.apache.org/viewvc?rev=912551&view=rev
Log:
start to split engine logic from transport logic

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=912551&r1=912550&r2=912551&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Mon Feb 22 12:52:40 2010
@@ -377,66 +377,26 @@
 
   @synchronized
   def readable(self):
-    error = None
-    recoverable = False
     try:
       data = self._socket.recv(64*1024)
       if data:
         rawlog.debug("READ[%s]: %r", self.log_id, data)
-        if self._sasl_decode:
-          data = self._sasl.decode(data)
+        self.engine_write(data)
       else:
-        rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
-        error = "connection aborted"
-        recoverable = True
+        rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername())
+        self.engine_close()
     except socket.error, e:
-      error = e
-      recoverable = True
-
-    if not error:
-      try:
-        if len(self._hdr) < 8:
-          r = 8 - len(self._hdr)
-          self._hdr += data[:r]
-          data = data[r:]
-
-          if len(self._hdr) == 8:
-            self.do_header(self._hdr)
-
-        self._frame_dec.write(data)
-        self._seg_dec.write(*self._frame_dec.read())
-        self._op_dec.write(*self._seg_dec.read())
-        for op in self._op_dec.read():
-          self.assign_id(op)
-          opslog.debug("RCVD[%s]: %r", self.log_id, op)
-          op.dispatch(self)
-      except VersionError, e:
-        error = e
-      except:
-        msg = compat.format_exc()
-        error = msg
-
-    if error:
-      self._error(error, recoverable)
-    else:
-      self.dispatch()
-
+      self.engine_close(e)
     self.connection._waiter.notifyAll()
 
-  def assign_id(self, op):
-    if isinstance(op, Command):
-      sst = self.get_sst(op)
-      op.id = sst.received
-      sst.received += 1
-
   @synchronized
   def writeable(self):
     try:
-      n = self._socket.send(self._buf)
-      rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
-      self._buf = self._buf[n:]
+      n = self._socket.send(self.engine_peek())
+      sent = self.engine_read(n)
+      rawlog.debug("SENT[%s]: %r", self.log_id, sent)
     except socket.error, e:
-      self._error(e, True)
+      self.engine_close(e)
       self.connection._waiter.notifyAll()
 
   @synchronized
@@ -444,6 +404,55 @@
     self.dispatch()
     self.connection._waiter.notifyAll()
 
+  def engine_write(self, data):
+    try:
+      if self._sasl_decode:
+        data = self._sasl.decode(data)
+
+      if len(self._hdr) < 8:
+        r = 8 - len(self._hdr)
+        self._hdr += data[:r]
+        data = data[r:]
+
+        if len(self._hdr) == 8:
+          self.do_header(self._hdr)
+
+      self._frame_dec.write(data)
+      self._seg_dec.write(*self._frame_dec.read())
+      self._op_dec.write(*self._seg_dec.read())
+      for op in self._op_dec.read():
+        self.assign_id(op)
+        opslog.debug("RCVD[%s]: %r", self.log_id, op)
+        op.dispatch(self)
+      self.dispatch()
+    except VersionError, e:
+      self._error(e, False)
+    except:
+      self._error(compat.format_exc(), False)
+
+  def engine_close(self, e=None):
+    if e is None:
+      self._error("connection aborted", True)
+    else:
+      self._error(e, True)
+
+  def assign_id(self, op):
+    if isinstance(op, Command):
+      sst = self.get_sst(op)
+      op.id = sst.received
+      sst.received += 1
+
+  def engine_pending(self):
+    return len(self._buf)
+
+  def engine_read(self, n):
+    result = self._buf[:n]
+    self._buf = self._buf[n:]
+    return result
+
+  def engine_peek(self):
+    return self._buf
+
   def _error(self, err, recoverable):
     if self._socket is not None:
       self._socket.close()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org