You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/06/17 00:15:14 UTC

svn commit: r955414 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py endpoints.py

Author: rhs
Date: Wed Jun 16 22:15:14 2010
New Revision: 955414

URL: http://svn.apache.org/viewvc?rev=955414&view=rev
Log:
don't always set the sync bit on send

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=955414&r1=955413&r2=955414&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Jun 16 22:15:14 2010
@@ -114,6 +114,7 @@ class SessionState:
     self.min_completion = self.sent
     self.max_completion = self.sent
     self.results = {}
+    self.need_sync = False
 
     # receiver state
     self.received = None
@@ -131,12 +132,12 @@ class SessionState:
     for k, v in overrides.items():
       cmd[k.replace('-', '_')] = v
 
-  def write_cmd(self, cmd, action=noop, overrides=None):
+  def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
     if overrides:
       self.apply_overrides(cmd, overrides)
 
-    if action != noop:
-      cmd.sync = True
+    if sync or action != noop:
+      cmd.sync = sync
     if self.detached:
       raise Exception("detached")
     cmd.id = self.sent
@@ -144,6 +145,7 @@ class SessionState:
     self.actions[cmd.id] = action
     self.max_completion = cmd.id
     self.write_op(cmd)
+    self.need_sync = not cmd.sync
 
   def write_cmds(self, cmds, action=noop):
     if cmds:
@@ -963,6 +965,10 @@ class Engine:
       else:
         break
 
+    for snd in ssn.senders:
+      if snd.synced >= snd.queued and sst.need_sync:
+        sst.write_cmd(ExecutionSync(), sync=True)
+
     for rcv in ssn.receivers:
       self.process_receiver(rcv)
 
@@ -1167,7 +1173,7 @@ class Engine:
       log.debug("RACK[%s]: %s", sst.session.log_id, msg)
       assert msg == m
     sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
-                                  payload=body), msg_acked)
+                                  payload=body), msg_acked, sync=msg._sync)
     log.debug("SENT[%s]: %s", sst.session.log_id, msg)
 
   def do_message_transfer(self, xfr):

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=955414&r1=955413&r2=955414&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Jun 16 22:15:14 2010
@@ -677,12 +677,20 @@ class Session:
     assert self.aborted
 
   @synchronized
+  def sync(self):
+    """
+    Sync the session.
+    """
+    for snd in self.senders:
+      snd.sync()
+    self._ewait(lambda: not self.outgoing and not self.acked)
+
+  @synchronized
   def close(self):
     """
     Close the session.
     """
-    # XXX: should be able to express this condition through API calls
-    self._ewait(lambda: not self.outgoing and not self.acked)
+    self.sync()
 
     for link in self.receivers + self.senders:
       link.close()
@@ -704,8 +712,10 @@ class Sender:
     self.target = target
     self.options = options
     self.capacity = options.get("capacity", UNLIMITED)
+    self.threshold = 0.5
     self.durable = options.get("durable")
     self.queued = Serial(0)
+    self.synced = Serial(0)
     self.acked = Serial(0)
     self.error = None
     self.linked = False
@@ -792,18 +802,25 @@ class Sender:
 
     # XXX: what if we send the same message to multiple senders?
     message._sender = self
+    if self.capacity is not UNLIMITED:
+      message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
+    else:
+      message._sync = sync
     self.session.outgoing.append(message)
     self.queued += 1
 
-    self._wakeup()
-
     if sync:
       self.sync()
       assert message not in self.session.outgoing
+    else:
+      self._wakeup()
 
   @synchronized
   def sync(self):
     mno = self.queued
+    if self.synced < mno:
+      self.synced = mno
+      self._wakeup()
     self._ewait(lambda: self.acked >= mno)
 
   @synchronized



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org