You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/26 12:29:32 UTC

[1/2] qpid-proton git commit: Improve error handling for BlockingConnection

Repository: qpid-proton
Updated Branches:
  refs/heads/master 5bb3d035c -> 1702665df


Improve error handling for BlockingConnection


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/647113c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/647113c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/647113c4

Branch: refs/heads/master
Commit: 647113c4cd42264381b5f81ef80f23ede25c0819
Parents: 5bb3d03
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 21 20:29:56 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 10:18:30 2015 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton/utils.py | 43 +++++++++++++++++++--------
 1 file changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/647113c4/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 7a35362..6658950 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 import collections, Queue, socket, time, threading
-from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
+from proton import ConnectionException, Endpoint, Handler, LinkException, Message, Timeout, Url
 from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
 from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler
 
@@ -27,6 +27,9 @@ class BlockingLink(object):
         self.link = link
         self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
                              msg="Opening link %s" % link.name)
+        if self.link.state & Endpoint.REMOTE_CLOSED:
+            self.link.close()
+            raise LinkException("Failed to open link %s" % link.name)
 
     def close(self):
         self.link.close()
@@ -39,6 +42,9 @@ class BlockingLink(object):
 class BlockingSender(BlockingLink):
     def __init__(self, connection, sender):
         super(BlockingSender, self).__init__(connection, sender)
+        if self.link.target and self.link.target.address != self.link.remote_target.address:
+            self.link.close()
+            raise LinkException("Failed to open sender %s, target does not match" % link.name)
 
     def send_msg(self, msg, timeout=False):
         delivery = send_msg(self.link, msg)
@@ -52,6 +58,10 @@ class Fetcher(MessagingHandler):
     def on_message(self, event):
         self.incoming.append(event.message)
 
+    def on_link_error(self, event):
+        # This will be handled by BlockingConnection
+        pass
+
     @property
     def has_message(self):
         return len(self.incoming)
@@ -63,6 +73,9 @@ class Fetcher(MessagingHandler):
 class BlockingReceiver(BlockingLink):
     def __init__(self, connection, receiver, fetcher, credit=1):
         super(BlockingReceiver, self).__init__(connection, receiver)
+        if self.link.source and self.link.source.address != self.link.remote_source.address:
+            self.link.close()
+            raise LinkException("Failed to open receiver %s, source does not match" % link.name)
         if credit: receiver.flow(credit)
         self.fetcher = fetcher
 
@@ -127,24 +140,30 @@ class BlockingConnection(Handler):
 
     def on_link_remote_close(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
-            self.closed(event.link.remote_condition)
+            event.link.close()
+            if event.link.is_sender:
+                txt = "sender %s to %s closed" % (event.link.name, event.link.target.address)
+            else:
+                txt = "receiver %s from %s closed" % (event.link.name, event.link.source.address)
+            if event.link.remote_condition:
+                txt += " due to: %s" % event.link.remote_condition
+            else:
+                txt += " by peer"
+            raise LinkException(txt)
 
     def on_connection_remote_close(self, event):
         if event.connection.state & Endpoint.LOCAL_ACTIVE:
-            self.closed(event.connection.remote_condition)
+            event.connection.close()
+            txt = "Connection %s closed" % self.url
+            if event.connection.remote_condition:
+                txt += " due to: %s" % event.connection.remote_condition
+            else:
+                txt += " by peer"
+            raise ConnectionException(txt)
 
     def on_disconnected(self, event):
         raise ConnectionException("Connection %s disconnected" % self.url);
 
-    def closed(self, error=None):
-        txt = "Connection %s closed" % self.url
-        if error:
-            txt += " due to: %s" % error
-        else:
-            txt += " by peer"
-        raise ConnectionException(txt)
-
-
 def atomic_count(start=0, step=1):
     """Thread-safe atomic count iterator"""
     lock = threading.Lock()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: Fix duplicated events caused by changes to dispatch function; no longer need to flatten nested handlers

Posted by gs...@apache.org.
Fix duplicated events caused by changes to dispatch function; no longer need to flatten nested handlers


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1702665d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1702665d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1702665d

Branch: refs/heads/master
Commit: 1702665dfbf306ebbada1905a7820e5592759797
Parents: 647113c
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Jan 26 11:30:39 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 11:30:39 2015 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton/handlers.py | 11 +----------
 proton-c/bindings/python/proton/reactors.py |  4 ++--
 2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1702665d/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index e403d26..b2548fb 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -50,15 +50,6 @@ class FlowController(Handler):
         if event.delivery.link.is_receiver:
             self.top_up(event.delivery.link)
 
-def nested_handlers(handlers):
-    # currently only allows for a single level of nesting
-    nested = []
-    for h in handlers:
-        nested.append(h)
-        if hasattr(h, 'handlers'):
-            nested.extend(getattr(h, 'handlers'))
-    return nested
-
 def add_nested_handler(handler, nested):
     if hasattr(handler, 'handlers'):
         getattr(handler, 'handlers').append(nested)
@@ -80,7 +71,7 @@ class ScopedHandler(Handler):
 
         objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
         targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
-        handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
+        handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
         for h in handlers:
             h(event)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1702665d/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index de9752e..77b49cd 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -23,7 +23,7 @@ from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler
 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
 from select import select
-from proton.handlers import nested_handlers, OutgoingMessageHandler, ScopedHandler
+from proton.handlers import OutgoingMessageHandler, ScopedHandler
 
 class AmqpSocket(object):
     """
@@ -721,7 +721,7 @@ class Urls(object):
 class Container(object):
     def __init__(self, *handlers):
         h = [Connector(self), ScopedHandler()]
-        h.extend(nested_handlers(handlers))
+        h.extend(handlers)
         self.events = Events(*h)
         self.loop = SelectLoop(self.events)
         self.trigger = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org