You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2013/04/23 23:40:49 UTC

svn commit: r1471158 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py messaging/endpoints.py messaging/exceptions.py tests/messaging/message.py

Author: jross
Date: Tue Apr 23 21:40:49 2013
New Revision: 1471158

URL: http://svn.apache.org/r1471158
Log:
QPID-2453: Allow the session to keep operating after content codec errors; a patch from Ernie Allen

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/messaging/exceptions.py
    qpid/trunk/qpid/python/qpid/tests/messaging/message.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Apr 23 21:40:49 2013
@@ -497,6 +497,9 @@ class Driver:
         self.engine.dispatch()
     except HeartbeatTimeout, e:
       self.close_engine(e)
+    except ContentError, e:
+      msg = compat.format_exc()
+      self.connection.error = ContentError(text=msg)
     except:
       # XXX: Does socket get leaked if this occurs?
       msg = compat.format_exc()
@@ -1245,7 +1248,11 @@ class Engine:
     if msg.ttl is not None:
       dp.ttl = long(msg.ttl*1000)
     enc, dec = get_codec(msg.content_type)
-    body = enc(msg.content)
+    try:
+      body = enc(msg.content)
+    except AttributeError, e:
+      # convert to non-blocking EncodeError
+      raise EncodeError(e)
 
     # XXX: this is not safe for out of order, can this be triggered by pre_ack?
     def msg_acked():
@@ -1294,7 +1301,10 @@ class Engine:
 
     ap = mp.application_headers
     enc, dec = get_codec(mp.content_type)
-    content = dec(xfr.payload)
+    try:
+      content = dec(xfr.payload)
+    except Exception, e:
+      raise DecodeError(e)
     msg = Message(content)
     msg.id = mp.message_id
     if ap is not None:

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Apr 23 21:40:49 2013
@@ -204,7 +204,13 @@ class Connection(Endpoint):
   def check_error(self):
     if self.error:
       self._condition.gc()
-      raise self.error
+      e = self.error
+      if isinstance(e, ContentError):
+          """ forget the content error. It will be
+          raised this time but won't block future calls
+          """
+          self.error = None
+      raise e
 
   def get_error(self):
     return self.error
@@ -887,8 +893,14 @@ class Sender(Endpoint):
     if self.synced < mno:
       self.synced = mno
       self._wakeup()
-    if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
-      raise Timeout("sender sync timed out")
+    try:
+      if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+        raise Timeout("sender sync timed out")
+    except ContentError:
+      # clean bad message so we can continue
+      self.acked = mno
+      self.session.outgoing.pop(0)
+      raise
 
   @synchronized
   def close(self, timeout=None):

Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Tue Apr 23 21:40:49 2013
@@ -154,3 +154,17 @@ class Empty(FetchError):
   available within the alloted time.
   """
   pass
+
+## Message Content errors
+class ContentError(MessagingError):
+  """
+  This type of exception will be returned to the application
+  once, and will not block further requests
+  """
+  pass
+
+class EncodeError(ContentError):
+  pass
+
+class DecodeError(ContentError):
+  pass

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Tue Apr 23 21:40:49 2013
@@ -153,3 +153,16 @@ class MessageEchoTests(Base):
     f = echo.content["false"]
     assert isinstance(t, bool), t
     assert isinstance(f, bool), f
+
+  def testExceptionRaisedMismatchedContentType(self):
+    msg = Message(content_type="amqp/map", content="asdf")
+    try:
+      self.snd.send(msg)
+      self.rcv.fetch(0)
+      assert False, "Exception not raised on mismatched content/content_type"
+    except Exception, e:
+      pass
+
+  def testRecoverAfterException(self):
+    self.testExceptionRaisedMismatchedContentType()
+    self.testTextPlain()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org