You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/26 12:29:32 UTC
[1/2] qpid-proton git commit: Improve error handling for
BlockingConnection
Repository: qpid-proton
Updated Branches:
refs/heads/master 5bb3d035c -> 1702665df
Improve error handling for BlockingConnection
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/647113c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/647113c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/647113c4
Branch: refs/heads/master
Commit: 647113c4cd42264381b5f81ef80f23ede25c0819
Parents: 5bb3d03
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 21 20:29:56 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 10:18:30 2015 +0000
----------------------------------------------------------------------
proton-c/bindings/python/proton/utils.py | 43 +++++++++++++++++++--------
1 file changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/647113c4/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 7a35362..6658950 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -17,7 +17,7 @@
# under the License.
#
import collections, Queue, socket, time, threading
-from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
+from proton import ConnectionException, Endpoint, Handler, LinkException, Message, Timeout, Url
from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler
@@ -27,6 +27,9 @@ class BlockingLink(object):
self.link = link
self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
msg="Opening link %s" % link.name)
+ if self.link.state & Endpoint.REMOTE_CLOSED:
+ self.link.close()
+ raise LinkException("Failed to open link %s" % link.name)
def close(self):
self.link.close()
@@ -39,6 +42,9 @@ class BlockingLink(object):
class BlockingSender(BlockingLink):
def __init__(self, connection, sender):
super(BlockingSender, self).__init__(connection, sender)
+ if self.link.target and self.link.target.address != self.link.remote_target.address:
+ self.link.close()
+ raise LinkException("Failed to open sender %s, target does not match" % link.name)
def send_msg(self, msg, timeout=False):
delivery = send_msg(self.link, msg)
@@ -52,6 +58,10 @@ class Fetcher(MessagingHandler):
def on_message(self, event):
self.incoming.append(event.message)
+ def on_link_error(self, event):
+ # This will be handled by BlockingConnection
+ pass
+
@property
def has_message(self):
return len(self.incoming)
@@ -63,6 +73,9 @@ class Fetcher(MessagingHandler):
class BlockingReceiver(BlockingLink):
def __init__(self, connection, receiver, fetcher, credit=1):
super(BlockingReceiver, self).__init__(connection, receiver)
+ if self.link.source and self.link.source.address != self.link.remote_source.address:
+ self.link.close()
+ raise LinkException("Failed to open receiver %s, source does not match" % link.name)
if credit: receiver.flow(credit)
self.fetcher = fetcher
@@ -127,24 +140,30 @@ class BlockingConnection(Handler):
def on_link_remote_close(self, event):
if event.link.state & Endpoint.LOCAL_ACTIVE:
- self.closed(event.link.remote_condition)
+ event.link.close()
+ if event.link.is_sender:
+ txt = "sender %s to %s closed" % (event.link.name, event.link.target.address)
+ else:
+ txt = "receiver %s from %s closed" % (event.link.name, event.link.source.address)
+ if event.link.remote_condition:
+ txt += " due to: %s" % event.link.remote_condition
+ else:
+ txt += " by peer"
+ raise LinkException(txt)
def on_connection_remote_close(self, event):
if event.connection.state & Endpoint.LOCAL_ACTIVE:
- self.closed(event.connection.remote_condition)
+ event.connection.close()
+ txt = "Connection %s closed" % self.url
+ if event.connection.remote_condition:
+ txt += " due to: %s" % event.connection.remote_condition
+ else:
+ txt += " by peer"
+ raise ConnectionException(txt)
def on_disconnected(self, event):
raise ConnectionException("Connection %s disconnected" % self.url);
- def closed(self, error=None):
- txt = "Connection %s closed" % self.url
- if error:
- txt += " due to: %s" % error
- else:
- txt += " by peer"
- raise ConnectionException(txt)
-
-
def atomic_count(start=0, step=1):
"""Thread-safe atomic count iterator"""
lock = threading.Lock()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: Fix duplicated events caused by changes
to dispatch function; no longer need to flatten nested handlers
Posted by gs...@apache.org.
Fix duplicated events caused by changes to dispatch function; no longer need to flatten nested handlers
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1702665d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1702665d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1702665d
Branch: refs/heads/master
Commit: 1702665dfbf306ebbada1905a7820e5592759797
Parents: 647113c
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Jan 26 11:30:39 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 11:30:39 2015 +0000
----------------------------------------------------------------------
proton-c/bindings/python/proton/handlers.py | 11 +----------
proton-c/bindings/python/proton/reactors.py | 4 ++--
2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1702665d/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index e403d26..b2548fb 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -50,15 +50,6 @@ class FlowController(Handler):
if event.delivery.link.is_receiver:
self.top_up(event.delivery.link)
-def nested_handlers(handlers):
- # currently only allows for a single level of nesting
- nested = []
- for h in handlers:
- nested.append(h)
- if hasattr(h, 'handlers'):
- nested.extend(getattr(h, 'handlers'))
- return nested
-
def add_nested_handler(handler, nested):
if hasattr(handler, 'handlers'):
getattr(handler, 'handlers').append(nested)
@@ -80,7 +71,7 @@ class ScopedHandler(Handler):
objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
- handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
+ handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
for h in handlers:
h(event)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1702665d/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index de9752e..77b49cd 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -23,7 +23,7 @@ from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler
from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
from select import select
-from proton.handlers import nested_handlers, OutgoingMessageHandler, ScopedHandler
+from proton.handlers import OutgoingMessageHandler, ScopedHandler
class AmqpSocket(object):
"""
@@ -721,7 +721,7 @@ class Urls(object):
class Container(object):
def __init__(self, *handlers):
h = [Connector(self), ScopedHandler()]
- h.extend(nested_handlers(handlers))
+ h.extend(handlers)
self.events = Events(*h)
self.loop = SelectLoop(self.events)
self.trigger = None
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org