You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/10/18 14:43:31 UTC

[09/15] qpid-dispatch git commit: DISPATCH-829 - Added a test for the abort capabilities (requires proton PR #123)

DISPATCH-829 - Added a test for the abort capabilities (requires proton PR #123)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3fd09218
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3fd09218
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3fd09218

Branch: refs/heads/master
Commit: 3fd09218c5903cab840eb048a209edc90bf99d5d
Parents: 90a8b62
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Oct 10 14:27:37 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 18 08:11:40 2017 -0400

----------------------------------------------------------------------
 tests/CMakeLists.txt                 |   1 +
 tests/system_tests_delivery_abort.py | 228 ++++++++++++++++++++++++++++++
 2 files changed, 229 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3fd09218/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eff063a..aaf65ec 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -99,6 +99,7 @@ foreach(py_test_module
     system_tests_failover_list
     system_tests_denied_unsettled_multicast
     system_tests_auth_service_plugin
+    system_tests_delivery_abort
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3fd09218/tests/system_tests_delivery_abort.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_delivery_abort.py b/tests/system_tests_delivery_abort.py
new file mode 100644
index 0000000..b0869e5
--- /dev/null
+++ b/tests/system_tests_delivery_abort.py
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, connection):
+
+            config = [
+                ('router', {'mode': 'interior', 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+                ('linkRoute', {'prefix': 'link', 'dir': 'in', 'containerId': 'LRC'}),
+                ('linkRoute', {'prefix': 'link', 'dir': 'out', 'containerId': 'LRC'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+                connection
+            ]
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+
+        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
+        router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
+
+        cls.routers[0].wait_router_connected('B')
+        cls.routers[1].wait_router_connected('A')
+
+
+    def test_01_message_route_interrupted_stream_one_router(self):
+        test = MessageRouteAbortTest(self.routers[0].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     "addr_01")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_02_message_route_interrupted_stream_two_routers(self):
+        test = MessageRouteAbortTest(self.routers[0].addresses[0],
+                                     self.routers[1].addresses[0],
+                                     "addr_02")
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Entity(object):
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+class RouterProxy(object):
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def read_address(self, name):
+        ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_addresses(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class PollTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.poll_timeout()
+
+
+class MessageRouteAbortTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, address):
+        super(MessageRouteAbortTest, self).__init__()
+        self.sender_host      = sender_host
+        self.receiver_host    = receiver_host
+        self.address          = address
+
+        self.sender_conn   = None
+        self.receiver_conn = None
+        self.error         = None
+        self.sender1       = None
+        self.sender2       = None
+        self.receiver      = None
+        self.streaming     = False
+        self.delivery      = None
+        self.data          = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
+        self.long_data     = ""
+
+        self.sent_stream   = 0
+        self.program       = ['Send_Short_1', 'Send_Long_Truncated', 'Send_Short_2', 'Send_Short_3']
+        self.result        = []
+        self.expected_result = ['Send_Short_1', 'Aborted_Delivery', '2', '2', '2', '2', '2',
+                                '2', '2', '2', '2', '2', 'Send_Short_2', 'Send_Short_3']
+
+    def timeout(self):
+        self.error = "Timeout Expired - Unprocessed Ops: %r, Result: %r" % (self.program, self.result)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.timer         = event.reactor.schedule(10.0, Timeout(self))
+        self.sender_conn   = event.container.connect(self.sender_host)
+        self.receiver_conn = event.container.connect(self.receiver_host)
+        self.sender1       = event.container.create_sender(self.sender_conn, self.address, name="S1")
+        self.sender2       = event.container.create_sender(self.sender_conn, self.address, name="S2")
+        self.sender3       = event.container.create_sender(self.sender_conn, self.address, name="S3")
+        self.receiver      = event.container.create_receiver(self.receiver_conn, self.address)
+
+    def stream(self):
+        self.sender1.stream(self.long_data)
+        self.sent_stream += len(self.long_data)
+        if self.sent_stream >= 100000:
+            self.streaming = False
+            self.sender1.close()
+            self.send()
+
+    def send(self):
+        next_op = self.program.pop(0) if len(self.program) > 0 else None
+        if next_op == 'Send_Short_1':
+            m = Message(body="%s" % next_op)
+            self.sender1.send(m)
+        elif next_op == 'Send_Long_Truncated':
+            for i in range(100):
+                self.long_data += self.data
+            self.delivery  = self.sender1.delivery(self.sender1.delivery_tag())
+            self.streaming = True
+            self.stream()
+        elif next_op == 'Send_Short_2':
+            m = Message(body="2")
+            for i in range(10):
+                self.sender2.send(m)
+            m = Message(body="Send_Short_2")
+            self.sender2.send(m)
+            self.sender2.close()
+        elif next_op == 'Send_Short_3':
+            m = Message(body="%s" % next_op)
+            self.sender3.send(m)
+
+    def on_sendable(self, event):
+        if event.sender == self.sender1 and self.program[0] == 'Send_Short_1':
+            self.send()
+        if self.streaming:
+            self.stream()
+
+    def on_message(self, event):
+        m = event.message
+        self.result.append(m.body)
+        if m.body == 'Send_Short_1':
+            self.send()
+        elif m.body == 'Send_Short_2':
+            self.send()
+        elif m.body == 'Send_Short_3':
+            if self.result != self.expected_result:
+                self.error = "Expected: %r, Actual: %r" % (self.expected_result, self.result)
+            self.sender_conn.close()
+            self.receiver_conn.close()
+            self.timer.cancel()
+
+    def on_aborted(self, event):
+        self.result.append('Aborted_Delivery')
+        self.send()
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org