You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/24 19:25:12 UTC

svn commit: r818556 - in /qpid/trunk/qpid/python/qpid: driver.py messaging.py tests/messaging.py

Author: rhs
Date: Thu Sep 24 17:25:10 2009
New Revision: 818556

URL: http://svn.apache.org/viewvc?rev=818556&view=rev
Log:
added back exchange query on link establishment; added sender.sync()

Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Thu Sep 24 17:25:10 2009
@@ -90,6 +90,7 @@
     self.completions = {}
     self.min_completion = self.sent
     self.max_completion = self.sent
+    self.results = {}
 
     # receiver state
     self.received = None
@@ -97,6 +98,10 @@
 
     # XXX: need to periodically exchange completion/known_completion
 
+  def write_query(self, query, handler):
+    id = self.sent
+    self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
   def write_cmd(self, cmd, completion=noop):
     cmd.id = self.sent
     self.sent += 1
@@ -220,6 +225,8 @@
         log.warn("sleeping 3 seconds")
       else:
         self.connection.error = error
+    else:
+      self.dispatch()
 
     self.connection._waiter.notifyAll()
 
@@ -272,9 +279,6 @@
 
   def do_connection_open_ok(self, open_ok):
     self._connected = True
-    # XXX: maybe think about a more generic way to catchup with
-    # deferred work
-    self.dispatch()
 
   def connection_heartbeat(self, hrt):
     self.write_op(ConnectionHeartbeat())
@@ -335,6 +339,10 @@
     if sf.completed:
       sst.write_op(SessionCompleted(sst.executed))
 
+  def do_execution_result(self, er):
+    sst = self.get_sst(er)
+    sst.results[er.command_id] = er.value
+
   def dispatch(self):
     try:
       if self._socket is None and self.connection._connected and not self._opening:
@@ -408,71 +416,66 @@
     del self._attachments[ssn]
     ssn.closed = True
 
-  def _exchange_query(self, ssn, address):
-    # XXX: auto sync hack is to avoid deadlock on future
-    result = ssn.exchange_query(name=address, sync=True)
-    ssn.sync()
-    return result.get()
-
   def link_out(self, snd):
     sst = self._attachments[snd.session]
     _snd = self._attachments.get(snd)
-    if _snd is None:
+    if not snd.closing and _snd is None:
       _snd = Attachment(snd)
+      _snd.linked = False
       node, _snd._subject = parse_addr(snd.target)
-      # XXX: result = self._exchange_query(sst, node)
-#      if result.not_found:
-      if True:
-        # XXX: should check 'create' option
-        sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
-        _snd._exchange = ""
-        _snd._routing_key = node
-      else:
-        _snd._exchange = node
-        _snd._routing_key = _snd._subject
+      def do_link_out(result):
+        if result.not_found:
+          # XXX: should check 'create' option
+          sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+          _snd._exchange = ""
+          _snd._routing_key = node
+        else:
+          _snd._exchange = node
+          _snd._routing_key = _snd._subject
+        _snd.linked = True
+      sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
       self._attachments[snd] = _snd
 
-    if snd.closed:
+    if snd.closing and not snd.closed:
       del self._attachments[snd]
-      return None
-    else:
-      return _snd
+      snd.closed = True
 
   def link_in(self, rcv):
     sst = self._attachments[rcv.session]
     _rcv = self._attachments.get(rcv)
-    if _rcv is None:
+    if _rcv is None and not rcv.closing:
       _rcv = Attachment(rcv)
-      # XXX: result = self._exchange_query(sst, rcv.source)
-#      if result.not_found:
+      _rcv.linked = False
       _rcv.canceled = False
       _rcv.draining = False
-      if True:
-        _rcv._queue = rcv.source
-        # XXX: should check 'create' option
-        sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
-      else:
-        _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
-        sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
-        if rcv.filter is None:
-          f = FILTER_DEFAULTS[result.type]
+
+      def do_link_in(result):
+        if result.not_found:
+          _rcv._queue = rcv.source
+          # XXX: should check 'create' option
+          sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
         else:
-          f = rcv.filter
-        f._bind(sst, rcv.source, _rcv._queue)
-      sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
-      sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+          _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+          sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+          # XXX
+          if rcv.filter is None:
+            f = FILTER_DEFAULTS[result.type]
+          else:
+            f = rcv.filter
+          f._bind(sst, rcv.source, _rcv._queue)
+        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+        _rcv.linked = True
+      sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
       self._attachments[rcv] = _rcv
 
-    if rcv.closing:
+    if rcv.closing and not rcv.closed:
       if not _rcv.canceled:
         def close_rcv():
           del self._attachments[rcv]
           rcv.closed = True
         sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
         _rcv.canceled = True
-      return None
-    else:
-      return _rcv
 
   def process(self, ssn):
     if ssn.closing: return
@@ -482,8 +485,12 @@
     while sst.outgoing_idx < len(ssn.outgoing):
       msg = ssn.outgoing[sst.outgoing_idx]
       snd = msg._sender
-      self.send(snd, msg)
-      sst.outgoing_idx += 1
+      _snd = self._attachments.get(snd)
+      if _snd and _snd.linked:
+        self.send(snd, msg)
+        sst.outgoing_idx += 1
+      else:
+        break
 
     for rcv in ssn.receivers:
       self.process_receiver(rcv)
@@ -551,8 +558,8 @@
 
   def grant(self, rcv):
     sst = self._attachments[rcv.session]
-    _rcv = self.link_in(rcv)
-    if _rcv is None or _rcv.draining:
+    _rcv = self._attachments.get(rcv)
+    if _rcv is None or not _rcv.linked or _rcv.draining:
       return
 
     if rcv.granted is UNLIMITED:
@@ -590,7 +597,7 @@
 
   def send(self, snd, msg):
     sst = self._attachments[snd.session]
-    _snd = self.link_out(snd)
+    _snd = self._attachments[snd]
 
     # XXX: what if subject is specified for a normal queue?
     if _snd._routing_key is None:

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Thu Sep 24 17:25:10 2009
@@ -314,7 +314,7 @@
     return self.connection._ewait(predicate, timeout, exc)
 
   @synchronized
-  def sender(self, target):
+  def sender(self, target, **options):
     """
     Creates a L{Sender} that may be used to send L{Messages<Message>}
     to the specified target.
@@ -324,7 +324,7 @@
     @rtype: Sender
     @return: a new Sender for the specified target
     """
-    sender = Sender(self, len(self.senders), target)
+    sender = Sender(self, len(self.senders), target, options)
     self.senders.append(sender)
     self._wakeup()
     # XXX: because of the lack of waiting here we can end up getting
@@ -334,7 +334,7 @@
     return sender
 
   @synchronized
-  def receiver(self, source, filter=None):
+  def receiver(self, source, **options):
     """
     Creates a receiver that may be used to actively fetch or to listen
     for the arrival of L{Messages<Message>} from the specified source.
@@ -344,7 +344,7 @@
     @rtype: Receiver
     @return: a new Receiver for the specified source
     """
-    receiver = Receiver(self, len(self.receivers), source, filter,
+    receiver = Receiver(self, len(self.receivers), source, options,
                         self.started)
     self.receivers.append(receiver)
     self._wakeup()
@@ -512,13 +512,15 @@
   Sends outgoing messages.
   """
 
-  def __init__(self, session, index, target):
+  def __init__(self, session, index, target, options):
     self.session = session
     self.index = index
     self.target = target
-    self.capacity = UNLIMITED
+    self.options = options
+    self.capacity = options.get("capacity", UNLIMITED)
     self.queued = Serial(0)
     self.acked = Serial(0)
+    self.closing = False
     self.closed = False
     self._lock = self.session._lock
 
@@ -580,15 +582,19 @@
     message._sender = self
     self.session.outgoing.append(message)
     self.queued += 1
-    mno = self.queued
 
     self._wakeup()
 
     if sync:
-      self._ewait(lambda: self.acked >= mno)
+      self.sync()
       assert message not in self.session.outgoing
 
   @synchronized
+  def sync(self):
+    mno = self.queued
+    self._ewait(lambda: self.acked >= mno)
+
+  @synchronized
   def close(self):
     """
     Close the Sender.
@@ -616,15 +622,15 @@
   L{listen}.
   """
 
-  def __init__(self, session, index, source, filter, started):
+  def __init__(self, session, index, source, options, started):
     self.session = session
     self.index = index
     self.destination = str(self.index)
     self.source = source
-    self.filter = filter
+    self.options = options
 
     self.started = started
-    self.capacity = UNLIMITED
+    self.capacity = options.get("capacity", UNLIMITED)
     self.granted = Serial(0)
     self.drain = False
     self.impending = Serial(0)

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Sep 24 17:25:10 2009
@@ -611,6 +611,7 @@
       except InsufficientCapacity:
         caught = True
         break
+    self.snd.sync()
     self.drain(self.rcv, expected=msgs)
     self.ssn.acknowledge()
     assert caught, "did not exceed capacity"



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org