You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/24 19:25:12 UTC
svn commit: r818556 - in /qpid/trunk/qpid/python/qpid: driver.py
messaging.py tests/messaging.py
Author: rhs
Date: Thu Sep 24 17:25:10 2009
New Revision: 818556
URL: http://svn.apache.org/viewvc?rev=818556&view=rev
Log:
added back exchange query on link establishment; added sender.sync()
Modified:
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Thu Sep 24 17:25:10 2009
@@ -90,6 +90,7 @@
self.completions = {}
self.min_completion = self.sent
self.max_completion = self.sent
+ self.results = {}
# receiver state
self.received = None
@@ -97,6 +98,10 @@
# XXX: need to periodically exchange completion/known_completion
+ def write_query(self, query, handler):
+ id = self.sent
+ self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
def write_cmd(self, cmd, completion=noop):
cmd.id = self.sent
self.sent += 1
@@ -220,6 +225,8 @@
log.warn("sleeping 3 seconds")
else:
self.connection.error = error
+ else:
+ self.dispatch()
self.connection._waiter.notifyAll()
@@ -272,9 +279,6 @@
def do_connection_open_ok(self, open_ok):
self._connected = True
- # XXX: maybe think about a more generic way to catchup with
- # deferred work
- self.dispatch()
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -335,6 +339,10 @@
if sf.completed:
sst.write_op(SessionCompleted(sst.executed))
+ def do_execution_result(self, er):
+ sst = self.get_sst(er)
+ sst.results[er.command_id] = er.value
+
def dispatch(self):
try:
if self._socket is None and self.connection._connected and not self._opening:
@@ -408,71 +416,66 @@
del self._attachments[ssn]
ssn.closed = True
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
-
def link_out(self, snd):
sst = self._attachments[snd.session]
_snd = self._attachments.get(snd)
- if _snd is None:
+ if not snd.closing and _snd is None:
_snd = Attachment(snd)
+ _snd.linked = False
node, _snd._subject = parse_addr(snd.target)
- # XXX: result = self._exchange_query(sst, node)
-# if result.not_found:
- if True:
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
+ def do_link_out(result):
+ if result.not_found:
+ # XXX: should check 'create' option
+ sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = node
+ else:
+ _snd._exchange = node
+ _snd._routing_key = _snd._subject
+ _snd.linked = True
+ sst.write_query(ExchangeQuery(name=snd.target, sync=True), do_link_out)
self._attachments[snd] = _snd
- if snd.closed:
+ if snd.closing and not snd.closed:
del self._attachments[snd]
- return None
- else:
- return _snd
+ snd.closed = True
def link_in(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
- if _rcv is None:
+ if _rcv is None and not rcv.closing:
_rcv = Attachment(rcv)
- # XXX: result = self._exchange_query(sst, rcv.source)
-# if result.not_found:
+ _rcv.linked = False
_rcv.canceled = False
_rcv.draining = False
- if True:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
+
+ def do_link_in(result):
+ if result.not_found:
+ _rcv._queue = rcv.source
+ # XXX: should check 'create' option
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
else:
- f = rcv.filter
- f._bind(sst, rcv.source, _rcv._queue)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ # XXX
+ if rcv.filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ else:
+ f = rcv.filter
+ f._bind(sst, rcv.source, _rcv._queue)
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ _rcv.linked = True
+ sst.write_query(ExchangeQuery(name=rcv.source, sync=True), do_link_in)
self._attachments[rcv] = _rcv
- if rcv.closing:
+ if rcv.closing and not rcv.closed:
if not _rcv.canceled:
def close_rcv():
del self._attachments[rcv]
rcv.closed = True
sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
_rcv.canceled = True
- return None
- else:
- return _rcv
def process(self, ssn):
if ssn.closing: return
@@ -482,8 +485,12 @@
while sst.outgoing_idx < len(ssn.outgoing):
msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
- self.send(snd, msg)
- sst.outgoing_idx += 1
+ _snd = self._attachments.get(snd)
+ if _snd and _snd.linked:
+ self.send(snd, msg)
+ sst.outgoing_idx += 1
+ else:
+ break
for rcv in ssn.receivers:
self.process_receiver(rcv)
@@ -551,8 +558,8 @@
def grant(self, rcv):
sst = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
- if _rcv is None or _rcv.draining:
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None or not _rcv.linked or _rcv.draining:
return
if rcv.granted is UNLIMITED:
@@ -590,7 +597,7 @@
def send(self, snd, msg):
sst = self._attachments[snd.session]
- _snd = self.link_out(snd)
+ _snd = self._attachments[snd]
# XXX: what if subject is specified for a normal queue?
if _snd._routing_key is None:
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Thu Sep 24 17:25:10 2009
@@ -314,7 +314,7 @@
return self.connection._ewait(predicate, timeout, exc)
@synchronized
- def sender(self, target):
+ def sender(self, target, **options):
"""
Creates a L{Sender} that may be used to send L{Messages<Message>}
to the specified target.
@@ -324,7 +324,7 @@
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target)
+ sender = Sender(self, len(self.senders), target, options)
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -334,7 +334,7 @@
return sender
@synchronized
- def receiver(self, source, filter=None):
+ def receiver(self, source, **options):
"""
Creates a receiver that may be used to actively fetch or to listen
for the arrival of L{Messages<Message>} from the specified source.
@@ -344,7 +344,7 @@
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, filter,
+ receiver = Receiver(self, len(self.receivers), source, options,
self.started)
self.receivers.append(receiver)
self._wakeup()
@@ -512,13 +512,15 @@
Sends outgoing messages.
"""
- def __init__(self, session, index, target):
+ def __init__(self, session, index, target, options):
self.session = session
self.index = index
self.target = target
- self.capacity = UNLIMITED
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
self.queued = Serial(0)
self.acked = Serial(0)
+ self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -580,15 +582,19 @@
message._sender = self
self.session.outgoing.append(message)
self.queued += 1
- mno = self.queued
self._wakeup()
if sync:
- self._ewait(lambda: self.acked >= mno)
+ self.sync()
assert message not in self.session.outgoing
@synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
def close(self):
"""
Close the Sender.
@@ -616,15 +622,15 @@
L{listen}.
"""
- def __init__(self, session, index, source, filter, started):
+ def __init__(self, session, index, source, options, started):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
- self.filter = filter
+ self.options = options
self.started = started
- self.capacity = UNLIMITED
+ self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
self.drain = False
self.impending = Serial(0)
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=818556&r1=818555&r2=818556&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Sep 24 17:25:10 2009
@@ -611,6 +611,7 @@
except InsufficientCapacity:
caught = True
break
+ self.snd.sync()
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org