You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/16 04:48:45 UTC

svn commit: r910388 - in /qpid/trunk/qpid/python: examples/api/drain examples/api/server examples/api/spout qpid/driver.py qpid/messaging.py qpid/tests/messaging.py

Author: rhs
Date: Tue Feb 16 03:48:44 2010
New Revision: 910388

URL: http://svn.apache.org/viewvc?rev=910388&view=rev
Log:
changed sender/receiver to be synchronous by default when invoked on a connected session

Modified:
    qpid/trunk/qpid/python/examples/api/drain
    qpid/trunk/qpid/python/examples/api/server
    qpid/trunk/qpid/python/examples/api/spout
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Tue Feb 16 03:48:44 2010
@@ -93,9 +93,8 @@
       ssn.acknowledge()
     except Empty:
       break
-    except ReceiveError, e:
-      print e
-      break
+except ReceiveError, e:
+  print e
 except KeyboardInterrupt:
   pass
 

Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Tue Feb 16 03:48:44 2010
@@ -51,15 +51,12 @@
   parser.error("address is required")
 
 # XXX: should make URL default the port for us
-conn = Connection.open(url.host, url.port or AMQP_PORT,
-                       username=url.user,
-                       password=url.password,
-                       reconnect=opts.reconnect,
-                       reconnect_delay=opts.reconnect_delay,
-                       reconnect_limit=opts.reconnect_limit)
-ssn = conn.session()
-rcv = ssn.receiver(addr)
-
+conn = Connection(url.host, url.port or AMQP_PORT,
+                  username=url.user,
+                  password=url.password,
+                  reconnect=opts.reconnect,
+                  reconnect_delay=opts.reconnect_delay,
+                  reconnect_limit=opts.reconnect_limit)
 def dispatch(msg):
   msg_type = msg.properties.get("type")
   if msg_type == "shell":
@@ -77,21 +74,26 @@
     result = Message("unrecognized message type: %s" % msg_type)
   return result
 
-while True:
-  try:
+try:
+  conn.connect()
+  ssn = conn.session()
+  rcv = ssn.receiver(addr)
+
+  while True:
     msg = rcv.fetch()
     response = dispatch(msg)
-    snd = ssn.sender(msg.reply_to)
+    snd = None
     try:
+      snd = ssn.sender(msg.reply_to)
       snd.send(response)
     except SendError, e:
       print e
-    snd.close()
+    if snd is not None:
+      snd.close()
     ssn.acknowledge()
-  except Empty:
-    break
-  except ReceiveError, e:
-    print e
-    break
+except ReceiveError, e:
+  print e
+except KeyboardInterrupt:
+  pass
 
 conn.close()

Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Tue Feb 16 03:48:44 2010
@@ -113,13 +113,11 @@
       name, val = nameval(p)
       msg.properties[name] = val
 
-    try:
-      snd.send(msg)
-      count += 1
-      print msg
-    except SendError, e:
-      print e
-      break
+    snd.send(msg)
+    count += 1
+    print msg
+except SendError, e:
+  print e
 except KeyboardInterrupt:
   pass
 

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Tue Feb 16 03:48:44 2010
@@ -560,6 +560,8 @@
         self.delete(sst, _snd.name, do_unlink)
       else:
         do_unlink()
+    elif not snd.linked and snd.closing and not snd.closed:
+      snd.closed = True
 
   def link_in(self, rcv):
     sst = self._attachments.get(rcv.session)
@@ -633,6 +635,8 @@
         else:
           sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
         _rcv.canceled = True
+    elif not rcv.linked and rcv.closing and not rcv.closed:
+      rcv.closed = True
 
   POLICIES = Values("always", "sender", "receiver", "never")
 

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Tue Feb 16 03:48:44 2010
@@ -181,7 +181,14 @@
     """
     self._connected = True
     self._wakeup()
-    self._ewait(lambda: self._driver._connected, exc=ConnectError)
+    self._ewait(lambda: self._driver._connected and not self._unlinked(),
+                exc=ConnectError)
+
+  def _unlinked(self):
+    return [l
+            for ssn in self.sessions.values()
+            for l in ssn.senders + ssn.receivers
+            if not (l.linked or l.error or l.closed)]
 
   @synchronized
   def disconnect(self):
@@ -484,11 +491,13 @@
     sender = Sender(self, self.next_sender_id, target, options)
     self.next_sender_id += 1
     self.senders.append(sender)
-    self._wakeup()
-    # XXX: because of the lack of waiting here we can end up getting
-    # into the driver loop with messages sent for senders that haven't
-    # been linked yet, something similar can probably happen for
-    # receivers
+    if not self.closed and self.connection._connected:
+      self._wakeup()
+      try:
+        sender._ewait(lambda: sender.linked)
+      except SendError, e:
+        sender.close()
+        raise e
     return sender
 
   @synchronized
@@ -505,7 +514,13 @@
     receiver = Receiver(self, self.next_receiver_id, source, options)
     self.next_receiver_id += 1
     self.receivers.append(receiver)
-    self._wakeup()
+    if not self.closed and self.connection._connected:
+      self._wakeup()
+      try:
+        receiver._ewait(lambda: receiver.linked)
+      except ReceiveError, e:
+        receiver.close()
+        raise e
     return receiver
 
   @synchronized

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=910388&r1=910387&r2=910388&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Feb 16 03:48:44 2010
@@ -214,6 +214,15 @@
     self.ssn.acknowledge(msg)
     snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
 
+  def testDisconnectedReceiver(self):
+    self.conn.disconnect()
+    rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
+    m = self.content("testDisconnectedReceiver")
+    self.conn.connect()
+    snd = self.ssn.sender("test-dis-rcv-queue")
+    snd.send(m)
+    self.drain(rcv, expected=[m])
+
   def testNextReceiver(self):
     ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
     rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
@@ -581,16 +590,14 @@
     return self.conn.session()
 
   def badOption(self, options, error):
-    snd = self.ssn.sender("test-bad-options-snd; %s" % options)
     try:
-      snd.send("ping")
+      self.ssn.sender("test-bad-options-snd; %s" % options)
       assert False
     except SendError, e:
       assert "error in options: %s" % error == str(e), e
 
-    rcv = self.ssn.receiver("test-bad-options-rcv; %s" % options)
     try:
-      rcv.fetch(timeout=0)
+      self.ssn.receiver("test-bad-options-rcv; %s" % options)
       assert False
     except ReceiveError, e:
       assert "error in options: %s" % error == str(e), e
@@ -673,9 +680,8 @@
     snd = self.ssn.sender("test-delete; {delete: always}")
     snd.send("ping")
     snd.close()
-    snd = self.ssn.sender("test-delete")
     try:
-      snd.send("ping")
+      self.ssn.sender("test-delete")
     except SendError, e:
       assert "no such queue" in str(e)
 
@@ -689,7 +695,8 @@
 
     try:
       self.ssn.receiver("test-delete")
-    except SendError, e:
+      assert False
+    except ReceiveError, e:
       assert "no such queue" in str(e)
 
   def testDeleteSpecial(self):
@@ -789,59 +796,55 @@
   def setup_session(self):
     return self.conn.session()
 
-  def sendErrorTest(self, addr, exc, check=lambda e: True):
-    snd = self.ssn.sender(addr, durable=self.durable())
+  def senderErrorTest(self, addr, exc, check=lambda e: True):
     try:
-      snd.send("hello")
-      assert False, "send succeeded"
+      self.ssn.sender(addr, durable=self.durable())
+      assert False, "sender creation succeeded"
     except exc, e:
       assert check(e), "unexpected error: %s" % compat.format_exc(e)
-      snd.close()
 
-  def fetchErrorTest(self, addr, exc, check=lambda e: True):
-    rcv = self.ssn.receiver(addr)
+  def receiverErrorTest(self, addr, exc, check=lambda e: True):
     try:
-      rcv.fetch(timeout=0)
-      assert False, "fetch succeeded"
+      self.ssn.receiver(addr)
+      assert False, "receiver creation succeeded"
     except exc, e:
       assert check(e), "unexpected error: %s" % compat.format_exc(e)
-      rcv.close()
 
   def testNoneTarget(self):
     # XXX: should have specific exception for this
-    self.sendErrorTest(None, SendError)
+    self.senderErrorTest(None, SendError)
 
   def testNoneSource(self):
     # XXX: should have specific exception for this
-    self.fetchErrorTest(None, ReceiveError)
+    self.receiverErrorTest(None, ReceiveError)
 
   def testNoTarget(self):
     # XXX: should have specific exception for this
-    self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+    self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
 
   def testNoSource(self):
     # XXX: should have specific exception for this
-    self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+    self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
 
   def testUnparseableTarget(self):
     # XXX: should have specific exception for this
-    self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
-                       lambda e: "expecting COLON" in str(e))
+    self.senderErrorTest(UNPARSEABLE_ADDR, SendError,
+                         lambda e: "expecting COLON" in str(e))
 
   def testUnparseableSource(self):
     # XXX: should have specific exception for this
-    self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
-                        lambda e: "expecting COLON" in str(e))
+    self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+                           lambda e: "expecting COLON" in str(e))
 
   def testUnlexableTarget(self):
     # XXX: should have specific exception for this
-    self.sendErrorTest(UNLEXABLE_ADDR, SendError,
-                       lambda e: "unrecognized characters" in str(e))
+    self.senderErrorTest(UNLEXABLE_ADDR, SendError,
+                         lambda e: "unrecognized characters" in str(e))
 
   def testUnlexableSource(self):
     # XXX: should have specific exception for this
-    self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
-                        lambda e: "unrecognized characters" in str(e))
+    self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
+                           lambda e: "unrecognized characters" in str(e))
 
 SENDER_Q = 'test-sender-q; {create: always, delete: always}'
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org