You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 06:00:23 UTC

svn commit: r910823 - /qpid/trunk/qpid/python/qpid/driver.py

Author: rhs
Date: Wed Feb 17 05:00:22 2010
New Revision: 910823

URL: http://svn.apache.org/viewvc?rev=910823&view=rev
Log:
combined duplicate logic between link_in/link_out

Modified:
    qpid/trunk/qpid/python/qpid/driver.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=910823&r1=910822&r2=910823&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Feb 17 05:00:22 2010
@@ -132,6 +132,73 @@
     op.channel = self.channel
     self.driver.write_op(op)
 
+class LinkIn:
+
+  ADDR_NAME = "source"
+  DIR_NAME = "receiver"
+
+  def init_link(self, sst, rcv, _rcv):
+    _rcv.destination = str(rcv.id)
+    sst.destinations[_rcv.destination] = _rcv
+    _rcv.closing = False
+    _rcv.draining = False
+
+  def do_link(self, sst, rcv, _rcv, type, subtype):
+    if type == "topic":
+      _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
+      sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+      filter = _rcv.options.get("filter")
+      if _rcv.subject is None and filter is None:
+        f = FILTER_DEFAULTS[subtype]
+      elif _rcv.subject and filter:
+        # XXX
+        raise Exception("can't supply both subject and filter")
+      elif _rcv.subject:
+        # XXX
+        from messaging import Pattern
+        f = Pattern(_rcv.subject)
+      else:
+        f = filter
+      f._bind(sst, _rcv.name, _rcv._queue)
+    elif type == "queue":
+      _rcv._queue = _rcv.name
+
+    def done():
+      rcv.linked = True
+
+    sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+    sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done)
+
+  def do_unlink(self, sst, rcv, _rcv, action=noop):
+    sst.write_cmd(MessageCancel(_rcv.destination), action)
+
+  def del_link(self, sst, rcv, _rcv):
+    del sst.destinations[_rcv.destination]
+
+class LinkOut:
+
+  ADDR_NAME = "target"
+  DIR_NAME = "sender"
+
+  def init_link(self, sst, snd, _snd):
+    _snd.closing = False
+
+  def do_link(self, sst, snd, _snd, type, subtype):
+    if type == "topic":
+      _snd._exchange = _snd.name
+      _snd._routing_key = _snd.subject
+    elif type == "queue":
+      _snd._exchange = ""
+      _snd._routing_key = _snd.name
+
+    snd.linked = True
+
+  def do_unlink(self, sst, snd, _snd, action=noop):
+    action()
+
+  def del_link(self, sst, snd, _snd):
+    pass
+
 # XXX
 HEADER="!4s4B"
 
@@ -148,6 +215,9 @@
     self.log_id = "%x" % id(self.connection)
     self._lock = self.connection._lock
 
+    self._in = LinkIn()
+    self._out = LinkOut()
+
     self._selector = Selector.default()
     self._attempts = 0
     self._hosts = [(self.connection.host, self.connection.port)] + \
@@ -486,9 +556,9 @@
       self._sessions[sst.channel] = sst
 
     for snd in ssn.senders:
-      self.link_out(snd)
+      self.link(snd, self._out, snd.target)
     for rcv in ssn.receivers:
-      self.link_in(rcv)
+      self.link(rcv, self._in, rcv.source)
 
     if sst is not None and ssn.closing and not sst.detached:
       sst.detached = True
@@ -508,135 +578,54 @@
     sst.write_op(SessionDetached(name=dtc.name))
     self.do_session_detached(dtc)
 
-  def link_out(self, snd):
-    sst = self._attachments.get(snd.session)
-    _snd = self._attachments.get(snd)
-    if _snd is None and not snd.closing and not snd.closed:
-      _snd = Attachment(snd)
-      _snd.closing = False
-
-      if snd.target is None:
-        snd.error = ("target is None",)
-        snd.closed = True
-        return
+  def link(self, lnk, dir, addr):
+    sst = self._attachments.get(lnk.session)
+    _lnk = self._attachments.get(lnk)
+
+    if _lnk is None and not lnk.closing and not lnk.closed:
+      _lnk = Attachment(lnk)
+      dir.init_link(sst, lnk, _lnk)
 
-      try:
-        _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
-      except address.LexError, e:
-        snd.error = (e,)
-        snd.closed = True
-        return
-      except address.ParseError, e:
-        snd.error = (e,)
-        snd.closed = True
-        return
-
-      # XXX: subject
-      if _snd.options is None:
-        _snd.options = {}
-
-      if not self.validate_options(_snd):
+      err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+      if err:
+        lnk.error = (err,)
+        lnk.closed = True
         return
 
-      def do_link(type, subtype):
-        if type == "topic":
-          _snd._exchange = _snd.name
-          _snd._routing_key = _snd.subject
-        elif type == "queue":
-          _snd._exchange = ""
-          _snd._routing_key = _snd.name
-
-        snd.linked = True
-
-      self.resolve_declare(sst, _snd, "sender", do_link)
-      self._attachments[snd] = _snd
-
-    if snd.linked and snd.closing and not (snd.closed or _snd.closing):
-      _snd.closing = True
-      def do_unlink():
-        del self._attachments[snd]
-        snd.closed = True
-      if _snd.options.get("delete") in ("always", "sender"):
-        self.delete(sst, _snd.name, do_unlink)
-      else:
-        do_unlink()
-    elif not snd.linked and snd.closing and not snd.closed:
-      snd.closed = True
+      def resolved(type, subtype):
+        dir.do_link(sst, lnk, _lnk, type, subtype)
 
-  def link_in(self, rcv):
-    sst = self._attachments.get(rcv.session)
-    _rcv = self._attachments.get(rcv)
-    if _rcv is None and not rcv.closing and not rcv.closed:
-      _rcv = Attachment(rcv)
-      _rcv.destination = str(rcv.id)
-      sst.destinations[_rcv.destination] = _rcv
-      _rcv.canceled = False
-      _rcv.draining = False
-
-      if rcv.source is None:
-        rcv.error = ("source is None",)
-        rcv.closed = True
-        return
+      self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
+      self._attachments[lnk] = _lnk
 
+    if lnk.linked and lnk.closing and not lnk.closed:
+      if not _lnk.closing:
+        def done():
+          dir.del_link(sst, lnk, _lnk)
+          del self._attachments[lnk]
+          lnk.closed = True
+        if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
+          dir.do_unlink(sst, lnk, _lnk)
+          self.delete(sst, _lnk.name, done)
+        else:
+          dir.do_unlink(sst, lnk, _lnk, done)
+        _lnk.closing = True
+    elif not lnk.linked and lnk.closing and not lnk.closed:
+      lnk.closed = True
+
+  def parse_address(self, lnk, dir, addr):
+    if addr is None:
+      return "%s is None" % dir.ADDR_NAME
+    else:
       try:
-        _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+        lnk.name, lnk.subject, lnk.options = address.parse(addr)
+        # XXX: subject
+        if lnk.options is None:
+          lnk.options = {}
       except address.LexError, e:
-        rcv.error = (e,)
-        rcv.closed = True
-        return
+        return e
       except address.ParseError, e:
-        rcv.error = (e,)
-        rcv.closed = True
-        return
-
-      # XXX: subject
-      if _rcv.options is None:
-        _rcv.options = {}
-
-      if not self.validate_options(_rcv):
-        return
-
-      def do_link(type, subtype):
-        if type == "topic":
-          _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
-          sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
-          filter = _rcv.options.get("filter")
-          if _rcv.subject is None and filter is None:
-            f = FILTER_DEFAULTS[subtype]
-          elif _rcv.subject and filter:
-            # XXX
-            raise Exception("can't supply both subject and filter")
-          elif _rcv.subject:
-            # XXX
-            from messaging import Pattern
-            f = Pattern(_rcv.subject)
-          else:
-            f = filter
-          f._bind(sst, _rcv.name, _rcv._queue)
-        elif type == "queue":
-          _rcv._queue = _rcv.name
-
-        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
-        sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit))
-        rcv.linked = True
-
-      self.resolve_declare(sst, _rcv, "receiver", do_link)
-      self._attachments[rcv] = _rcv
-
-    if rcv.linked and rcv.closing and not rcv.closed:
-      if not _rcv.canceled:
-        def do_unlink():
-          del self._attachments[rcv]
-          del sst.destinations[_rcv.destination]
-          rcv.closed = True
-        if _rcv.options.get("delete") in ("always", "receiver"):
-          sst.write_cmd(MessageCancel(_rcv.destination))
-          self.delete(sst, _rcv.name, do_unlink)
-        else:
-          sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
-        _rcv.canceled = True
-    elif not rcv.linked and rcv.closing and not rcv.closed:
-      rcv.closed = True
+        return e
 
   POLICIES = Values("always", "sender", "receiver", "never")
 
@@ -665,12 +654,7 @@
   def validate_options(self, lnk):
     ctx = Context()
     err = Driver.OPTS.validate(lnk.options, ctx)
-    if err:
-      lnk.target.error = ("error in options: %s" % err,)
-      lnk.target.closed = True
-      return False
-    else:
-      return True
+    if err: return "error in options: %s" % err
 
   def resolve_declare(self, sst, lnk, dir, action):
     def do_resolved(er, qr):
@@ -848,7 +832,7 @@
   def grant(self, rcv):
     sst = self._attachments[rcv.session]
     _rcv = self._attachments.get(rcv)
-    if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
+    if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
       return
 
     if rcv.granted is UNLIMITED:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org