You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/22 13:52:41 UTC
svn commit: r912551 - /qpid/trunk/qpid/python/qpid/messaging/driver.py
Author: rhs
Date: Mon Feb 22 12:52:40 2010
New Revision: 912551
URL: http://svn.apache.org/viewvc?rev=912551&view=rev
Log:
start to split engine logic from transport logic
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=912551&r1=912550&r2=912551&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Mon Feb 22 12:52:40 2010
@@ -377,66 +377,26 @@
@synchronized
def readable(self):
- error = None
- recoverable = False
try:
data = self._socket.recv(64*1024)
if data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
- if self._sasl_decode:
- data = self._sasl.decode(data)
+ self.engine_write(data)
else:
- rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
- error = "connection aborted"
- recoverable = True
+ rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername())
+ self.engine_close()
except socket.error, e:
- error = e
- recoverable = True
-
- if not error:
- try:
- if len(self._hdr) < 8:
- r = 8 - len(self._hdr)
- self._hdr += data[:r]
- data = data[r:]
-
- if len(self._hdr) == 8:
- self.do_header(self._hdr)
-
- self._frame_dec.write(data)
- self._seg_dec.write(*self._frame_dec.read())
- self._op_dec.write(*self._seg_dec.read())
- for op in self._op_dec.read():
- self.assign_id(op)
- opslog.debug("RCVD[%s]: %r", self.log_id, op)
- op.dispatch(self)
- except VersionError, e:
- error = e
- except:
- msg = compat.format_exc()
- error = msg
-
- if error:
- self._error(error, recoverable)
- else:
- self.dispatch()
-
+ self.engine_close(e)
self.connection._waiter.notifyAll()
- def assign_id(self, op):
- if isinstance(op, Command):
- sst = self.get_sst(op)
- op.id = sst.received
- sst.received += 1
-
@synchronized
def writeable(self):
try:
- n = self._socket.send(self._buf)
- rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
- self._buf = self._buf[n:]
+ n = self._socket.send(self.engine_peek())
+ sent = self.engine_read(n)
+ rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
- self._error(e, True)
+ self.engine_close(e)
self.connection._waiter.notifyAll()
@synchronized
@@ -444,6 +404,55 @@
self.dispatch()
self.connection._waiter.notifyAll()
+ def engine_write(self, data):
+ try:
+ if self._sasl_decode:
+ data = self._sasl.decode(data)
+
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ opslog.debug("RCVD[%s]: %r", self.log_id, op)
+ op.dispatch(self)
+ self.dispatch()
+ except VersionError, e:
+ self._error(e, False)
+ except:
+ self._error(compat.format_exc(), False)
+
+ def engine_close(self, e=None):
+ if e is None:
+ self._error("connection aborted", True)
+ else:
+ self._error(e, True)
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
+
+ def engine_pending(self):
+ return len(self._buf)
+
+ def engine_read(self, n):
+ result = self._buf[:n]
+ self._buf = self._buf[n:]
+ return result
+
+ def engine_peek(self):
+ return self._buf
+
def _error(self, err, recoverable):
if self._socket is not None:
self._socket.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org