You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/06/16 18:47:18 UTC

svn commit: r955296 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py endpoints.py

Author: rhs
Date: Wed Jun 16 16:47:18 2010
New Revision: 955296

URL: http://svn.apache.org/viewvc?rev=955296&view=rev
Log:
performance tweaks for receive: added configurable threshold for issuing credit; don't disable byte credit more than necessary; avoided n-squared loop for generating acks

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=955296&r1=955295&r2=955296&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Jun 16 16:47:18 2010
@@ -208,6 +208,7 @@ class LinkIn:
     _rcv.destination = str(rcv.id)
     sst.destinations[_rcv.destination] = _rcv
     _rcv.draining = False
+    _rcv.bytes_open = False
     _rcv.on_unlink = []
 
   def do_link(self, sst, rcv, _rcv, type, subtype, action):
@@ -762,6 +763,7 @@ class Engine:
       sst.write_op(SessionCommandPoint(sst.sent, 0))
       sst.outgoing_idx = 0
       sst.acked = []
+      sst.acked_idx = 0
       if ssn.transactional:
         sst.write_cmd(TxSelect())
       self._attachments[ssn] = sst
@@ -965,7 +967,8 @@ class Engine:
       self.process_receiver(rcv)
 
     if ssn.acked:
-      messages = [m for m in ssn.acked if m not in sst.acked]
+      messages = ssn.acked[sst.acked_idx:]
+      delta = len(messages)
       if messages:
         ids = RangedSet()
 
@@ -975,6 +978,7 @@ class Engine:
           # could we deal this via some message-id based purge?
           if m._transfer_id is None:
             ssn.acked.remove(m)
+            delta -= 1
             continue
           ids.add(m._transfer_id)
           disp = m._disposition or DEFAULT_DISPOSITION
@@ -992,6 +996,7 @@ class Engine:
           def ack_ack():
             for m in msgs:
               ssn.acked.remove(m)
+              sst.acked_idx -= 1
               if not ssn.transactional:
                 sst.acked.remove(m)
           return ack_ack
@@ -1011,7 +1016,9 @@ class Engine:
             for m in msgs:
               log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
 
+        # XXX: could add messages with _transfer_id of None
         sst.acked.extend(messages)
+        sst.acked_idx += delta
 
     if ssn.committing and not sst.committing:
       def commit_ok():
@@ -1076,11 +1083,15 @@ class Engine:
       delta = max(rcv.granted, rcv.received) - rcv.impending
 
     if delta is UNLIMITED:
-      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+      if not _rcv.bytes_open:
+        sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+        _rcv.bytes_open = True
       sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
       rcv.impending = UNLIMITED
     elif delta > 0:
-      sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+      if not _rcv.bytes_open:
+        sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+        _rcv.bytes_open = True
       sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
       rcv.impending += delta
     elif delta < 0 and not rcv.draining:
@@ -1088,6 +1099,7 @@ class Engine:
       def do_stop():
         rcv.impending = rcv.received
         _rcv.draining = False
+        _rcv.bytes_open = False
         self.grant(rcv)
       sst.write_cmd(MessageStop(_rcv.destination), do_stop)
 
@@ -1097,6 +1109,7 @@ class Engine:
         rcv.impending = rcv.received
         rcv.granted = rcv.impending
         _rcv.draining = False
+        _rcv.bytes_open = False
         rcv.draining = False
       sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
 

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=955296&r1=955295&r2=955296&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Jun 16 16:47:18 2010
@@ -29,6 +29,7 @@ Areas that still need work:
 """
 
 from logging import getLogger
+from math import ceil
 from qpid.codec010 import StringCodec
 from qpid.concurrency import synchronized, Waiter, Condition
 from qpid.datatypes import Serial, uuid4
@@ -843,6 +844,7 @@ class Receiver(object):
     self._lock = self.session._lock
     self._capacity = 0
     self._set_capacity(options.get("capacity", 0), False)
+    self.threshold = 0.5
 
   @synchronized
   def _set_capacity(self, c, wakeup=True):
@@ -931,8 +933,9 @@ class Receiver(object):
       if msg is None:
         raise Empty()
     elif self._capacity not in (0, UNLIMITED.value):
-      self.granted += 1
-      self._wakeup()
+      if self.received - self.returned <= int(ceil(self.threshold * self._capacity)):
+        self.granted = self.received + self._capacity
+        self._wakeup()
     return msg
 
   def _grant(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org