You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/06 16:59:21 UTC
[09/11] qpid-proton git commit: PROTON-1885: [python] move
tests/python to python/tests
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/interop.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/interop.py b/python/tests/proton_tests/interop.py
new file mode 100644
index 0000000..fe62c02
--- /dev/null
+++ b/python/tests/proton_tests/interop.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import *
+from . import common
+import os
+
+
+def find_test_interop_dir():
+ """Find the common tests directory relative to this script"""
+ from os.path import dirname, join, abspath, isdir
+ f = dirname(dirname(dirname(dirname(abspath(__file__)))))
+ f = join(f, "tests", "interop")
+ if not isdir(f):
+ raise Exception("Cannot find tests/interop directory from "+__file__)
+ return f
+
+test_interop_dir=find_test_interop_dir()
+
+class InteropTest(common.Test):
+
+ def setUp(self):
+ self.data = Data()
+ self.message = Message()
+
+ def tearDown(self):
+ self.data = None
+
+ def get_data(self, name):
+ filename = os.path.join(test_interop_dir, name+".amqp")
+ f = open(filename,"rb")
+ try: return f.read()
+ finally: f.close()
+
+ def decode_data(self, encoded):
+ buffer = encoded
+ while buffer:
+ n = self.data.decode(buffer)
+ buffer = buffer[n:]
+ self.data.rewind()
+
+ def decode_data_file(self, name):
+ encoded = self.get_data(name)
+ self.decode_data(encoded)
+ encoded_size = self.data.encoded_size()
+ # Re-encode and verify pre-computed and actual encoded size match.
+ reencoded = self.data.encode()
+ assert encoded_size == len(reencoded), "%d != %d" % (encoded_size, len(reencoded))
+ # verify round trip bytes
+ assert reencoded == encoded, "Value mismatch: %s != %s" % (reencoded, encoded)
+
+ def decode_message_file(self, name):
+ self.message.decode(self.get_data(name))
+ body = self.message.body
+ if str(type(body)) == "<type 'org.apache.qpid.proton.amqp.Binary'>":
+ body = body.array.tostring()
+ self.decode_data(body)
+
+ def assert_next(self, type, value):
+ next_type = self.data.next()
+ assert next_type == type, "Type mismatch: %s != %s"%(
+ Data.type_names[next_type], Data.type_names[type])
+ next_value = self.data.get_object()
+ assert next_value == value, "Value mismatch: %s != %s"%(next_value, value)
+
+ def test_message(self):
+ self.decode_message_file("message")
+ self.assert_next(Data.STRING, "hello")
+ assert self.data.next() is None
+
+ def test_primitives(self):
+ self.decode_data_file("primitives")
+ self.assert_next(Data.BOOL, True)
+ self.assert_next(Data.BOOL, False)
+ self.assert_next(Data.UBYTE, 42)
+ self.assert_next(Data.USHORT, 42)
+ self.assert_next(Data.SHORT, -42)
+ self.assert_next(Data.UINT, 12345)
+ self.assert_next(Data.INT, -12345)
+ self.assert_next(Data.ULONG, 12345)
+ self.assert_next(Data.LONG, -12345)
+ self.assert_next(Data.FLOAT, 0.125)
+ self.assert_next(Data.DOUBLE, 0.125)
+ assert self.data.next() is None
+
+ def test_strings(self):
+ self.decode_data_file("strings")
+ self.assert_next(Data.BINARY, b"abc\0defg")
+ self.assert_next(Data.STRING, "abcdefg")
+ self.assert_next(Data.SYMBOL, "abcdefg")
+ self.assert_next(Data.BINARY, b"")
+ self.assert_next(Data.STRING, "")
+ self.assert_next(Data.SYMBOL, "")
+ assert self.data.next() is None
+
+ def test_described(self):
+ self.decode_data_file("described")
+ self.assert_next(Data.DESCRIBED, Described("foo-descriptor", "foo-value"))
+ self.data.exit()
+
+ assert self.data.next() == Data.DESCRIBED
+ self.data.enter()
+ self.assert_next(Data.INT, 12)
+ self.assert_next(Data.INT, 13)
+ self.data.exit()
+
+ assert self.data.next() is None
+
+ def test_described_array(self):
+ self.decode_data_file("described_array")
+ self.assert_next(Data.ARRAY, Array("int-array", Data.INT, *range(0,10)))
+
+ def test_arrays(self):
+ self.decode_data_file("arrays")
+ self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT, *range(0,100)))
+ self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.STRING, *["a", "b", "c"]))
+ self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT))
+ assert self.data.next() is None
+
+ def test_lists(self):
+ self.decode_data_file("lists")
+ self.assert_next(Data.LIST, [32, "foo", True])
+ self.assert_next(Data.LIST, [])
+ assert self.data.next() is None
+
+ def test_maps(self):
+ self.decode_data_file("maps")
+ self.assert_next(Data.MAP, {"one":1, "two":2, "three":3 })
+ self.assert_next(Data.MAP, {1:"one", 2:"two", 3:"three"})
+ self.assert_next(Data.MAP, {})
+ assert self.data.next() is None
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/message.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/message.py b/python/tests/proton_tests/message.py
new file mode 100644
index 0000000..26a3dd2
--- /dev/null
+++ b/python/tests/proton_tests/message.py
@@ -0,0 +1,275 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from . import common
+from proton import *
+from uuid import uuid4
+
+class Test(common.Test):
+
+ def setUp(self):
+ self.msg = Message()
+
+ def tearDown(self):
+ self.msg = None
+
+
+class AccessorsTest(Test):
+
+ def _test(self, name, default, values):
+ d = getattr(self.msg, name)
+ assert d == default, (d, default)
+ for v in values:
+ setattr(self.msg, name, v)
+ gotten = getattr(self.msg, name)
+ assert gotten == v, gotten
+
+ def _test_symbol(self, name):
+ self._test(name, symbol(None), (symbol(u"abc.123.#$%"), symbol(u"hello.world")))
+
+ def _test_str(self, name):
+ self._test(name, None, (u"asdf", u"fdsa", u""))
+
+ def _test_time(self, name):
+ self._test(name, 0, (0, 123456789, 987654321))
+
+ def testId(self):
+ self._test("id", None, ("bytes", None, 123, u"string", uuid4()))
+
+ def testCorrelationId(self):
+ self._test("correlation_id", None, ("bytes", None, 123, u"string", uuid4()))
+
+ def testDurable(self):
+ self._test("durable", False, (True, False))
+
+ def testPriority(self):
+ self._test("priority", Message.DEFAULT_PRIORITY, range(0, 255))
+
+ def testTtl(self):
+ self._test("ttl", 0, range(12345, 54321))
+
+ def testFirstAcquirer(self):
+ self._test("first_acquirer", False, (True, False))
+
+ def testDeliveryCount(self):
+ self._test("delivery_count", 0, range(0, 1024))
+
+ def testUserId(self):
+ self._test("user_id", b"", (b"asdf", b"fdsa", b"asd\x00fdsa", b""))
+
+ def testAddress(self):
+ self._test_str("address")
+
+ def testSubject(self):
+ self._test_str("subject")
+
+ def testReplyTo(self):
+ self._test_str("reply_to")
+
+ def testContentType(self):
+ self._test_symbol("content_type")
+
+ def testContentEncoding(self):
+ self._test_symbol("content_encoding")
+
+ def testExpiryTime(self):
+ self._test_time("expiry_time")
+
+ def testCreationTime(self):
+ self._test_time("creation_time")
+
+ def testGroupId(self):
+ self._test_str("group_id")
+
+ def testGroupSequence(self):
+ self._test("group_sequence", 0, (0, -10, 10, 20, -20))
+
+ def testReplyToGroupId(self):
+ self._test_str("reply_to_group_id")
+
+class CodecTest(Test):
+
+ def testProperties(self):
+ self.msg.properties = {}
+ self.msg.properties['key'] = 'value'
+ data = self.msg.encode()
+
+ msg2 = Message()
+ msg2.decode(data)
+
+ assert msg2.properties['key'] == 'value', msg2.properties['key']
+
+ def testRoundTrip(self):
+ self.msg.id = "asdf"
+ self.msg.correlation_id = uuid4()
+ self.msg.ttl = 3
+ self.msg.priority = 100
+ self.msg.address = "address"
+ self.msg.subject = "subject"
+ self.msg.body = 'Hello World!'
+
+ data = self.msg.encode()
+
+ msg2 = Message()
+ msg2.decode(data)
+
+ assert self.msg.id == msg2.id, (self.msg.id, msg2.id)
+ assert self.msg.correlation_id == msg2.correlation_id, (self.msg.correlation_id, msg2.correlation_id)
+ assert self.msg.ttl == msg2.ttl, (self.msg.ttl, msg2.ttl)
+ assert self.msg.priority == msg2.priority, (self.msg.priority, msg2.priority)
+ assert self.msg.address == msg2.address, (self.msg.address, msg2.address)
+ assert self.msg.subject == msg2.subject, (self.msg.subject, msg2.subject)
+ assert self.msg.body == msg2.body, (self.msg.body, msg2.body)
+
+ def testExpiryEncodeAsNull(self):
+ self.msg.group_id = "A" # Force creation and expiry fields to be present
+ data = self.msg.encode()
+
+ decoder = Data()
+
+ # Skip past the headers
+ consumed = decoder.decode(data)
+ decoder.clear()
+ data = data[consumed:]
+
+ decoder.decode(data)
+ dproperties = decoder.get_py_described()
+ # Check we've got the correct described list
+ assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+ properties = dproperties.value
+ assert properties[8] == None, properties[8]
+
+ def testCreationEncodeAsNull(self):
+ self.msg.group_id = "A" # Force creation and expiry fields to be present
+ data = self.msg.encode()
+
+ decoder = Data()
+
+ # Skip past the headers
+ consumed = decoder.decode(data)
+ decoder.clear()
+ data = data[consumed:]
+
+ decoder.decode(data)
+ dproperties = decoder.get_py_described()
+ # Check we've got the correct described list
+ assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+ properties = dproperties.value
+ assert properties[9] == None, properties[9]
+
+ def testGroupSequenceEncodeAsNull(self):
+ self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
+ data = self.msg.encode()
+
+ decoder = Data()
+
+ # Skip past the headers
+ consumed = decoder.decode(data)
+ decoder.clear()
+ data = data[consumed:]
+
+ decoder.decode(data)
+ dproperties = decoder.get_py_described()
+ # Check we've got the correct described list
+ assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+ properties = dproperties.value
+ assert properties[10] == None, properties[10]
+ assert properties[11] == None, properties[11]
+
+ def testGroupSequenceEncodeAsNonNull(self):
+ self.msg.group_id = "G"
+ self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
+ data = self.msg.encode()
+
+ decoder = Data()
+
+ # Skip past the headers
+ consumed = decoder.decode(data)
+ decoder.clear()
+ data = data[consumed:]
+
+ decoder.decode(data)
+ dproperties = decoder.get_py_described()
+ # Check we've got the correct described list
+ assert dproperties.descriptor == 0x73, (dproperties.descriptor)
+
+ properties = dproperties.value
+ assert properties[10] == 'G', properties[10]
+ assert properties[11] == 0, properties[11]
+
+ def testDefaultCreationExpiryDecode(self):
+ # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
+ data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x12\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
+ msg2 = Message()
+ msg2.decode(data)
+ assert msg2.expiry_time == 0, (msg2.expiry_time)
+ assert msg2.creation_time == 0, (msg2.creation_time)
+
+ # The same message with LIST8s instead
+ data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x0f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
+ msg3 = Message()
+ msg3.decode(data)
+ assert msg2.expiry_time == 0, (msg2.expiry_time)
+ assert msg2.creation_time == 0, (msg2.creation_time)
+
+ # Minified message with zero length HEADER and PROPERTIES lists
+ data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
+ msg4 = Message()
+ msg4.decode(data)
+ assert msg2.expiry_time == 0, (msg2.expiry_time)
+ assert msg2.creation_time == 0, (msg2.creation_time)
+
+ def testDefaultPriorityEncode(self):
+ assert self.msg.priority == 4, (self.msg.priority)
+ self.msg.ttl = 0.003 # field after priority, so forces priority to be present
+ data = self.msg.encode()
+
+ decoder = Data()
+ decoder.decode(data)
+
+ dheaders = decoder.get_py_described()
+ # Check we've got the correct described list
+ assert dheaders.descriptor == 0x70, (dheaders.descriptor)
+
+ # Check that the priority field (second field) is encoded as null
+ headers = dheaders.value
+ assert headers[1] == None, (headers[1])
+
+ def testDefaultPriorityDecode(self):
+ # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
+ data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x22\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
+ msg2 = Message()
+ msg2.decode(data)
+ assert msg2.priority == 4, (msg2.priority)
+
+ # The same message with LIST8s instead
+ data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x1f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
+ msg3 = Message()
+ msg3.decode(data)
+ assert msg3.priority == 4, (msg3.priority)
+
+ # Minified message with zero length HEADER and PROPERTIES lists
+ data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
+ msg4 = Message()
+ msg4.decode(data)
+ assert msg4.priority == 4, (msg4.priority)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
new file mode 100644
index 0000000..208baa2
--- /dev/null
+++ b/python/tests/proton_tests/reactor.py
@@ -0,0 +1,485 @@
+from __future__ import absolute_import
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+import sys
+from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
+from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
+from proton.handlers import Handshaker, MessagingHandler
+from proton import Handler, Url
+
+class Barf(Exception):
+ pass
+
+class BarfOnInit:
+
+ def on_reactor_init(self, event):
+ raise Barf()
+
+ def on_connection_init(self, event):
+ raise Barf()
+
+ def on_session_init(self, event):
+ raise Barf()
+
+ def on_link_init(self, event):
+ raise Barf()
+
+class BarfOnTask:
+
+ def on_timer_task(self, event):
+ raise Barf()
+
+class BarfOnFinal:
+ init = False
+
+ def on_reactor_init(self, event):
+ self.init = True
+
+ def on_reactor_final(self, event):
+ raise Barf()
+
+class BarfOnFinalDerived(Handshaker):
+ init = False
+
+ def on_reactor_init(self, event):
+ self.init = True
+
+ def on_reactor_final(self, event):
+ raise Barf()
+
+class ExceptionTest(Test):
+
+ def setUp(self):
+ self.reactor = Reactor()
+
+ def test_reactor_final(self):
+ self.reactor.global_handler = BarfOnFinal()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_global_set(self):
+ self.reactor.global_handler = BarfOnInit()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_global_add(self):
+ self.reactor.global_handler.add(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_reactor_set(self):
+ self.reactor.handler = BarfOnInit()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_reactor_add(self):
+ self.reactor.handler.add(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_connection(self):
+ self.reactor.connection(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_connection_set(self):
+ c = self.reactor.connection()
+ c.handler = BarfOnInit()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_connection_add(self):
+ c = self.reactor.connection()
+ c.handler = object()
+ c.handler.add(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_session_set(self):
+ c = self.reactor.connection()
+ s = c.session()
+ s.handler = BarfOnInit()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_session_add(self):
+ c = self.reactor.connection()
+ s = c.session()
+ s.handler = object()
+ s.handler.add(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_link_set(self):
+ c = self.reactor.connection()
+ s = c.session()
+ l = s.sender("xxx")
+ l.handler = BarfOnInit()
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_link_add(self):
+ c = self.reactor.connection()
+ s = c.session()
+ l = s.sender("xxx")
+ l.handler = object()
+ l.handler.add(BarfOnInit())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_schedule(self):
+ self.reactor.schedule(0, BarfOnTask())
+ try:
+ self.reactor.run()
+ assert False, "expected to barf"
+ except Barf:
+ pass
+
+ def test_schedule_many_nothings(self):
+ class Nothing:
+ results = []
+ def on_timer_task(self, event):
+ self.results.append(None)
+ num = 12345
+ for a in range(num):
+ self.reactor.schedule(0, Nothing())
+ self.reactor.run()
+ assert len(Nothing.results) == num
+
+ def test_schedule_many_nothing_refs(self):
+ class Nothing:
+ results = []
+ def on_timer_task(self, event):
+ self.results.append(None)
+ num = 12345
+ tasks = []
+ for a in range(num):
+ tasks.append(self.reactor.schedule(0, Nothing()))
+ self.reactor.run()
+ assert len(Nothing.results) == num
+
+ def test_schedule_many_nothing_refs_cancel_before_run(self):
+ class Nothing:
+ results = []
+ def on_timer_task(self, event):
+ self.results.append(None)
+ num = 12345
+ tasks = []
+ for a in range(num):
+ tasks.append(self.reactor.schedule(0, Nothing()))
+ for task in tasks:
+ task.cancel()
+ self.reactor.run()
+ assert len(Nothing.results) == 0
+
+ def test_schedule_cancel(self):
+ barf = self.reactor.schedule(10, BarfOnTask())
+ class CancelBarf:
+ def __init__(self, barf):
+ self.barf = barf
+ def on_timer_task(self, event):
+ self.barf.cancel()
+ pass
+ self.reactor.schedule(0, CancelBarf(barf))
+ now = self.reactor.mark()
+ try:
+ self.reactor.run()
+ elapsed = self.reactor.mark() - now
+ assert elapsed < 10, "expected cancelled task to not delay the reactor by %s" % elapsed
+ except Barf:
+ assert False, "expected barf to be cancelled"
+
+ def test_schedule_cancel_many(self):
+ num = 12345
+ barfs = set()
+ for a in range(num):
+ barf = self.reactor.schedule(10*(a+1), BarfOnTask())
+ class CancelBarf:
+ def __init__(self, barf):
+ self.barf = barf
+ def on_timer_task(self, event):
+ self.barf.cancel()
+ barfs.discard(self.barf)
+ pass
+ self.reactor.schedule(0, CancelBarf(barf))
+ barfs.add(barf)
+ now = self.reactor.mark()
+ try:
+ self.reactor.run()
+ elapsed = self.reactor.mark() - now
+ assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed
+ assert not barfs, "expected all barfs to be discarded"
+ except Barf:
+ assert False, "expected barf to be cancelled"
+
+
+class ApplicationEventTest(Test):
+ """Test application defined events and handlers."""
+
+ class MyTestServer(TestServer):
+ def __init__(self):
+ super(ApplicationEventTest.MyTestServer, self).__init__()
+
+ class MyHandler(Handler):
+ def __init__(self, test):
+ super(ApplicationEventTest.MyHandler, self).__init__()
+ self._test = test
+
+ def on_hello(self, event):
+ # verify PROTON-1056
+ self._test.hello_rcvd = str(event)
+
+ def on_goodbye(self, event):
+ self._test.goodbye_rcvd = str(event)
+
+ def setUp(self):
+ import os
+ if not hasattr(os, 'pipe'):
+ # KAG: seems like Jython doesn't have an os.pipe() method
+ raise SkipTest()
+ if os.name=="nt":
+ # Correct implementation on Windows is complicated
+ raise SkipTest("PROTON-1071")
+ self.server = ApplicationEventTest.MyTestServer()
+ self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self))
+ self.event_injector = EventInjector()
+ self.hello_event = ApplicationEvent("hello")
+ self.goodbye_event = ApplicationEvent("goodbye")
+ self.server.reactor.selectable(self.event_injector)
+ self.hello_rcvd = None
+ self.goodbye_rcvd = None
+ self.server.start()
+
+ def tearDown(self):
+ self.server.stop()
+
+ def _wait_for(self, predicate, timeout=10.0):
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ if predicate():
+ break
+ time.sleep(0.1)
+ assert predicate()
+
+ def test_application_events(self):
+ self.event_injector.trigger(self.hello_event)
+ self._wait_for(lambda: self.hello_rcvd is not None)
+ self.event_injector.trigger(self.goodbye_event)
+ self._wait_for(lambda: self.goodbye_rcvd is not None)
+
+
+class AuthenticationTestHandler(MessagingHandler):
+ def __init__(self):
+ super(AuthenticationTestHandler, self).__init__()
+ port = free_tcp_port()
+ self.url = "localhost:%i" % port
+ self.verified = False
+
+ def on_start(self, event):
+ self.listener = event.container.listen(self.url)
+
+ def on_connection_opened(self, event):
+ event.connection.close()
+
+ def on_connection_opening(self, event):
+ assert event.connection.transport.user == "user@proton"
+ self.verified = True
+
+ def on_connection_closed(self, event):
+ event.connection.close()
+ self.listener.close()
+
+ def on_connection_error(self, event):
+ event.connection.close()
+ self.listener.close()
+
+class ContainerTest(Test):
+ """Test container subclass of reactor."""
+
+ def test_event_has_container_attribute(self):
+ ensureCanTestExtendedSASL()
+ class TestHandler(MessagingHandler):
+ def __init__(self):
+ super(TestHandler, self).__init__()
+ port = free_tcp_port()
+ self.url = "localhost:%i" % port
+
+ def on_start(self, event):
+ self.listener = event.container.listen(self.url)
+
+ def on_connection_closing(self, event):
+ event.connection.close()
+ self.listener.close()
+ test_handler = TestHandler()
+ container = Container(test_handler)
+ class ConnectionHandler(MessagingHandler):
+ def __init__(self):
+ super(ConnectionHandler, self).__init__()
+
+ def on_connection_opened(self, event):
+ event.connection.close()
+ assert event.container == event.reactor
+ assert event.container == container
+ container.connect(test_handler.url, handler=ConnectionHandler())
+ container.run()
+
+ def test_authentication_via_url(self):
+ ensureCanTestExtendedSASL()
+ test_handler = AuthenticationTestHandler()
+ container = Container(test_handler)
+ container.connect("%s:password@%s" % ("user%40proton", test_handler.url), reconnect=False)
+ container.run()
+ assert test_handler.verified
+
+ def test_authentication_via_container_attributes(self):
+ ensureCanTestExtendedSASL()
+ test_handler = AuthenticationTestHandler()
+ container = Container(test_handler)
+ container.user = "user@proton"
+ container.password = "password"
+ container.connect(test_handler.url, reconnect=False)
+ container.run()
+ assert test_handler.verified
+
+ def test_authentication_via_kwargs(self):
+ ensureCanTestExtendedSASL()
+ test_handler = AuthenticationTestHandler()
+ container = Container(test_handler)
+ container.connect(test_handler.url, user="user@proton", password="password", reconnect=False)
+ container.run()
+ assert test_handler.verified
+
+ class _ServerHandler(MessagingHandler):
+ def __init__(self, host):
+ super(ContainerTest._ServerHandler, self).__init__()
+ self.host = host
+ port = free_tcp_port()
+ self.port = free_tcp_port()
+ self.client_addr = None
+ self.peer_hostname = None
+
+ def on_start(self, event):
+ self.listener = event.container.listen("%s:%s" % (self.host, self.port))
+
+ def on_connection_opened(self, event):
+ self.client_addr = event.reactor.get_connection_address(event.connection)
+ self.peer_hostname = event.connection.remote_hostname
+
+ def on_connection_closing(self, event):
+ event.connection.close()
+ self.listener.close()
+
+ class _ClientHandler(MessagingHandler):
+ def __init__(self):
+ super(ContainerTest._ClientHandler, self).__init__()
+ self.server_addr = None
+
+ def on_connection_opened(self, event):
+ self.server_addr = event.reactor.get_connection_address(event.connection)
+ event.connection.close()
+
+ def test_numeric_hostname(self):
+ ensureCanTestExtendedSASL()
+ server_handler = ContainerTest._ServerHandler("127.0.0.1")
+ client_handler = ContainerTest._ClientHandler()
+ container = Container(server_handler)
+ container.connect(url=Url(host="127.0.0.1",
+ port=server_handler.port),
+ handler=client_handler)
+ container.run()
+ assert server_handler.client_addr
+ assert client_handler.server_addr
+ assert server_handler.peer_hostname == "127.0.0.1", server_handler.peer_hostname
+ assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
+
+ def test_non_numeric_hostname(self):
+ ensureCanTestExtendedSASL()
+ server_handler = ContainerTest._ServerHandler("localhost")
+ client_handler = ContainerTest._ClientHandler()
+ container = Container(server_handler)
+ container.connect(url=Url(host="localhost",
+ port=server_handler.port),
+ handler=client_handler)
+ container.run()
+ assert server_handler.client_addr
+ assert client_handler.server_addr
+ assert server_handler.peer_hostname == "localhost", server_handler.peer_hostname
+ assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
+
+ def test_virtual_host(self):
+ ensureCanTestExtendedSASL()
+ server_handler = ContainerTest._ServerHandler("localhost")
+ container = Container(server_handler)
+ conn = container.connect(url=Url(host="localhost",
+ port=server_handler.port),
+ handler=ContainerTest._ClientHandler(),
+ virtual_host="a.b.c.org")
+ container.run()
+ assert server_handler.peer_hostname == "a.b.c.org", server_handler.peer_hostname
+
+ def test_no_virtual_host(self):
+ # explicitly setting an empty virtual host should prevent the hostname
+ # field from being sent in the Open performative when using the
+ # Python Container.
+ server_handler = ContainerTest._ServerHandler("localhost")
+ container = Container(server_handler)
+ conn = container.connect(url=Url(host="localhost",
+ port=server_handler.port),
+ handler=ContainerTest._ClientHandler(),
+ virtual_host="")
+ container.run()
+ assert server_handler.peer_hostname is None, server_handler.peer_hostname
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/sasl.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/sasl.py b/python/tests/proton_tests/sasl.py
new file mode 100644
index 0000000..09b5f81
--- /dev/null
+++ b/python/tests/proton_tests/sasl.py
@@ -0,0 +1,587 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import absolute_import
+
+import sys, os
+from . import common
+from . import engine
+
+from proton import *
+from .common import pump, Skipped
+
+def _sslCertpath(file):
+ """ Return the full path to the certificate, keyfile, etc.
+ """
+ if os.name=="nt":
+ if file.find("private-key")!=-1:
+ # The private key is not in a separate store
+ return None
+ # Substitute pkcs#12 equivalent for the CA/key store
+ if file.endswith(".pem"):
+ file = file[:-4] + ".p12"
+ return os.path.join(os.path.dirname(__file__),
+ "ssl_db/%s" % file)
+
+def _testSaslMech(self, mech, clientUser='user@proton', authUser='user@proton', encrypted=False, authenticated=True):
+ self.s1.allowed_mechs(mech)
+ self.c1.open()
+ self.c2.open()
+
+ pump(self.t1, self.t2, 1024)
+
+ if encrypted is not None:
+ assert self.t2.encrypted == encrypted, encrypted
+ assert self.t1.encrypted == encrypted, encrypted
+
+ assert self.t2.authenticated == authenticated, authenticated
+ assert self.t1.authenticated == authenticated, authenticated
+ if authenticated:
+ # Server
+ assert self.t2.user == authUser
+ assert self.s2.user == authUser
+ assert self.s2.mech == mech.strip()
+ assert self.s2.outcome == SASL.OK, self.s2.outcome
+ assert self.c2.state & Endpoint.LOCAL_ACTIVE and self.c2.state & Endpoint.REMOTE_ACTIVE,\
+ "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
+ # Client
+ assert self.t1.user == clientUser
+ assert self.s1.user == clientUser
+ assert self.s1.mech == mech.strip()
+ assert self.s1.outcome == SASL.OK, self.s1.outcome
+ assert self.c1.state & Endpoint.LOCAL_ACTIVE and self.c1.state & Endpoint.REMOTE_ACTIVE,\
+ "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
+ else:
+ # Server
+ assert self.t2.user == None
+ assert self.s2.user == None
+ assert self.s2.outcome != SASL.OK, self.s2.outcome
+ # Client
+ assert self.t1.user == clientUser
+ assert self.s1.user == clientUser
+ assert self.s1.outcome != SASL.OK, self.s1.outcome
+
+class Test(common.Test):
+ pass
+
+def consumeAllOuput(t):
+ stops = 0
+ while stops<1:
+ out = t.peek(1024)
+ l = len(out) if out else 0
+ t.pop(l)
+ if l <= 0:
+ stops += 1
+
+class SaslTest(Test):
+
+ def setUp(self):
+ self.t1 = Transport()
+ self.s1 = SASL(self.t1)
+ self.t2 = Transport(Transport.SERVER)
+ self.t2.max_frame_size = 65536
+ self.s2 = SASL(self.t2)
+
+ def pump(self):
+ pump(self.t1, self.t2, 1024)
+
+ # We have to generate the client frames manually because proton does not
+ # generate pipelined SASL and AMQP frames together
+ def testIllegalProtocolLayering(self):
+ # Server
+ self.s2.allowed_mechs('ANONYMOUS')
+
+ c2 = Connection()
+ self.t2.bind(c2)
+
+ assert self.s2.outcome is None
+
+ # Push client bytes into server
+ self.t2.push(
+ # SASL
+ b'AMQP\x03\x01\x00\x00'
+ # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+ b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+ # SASL (again illegally)
+ b'AMQP\x03\x01\x00\x00'
+ # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+ b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+ # AMQP
+ b'AMQP\x00\x01\x00\x00'
+ # @open(16) [container-id="", channel-max=1234]
+ b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+ )
+
+ consumeAllOuput(self.t2)
+
+ assert self.t2.condition
+ assert self.t2.closed
+ assert not c2.state & Endpoint.REMOTE_ACTIVE
+
+ def testPipelinedClient(self):
+ # Server
+ self.s2.allowed_mechs('ANONYMOUS')
+
+ c2 = Connection()
+ self.t2.bind(c2)
+
+ assert self.s2.outcome is None
+
+ # Push client bytes into server
+ self.t2.push(
+ # SASL
+ b'AMQP\x03\x01\x00\x00'
+ # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
+ b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
+ # AMQP
+ b'AMQP\x00\x01\x00\x00'
+ # @open(16) [container-id="", channel-max=1234]
+ b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+ )
+
+ consumeAllOuput(self.t2)
+
+ assert not self.t2.condition
+ assert self.s2.outcome == SASL.OK
+ assert c2.state & Endpoint.REMOTE_ACTIVE
+
+ def testPipelinedServer(self):
+ # Client
+ self.s1.allowed_mechs('ANONYMOUS')
+
+ c1 = Connection()
+ self.t1.bind(c1)
+
+ assert self.s1.outcome is None
+
+ # Push server bytes into client
+ # Commented out lines in this test are where the client input processing doesn't
+ # run after output processing even though there is input waiting
+ self.t1.push(
+ # SASL
+ b'AMQP\x03\x01\x00\x00'
+ # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
+ b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
+ # @sasl-outcome(68) [code=0]
+ b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
+ # AMQP
+ b'AMQP\x00\x01\x00\x00'
+ # @open(16) [container-id="", channel-max=1234]
+ b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
+ )
+
+ consumeAllOuput(self.t1)
+
+ assert self.s1.outcome == SASL.OK
+ assert c1.state & Endpoint.REMOTE_ACTIVE
+
+ def testPipelined2(self):
+ out1 = self.t1.peek(1024)
+ self.t1.pop(len(out1))
+ self.t2.push(out1)
+
+ self.s2.allowed_mechs('ANONYMOUS')
+ c2 = Connection()
+ c2.open()
+ self.t2.bind(c2)
+
+ out2 = self.t2.peek(1024)
+ self.t2.pop(len(out2))
+ self.t1.push(out2)
+
+ out1 = self.t1.peek(1024)
+ assert len(out1) > 0
+
+ def testFracturedSASL(self):
+ """ PROTON-235
+ """
+ assert self.s1.outcome is None
+
+ # self.t1.trace(Transport.TRACE_FRM)
+
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push(b"AMQP\x03\x01\x00\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push(b"\x00\x00\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+
+ self.t1.push(b"6\x02\x01\x00\x00\x00S@\xc0\x29\x01\xe0\x26\x04\xa3\x05PLAIN\x0aDIGEST-MD5\x09ANONYMOUS\x08CRAM-MD5")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ self.t1.push(b"\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+ while out:
+ out = self.t1.peek(1024)
+ self.t1.pop(len(out))
+
+ assert self.s1.outcome == SASL.OK, self.s1.outcome
+
+ def test_singleton(self):
+ """Verify that only a single instance of SASL can exist per Transport"""
+ transport = Transport()
+ attr = object()
+ sasl1 = SASL(transport)
+ sasl1.my_attribute = attr
+ sasl2 = transport.sasl()
+ sasl3 = SASL(transport)
+ assert sasl1 == sasl2
+ assert sasl1 == sasl3
+ assert sasl1.my_attribute == attr
+ assert sasl2.my_attribute == attr
+ assert sasl3.my_attribute == attr
+ transport = Transport()
+ sasl1 = transport.sasl()
+ sasl1.my_attribute = attr
+ sasl2 = SASL(transport)
+ assert sasl1 == sasl2
+ assert sasl1.my_attribute == attr
+ assert sasl2.my_attribute == attr
+
+ def testSaslSkipped(self):
+ """Verify that the server (with SASL) correctly handles a client without SASL"""
+ self.t1 = Transport()
+ self.t2.require_auth(False)
+ self.pump()
+ assert self.s2.outcome == None
+ assert self.t2.condition == None
+ assert self.t2.authenticated == False
+ assert self.s1.outcome == None
+ assert self.t1.condition == None
+ assert self.t1.authenticated == False
+
+ def testSaslSkippedFail(self):
+ """Verify that the server (with SASL) correctly handles a client without SASL"""
+ self.t1 = Transport()
+ self.t2.require_auth(True)
+ self.pump()
+ assert self.s2.outcome == None
+ assert self.t2.condition != None
+ assert self.s1.outcome == None
+ assert self.t1.condition != None
+
+ def testMechNotFound(self):
+ self.c1 = Connection()
+ self.c1.open()
+ self.t1.bind(self.c1)
+ self.s1.allowed_mechs('IMPOSSIBLE')
+
+ self.pump()
+
+ assert self.t2.authenticated == False
+ assert self.t1.authenticated == False
+ assert self.s1.outcome != SASL.OK
+ assert self.s2.outcome != SASL.OK
+
+class SASLMechTest(Test):
+ def setUp(self):
+ self.t1 = Transport()
+ self.s1 = SASL(self.t1)
+ self.t2 = Transport(Transport.SERVER)
+ self.s2 = SASL(self.t2)
+
+ self.c1 = Connection()
+ self.c1.user = 'user@proton'
+ self.c1.password = 'password'
+ self.c1.hostname = 'localhost'
+
+ self.c2 = Connection()
+
+ def testANON(self):
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'ANONYMOUS', authUser='anonymous')
+
+ def testCRAMMD5(self):
+ common.ensureCanTestExtendedSASL()
+
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'CRAM-MD5')
+
+ def testDIGESTMD5(self):
+ common.ensureCanTestExtendedSASL()
+
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5')
+
+ # PLAIN shouldn't work without encryption without special setting
+ def testPLAINfail(self):
+ common.ensureCanTestExtendedSASL()
+
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'PLAIN', authenticated=False)
+
+ # Client won't accept PLAIN even if offered by server without special setting
+ def testPLAINClientFail(self):
+ common.ensureCanTestExtendedSASL()
+
+ self.s2.allow_insecure_mechs = True
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'PLAIN', authenticated=False)
+
+ # PLAIN will only work if both ends are specially set up
+ def testPLAIN(self):
+ common.ensureCanTestExtendedSASL()
+
+ self.s1.allow_insecure_mechs = True
+ self.s2.allow_insecure_mechs = True
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'PLAIN')
+
+# SCRAM not supported before Cyrus SASL 2.1.26
+# so not universal and hence need a test for support
+# to keep it in tests.
+# def testSCRAMSHA1(self):
+# common.ensureCanTestExtendedSASL()
+#
+# self.t1.bind(self.c1)
+# self.t2.bind(self.c2)
+# _testSaslMech(self, 'SCRAM-SHA-1')
+
+def _sslConnection(domain, transport, connection):
+ transport.bind(connection)
+ ssl = SSL(transport, domain, None )
+ return connection
+
+class SSLSASLTest(Test):
+ def setUp(self):
+ if not common.isSSLPresent():
+ raise Skipped("No SSL libraries found.")
+
+ self.server_domain = SSLDomain(SSLDomain.MODE_SERVER)
+ self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT)
+
+ self.t1 = Transport()
+ self.s1 = SASL(self.t1)
+ self.t2 = Transport(Transport.SERVER)
+ self.s2 = SASL(self.t2)
+
+ self.c1 = Connection()
+ self.c2 = Connection()
+
+ def testSSLPlainSimple(self):
+ if not SASL.extended():
+ raise Skipped("Simple SASL server does not support PLAIN")
+ common.ensureCanTestExtendedSASL()
+
+ clientUser = 'user@proton'
+ mech = 'PLAIN'
+
+ self.c1.user = clientUser
+ self.c1.password = 'password'
+ self.c1.hostname = 'localhost'
+
+ ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+ ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+ _testSaslMech(self, mech, encrypted=True)
+
+ def testSSLPlainSimpleFail(self):
+ if not SASL.extended():
+ raise Skipped("Simple SASL server does not support PLAIN")
+ common.ensureCanTestExtendedSASL()
+
+ clientUser = 'usr@proton'
+ mech = 'PLAIN'
+
+ self.c1.user = clientUser
+ self.c1.password = 'password'
+ self.c1.hostname = 'localhost'
+
+ ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+ ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+ _testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False)
+
+ def testSSLExternalSimple(self):
+ if os.name=="nt":
+ extUser = 'O=Client, CN=127.0.0.1'
+ else:
+ extUser = 'O=Client,CN=127.0.0.1'
+ mech = 'EXTERNAL'
+
+ self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
+ _sslCertpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+ self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
+ _sslCertpath("ca-certificate.pem") )
+ self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
+ _sslCertpath("client-private-key.pem"),
+ "client-password")
+ self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+ self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
+
+ ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+ ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+ _testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True)
+
+ def testSSLExternalSimpleFail(self):
+ mech = 'EXTERNAL'
+
+ self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
+ _sslCertpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+ self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
+ _sslCertpath("ca-certificate.pem") )
+ self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
+ self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
+
+ ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
+ ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
+
+ _testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False)
+
+class SASLEventTest(engine.CollectorTest):
+ def setUp(self):
+ engine.CollectorTest.setUp(self)
+ self.t1 = Transport()
+ self.s1 = SASL(self.t1)
+ self.t2 = Transport(Transport.SERVER)
+ self.s2 = SASL(self.t2)
+
+ self.c1 = Connection()
+ self.c1.user = 'user@proton'
+ self.c1.password = 'password'
+ self.c1.hostname = 'localhost'
+
+ self.c2 = Connection()
+
+ self.collector = Collector()
+
+ def testNormalAuthenticationClient(self):
+ common.ensureCanTestExtendedSASL()
+ self.c1.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5')
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.CONNECTION_REMOTE_OPEN)
+
+ def testNormalAuthenticationServer(self):
+ common.ensureCanTestExtendedSASL()
+ self.c2.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5')
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.CONNECTION_REMOTE_OPEN)
+
+ def testFailedAuthenticationClient(self):
+ common.ensureCanTestExtendedSASL()
+ clientUser = "usr@proton"
+ self.c1.user = clientUser
+ self.c1.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR,
+ Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testFailedAuthenticationServer(self):
+ common.ensureCanTestExtendedSASL()
+ clientUser = "usr@proton"
+ self.c1.user = clientUser
+ self.c2.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR,
+ Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testNoMechClient(self):
+ common.ensureCanTestExtendedSASL()
+ self.c1.collect(self.collector)
+ self.s2.allowed_mechs('IMPOSSIBLE')
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR,
+ Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testNoMechServer(self):
+ common.ensureCanTestExtendedSASL()
+ self.c2.collect(self.collector)
+ self.s2.allowed_mechs('IMPOSSIBLE')
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testDisallowedMechClient(self):
+ self.c1.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR,
+ Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testDisallowedMechServer(self):
+ self.c2.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testDisallowedPlainClient(self):
+ self.c1.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'PLAIN', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_ERROR,
+ Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+ def testDisallowedPlainServer(self):
+ self.c2.collect(self.collector)
+ self.t1.bind(self.c1)
+ self.t2.bind(self.c2)
+ _testSaslMech(self, 'PLAIN', authenticated=False)
+ self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+ Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+ Event.TRANSPORT_TAIL_CLOSED,
+ Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/python/tests/proton_tests/soak.py
----------------------------------------------------------------------
diff --git a/python/tests/proton_tests/soak.py b/python/tests/proton_tests/soak.py
new file mode 100644
index 0000000..b7b521c
--- /dev/null
+++ b/python/tests/proton_tests/soak.py
@@ -0,0 +1,334 @@
+from __future__ import absolute_import
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os
+import sys
+from .common import Test, Skipped, free_tcp_ports, \
+ MessengerReceiverC, MessengerSenderC, \
+ MessengerReceiverValgrind, MessengerSenderValgrind, \
+ ReactorReceiverC, ReactorSenderC, \
+ ReactorReceiverValgrind, ReactorSenderValgrind, \
+ isSSLPresent
+from proton import *
+
+#
+# Tests that run the apps
+#
+
+class AppTests(Test):
+
+ def __init__(self, *args):
+ Test.__init__(self, *args)
+ self.is_valgrind = False
+
+ def default(self, name, value, **kwargs):
+ if self.is_valgrind:
+ default = kwargs.get("valgrind", value)
+ else:
+ default = value
+ return Test.default(self, name, default, **kwargs)
+
+ @property
+ def iterations(self):
+ return int(self.default("iterations", 2, fast=1, valgrind=2))
+
+ @property
+ def send_count(self):
+ return int(self.default("send_count", 17, fast=1, valgrind=2))
+
+ @property
+ def target_count(self):
+ return int(self.default("target_count", 5, fast=1, valgrind=2))
+
+ @property
+ def send_batch(self):
+ return int(self.default("send_batch", 7, fast=1, valgrind=2))
+
+ @property
+ def forward_count(self):
+ return int(self.default("forward_count", 5, fast=1, valgrind=2))
+
+ @property
+ def port_count(self):
+ return int(self.default("port_count", 3, fast=1, valgrind=2))
+
+ @property
+ def sender_count(self):
+ return int(self.default("sender_count", 3, fast=1, valgrind=2))
+
+ def valgrind_test(self):
+ self.is_valgrind = True
+
+ def setUp(self):
+ self.senders = []
+ self.receivers = []
+
+ def tearDown(self):
+ pass
+
+ def _do_test(self, iterations=1):
+ verbose = self.verbose
+
+ for R in self.receivers:
+ R.start( verbose )
+
+ for j in range(iterations):
+ for S in self.senders:
+ S.start( verbose )
+
+ for S in self.senders:
+ S.wait()
+ #print("SENDER OUTPUT:")
+ #print( S.stdout() )
+ assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
+ % (str(S.cmdline()),
+ S.status(),
+ S.stdout(),
+ S.stderr()))
+
+ for R in self.receivers:
+ R.wait()
+ #print("RECEIVER OUTPUT")
+ #print( R.stdout() )
+ assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
+ % (str(R.cmdline()),
+ R.status(),
+ R.stdout(),
+ R.stderr()))
+
+#
+# Traffic passing tests based on the Messenger apps
+#
+
+class MessengerTests(AppTests):
+
+ _timeout = 60
+
+ def _ssl_check(self):
+ if not isSSLPresent():
+ raise Skipped("No SSL libraries found.")
+ if os.name=="nt":
+ raise Skipped("Windows SChannel lacks anonymous cipher support.")
+
+ def __init__(self, *args):
+ AppTests.__init__(self, *args)
+
+ def _do_oneway_test(self, receiver, sender, domain="amqp"):
+ """ Send N messages to a receiver.
+ Parameters:
+ iterations - repeat the senders this many times
+ target_count = # of targets to send to.
+ send_count = # messages sent to each target
+ """
+ iterations = self.iterations
+ send_count = self.send_count
+ target_count = self.target_count
+
+ send_total = send_count * target_count
+ receive_total = send_total * iterations
+
+ port = free_tcp_ports()[0]
+
+ receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+ receiver.receive_count = receive_total
+ receiver.timeout = MessengerTests._timeout
+ self.receivers.append( receiver )
+
+ sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)]
+ sender.send_count = send_total
+ sender.timeout = MessengerTests._timeout
+ self.senders.append( sender )
+
+ self._do_test(iterations)
+
+ def _do_echo_test(self, receiver, sender, domain="amqp"):
+ """ Send N messages to a receiver, which responds to each.
+ Parameters:
+ iterations - repeat the senders this many times
+ target_count - # targets to send to
+ send_count = # messages sent to each target
+ send_batch - wait for replies after this many messages sent
+ """
+ iterations = self.iterations
+ send_count = self.send_count
+ target_count = self.target_count
+ send_batch = self.send_batch
+
+ send_total = send_count * target_count
+ receive_total = send_total * iterations
+
+ port = free_tcp_ports()[0]
+
+ receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+ receiver.receive_count = receive_total
+ receiver.send_reply = True
+ receiver.timeout = MessengerTests._timeout
+ self.receivers.append( receiver )
+
+ sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)]
+ sender.send_count = send_total
+ sender.get_reply = True
+ sender.send_batch = send_batch
+ sender.timeout = MessengerTests._timeout
+ self.senders.append( sender )
+
+ self._do_test(iterations)
+
+ def _do_relay_test(self, receiver, relay, sender, domain="amqp"):
+ """ Send N messages to a receiver, which replies to each and forwards
+ each of them to different receiver.
+ Parameters:
+ iterations - repeat the senders this many times
+ target_count - # targets to send to
+ send_count = # messages sent to each target
+ send_batch - wait for replies after this many messages sent
+ forward_count - forward to this many targets
+ """
+ iterations = self.iterations
+ send_count = self.send_count
+ target_count = self.target_count
+ send_batch = self.send_batch
+ forward_count = self.forward_count
+
+ send_total = send_count * target_count
+ receive_total = send_total * iterations
+
+ port = free_tcp_ports()[0]
+
+ receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
+ receiver.receive_count = receive_total
+ receiver.send_reply = True
+ # forward to 'relay' - uses two links
+ # ## THIS FAILS:
+ # receiver.forwards = ["amqp://Relay/%d" % j for j in range(forward_count)]
+ receiver.forwards = ["%s://Relay" % domain]
+ receiver.timeout = MessengerTests._timeout
+ self.receivers.append( receiver )
+
+ relay.subscriptions = ["%s://0.0.0.0:%s" % (domain, port)]
+ relay.name = "Relay"
+ relay.receive_count = receive_total
+ relay.timeout = MessengerTests._timeout
+ self.receivers.append( relay )
+
+ # send to 'receiver'
+ sender.targets = ["%s://0.0.0.0:%s/X%dY" % (domain, port, j) for j in range(target_count)]
+ sender.send_count = send_total
+ sender.get_reply = True
+ sender.timeout = MessengerTests._timeout
+ self.senders.append( sender )
+
+ self._do_test(iterations)
+
+
+ def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"):
+ """
+ A star-like topology, with a central receiver at the hub, and senders at
+ the spokes. Each sender will connect to each of the ports the receiver is
+ listening on. Each sender will then create N links per each connection.
+ Each sender will send X messages per link, waiting for a response.
+ Parameters:
+ iterations - repeat the senders this many times
+ port_count - # of ports the receiver will listen on. Each sender connects
+ to all ports.
+ sender_count - # of senders
+ target_count - # of targets per connection
+ send_count - # of messages sent to each target
+ send_batch - # of messages to send before waiting for response
+ """
+ iterations = self.iterations
+ port_count = self.port_count
+ sender_count = self.sender_count
+ target_count = self.target_count
+ send_count = self.send_count
+ send_batch = self.send_batch
+
+ send_total = port_count * target_count * send_count
+ receive_total = send_total * sender_count * iterations
+
+ ports = free_tcp_ports(port_count)
+
+ receiver = r_factory()
+ receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports]
+ receiver.receive_count = receive_total
+ receiver.send_reply = True
+ receiver.timeout = MessengerTests._timeout
+ self.receivers.append( receiver )
+
+ for i in range(sender_count):
+ sender = s_factory()
+ sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)]
+ sender.send_count = send_total
+ sender.send_batch = send_batch
+ sender.get_reply = True
+ sender.timeout = MessengerTests._timeout
+ self.senders.append( sender )
+
+ self._do_test(iterations)
+
+ def test_oneway_C(self):
+ self._do_oneway_test(MessengerReceiverC(), MessengerSenderC())
+
+ def test_oneway_C_SSL(self):
+ self._ssl_check()
+ self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+ def test_oneway_valgrind(self):
+ self.valgrind_test()
+ self._do_oneway_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+ def test_echo_C(self):
+ self._do_echo_test(MessengerReceiverC(), MessengerSenderC())
+
+ def test_echo_C_SSL(self):
+ self._ssl_check()
+ self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+ def test_echo_valgrind(self):
+ self.valgrind_test()
+ self._do_echo_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+ def test_relay_C(self):
+ self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC())
+
+ def test_relay_C_SSL(self):
+ self._ssl_check()
+ self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC(), "amqps")
+
+ def test_relay_valgrind(self):
+ self.valgrind_test()
+ self._do_relay_test(MessengerReceiverValgrind(), MessengerReceiverValgrind(), MessengerSenderValgrind())
+
+ def test_star_topology_C(self):
+ self._do_star_topology_test( MessengerReceiverC, MessengerSenderC )
+
+ def test_star_topology_C_SSL(self):
+ self._ssl_check()
+ self._do_star_topology_test( MessengerReceiverC, MessengerSenderC, "amqps" )
+
+ def test_star_topology_valgrind(self):
+ self.valgrind_test()
+ self._do_star_topology_test( MessengerReceiverValgrind, MessengerSenderValgrind )
+
+ def test_oneway_reactor(self):
+ self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())
+
+ def test_oneway_reactor_valgrind(self):
+ self.valgrind_test()
+ self._do_oneway_test(ReactorReceiverValgrind(), ReactorSenderValgrind())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org