You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/26 01:02:33 UTC
svn commit: r916499 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py
endpoints.py
Author: rhs
Date: Fri Feb 26 00:02:33 2010
New Revision: 916499
URL: http://svn.apache.org/viewvc?rev=916499&view=rev
Log:
split engine into a separate class
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=916499&r1=916498&r2=916499&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Feb 26 00:02:33 2010
@@ -295,9 +295,6 @@
self.log_id = "%x" % id(self.connection)
self._lock = self.connection._lock
- self._in = LinkIn()
- self._out = LinkOut()
-
self._selector = Selector.default()
self._attempts = 0
self._hosts = [(self.connection.host, self.connection.port)] + \
@@ -306,56 +303,9 @@
self._retrying = False
self._socket = None
- self.reset()
-
- def reset(self):
- self._closing = False
- self._connected = False
- self._attachments = {}
-
- self._channel_max = 65536
- self._channels = 0
- self._sessions = {}
-
- options = self.connection.options
-
- self.address_cache = Cache(options.get("address_ttl", 60))
-
- self._engine_status = CLOSED
- self._buf = ""
- self._hdr = ""
- self._op_enc = OpEncoder()
- self._seg_enc = SegmentEncoder()
- self._frame_enc = FrameEncoder()
- self._frame_dec = FrameDecoder()
- self._seg_dec = SegmentDecoder()
- self._op_dec = OpDecoder()
self._timeout = None
- self._sasl = sasl.Client()
- if self.connection.username:
- self._sasl.setAttr("username", self.connection.username)
- if self.connection.password:
- self._sasl.setAttr("password", self.connection.password)
- if self.connection.host:
- self._sasl.setAttr("host", self.connection.host)
- self._sasl.setAttr("service", options.get("service", "qpidd"))
- if "min_ssf" in options:
- self._sasl.setAttr("minssf", options["min_ssf"])
- if "max_ssf" in options:
- self._sasl.setAttr("maxssf", options["max_ssf"])
- self._sasl.init()
- self._sasl_encode = False
- self._sasl_decode = False
-
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for snd in ssn.senders:
- snd.linked = False
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.linked = False
+ self.engine = None
@synchronized
def wakeup(self):
@@ -374,7 +324,7 @@
@synchronized
def writing(self):
- return self._socket is not None and self._buf
+ return self._socket is not None and self.engine.pending()
@synchronized
def timing(self):
@@ -386,7 +336,7 @@
data = self._socket.recv(64*1024)
if data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
- self.engine_write(data)
+ self.engine.write(data)
else:
self.close_engine()
except socket.error, e:
@@ -413,18 +363,19 @@
if delay > 0:
log.warn("sleeping %s seconds" % delay)
self._retrying = True
- self.engine_close()
+ self.engine.close()
else:
- self.engine_close(e)
+ self.engine.close(e)
def update_status(self):
- status = self.engine_status()
+ status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
def st_closed(self):
rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
self._socket.close()
self._socket = None
+ self.engine = None
return True
def st_open(self):
@@ -434,8 +385,8 @@
def writeable(self):
notify = False
try:
- n = self._socket.send(self.engine_peek())
- sent = self.engine_read(n)
+ n = self._socket.send(self.engine.peek())
+ sent = self.engine.read(n)
rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
self.close_engine(e)
@@ -449,10 +400,102 @@
self.dispatch()
self.connection._waiter.notifyAll()
- def engine_status(self):
- return self._engine_status
+ def dispatch(self):
+ try:
+ if self._socket is None:
+ if self.connection._connected:
+ self.connect()
+ else:
+ self.engine.dispatch()
+ except:
+ # XXX: Does socket get leaked if this occurs?
+ msg = compat.format_exc()
+ self.connection.error = (msg,)
- def engine_write(self, data):
+ def connect(self):
+ try:
+ # XXX: should make this non blocking
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
+ self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ self.engine = Engine(self.connection)
+ self.engine.open()
+ except socket.error, e:
+ self._host = (self._host + 1) % len(self._hosts)
+ self.close_engine(e)
+
+class Engine:
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.log_id = "%x" % id(self.connection)
+ self._closing = False
+ self._connected = False
+ self._attachments = {}
+
+ self._in = LinkIn()
+ self._out = LinkOut()
+
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ options = self.connection.options
+
+ self.address_cache = Cache(options.get("address_ttl", 60))
+
+ self._status = CLOSED
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+
+ self._sasl = sasl.Client()
+ if self.connection.username:
+ self._sasl.setAttr("username", self.connection.username)
+ if self.connection.password:
+ self._sasl.setAttr("password", self.connection.password)
+ if self.connection.host:
+ self._sasl.setAttr("host", self.connection.host)
+ self._sasl.setAttr("service", options.get("service", "qpidd"))
+ if "min_ssf" in options:
+ self._sasl.setAttr("minssf", options["min_ssf"])
+ if "max_ssf" in options:
+ self._sasl.setAttr("maxssf", options["max_ssf"])
+ self._sasl.init()
+ self._sasl_encode = False
+ self._sasl_decode = False
+
+ def _reset(self):
+ self.connection._transport_connected = False
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.linked = False
+
+ def status(self):
+ return self._status
+
+ def write(self, data):
try:
if self._sasl_decode:
data = self._sasl.decode(data)
@@ -472,16 +515,17 @@
self.assign_id(op)
opslog.debug("RCVD[%s]: %r", self.log_id, op)
op.dispatch(self)
- self.engine_dispatch()
+ self.dispatch()
except VersionError, e:
- self.engine_close(e)
+ self.close(e)
except:
- self.engine_close(compat.format_exc())
+ self.close(compat.format_exc())
- def engine_close(self, e=None):
- self.reset()
+ def close(self, e=None):
+ self._reset()
if e:
self.connection.error = (e,)
+ self._status = CLOSED
def assign_id(self, op):
if isinstance(op, Command):
@@ -489,15 +533,15 @@
op.id = sst.received
sst.received += 1
- def engine_pending(self):
+ def pending(self):
return len(self._buf)
- def engine_read(self, n):
+ def read(self, n):
result = self._buf[:n]
self._buf = self._buf[n:]
return result
- def engine_peek(self):
+ def peek(self):
return self._buf
def write_op(self, op):
@@ -543,6 +587,7 @@
def do_connection_open_ok(self, open_ok):
self._connected = True
self._sasl_decode = True
+ self.connection._transport_connected = True
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -558,7 +603,7 @@
# probably the right thing to do
def do_connection_close_ok(self, close_ok):
- self.reset()
+ self.close()
def do_session_attached(self, atc):
pass
@@ -611,19 +656,7 @@
sst.session.error = (ex,)
def dispatch(self):
- try:
- if self._socket is None:
- if self.connection._connected:
- self.connect()
- else:
- self.engine_dispatch()
- except:
- # XXX: Does socket get leaked if this occurs?
- msg = compat.format_exc()
- self.connection.error = (msg,)
-
- def engine_dispatch(self):
- if not self.connection._connected and not self._closing and self._engine_status != CLOSED:
+ if not self.connection._connected and not self._closing and self._status != CLOSED:
self.disconnect()
if self._connected and not self._closing:
@@ -631,28 +664,9 @@
self.attach(ssn)
self.process(ssn)
- def connect(self):
- try:
- # XXX: should make this non blocking
- if self._host == 0:
- self._attempts += 1
- host, port = self._hosts[self._host]
- if self._retrying:
- log.warn("trying: %s:%s", host, port)
- self._socket = connect(host, port)
- if self._retrying:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
- self._attempts = 0
- self._host = 0
- self._retrying = False
- self.engine_open()
- except socket.error, e:
- self._host = (self._host + 1) % len(self._hosts)
- self.close_engine(e)
-
- def engine_open(self):
- self._engine_status = OPEN
+ def open(self):
+ self._reset()
+ self._status = OPEN
self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
def disconnect(self):
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=916499&r1=916498&r2=916499&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Fri Feb 26 00:02:33 2010
@@ -94,6 +94,7 @@
self.session_counter = 0
self.sessions = {}
self._connected = False
+ self._transport_connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
self._waiter = Waiter(self._condition)
@@ -157,7 +158,7 @@
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
def _unlinked(self):
@@ -173,7 +174,7 @@
"""
self._connected = False
self._wakeup()
- self._ewait(lambda: not self._driver._connected)
+ self._ewait(lambda: not self._transport_connected)
@synchronized
def connected(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org