You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2013/04/23 23:40:49 UTC
svn commit: r1471158 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py
messaging/endpoints.py messaging/exceptions.py tests/messaging/message.py
Author: jross
Date: Tue Apr 23 21:40:49 2013
New Revision: 1471158
URL: http://svn.apache.org/r1471158
Log:
QPID-2453: Allow the session to keep operating after content codec errors; a patch from Ernie Allen
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/messaging/exceptions.py
qpid/trunk/qpid/python/qpid/tests/messaging/message.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Apr 23 21:40:49 2013
@@ -497,6 +497,9 @@ class Driver:
self.engine.dispatch()
except HeartbeatTimeout, e:
self.close_engine(e)
+ except ContentError, e:
+ msg = compat.format_exc()
+ self.connection.error = ContentError(text=msg)
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
@@ -1245,7 +1248,11 @@ class Engine:
if msg.ttl is not None:
dp.ttl = long(msg.ttl*1000)
enc, dec = get_codec(msg.content_type)
- body = enc(msg.content)
+ try:
+ body = enc(msg.content)
+ except AttributeError, e:
+ # convert to non-blocking EncodeError
+ raise EncodeError(e)
# XXX: this is not safe for out of order, can this be triggered by pre_ack?
def msg_acked():
@@ -1294,7 +1301,10 @@ class Engine:
ap = mp.application_headers
enc, dec = get_codec(mp.content_type)
- content = dec(xfr.payload)
+ try:
+ content = dec(xfr.payload)
+ except Exception, e:
+ raise DecodeError(e)
msg = Message(content)
msg.id = mp.message_id
if ap is not None:
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Apr 23 21:40:49 2013
@@ -204,7 +204,13 @@ class Connection(Endpoint):
def check_error(self):
if self.error:
self._condition.gc()
- raise self.error
+ e = self.error
+ if isinstance(e, ContentError):
+ """ forget the content error. It will be
+ raised this time but won't block future calls
+ """
+ self.error = None
+ raise e
def get_error(self):
return self.error
@@ -887,8 +893,14 @@ class Sender(Endpoint):
if self.synced < mno:
self.synced = mno
self._wakeup()
- if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
- raise Timeout("sender sync timed out")
+ try:
+ if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+ raise Timeout("sender sync timed out")
+ except ContentError:
+ # clean bad message so we can continue
+ self.acked = mno
+ self.session.outgoing.pop(0)
+ raise
@synchronized
def close(self, timeout=None):
Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Tue Apr 23 21:40:49 2013
@@ -154,3 +154,17 @@ class Empty(FetchError):
available within the alloted time.
"""
pass
+
+## Message Content errors
+class ContentError(MessagingError):
+ """
+ This type of exception will be returned to the application
+ once, and will not block further requests
+ """
+ pass
+
+class EncodeError(ContentError):
+ pass
+
+class DecodeError(ContentError):
+ pass
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=1471158&r1=1471157&r2=1471158&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Tue Apr 23 21:40:49 2013
@@ -153,3 +153,16 @@ class MessageEchoTests(Base):
f = echo.content["false"]
assert isinstance(t, bool), t
assert isinstance(f, bool), f
+
+ def testExceptionRaisedMismatchedContentType(self):
+ msg = Message(content_type="amqp/map", content="asdf")
+ try:
+ self.snd.send(msg)
+ self.rcv.fetch(0)
+ assert False, "Exception not raised on mismatched content/content_type"
+ except Exception, e:
+ pass
+
+ def testRecoverAfterException(self):
+ self.testExceptionRaisedMismatchedContentType()
+ self.testTextPlain()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org