You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/26 01:02:33 UTC

svn commit: r916499 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py endpoints.py

Author: rhs
Date: Fri Feb 26 00:02:33 2010
New Revision: 916499

URL: http://svn.apache.org/viewvc?rev=916499&view=rev
Log:
split engine into a separate class

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=916499&r1=916498&r2=916499&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Feb 26 00:02:33 2010
@@ -295,9 +295,6 @@
     self.log_id = "%x" % id(self.connection)
     self._lock = self.connection._lock
 
-    self._in = LinkIn()
-    self._out = LinkOut()
-
     self._selector = Selector.default()
     self._attempts = 0
     self._hosts = [(self.connection.host, self.connection.port)] + \
@@ -306,56 +303,9 @@
     self._retrying = False
     self._socket = None
 
-    self.reset()
-
-  def reset(self):
-    self._closing = False
-    self._connected = False
-    self._attachments = {}
-
-    self._channel_max = 65536
-    self._channels = 0
-    self._sessions = {}
-
-    options = self.connection.options
-
-    self.address_cache = Cache(options.get("address_ttl", 60))
-
-    self._engine_status = CLOSED
-    self._buf = ""
-    self._hdr = ""
-    self._op_enc = OpEncoder()
-    self._seg_enc = SegmentEncoder()
-    self._frame_enc = FrameEncoder()
-    self._frame_dec = FrameDecoder()
-    self._seg_dec = SegmentDecoder()
-    self._op_dec = OpDecoder()
     self._timeout = None
 
-    self._sasl = sasl.Client()
-    if self.connection.username:
-      self._sasl.setAttr("username", self.connection.username)
-    if self.connection.password:
-      self._sasl.setAttr("password", self.connection.password)
-    if self.connection.host:
-      self._sasl.setAttr("host", self.connection.host)
-    self._sasl.setAttr("service", options.get("service", "qpidd"))
-    if "min_ssf" in options:
-      self._sasl.setAttr("minssf", options["min_ssf"])
-    if "max_ssf" in options:
-      self._sasl.setAttr("maxssf", options["max_ssf"])
-    self._sasl.init()
-    self._sasl_encode = False
-    self._sasl_decode = False
-
-    for ssn in self.connection.sessions.values():
-      for m in ssn.acked + ssn.unacked + ssn.incoming:
-        m._transfer_id = None
-      for snd in ssn.senders:
-        snd.linked = False
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-        rcv.linked = False
+    self.engine = None
 
   @synchronized
   def wakeup(self):
@@ -374,7 +324,7 @@
 
   @synchronized
   def writing(self):
-    return self._socket is not None and self._buf
+    return self._socket is not None and self.engine.pending()
 
   @synchronized
   def timing(self):
@@ -386,7 +336,7 @@
       data = self._socket.recv(64*1024)
       if data:
         rawlog.debug("READ[%s]: %r", self.log_id, data)
-        self.engine_write(data)
+        self.engine.write(data)
       else:
         self.close_engine()
     except socket.error, e:
@@ -413,18 +363,19 @@
       if delay > 0:
         log.warn("sleeping %s seconds" % delay)
       self._retrying = True
-      self.engine_close()
+      self.engine.close()
     else:
-      self.engine_close(e)
+      self.engine.close(e)
 
   def update_status(self):
-    status = self.engine_status()
+    status = self.engine.status()
     return getattr(self, "st_%s" % status.lower())()
 
   def st_closed(self):
     rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
     self._socket.close()
     self._socket = None
+    self.engine = None
     return True
 
   def st_open(self):
@@ -434,8 +385,8 @@
   def writeable(self):
     notify = False
     try:
-      n = self._socket.send(self.engine_peek())
-      sent = self.engine_read(n)
+      n = self._socket.send(self.engine.peek())
+      sent = self.engine.read(n)
       rawlog.debug("SENT[%s]: %r", self.log_id, sent)
     except socket.error, e:
       self.close_engine(e)
@@ -449,10 +400,102 @@
     self.dispatch()
     self.connection._waiter.notifyAll()
 
-  def engine_status(self):
-    return self._engine_status
+  def dispatch(self):
+    try:
+      if self._socket is None:
+        if self.connection._connected:
+          self.connect()
+      else:
+        self.engine.dispatch()
+    except:
+      # XXX: Does socket get leaked if this occurs?
+      msg = compat.format_exc()
+      self.connection.error = (msg,)
 
-  def engine_write(self, data):
+  def connect(self):
+    try:
+      # XXX: should make this non blocking
+      if self._host == 0:
+        self._attempts += 1
+      host, port = self._hosts[self._host]
+      if self._retrying:
+        log.warn("trying: %s:%s", host, port)
+      rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+      self._socket = connect(host, port)
+      if self._retrying:
+        log.warn("reconnect succeeded: %s:%s", host, port)
+      self._timeout = None
+      self._attempts = 0
+      self._host = 0
+      self._retrying = False
+      self.engine = Engine(self.connection)
+      self.engine.open()
+    except socket.error, e:
+      self._host = (self._host + 1) % len(self._hosts)
+      self.close_engine(e)
+
+class Engine:
+
+  def __init__(self, connection):
+    self.connection = connection
+    self.log_id = "%x" % id(self.connection)
+    self._closing = False
+    self._connected = False
+    self._attachments = {}
+
+    self._in = LinkIn()
+    self._out = LinkOut()
+
+    self._channel_max = 65536
+    self._channels = 0
+    self._sessions = {}
+
+    options = self.connection.options
+
+    self.address_cache = Cache(options.get("address_ttl", 60))
+
+    self._status = CLOSED
+    self._buf = ""
+    self._hdr = ""
+    self._op_enc = OpEncoder()
+    self._seg_enc = SegmentEncoder()
+    self._frame_enc = FrameEncoder()
+    self._frame_dec = FrameDecoder()
+    self._seg_dec = SegmentDecoder()
+    self._op_dec = OpDecoder()
+
+    self._sasl = sasl.Client()
+    if self.connection.username:
+      self._sasl.setAttr("username", self.connection.username)
+    if self.connection.password:
+      self._sasl.setAttr("password", self.connection.password)
+    if self.connection.host:
+      self._sasl.setAttr("host", self.connection.host)
+    self._sasl.setAttr("service", options.get("service", "qpidd"))
+    if "min_ssf" in options:
+      self._sasl.setAttr("minssf", options["min_ssf"])
+    if "max_ssf" in options:
+      self._sasl.setAttr("maxssf", options["max_ssf"])
+    self._sasl.init()
+    self._sasl_encode = False
+    self._sasl_decode = False
+
+  def _reset(self):
+    self.connection._transport_connected = False
+
+    for ssn in self.connection.sessions.values():
+      for m in ssn.acked + ssn.unacked + ssn.incoming:
+        m._transfer_id = None
+      for snd in ssn.senders:
+        snd.linked = False
+      for rcv in ssn.receivers:
+        rcv.impending = rcv.received
+        rcv.linked = False
+
+  def status(self):
+    return self._status
+
+  def write(self, data):
     try:
       if self._sasl_decode:
         data = self._sasl.decode(data)
@@ -472,16 +515,17 @@
         self.assign_id(op)
         opslog.debug("RCVD[%s]: %r", self.log_id, op)
         op.dispatch(self)
-      self.engine_dispatch()
+      self.dispatch()
     except VersionError, e:
-      self.engine_close(e)
+      self.close(e)
     except:
-      self.engine_close(compat.format_exc())
+      self.close(compat.format_exc())
 
-  def engine_close(self, e=None):
-    self.reset()
+  def close(self, e=None):
+    self._reset()
     if e:
       self.connection.error = (e,)
+    self._status = CLOSED
 
   def assign_id(self, op):
     if isinstance(op, Command):
@@ -489,15 +533,15 @@
       op.id = sst.received
       sst.received += 1
 
-  def engine_pending(self):
+  def pending(self):
     return len(self._buf)
 
-  def engine_read(self, n):
+  def read(self, n):
     result = self._buf[:n]
     self._buf = self._buf[n:]
     return result
 
-  def engine_peek(self):
+  def peek(self):
     return self._buf
 
   def write_op(self, op):
@@ -543,6 +587,7 @@
   def do_connection_open_ok(self, open_ok):
     self._connected = True
     self._sasl_decode = True
+    self.connection._transport_connected = True
 
   def connection_heartbeat(self, hrt):
     self.write_op(ConnectionHeartbeat())
@@ -558,7 +603,7 @@
     # probably the right thing to do
 
   def do_connection_close_ok(self, close_ok):
-    self.reset()
+    self.close()
 
   def do_session_attached(self, atc):
     pass
@@ -611,19 +656,7 @@
     sst.session.error = (ex,)
 
   def dispatch(self):
-    try:
-      if self._socket is None:
-        if self.connection._connected:
-          self.connect()
-      else:
-        self.engine_dispatch()
-    except:
-      # XXX: Does socket get leaked if this occurs?
-      msg = compat.format_exc()
-      self.connection.error = (msg,)
-
-  def engine_dispatch(self):
-    if not self.connection._connected and not self._closing and self._engine_status != CLOSED:
+    if not self.connection._connected and not self._closing and self._status != CLOSED:
       self.disconnect()
 
     if self._connected and not self._closing:
@@ -631,28 +664,9 @@
         self.attach(ssn)
         self.process(ssn)
 
-  def connect(self):
-    try:
-      # XXX: should make this non blocking
-      if self._host == 0:
-        self._attempts += 1
-      host, port = self._hosts[self._host]
-      if self._retrying:
-        log.warn("trying: %s:%s", host, port)
-      self._socket = connect(host, port)
-      if self._retrying:
-        log.warn("reconnect succeeded: %s:%s", host, port)
-      self._timeout = None
-      self._attempts = 0
-      self._host = 0
-      self._retrying = False
-      self.engine_open()
-    except socket.error, e:
-      self._host = (self._host + 1) % len(self._hosts)
-      self.close_engine(e)
-
-  def engine_open(self):
-    self._engine_status = OPEN
+  def open(self):
+    self._reset()
+    self._status = OPEN
     self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
 
   def disconnect(self):

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=916499&r1=916498&r2=916499&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Fri Feb 26 00:02:33 2010
@@ -94,6 +94,7 @@
     self.session_counter = 0
     self.sessions = {}
     self._connected = False
+    self._transport_connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
     self._waiter = Waiter(self._condition)
@@ -157,7 +158,7 @@
     """
     self._connected = True
     self._wakeup()
-    self._ewait(lambda: self._driver._connected and not self._unlinked(),
+    self._ewait(lambda: self._transport_connected and not self._unlinked(),
                 exc=ConnectError)
 
   def _unlinked(self):
@@ -173,7 +174,7 @@
     """
     self._connected = False
     self._wakeup()
-    self._ewait(lambda: not self._driver._connected)
+    self._ewait(lambda: not self._transport_connected)
 
   @synchronized
   def connected(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org