You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/06/12 14:43:59 UTC
qpid-proton git commit: PROTON-1858: Rework wrapped C handlers in
pure python - Rewritten Handshaker and FlowController in python -
CFlowController and CHandshaker are now aliases for Flowcontroller and
Handshaker
Repository: qpid-proton
Updated Branches:
refs/heads/master 763a0798d -> 67d4d7490
PROTON-1858: Rework wrapped C handlers in pure python
- Rewritten Handshaker and FlowController in python
- CFlowController and CHandshaker are now aliases for
Flowcontroller and Handshaker
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/67d4d749
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/67d4d749
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/67d4d749
Branch: refs/heads/master
Commit: 67d4d749033c1eb03b2f1bff20518c8050d0ccdb
Parents: 763a079
Author: Andrew Stitcher <as...@apache.org>
Authored: Fri May 25 18:18:55 2018 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Jun 12 10:43:09 2018 -0400
----------------------------------------------------------------------
python/proton/handlers.py | 78 +++++++++++++++++++++++++++----
tests/python/proton_tests/common.py | 4 +-
tests/python/proton_tests/reactor.py | 4 +-
3 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 15e31a0..b30408e 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -415,7 +415,7 @@ class MessagingHandler(Handler, Acking):
def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
self.handlers = []
if prefetch:
- self.handlers.append(CFlowController(prefetch))
+ self.handlers.append(FlowController(prefetch))
self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
@@ -599,21 +599,79 @@ class TransactionalClientHandler(MessagingHandler, TransactionHandler):
super(TransactionalClientHandler, self).accept(delivery)
-from ._events import WrappedHandler
-from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
-
+class FlowController(Handler):
+ def __init__(self, window=1024):
+ self._window = window
+ self._drained = 0
-class CFlowController(WrappedHandler):
+ def on_link_local_open(self, event):
+ self._flow(event.link)
- def __init__(self, window=1024):
- WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
+ def on_link_remote_open(self, event):
+ self._flow(event.link)
+ def on_link_flow(self, event):
+ self._flow(event.link)
-class CHandshaker(WrappedHandler):
+ def on_delivery(self, event):
+ self._flow(event.link)
+
+ def _flow(self, link):
+ if link.is_receiver:
+ self._drained += link.drained()
+ if self._drained == 0:
+ delta = self._window - link.credit
+ link.flow(delta)
+
+
+class Handshaker(Handler):
+
+ @staticmethod
+ def on_connection_remote_open(event):
+ conn = event.connection
+ if conn.state & Endpoint.LOCAL_UNINIT:
+ conn.open()
+
+ @staticmethod
+ def on_session_remote_open(event):
+ ssn = event.session
+ if ssn.state() & Endpoint.LOCAL_UNINIT:
+ ssn.open()
+
+ @staticmethod
+ def on_link_remote_open(event):
+ link = event.link
+ if link.state & Endpoint.LOCAL_UNINIT:
+ link.source.copy(link.remote_source)
+ link.target.copy(link.remote_target)
+ link.open()
+
+ @staticmethod
+ def on_connection_remote_close(event):
+ conn = event.connection
+ if not conn.state & Endpoint.LOCAL_CLOSED:
+ conn.close()
+
+ @staticmethod
+ def on_session_remote_close(event):
+ ssn = event.session
+ if not ssn.state & Endpoint.LOCAL_CLOSED:
+ ssn.close()
+
+ @staticmethod
+ def on_link_remote_close(event):
+ link = event.link
+ if not link.state & Endpoint.LOCAL_CLOSED:
+ link.close()
+
+
+# Back compatibility definitions
+CFlowController = FlowController
+CHandshaker = Handshaker
- def __init__(self):
- WrappedHandler.__init__(self, pn_handshaker)
+from ._events import WrappedHandler
+from cproton import pn_iohandler
class IOHandler(WrappedHandler):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/tests/python/proton_tests/common.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py
index ed65e2f..90ba34c 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -34,7 +34,7 @@ from subprocess import Popen,PIPE,STDOUT
import sys, os, subprocess
from proton import SASL, SSL
from proton.reactor import Container
-from proton.handlers import CHandshaker, CFlowController
+from proton.handlers import Handshaker, FlowController
from string import Template
def free_tcp_ports(count=1):
@@ -195,7 +195,7 @@ class TestServer(object):
self.host = kwargs["host"]
if "port" in kwargs:
self.port = kwargs["port"]
- self.handlers = [CFlowController(10), CHandshaker()]
+ self.handlers = [FlowController(10), Handshaker()]
self.thread = Thread(name="server-thread", target=self.run)
self.thread.daemon = True
self.running = True
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/67d4d749/tests/python/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py
index 8a3a6af..1b1e2fc 100644
--- a/tests/python/proton_tests/reactor.py
+++ b/tests/python/proton_tests/reactor.py
@@ -22,7 +22,7 @@ import time
import sys
from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
-from proton.handlers import CHandshaker, MessagingHandler
+from proton.handlers import Handshaker, MessagingHandler
from proton import Handler, Url
class Barf(Exception):
@@ -56,7 +56,7 @@ class BarfOnFinal:
def on_reactor_final(self, event):
raise Barf()
-class BarfOnFinalDerived(CHandshaker):
+class BarfOnFinalDerived(Handshaker):
init = False
def on_reactor_init(self, event):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org