You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/16 04:48:45 UTC
svn commit: r910388 - in /qpid/trunk/qpid/python: examples/api/drain
examples/api/server examples/api/spout qpid/driver.py qpid/messaging.py
qpid/tests/messaging.py
Author: rhs
Date: Tue Feb 16 03:48:44 2010
New Revision: 910388
URL: http://svn.apache.org/viewvc?rev=910388&view=rev
Log:
changed sender/receiver to be synchronous by default when invoked on a connected session
Modified:
qpid/trunk/qpid/python/examples/api/drain
qpid/trunk/qpid/python/examples/api/server
qpid/trunk/qpid/python/examples/api/spout
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Tue Feb 16 03:48:44 2010
@@ -93,9 +93,8 @@
ssn.acknowledge()
except Empty:
break
- except ReceiveError, e:
- print e
- break
+except ReceiveError, e:
+ print e
except KeyboardInterrupt:
pass
Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Tue Feb 16 03:48:44 2010
@@ -51,15 +51,12 @@
parser.error("address is required")
# XXX: should make URL default the port for us
-conn = Connection.open(url.host, url.port or AMQP_PORT,
- username=url.user,
- password=url.password,
- reconnect=opts.reconnect,
- reconnect_delay=opts.reconnect_delay,
- reconnect_limit=opts.reconnect_limit)
-ssn = conn.session()
-rcv = ssn.receiver(addr)
-
+conn = Connection(url.host, url.port or AMQP_PORT,
+ username=url.user,
+ password=url.password,
+ reconnect=opts.reconnect,
+ reconnect_delay=opts.reconnect_delay,
+ reconnect_limit=opts.reconnect_limit)
def dispatch(msg):
msg_type = msg.properties.get("type")
if msg_type == "shell":
@@ -77,21 +74,26 @@
result = Message("unrecognized message type: %s" % msg_type)
return result
-while True:
- try:
+try:
+ conn.connect()
+ ssn = conn.session()
+ rcv = ssn.receiver(addr)
+
+ while True:
msg = rcv.fetch()
response = dispatch(msg)
- snd = ssn.sender(msg.reply_to)
+ snd = None
try:
+ snd = ssn.sender(msg.reply_to)
snd.send(response)
except SendError, e:
print e
- snd.close()
+ if snd is not None:
+ snd.close()
ssn.acknowledge()
- except Empty:
- break
- except ReceiveError, e:
- print e
- break
+except ReceiveError, e:
+ print e
+except KeyboardInterrupt:
+ pass
conn.close()
Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Tue Feb 16 03:48:44 2010
@@ -113,13 +113,11 @@
name, val = nameval(p)
msg.properties[name] = val
- try:
- snd.send(msg)
- count += 1
- print msg
- except SendError, e:
- print e
- break
+ snd.send(msg)
+ count += 1
+ print msg
+except SendError, e:
+ print e
except KeyboardInterrupt:
pass
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Tue Feb 16 03:48:44 2010
@@ -560,6 +560,8 @@
self.delete(sst, _snd.name, do_unlink)
else:
do_unlink()
+ elif not snd.linked and snd.closing and not snd.closed:
+ snd.closed = True
def link_in(self, rcv):
sst = self._attachments.get(rcv.session)
@@ -633,6 +635,8 @@
else:
sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
_rcv.canceled = True
+ elif not rcv.linked and rcv.closing and not rcv.closed:
+ rcv.closed = True
POLICIES = Values("always", "sender", "receiver", "never")
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Tue Feb 16 03:48:44 2010
@@ -181,7 +181,14 @@
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected, exc=ConnectError)
+ self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ exc=ConnectError)
+
+ def _unlinked(self):
+ return [l
+ for ssn in self.sessions.values()
+ for l in ssn.senders + ssn.receivers
+ if not (l.linked or l.error or l.closed)]
@synchronized
def disconnect(self):
@@ -484,11 +491,13 @@
sender = Sender(self, self.next_sender_id, target, options)
self.next_sender_id += 1
self.senders.append(sender)
- self._wakeup()
- # XXX: because of the lack of waiting here we can end up getting
- # into the driver loop with messages sent for senders that haven't
- # been linked yet, something similar can probably happen for
- # receivers
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ sender._ewait(lambda: sender.linked)
+ except SendError, e:
+ sender.close()
+ raise e
return sender
@synchronized
@@ -505,7 +514,13 @@
receiver = Receiver(self, self.next_receiver_id, source, options)
self.next_receiver_id += 1
self.receivers.append(receiver)
- self._wakeup()
+ if not self.closed and self.connection._connected:
+ self._wakeup()
+ try:
+ receiver._ewait(lambda: receiver.linked)
+ except ReceiveError, e:
+ receiver.close()
+ raise e
return receiver
@synchronized
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Feb 16 03:48:44 2010
@@ -214,6 +214,15 @@
self.ssn.acknowledge(msg)
snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
+ def testDisconnectedReceiver(self):
+ self.conn.disconnect()
+ rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
+ m = self.content("testDisconnectedReceiver")
+ self.conn.connect()
+ snd = self.ssn.sender("test-dis-rcv-queue")
+ snd.send(m)
+ self.drain(rcv, expected=[m])
+
def testNextReceiver(self):
ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
@@ -581,16 +590,14 @@
return self.conn.session()
def badOption(self, options, error):
- snd = self.ssn.sender("test-bad-options-snd; %s" % options)
try:
- snd.send("ping")
+ self.ssn.sender("test-bad-options-snd; %s" % options)
assert False
except SendError, e:
assert "error in options: %s" % error == str(e), e
- rcv = self.ssn.receiver("test-bad-options-rcv; %s" % options)
try:
- rcv.fetch(timeout=0)
+ self.ssn.receiver("test-bad-options-rcv; %s" % options)
assert False
except ReceiveError, e:
assert "error in options: %s" % error == str(e), e
@@ -673,9 +680,8 @@
snd = self.ssn.sender("test-delete; {delete: always}")
snd.send("ping")
snd.close()
- snd = self.ssn.sender("test-delete")
try:
- snd.send("ping")
+ self.ssn.sender("test-delete")
except SendError, e:
assert "no such queue" in str(e)
@@ -689,7 +695,8 @@
try:
self.ssn.receiver("test-delete")
- except SendError, e:
+ assert False
+ except ReceiveError, e:
assert "no such queue" in str(e)
def testDeleteSpecial(self):
@@ -789,59 +796,55 @@
def setup_session(self):
return self.conn.session()
- def sendErrorTest(self, addr, exc, check=lambda e: True):
- snd = self.ssn.sender(addr, durable=self.durable())
+ def senderErrorTest(self, addr, exc, check=lambda e: True):
try:
- snd.send("hello")
- assert False, "send succeeded"
+ self.ssn.sender(addr, durable=self.durable())
+ assert False, "sender creation succeeded"
except exc, e:
assert check(e), "unexpected error: %s" % compat.format_exc(e)
- snd.close()
- def fetchErrorTest(self, addr, exc, check=lambda e: True):
- rcv = self.ssn.receiver(addr)
+ def receiverErrorTest(self, addr, exc, check=lambda e: True):
try:
- rcv.fetch(timeout=0)
- assert False, "fetch succeeded"
+ self.ssn.receiver(addr)
+ assert False, "receiver creation succeeded"
except exc, e:
assert check(e), "unexpected error: %s" % compat.format_exc(e)
- rcv.close()
def testNoneTarget(self):
# XXX: should have specific exception for this
- self.sendErrorTest(None, SendError)
+ self.senderErrorTest(None, SendError)
def testNoneSource(self):
# XXX: should have specific exception for this
- self.fetchErrorTest(None, ReceiveError)
+ self.receiverErrorTest(None, ReceiveError)
def testNoTarget(self):
# XXX: should have specific exception for this
- self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+ self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
def testNoSource(self):
# XXX: should have specific exception for this
- self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+ self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
def testUnparseableTarget(self):
# XXX: should have specific exception for this
- self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
- lambda e: "expecting COLON" in str(e))
+ self.senderErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting COLON" in str(e))
def testUnparseableSource(self):
# XXX: should have specific exception for this
- self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
- lambda e: "expecting COLON" in str(e))
+ self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting COLON" in str(e))
def testUnlexableTarget(self):
# XXX: should have specific exception for this
- self.sendErrorTest(UNLEXABLE_ADDR, SendError,
- lambda e: "unrecognized characters" in str(e))
+ self.senderErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized characters" in str(e))
def testUnlexableSource(self):
# XXX: should have specific exception for this
- self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
- lambda e: "unrecognized characters" in str(e))
+ self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized characters" in str(e))
SENDER_Q = 'test-sender-q; {create: always, delete: always}'
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org