You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/06/17 00:15:14 UTC
svn commit: r955414 - in /qpid/trunk/qpid/python/qpid/messaging: driver.py
endpoints.py
Author: rhs
Date: Wed Jun 16 22:15:14 2010
New Revision: 955414
URL: http://svn.apache.org/viewvc?rev=955414&view=rev
Log:
don't always set the sync bit on send
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=955414&r1=955413&r2=955414&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Jun 16 22:15:14 2010
@@ -114,6 +114,7 @@ class SessionState:
self.min_completion = self.sent
self.max_completion = self.sent
self.results = {}
+ self.need_sync = False
# receiver state
self.received = None
@@ -131,12 +132,12 @@ class SessionState:
for k, v in overrides.items():
cmd[k.replace('-', '_')] = v
- def write_cmd(self, cmd, action=noop, overrides=None):
+ def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
if overrides:
self.apply_overrides(cmd, overrides)
- if action != noop:
- cmd.sync = True
+ if sync or action != noop:
+ cmd.sync = sync
if self.detached:
raise Exception("detached")
cmd.id = self.sent
@@ -144,6 +145,7 @@ class SessionState:
self.actions[cmd.id] = action
self.max_completion = cmd.id
self.write_op(cmd)
+ self.need_sync = not cmd.sync
def write_cmds(self, cmds, action=noop):
if cmds:
@@ -963,6 +965,10 @@ class Engine:
else:
break
+ for snd in ssn.senders:
+ if snd.synced >= snd.queued and sst.need_sync:
+ sst.write_cmd(ExecutionSync(), sync=True)
+
for rcv in ssn.receivers:
self.process_receiver(rcv)
@@ -1167,7 +1173,7 @@ class Engine:
log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body), msg_acked)
+ payload=body), msg_acked, sync=msg._sync)
log.debug("SENT[%s]: %s", sst.session.log_id, msg)
def do_message_transfer(self, xfr):
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=955414&r1=955413&r2=955414&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Jun 16 22:15:14 2010
@@ -677,12 +677,20 @@ class Session:
assert self.aborted
@synchronized
+ def sync(self):
+ """
+ Sync the session.
+ """
+ for snd in self.senders:
+ snd.sync()
+ self._ewait(lambda: not self.outgoing and not self.acked)
+
+ @synchronized
def close(self):
"""
Close the session.
"""
- # XXX: should be able to express this condition through API calls
- self._ewait(lambda: not self.outgoing and not self.acked)
+ self.sync()
for link in self.receivers + self.senders:
link.close()
@@ -704,8 +712,10 @@ class Sender:
self.target = target
self.options = options
self.capacity = options.get("capacity", UNLIMITED)
+ self.threshold = 0.5
self.durable = options.get("durable")
self.queued = Serial(0)
+ self.synced = Serial(0)
self.acked = Serial(0)
self.error = None
self.linked = False
@@ -792,18 +802,25 @@ class Sender:
# XXX: what if we send the same message to multiple senders?
message._sender = self
+ if self.capacity is not UNLIMITED:
+ message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
+ else:
+ message._sync = sync
self.session.outgoing.append(message)
self.queued += 1
- self._wakeup()
-
if sync:
self.sync()
assert message not in self.session.outgoing
+ else:
+ self._wakeup()
@synchronized
def sync(self):
mno = self.queued
+ if self.synced < mno:
+ self.synced = mno
+ self._wakeup()
self._ewait(lambda: self.acked >= mno)
@synchronized
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org