You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/06 16:59:17 UTC
[05/11] qpid-proton git commit: PROTON-1885: [python] move
tests/python to python/tests
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/interop.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/interop.py b/tests/python/proton_tests/interop.py
deleted file mode 100644
index 2825fe4..0000000
--- a/tests/python/proton_tests/interop.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from proton import *
-import os
-from . import common
-
-
-def find_test_interop_dir():
- """Walk up the directory tree to find the tests directory."""
- f = os.path.dirname(os.path.abspath(__file__))
- while f and os.path.basename(f) != "tests": f = os.path.dirname(f)
- f = os.path.join(f, "interop")
- if not os.path.isdir(f):
- raise Exception("Cannot find test/interop directory from "+__file__)
- return f
-
-test_interop_dir=find_test_interop_dir()
-
-class InteropTest(common.Test):
-
- def setUp(self):
- self.data = Data()
- self.message = Message()
-
- def tearDown(self):
- self.data = None
-
- def get_data(self, name):
- filename = os.path.join(test_interop_dir, name+".amqp")
- f = open(filename,"rb")
- try: return f.read()
- finally: f.close()
-
- def decode_data(self, encoded):
- buffer = encoded
- while buffer:
- n = self.data.decode(buffer)
- buffer = buffer[n:]
- self.data.rewind()
-
- def decode_data_file(self, name):
- encoded = self.get_data(name)
- self.decode_data(encoded)
- encoded_size = self.data.encoded_size()
- # Re-encode and verify pre-computed and actual encoded size match.
- reencoded = self.data.encode()
- assert encoded_size == len(reencoded), "%d != %d" % (encoded_size, len(reencoded))
- # verify round trip bytes
- assert reencoded == encoded, "Value mismatch: %s != %s" % (reencoded, encoded)
-
- def decode_message_file(self, name):
- self.message.decode(self.get_data(name))
- body = self.message.body
- if str(type(body)) == "<type 'org.apache.qpid.proton.amqp.Binary'>":
- body = body.array.tostring()
- self.decode_data(body)
-
- def assert_next(self, type, value):
- next_type = self.data.next()
- assert next_type == type, "Type mismatch: %s != %s"%(
- Data.type_names[next_type], Data.type_names[type])
- next_value = self.data.get_object()
- assert next_value == value, "Value mismatch: %s != %s"%(next_value, value)
-
- def test_message(self):
- self.decode_message_file("message")
- self.assert_next(Data.STRING, "hello")
- assert self.data.next() is None
-
- def test_primitives(self):
- self.decode_data_file("primitives")
- self.assert_next(Data.BOOL, True)
- self.assert_next(Data.BOOL, False)
- self.assert_next(Data.UBYTE, 42)
- self.assert_next(Data.USHORT, 42)
- self.assert_next(Data.SHORT, -42)
- self.assert_next(Data.UINT, 12345)
- self.assert_next(Data.INT, -12345)
- self.assert_next(Data.ULONG, 12345)
- self.assert_next(Data.LONG, -12345)
- self.assert_next(Data.FLOAT, 0.125)
- self.assert_next(Data.DOUBLE, 0.125)
- assert self.data.next() is None
-
- def test_strings(self):
- self.decode_data_file("strings")
- self.assert_next(Data.BINARY, b"abc\0defg")
- self.assert_next(Data.STRING, "abcdefg")
- self.assert_next(Data.SYMBOL, "abcdefg")
- self.assert_next(Data.BINARY, b"")
- self.assert_next(Data.STRING, "")
- self.assert_next(Data.SYMBOL, "")
- assert self.data.next() is None
-
- def test_described(self):
- self.decode_data_file("described")
- self.assert_next(Data.DESCRIBED, Described("foo-descriptor", "foo-value"))
- self.data.exit()
-
- assert self.data.next() == Data.DESCRIBED
- self.data.enter()
- self.assert_next(Data.INT, 12)
- self.assert_next(Data.INT, 13)
- self.data.exit()
-
- assert self.data.next() is None
-
- def test_described_array(self):
- self.decode_data_file("described_array")
- self.assert_next(Data.ARRAY, Array("int-array", Data.INT, *range(0,10)))
-
- def test_arrays(self):
- self.decode_data_file("arrays")
- self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT, *range(0,100)))
- self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.STRING, *["a", "b", "c"]))
- self.assert_next(Data.ARRAY, Array(UNDESCRIBED, Data.INT))
- assert self.data.next() is None
-
- def test_lists(self):
- self.decode_data_file("lists")
- self.assert_next(Data.LIST, [32, "foo", True])
- self.assert_next(Data.LIST, [])
- assert self.data.next() is None
-
- def test_maps(self):
- self.decode_data_file("maps")
- self.assert_next(Data.MAP, {"one":1, "two":2, "three":3 })
- self.assert_next(Data.MAP, {1:"one", 2:"two", 3:"three"})
- self.assert_next(Data.MAP, {})
- assert self.data.next() is None
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/message.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/message.py b/tests/python/proton_tests/message.py
deleted file mode 100644
index 1a2709c..0000000
--- a/tests/python/proton_tests/message.py
+++ /dev/null
@@ -1,275 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-from . import common
-from proton import *
-from uuid import uuid4
-
-class Test(common.Test):
-
- def setUp(self):
- self.msg = Message()
-
- def tearDown(self):
- self.msg = None
-
-
-class AccessorsTest(Test):
-
- def _test(self, name, default, values):
- d = getattr(self.msg, name)
- assert d == default, (d, default)
- for v in values:
- setattr(self.msg, name, v)
- gotten = getattr(self.msg, name)
- assert gotten == v, gotten
-
- def _test_symbol(self, name):
- self._test(name, symbol(None), (symbol(u"abc.123.#$%"), symbol(u"hello.world")))
-
- def _test_str(self, name):
- self._test(name, None, (u"asdf", u"fdsa", u""))
-
- def _test_time(self, name):
- self._test(name, 0, (0, 123456789, 987654321))
-
- def testId(self):
- self._test("id", None, ("bytes", None, 123, u"string", uuid4()))
-
- def testCorrelationId(self):
- self._test("correlation_id", None, ("bytes", None, 123, u"string", uuid4()))
-
- def testDurable(self):
- self._test("durable", False, (True, False))
-
- def testPriority(self):
- self._test("priority", Message.DEFAULT_PRIORITY, range(0, 255))
-
- def testTtl(self):
- self._test("ttl", 0, range(12345, 54321))
-
- def testFirstAcquirer(self):
- self._test("first_acquirer", False, (True, False))
-
- def testDeliveryCount(self):
- self._test("delivery_count", 0, range(0, 1024))
-
- def testUserId(self):
- self._test("user_id", b"", (b"asdf", b"fdsa", b"asd\x00fdsa", b""))
-
- def testAddress(self):
- self._test_str("address")
-
- def testSubject(self):
- self._test_str("subject")
-
- def testReplyTo(self):
- self._test_str("reply_to")
-
- def testContentType(self):
- self._test_symbol("content_type")
-
- def testContentEncoding(self):
- self._test_symbol("content_encoding")
-
- def testExpiryTime(self):
- self._test_time("expiry_time")
-
- def testCreationTime(self):
- self._test_time("creation_time")
-
- def testGroupId(self):
- self._test_str("group_id")
-
- def testGroupSequence(self):
- self._test("group_sequence", 0, (0, -10, 10, 20, -20))
-
- def testReplyToGroupId(self):
- self._test_str("reply_to_group_id")
-
-class CodecTest(Test):
-
- def testProperties(self):
- self.msg.properties = {}
- self.msg.properties['key'] = 'value'
- data = self.msg.encode()
-
- msg2 = Message()
- msg2.decode(data)
-
- assert msg2.properties['key'] == 'value', msg2.properties['key']
-
- def testRoundTrip(self):
- self.msg.id = "asdf"
- self.msg.correlation_id = uuid4()
- self.msg.ttl = 3
- self.msg.priority = 100
- self.msg.address = "address"
- self.msg.subject = "subject"
- self.msg.body = 'Hello World!'
-
- data = self.msg.encode()
-
- msg2 = Message()
- msg2.decode(data)
-
- assert self.msg.id == msg2.id, (self.msg.id, msg2.id)
- assert self.msg.correlation_id == msg2.correlation_id, (self.msg.correlation_id, msg2.correlation_id)
- assert self.msg.ttl == msg2.ttl, (self.msg.ttl, msg2.ttl)
- assert self.msg.priority == msg2.priority, (self.msg.priority, msg2.priority)
- assert self.msg.address == msg2.address, (self.msg.address, msg2.address)
- assert self.msg.subject == msg2.subject, (self.msg.subject, msg2.subject)
- assert self.msg.body == msg2.body, (self.msg.body, msg2.body)
-
- def testExpiryEncodeAsNull(self):
- self.msg.group_id = "A" # Force creation and expiry fields to be present
- data = self.msg.encode()
-
- decoder = Data()
-
- # Skip past the headers
- consumed = decoder.decode(data)
- decoder.clear()
- data = data[consumed:]
-
- decoder.decode(data)
- dproperties = decoder.get_py_described()
- # Check we've got the correct described list
- assert dproperties.descriptor == 0x73, (dproperties.descriptor)
-
- properties = dproperties.value
- assert properties[8] == None, properties[8]
-
- def testCreationEncodeAsNull(self):
- self.msg.group_id = "A" # Force creation and expiry fields to be present
- data = self.msg.encode()
-
- decoder = Data()
-
- # Skip past the headers
- consumed = decoder.decode(data)
- decoder.clear()
- data = data[consumed:]
-
- decoder.decode(data)
- dproperties = decoder.get_py_described()
- # Check we've got the correct described list
- assert dproperties.descriptor == 0x73, (dproperties.descriptor)
-
- properties = dproperties.value
- assert properties[9] == None, properties[9]
-
- def testGroupSequenceEncodeAsNull(self):
- self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
- data = self.msg.encode()
-
- decoder = Data()
-
- # Skip past the headers
- consumed = decoder.decode(data)
- decoder.clear()
- data = data[consumed:]
-
- decoder.decode(data)
- dproperties = decoder.get_py_described()
- # Check we've got the correct described list
- assert dproperties.descriptor == 0x73, (dproperties.descriptor)
-
- properties = dproperties.value
- assert properties[10] == None, properties[10]
- assert properties[11] == None, properties[11]
-
- def testGroupSequenceEncodeAsNonNull(self):
- self.msg.group_id = "G"
- self.msg.reply_to_group_id = "R" # Force group_id and group_sequence fields to be present
- data = self.msg.encode()
-
- decoder = Data()
-
- # Skip past the headers
- consumed = decoder.decode(data)
- decoder.clear()
- data = data[consumed:]
-
- decoder.decode(data)
- dproperties = decoder.get_py_described()
- # Check we've got the correct described list
- assert dproperties.descriptor == 0x73, (dproperties.descriptor)
-
- properties = dproperties.value
- assert properties[10] == 'G', properties[10]
- assert properties[11] == 0, properties[11]
-
- def testDefaultCreationExpiryDecode(self):
- # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
- data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x12\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
- msg2 = Message()
- msg2.decode(data)
- assert msg2.expiry_time == 0, (msg2.expiry_time)
- assert msg2.creation_time == 0, (msg2.creation_time)
-
- # The same message with LIST8s instead
- data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x0f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x40\x52\x00\x40'
- msg3 = Message()
- msg3.decode(data)
- assert msg2.expiry_time == 0, (msg2.expiry_time)
- assert msg2.creation_time == 0, (msg2.creation_time)
-
- # Minified message with zero length HEADER and PROPERTIES lists
- data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
- msg4 = Message()
- msg4.decode(data)
- assert msg2.expiry_time == 0, (msg2.expiry_time)
- assert msg2.creation_time == 0, (msg2.creation_time)
-
- def testDefaultPriorityEncode(self):
- assert self.msg.priority == 4, (self.msg.priority)
- self.msg.ttl = 0.003 # field after priority, so forces priority to be present
- data = self.msg.encode()
-
- decoder = Data()
- decoder.decode(data)
-
- dheaders = decoder.get_py_described()
- # Check we've got the correct described list
- assert dheaders.descriptor == 0x70, (dheaders.descriptor)
-
- # Check that the priority field (second field) is encoded as null
- headers = dheaders.value
- assert headers[1] == None, (headers[1])
-
- def testDefaultPriorityDecode(self):
- # This is a message with everything filled explicitly as null or zero in LIST32 HEADER and PROPERTIES lists
- data = b'\x00\x53\x70\xd0\x00\x00\x00\x0a\x00\x00\x00\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xd0\x00\x00\x00\x22\x00\x00\x00\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
- msg2 = Message()
- msg2.decode(data)
- assert msg2.priority == 4, (msg2.priority)
-
- # The same message with LIST8s instead
- data = b'\x00\x53\x70\xc0\x07\x05\x42\x40\x40\x42\x52\x00\x00\x53\x73\xc0\x1f\x0d\x40\x40\x40\x40\x40\x40\x40\x40\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00\x40\x52\x00\x40'
- msg3 = Message()
- msg3.decode(data)
- assert msg3.priority == 4, (msg3.priority)
-
- # Minified message with zero length HEADER and PROPERTIES lists
- data = b'\x00\x53\x70\x45' b'\x00\x53\x73\x45'
- msg4 = Message()
- msg4.decode(data)
- assert msg4.priority == 4, (msg4.priority)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/reactor.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py
deleted file mode 100644
index 1b1e2fc..0000000
--- a/tests/python/proton_tests/reactor.py
+++ /dev/null
@@ -1,485 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time
-import sys
-from .common import Test, SkipTest, TestServer, free_tcp_port, ensureCanTestExtendedSASL
-from proton.reactor import Container, Reactor, ApplicationEvent, EventInjector
-from proton.handlers import Handshaker, MessagingHandler
-from proton import Handler, Url
-
-class Barf(Exception):
- pass
-
-class BarfOnInit:
-
- def on_reactor_init(self, event):
- raise Barf()
-
- def on_connection_init(self, event):
- raise Barf()
-
- def on_session_init(self, event):
- raise Barf()
-
- def on_link_init(self, event):
- raise Barf()
-
-class BarfOnTask:
-
- def on_timer_task(self, event):
- raise Barf()
-
-class BarfOnFinal:
- init = False
-
- def on_reactor_init(self, event):
- self.init = True
-
- def on_reactor_final(self, event):
- raise Barf()
-
-class BarfOnFinalDerived(Handshaker):
- init = False
-
- def on_reactor_init(self, event):
- self.init = True
-
- def on_reactor_final(self, event):
- raise Barf()
-
-class ExceptionTest(Test):
-
- def setUp(self):
- self.reactor = Reactor()
-
- def test_reactor_final(self):
- self.reactor.global_handler = BarfOnFinal()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_global_set(self):
- self.reactor.global_handler = BarfOnInit()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_global_add(self):
- self.reactor.global_handler.add(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_reactor_set(self):
- self.reactor.handler = BarfOnInit()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_reactor_add(self):
- self.reactor.handler.add(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_connection(self):
- self.reactor.connection(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_connection_set(self):
- c = self.reactor.connection()
- c.handler = BarfOnInit()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_connection_add(self):
- c = self.reactor.connection()
- c.handler = object()
- c.handler.add(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_session_set(self):
- c = self.reactor.connection()
- s = c.session()
- s.handler = BarfOnInit()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_session_add(self):
- c = self.reactor.connection()
- s = c.session()
- s.handler = object()
- s.handler.add(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_link_set(self):
- c = self.reactor.connection()
- s = c.session()
- l = s.sender("xxx")
- l.handler = BarfOnInit()
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_link_add(self):
- c = self.reactor.connection()
- s = c.session()
- l = s.sender("xxx")
- l.handler = object()
- l.handler.add(BarfOnInit())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_schedule(self):
- self.reactor.schedule(0, BarfOnTask())
- try:
- self.reactor.run()
- assert False, "expected to barf"
- except Barf:
- pass
-
- def test_schedule_many_nothings(self):
- class Nothing:
- results = []
- def on_timer_task(self, event):
- self.results.append(None)
- num = 12345
- for a in range(num):
- self.reactor.schedule(0, Nothing())
- self.reactor.run()
- assert len(Nothing.results) == num
-
- def test_schedule_many_nothing_refs(self):
- class Nothing:
- results = []
- def on_timer_task(self, event):
- self.results.append(None)
- num = 12345
- tasks = []
- for a in range(num):
- tasks.append(self.reactor.schedule(0, Nothing()))
- self.reactor.run()
- assert len(Nothing.results) == num
-
- def test_schedule_many_nothing_refs_cancel_before_run(self):
- class Nothing:
- results = []
- def on_timer_task(self, event):
- self.results.append(None)
- num = 12345
- tasks = []
- for a in range(num):
- tasks.append(self.reactor.schedule(0, Nothing()))
- for task in tasks:
- task.cancel()
- self.reactor.run()
- assert len(Nothing.results) == 0
-
- def test_schedule_cancel(self):
- barf = self.reactor.schedule(10, BarfOnTask())
- class CancelBarf:
- def __init__(self, barf):
- self.barf = barf
- def on_timer_task(self, event):
- self.barf.cancel()
- pass
- self.reactor.schedule(0, CancelBarf(barf))
- now = self.reactor.mark()
- try:
- self.reactor.run()
- elapsed = self.reactor.mark() - now
- assert elapsed < 10, "expected cancelled task to not delay the reactor by %s" % elapsed
- except Barf:
- assert False, "expected barf to be cancelled"
-
- def test_schedule_cancel_many(self):
- num = 12345
- barfs = set()
- for a in range(num):
- barf = self.reactor.schedule(10*(a+1), BarfOnTask())
- class CancelBarf:
- def __init__(self, barf):
- self.barf = barf
- def on_timer_task(self, event):
- self.barf.cancel()
- barfs.discard(self.barf)
- pass
- self.reactor.schedule(0, CancelBarf(barf))
- barfs.add(barf)
- now = self.reactor.mark()
- try:
- self.reactor.run()
- elapsed = self.reactor.mark() - now
- assert elapsed < num, "expected cancelled task to not delay the reactor by %s" % elapsed
- assert not barfs, "expected all barfs to be discarded"
- except Barf:
- assert False, "expected barf to be cancelled"
-
-
-class ApplicationEventTest(Test):
- """Test application defined events and handlers."""
-
- class MyTestServer(TestServer):
- def __init__(self):
- super(ApplicationEventTest.MyTestServer, self).__init__()
-
- class MyHandler(Handler):
- def __init__(self, test):
- super(ApplicationEventTest.MyHandler, self).__init__()
- self._test = test
-
- def on_hello(self, event):
- # verify PROTON-1056
- self._test.hello_rcvd = str(event)
-
- def on_goodbye(self, event):
- self._test.goodbye_rcvd = str(event)
-
- def setUp(self):
- import os
- if not hasattr(os, 'pipe'):
- # KAG: seems like Jython doesn't have an os.pipe() method
- raise SkipTest()
- if os.name=="nt":
- # Correct implementation on Windows is complicated
- raise SkipTest("PROTON-1071")
- self.server = ApplicationEventTest.MyTestServer()
- self.server.reactor.handler.add(ApplicationEventTest.MyHandler(self))
- self.event_injector = EventInjector()
- self.hello_event = ApplicationEvent("hello")
- self.goodbye_event = ApplicationEvent("goodbye")
- self.server.reactor.selectable(self.event_injector)
- self.hello_rcvd = None
- self.goodbye_rcvd = None
- self.server.start()
-
- def tearDown(self):
- self.server.stop()
-
- def _wait_for(self, predicate, timeout=10.0):
- deadline = time.time() + timeout
- while time.time() < deadline:
- if predicate():
- break
- time.sleep(0.1)
- assert predicate()
-
- def test_application_events(self):
- self.event_injector.trigger(self.hello_event)
- self._wait_for(lambda: self.hello_rcvd is not None)
- self.event_injector.trigger(self.goodbye_event)
- self._wait_for(lambda: self.goodbye_rcvd is not None)
-
-
-class AuthenticationTestHandler(MessagingHandler):
- def __init__(self):
- super(AuthenticationTestHandler, self).__init__()
- port = free_tcp_port()
- self.url = "localhost:%i" % port
- self.verified = False
-
- def on_start(self, event):
- self.listener = event.container.listen(self.url)
-
- def on_connection_opened(self, event):
- event.connection.close()
-
- def on_connection_opening(self, event):
- assert event.connection.transport.user == "user@proton"
- self.verified = True
-
- def on_connection_closed(self, event):
- event.connection.close()
- self.listener.close()
-
- def on_connection_error(self, event):
- event.connection.close()
- self.listener.close()
-
-class ContainerTest(Test):
- """Test container subclass of reactor."""
-
- def test_event_has_container_attribute(self):
- ensureCanTestExtendedSASL()
- class TestHandler(MessagingHandler):
- def __init__(self):
- super(TestHandler, self).__init__()
- port = free_tcp_port()
- self.url = "localhost:%i" % port
-
- def on_start(self, event):
- self.listener = event.container.listen(self.url)
-
- def on_connection_closing(self, event):
- event.connection.close()
- self.listener.close()
- test_handler = TestHandler()
- container = Container(test_handler)
- class ConnectionHandler(MessagingHandler):
- def __init__(self):
- super(ConnectionHandler, self).__init__()
-
- def on_connection_opened(self, event):
- event.connection.close()
- assert event.container == event.reactor
- assert event.container == container
- container.connect(test_handler.url, handler=ConnectionHandler())
- container.run()
-
- def test_authentication_via_url(self):
- ensureCanTestExtendedSASL()
- test_handler = AuthenticationTestHandler()
- container = Container(test_handler)
- container.connect("%s:password@%s" % ("user%40proton", test_handler.url), reconnect=False)
- container.run()
- assert test_handler.verified
-
- def test_authentication_via_container_attributes(self):
- ensureCanTestExtendedSASL()
- test_handler = AuthenticationTestHandler()
- container = Container(test_handler)
- container.user = "user@proton"
- container.password = "password"
- container.connect(test_handler.url, reconnect=False)
- container.run()
- assert test_handler.verified
-
- def test_authentication_via_kwargs(self):
- ensureCanTestExtendedSASL()
- test_handler = AuthenticationTestHandler()
- container = Container(test_handler)
- container.connect(test_handler.url, user="user@proton", password="password", reconnect=False)
- container.run()
- assert test_handler.verified
-
- class _ServerHandler(MessagingHandler):
- def __init__(self, host):
- super(ContainerTest._ServerHandler, self).__init__()
- self.host = host
- port = free_tcp_port()
- self.port = free_tcp_port()
- self.client_addr = None
- self.peer_hostname = None
-
- def on_start(self, event):
- self.listener = event.container.listen("%s:%s" % (self.host, self.port))
-
- def on_connection_opened(self, event):
- self.client_addr = event.reactor.get_connection_address(event.connection)
- self.peer_hostname = event.connection.remote_hostname
-
- def on_connection_closing(self, event):
- event.connection.close()
- self.listener.close()
-
- class _ClientHandler(MessagingHandler):
- def __init__(self):
- super(ContainerTest._ClientHandler, self).__init__()
- self.server_addr = None
-
- def on_connection_opened(self, event):
- self.server_addr = event.reactor.get_connection_address(event.connection)
- event.connection.close()
-
- def test_numeric_hostname(self):
- ensureCanTestExtendedSASL()
- server_handler = ContainerTest._ServerHandler("127.0.0.1")
- client_handler = ContainerTest._ClientHandler()
- container = Container(server_handler)
- container.connect(url=Url(host="127.0.0.1",
- port=server_handler.port),
- handler=client_handler)
- container.run()
- assert server_handler.client_addr
- assert client_handler.server_addr
- assert server_handler.peer_hostname == "127.0.0.1", server_handler.peer_hostname
- assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
-
- def test_non_numeric_hostname(self):
- ensureCanTestExtendedSASL()
- server_handler = ContainerTest._ServerHandler("localhost")
- client_handler = ContainerTest._ClientHandler()
- container = Container(server_handler)
- container.connect(url=Url(host="localhost",
- port=server_handler.port),
- handler=client_handler)
- container.run()
- assert server_handler.client_addr
- assert client_handler.server_addr
- assert server_handler.peer_hostname == "localhost", server_handler.peer_hostname
- assert client_handler.server_addr.rsplit(':', 1)[1] == str(server_handler.port)
-
- def test_virtual_host(self):
- ensureCanTestExtendedSASL()
- server_handler = ContainerTest._ServerHandler("localhost")
- container = Container(server_handler)
- conn = container.connect(url=Url(host="localhost",
- port=server_handler.port),
- handler=ContainerTest._ClientHandler(),
- virtual_host="a.b.c.org")
- container.run()
- assert server_handler.peer_hostname == "a.b.c.org", server_handler.peer_hostname
-
- def test_no_virtual_host(self):
- # explicitly setting an empty virtual host should prevent the hostname
- # field from being sent in the Open performative when using the
- # Python Container.
- server_handler = ContainerTest._ServerHandler("localhost")
- container = Container(server_handler)
- conn = container.connect(url=Url(host="localhost",
- port=server_handler.port),
- handler=ContainerTest._ClientHandler(),
- virtual_host="")
- container.run()
- assert server_handler.peer_hostname is None, server_handler.peer_hostname
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/sasl.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/sasl.py b/tests/python/proton_tests/sasl.py
deleted file mode 100644
index 68ae200..0000000
--- a/tests/python/proton_tests/sasl.py
+++ /dev/null
@@ -1,587 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from __future__ import absolute_import
-
-import sys, os
-from . import common
-from . import engine
-
-from proton import *
-from .common import pump, Skipped
-
-def _sslCertpath(file):
- """ Return the full path to the certificate, keyfile, etc.
- """
- if os.name=="nt":
- if file.find("private-key")!=-1:
- # The private key is not in a separate store
- return None
- # Substitute pkcs#12 equivalent for the CA/key store
- if file.endswith(".pem"):
- file = file[:-4] + ".p12"
- return os.path.join(os.path.dirname(__file__),
- "ssl_db/%s" % file)
-
-def _testSaslMech(self, mech, clientUser='user@proton', authUser='user@proton', encrypted=False, authenticated=True):
- self.s1.allowed_mechs(mech)
- self.c1.open()
- self.c2.open()
-
- pump(self.t1, self.t2, 1024)
-
- if encrypted is not None:
- assert self.t2.encrypted == encrypted, encrypted
- assert self.t1.encrypted == encrypted, encrypted
-
- assert self.t2.authenticated == authenticated, authenticated
- assert self.t1.authenticated == authenticated, authenticated
- if authenticated:
- # Server
- assert self.t2.user == authUser
- assert self.s2.user == authUser
- assert self.s2.mech == mech.strip()
- assert self.s2.outcome == SASL.OK, self.s2.outcome
- assert self.c2.state & Endpoint.LOCAL_ACTIVE and self.c2.state & Endpoint.REMOTE_ACTIVE,\
- "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
- # Client
- assert self.t1.user == clientUser
- assert self.s1.user == clientUser
- assert self.s1.mech == mech.strip()
- assert self.s1.outcome == SASL.OK, self.s1.outcome
- assert self.c1.state & Endpoint.LOCAL_ACTIVE and self.c1.state & Endpoint.REMOTE_ACTIVE,\
- "local_active=%s, remote_active=%s" % (self.c1.state & Endpoint.LOCAL_ACTIVE, self.c1.state & Endpoint.REMOTE_ACTIVE)
- else:
- # Server
- assert self.t2.user == None
- assert self.s2.user == None
- assert self.s2.outcome != SASL.OK, self.s2.outcome
- # Client
- assert self.t1.user == clientUser
- assert self.s1.user == clientUser
- assert self.s1.outcome != SASL.OK, self.s1.outcome
-
-class Test(common.Test):
- pass
-
-def consumeAllOuput(t):
- stops = 0
- while stops<1:
- out = t.peek(1024)
- l = len(out) if out else 0
- t.pop(l)
- if l <= 0:
- stops += 1
-
-class SaslTest(Test):
-
- def setUp(self):
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.t2.max_frame_size = 65536
- self.s2 = SASL(self.t2)
-
- def pump(self):
- pump(self.t1, self.t2, 1024)
-
- # We have to generate the client frames manually because proton does not
- # generate pipelined SASL and AMQP frames together
- def testIllegalProtocolLayering(self):
- # Server
- self.s2.allowed_mechs('ANONYMOUS')
-
- c2 = Connection()
- self.t2.bind(c2)
-
- assert self.s2.outcome is None
-
- # Push client bytes into server
- self.t2.push(
- # SASL
- b'AMQP\x03\x01\x00\x00'
- # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
- b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
- # SASL (again illegally)
- b'AMQP\x03\x01\x00\x00'
- # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
- b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
- # AMQP
- b'AMQP\x00\x01\x00\x00'
- # @open(16) [container-id="", channel-max=1234]
- b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
- )
-
- consumeAllOuput(self.t2)
-
- assert self.t2.condition
- assert self.t2.closed
- assert not c2.state & Endpoint.REMOTE_ACTIVE
-
- def testPipelinedClient(self):
- # Server
- self.s2.allowed_mechs('ANONYMOUS')
-
- c2 = Connection()
- self.t2.bind(c2)
-
- assert self.s2.outcome is None
-
- # Push client bytes into server
- self.t2.push(
- # SASL
- b'AMQP\x03\x01\x00\x00'
- # @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"anonymous@fuschia"]
- b'\x00\x00\x002\x02\x01\x00\x00\x00SA\xd0\x00\x00\x00"\x00\x00\x00\x02\xa3\x09ANONYMOUS\xa0\x11anonymous@fuschia'
- # AMQP
- b'AMQP\x00\x01\x00\x00'
- # @open(16) [container-id="", channel-max=1234]
- b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
- )
-
- consumeAllOuput(self.t2)
-
- assert not self.t2.condition
- assert self.s2.outcome == SASL.OK
- assert c2.state & Endpoint.REMOTE_ACTIVE
-
- def testPipelinedServer(self):
- # Client
- self.s1.allowed_mechs('ANONYMOUS')
-
- c1 = Connection()
- self.t1.bind(c1)
-
- assert self.s1.outcome is None
-
- # Push server bytes into client
- # Commented out lines in this test are where the client input processing doesn't
- # run after output processing even though there is input waiting
- self.t1.push(
- # SASL
- b'AMQP\x03\x01\x00\x00'
- # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
- b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
- # @sasl-outcome(68) [code=0]
- b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
- # AMQP
- b'AMQP\x00\x01\x00\x00'
- # @open(16) [container-id="", channel-max=1234]
- b'\x00\x00\x00!\x02\x00\x00\x00\x00S\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x0a\xa1\x00@@`\x04\xd2@@@@@@'
- )
-
- consumeAllOuput(self.t1)
-
- assert self.s1.outcome == SASL.OK
- assert c1.state & Endpoint.REMOTE_ACTIVE
-
- def testPipelined2(self):
- out1 = self.t1.peek(1024)
- self.t1.pop(len(out1))
- self.t2.push(out1)
-
- self.s2.allowed_mechs('ANONYMOUS')
- c2 = Connection()
- c2.open()
- self.t2.bind(c2)
-
- out2 = self.t2.peek(1024)
- self.t2.pop(len(out2))
- self.t1.push(out2)
-
- out1 = self.t1.peek(1024)
- assert len(out1) > 0
-
- def testFracturedSASL(self):
- """ PROTON-235
- """
- assert self.s1.outcome is None
-
- # self.t1.trace(Transport.TRACE_FRM)
-
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
- self.t1.push(b"AMQP\x03\x01\x00\x00")
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
- self.t1.push(b"\x00\x00\x00")
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
-
- self.t1.push(b"6\x02\x01\x00\x00\x00S@\xc0\x29\x01\xe0\x26\x04\xa3\x05PLAIN\x0aDIGEST-MD5\x09ANONYMOUS\x08CRAM-MD5")
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
- self.t1.push(b"\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00")
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
- while out:
- out = self.t1.peek(1024)
- self.t1.pop(len(out))
-
- assert self.s1.outcome == SASL.OK, self.s1.outcome
-
- def test_singleton(self):
- """Verify that only a single instance of SASL can exist per Transport"""
- transport = Transport()
- attr = object()
- sasl1 = SASL(transport)
- sasl1.my_attribute = attr
- sasl2 = transport.sasl()
- sasl3 = SASL(transport)
- assert sasl1 == sasl2
- assert sasl1 == sasl3
- assert sasl1.my_attribute == attr
- assert sasl2.my_attribute == attr
- assert sasl3.my_attribute == attr
- transport = Transport()
- sasl1 = transport.sasl()
- sasl1.my_attribute = attr
- sasl2 = SASL(transport)
- assert sasl1 == sasl2
- assert sasl1.my_attribute == attr
- assert sasl2.my_attribute == attr
-
- def testSaslSkipped(self):
- """Verify that the server (with SASL) correctly handles a client without SASL"""
- self.t1 = Transport()
- self.t2.require_auth(False)
- self.pump()
- assert self.s2.outcome == None
- assert self.t2.condition == None
- assert self.t2.authenticated == False
- assert self.s1.outcome == None
- assert self.t1.condition == None
- assert self.t1.authenticated == False
-
- def testSaslSkippedFail(self):
- """Verify that the server (with SASL) correctly handles a client without SASL"""
- self.t1 = Transport()
- self.t2.require_auth(True)
- self.pump()
- assert self.s2.outcome == None
- assert self.t2.condition != None
- assert self.s1.outcome == None
- assert self.t1.condition != None
-
- def testMechNotFound(self):
- self.c1 = Connection()
- self.c1.open()
- self.t1.bind(self.c1)
- self.s1.allowed_mechs('IMPOSSIBLE')
-
- self.pump()
-
- assert self.t2.authenticated == False
- assert self.t1.authenticated == False
- assert self.s1.outcome != SASL.OK
- assert self.s2.outcome != SASL.OK
-
-class SASLMechTest(Test):
- def setUp(self):
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c1.user = 'user@proton'
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- self.c2 = Connection()
-
- def testANON(self):
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'ANONYMOUS', authUser='anonymous')
-
- def testCRAMMD5(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'CRAM-MD5')
-
- def testDIGESTMD5(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
-
- # PLAIN shouldn't work without encryption without special setting
- def testPLAINfail(self):
- common.ensureCanTestExtendedSASL()
-
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
-
- # Client won't accept PLAIN even if offered by server without special setting
- def testPLAINClientFail(self):
- common.ensureCanTestExtendedSASL()
-
- self.s2.allow_insecure_mechs = True
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
-
- # PLAIN will only work if both ends are specially set up
- def testPLAIN(self):
- common.ensureCanTestExtendedSASL()
-
- self.s1.allow_insecure_mechs = True
- self.s2.allow_insecure_mechs = True
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN')
-
-# SCRAM not supported before Cyrus SASL 2.1.26
-# so not universal and hence need a test for support
-# to keep it in tests.
-# def testSCRAMSHA1(self):
-# common.ensureCanTestExtendedSASL()
-#
-# self.t1.bind(self.c1)
-# self.t2.bind(self.c2)
-# _testSaslMech(self, 'SCRAM-SHA-1')
-
-def _sslConnection(domain, transport, connection):
- transport.bind(connection)
- ssl = SSL(transport, domain, None )
- return connection
-
-class SSLSASLTest(Test):
- def setUp(self):
- if not common.isSSLPresent():
- raise Skipped("No SSL libraries found.")
-
- self.server_domain = SSLDomain(SSLDomain.MODE_SERVER)
- self.client_domain = SSLDomain(SSLDomain.MODE_CLIENT)
-
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c2 = Connection()
-
- def testSSLPlainSimple(self):
- if not SASL.extended():
- raise Skipped("Simple SASL server does not support PLAIN")
- common.ensureCanTestExtendedSASL()
-
- clientUser = 'user@proton'
- mech = 'PLAIN'
-
- self.c1.user = clientUser
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, encrypted=True)
-
- def testSSLPlainSimpleFail(self):
- if not SASL.extended():
- raise Skipped("Simple SASL server does not support PLAIN")
- common.ensureCanTestExtendedSASL()
-
- clientUser = 'usr@proton'
- mech = 'PLAIN'
-
- self.c1.user = clientUser
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser='usr@proton', encrypted=True, authenticated=False)
-
- def testSSLExternalSimple(self):
- if os.name=="nt":
- extUser = 'O=Client, CN=127.0.0.1'
- else:
- extUser = 'O=Client,CN=127.0.0.1'
- mech = 'EXTERNAL'
-
- self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
- _sslCertpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
- _sslCertpath("ca-certificate.pem") )
- self.client_domain.set_credentials(_sslCertpath("client-certificate.pem"),
- _sslCertpath("client-private-key.pem"),
- "client-password")
- self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser=None, authUser=extUser, encrypted=True)
-
- def testSSLExternalSimpleFail(self):
- mech = 'EXTERNAL'
-
- self.server_domain.set_credentials(_sslCertpath("server-certificate.pem"),
- _sslCertpath("server-private-key.pem"),
- "server-password")
- self.server_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.server_domain.set_peer_authentication(SSLDomain.VERIFY_PEER,
- _sslCertpath("ca-certificate.pem") )
- self.client_domain.set_trusted_ca_db(_sslCertpath("ca-certificate.pem"))
- self.client_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
-
- ssl1 = _sslConnection(self.client_domain, self.t1, self.c1)
- ssl2 = _sslConnection(self.server_domain, self.t2, self.c2)
-
- _testSaslMech(self, mech, clientUser=None, authUser=None, encrypted=None, authenticated=False)
-
-class SASLEventTest(engine.CollectorTest):
- def setUp(self):
- engine.CollectorTest.setUp(self)
- self.t1 = Transport()
- self.s1 = SASL(self.t1)
- self.t2 = Transport(Transport.SERVER)
- self.s2 = SASL(self.t2)
-
- self.c1 = Connection()
- self.c1.user = 'user@proton'
- self.c1.password = 'password'
- self.c1.hostname = 'localhost'
-
- self.c2 = Connection()
-
- self.collector = Collector()
-
- def testNormalAuthenticationClient(self):
- common.ensureCanTestExtendedSASL()
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.CONNECTION_REMOTE_OPEN)
-
- def testNormalAuthenticationServer(self):
- common.ensureCanTestExtendedSASL()
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5')
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.CONNECTION_REMOTE_OPEN)
-
- def testFailedAuthenticationClient(self):
- common.ensureCanTestExtendedSASL()
- clientUser = "usr@proton"
- self.c1.user = clientUser
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testFailedAuthenticationServer(self):
- common.ensureCanTestExtendedSASL()
- clientUser = "usr@proton"
- self.c1.user = clientUser
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', clientUser=clientUser, authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testNoMechClient(self):
- common.ensureCanTestExtendedSASL()
- self.c1.collect(self.collector)
- self.s2.allowed_mechs('IMPOSSIBLE')
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testNoMechServer(self):
- common.ensureCanTestExtendedSASL()
- self.c2.collect(self.collector)
- self.s2.allowed_mechs('IMPOSSIBLE')
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'DIGEST-MD5', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedMechClient(self):
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedMechServer(self):
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'IMPOSSIBLE', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedPlainClient(self):
- self.c1.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_ERROR,
- Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
-
- def testDisallowedPlainServer(self):
- self.c2.collect(self.collector)
- self.t1.bind(self.c1)
- self.t2.bind(self.c2)
- _testSaslMech(self, 'PLAIN', authenticated=False)
- self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
- Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
- Event.TRANSPORT_TAIL_CLOSED,
- Event.TRANSPORT_ERROR, Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/soak.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/soak.py b/tests/python/proton_tests/soak.py
deleted file mode 100644
index b7b521c..0000000
--- a/tests/python/proton_tests/soak.py
+++ /dev/null
@@ -1,334 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import os
-import sys
-from .common import Test, Skipped, free_tcp_ports, \
- MessengerReceiverC, MessengerSenderC, \
- MessengerReceiverValgrind, MessengerSenderValgrind, \
- ReactorReceiverC, ReactorSenderC, \
- ReactorReceiverValgrind, ReactorSenderValgrind, \
- isSSLPresent
-from proton import *
-
-#
-# Tests that run the apps
-#
-
-class AppTests(Test):
-
- def __init__(self, *args):
- Test.__init__(self, *args)
- self.is_valgrind = False
-
- def default(self, name, value, **kwargs):
- if self.is_valgrind:
- default = kwargs.get("valgrind", value)
- else:
- default = value
- return Test.default(self, name, default, **kwargs)
-
- @property
- def iterations(self):
- return int(self.default("iterations", 2, fast=1, valgrind=2))
-
- @property
- def send_count(self):
- return int(self.default("send_count", 17, fast=1, valgrind=2))
-
- @property
- def target_count(self):
- return int(self.default("target_count", 5, fast=1, valgrind=2))
-
- @property
- def send_batch(self):
- return int(self.default("send_batch", 7, fast=1, valgrind=2))
-
- @property
- def forward_count(self):
- return int(self.default("forward_count", 5, fast=1, valgrind=2))
-
- @property
- def port_count(self):
- return int(self.default("port_count", 3, fast=1, valgrind=2))
-
- @property
- def sender_count(self):
- return int(self.default("sender_count", 3, fast=1, valgrind=2))
-
- def valgrind_test(self):
- self.is_valgrind = True
-
- def setUp(self):
- self.senders = []
- self.receivers = []
-
- def tearDown(self):
- pass
-
- def _do_test(self, iterations=1):
- verbose = self.verbose
-
- for R in self.receivers:
- R.start( verbose )
-
- for j in range(iterations):
- for S in self.senders:
- S.start( verbose )
-
- for S in self.senders:
- S.wait()
- #print("SENDER OUTPUT:")
- #print( S.stdout() )
- assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
- % (str(S.cmdline()),
- S.status(),
- S.stdout(),
- S.stderr()))
-
- for R in self.receivers:
- R.wait()
- #print("RECEIVER OUTPUT")
- #print( R.stdout() )
- assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
- % (str(R.cmdline()),
- R.status(),
- R.stdout(),
- R.stderr()))
-
-#
-# Traffic passing tests based on the Messenger apps
-#
-
-class MessengerTests(AppTests):
-
- _timeout = 60
-
- def _ssl_check(self):
- if not isSSLPresent():
- raise Skipped("No SSL libraries found.")
- if os.name=="nt":
- raise Skipped("Windows SChannel lacks anonymous cipher support.")
-
- def __init__(self, *args):
- AppTests.__init__(self, *args)
-
- def _do_oneway_test(self, receiver, sender, domain="amqp"):
- """ Send N messages to a receiver.
- Parameters:
- iterations - repeat the senders this many times
- target_count = # of targets to send to.
- send_count = # messages sent to each target
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def _do_echo_test(self, receiver, sender, domain="amqp"):
- """ Send N messages to a receiver, which responds to each.
- Parameters:
- iterations - repeat the senders this many times
- target_count - # targets to send to
- send_count = # messages sent to each target
- send_batch - wait for replies after this many messages sent
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
- send_batch = self.send_batch
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.get_reply = True
- sender.send_batch = send_batch
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def _do_relay_test(self, receiver, relay, sender, domain="amqp"):
- """ Send N messages to a receiver, which replies to each and forwards
- each of them to different receiver.
- Parameters:
- iterations - repeat the senders this many times
- target_count - # targets to send to
- send_count = # messages sent to each target
- send_batch - wait for replies after this many messages sent
- forward_count - forward to this many targets
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
- send_batch = self.send_batch
- forward_count = self.forward_count
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- # forward to 'relay' - uses two links
- # ## THIS FAILS:
- # receiver.forwards = ["amqp://Relay/%d" % j for j in range(forward_count)]
- receiver.forwards = ["%s://Relay" % domain]
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- relay.subscriptions = ["%s://0.0.0.0:%s" % (domain, port)]
- relay.name = "Relay"
- relay.receive_count = receive_total
- relay.timeout = MessengerTests._timeout
- self.receivers.append( relay )
-
- # send to 'receiver'
- sender.targets = ["%s://0.0.0.0:%s/X%dY" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.get_reply = True
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
-
- def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"):
- """
- A star-like topology, with a central receiver at the hub, and senders at
- the spokes. Each sender will connect to each of the ports the receiver is
- listening on. Each sender will then create N links per each connection.
- Each sender will send X messages per link, waiting for a response.
- Parameters:
- iterations - repeat the senders this many times
- port_count - # of ports the receiver will listen on. Each sender connects
- to all ports.
- sender_count - # of senders
- target_count - # of targets per connection
- send_count - # of messages sent to each target
- send_batch - # of messages to send before waiting for response
- """
- iterations = self.iterations
- port_count = self.port_count
- sender_count = self.sender_count
- target_count = self.target_count
- send_count = self.send_count
- send_batch = self.send_batch
-
- send_total = port_count * target_count * send_count
- receive_total = send_total * sender_count * iterations
-
- ports = free_tcp_ports(port_count)
-
- receiver = r_factory()
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- for i in range(sender_count):
- sender = s_factory()
- sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)]
- sender.send_count = send_total
- sender.send_batch = send_batch
- sender.get_reply = True
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def test_oneway_C(self):
- self._do_oneway_test(MessengerReceiverC(), MessengerSenderC())
-
- def test_oneway_C_SSL(self):
- self._ssl_check()
- self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_oneway_valgrind(self):
- self.valgrind_test()
- self._do_oneway_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_echo_C(self):
- self._do_echo_test(MessengerReceiverC(), MessengerSenderC())
-
- def test_echo_C_SSL(self):
- self._ssl_check()
- self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_echo_valgrind(self):
- self.valgrind_test()
- self._do_echo_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_relay_C(self):
- self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC())
-
- def test_relay_C_SSL(self):
- self._ssl_check()
- self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_relay_valgrind(self):
- self.valgrind_test()
- self._do_relay_test(MessengerReceiverValgrind(), MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_star_topology_C(self):
- self._do_star_topology_test( MessengerReceiverC, MessengerSenderC )
-
- def test_star_topology_C_SSL(self):
- self._ssl_check()
- self._do_star_topology_test( MessengerReceiverC, MessengerSenderC, "amqps" )
-
- def test_star_topology_valgrind(self):
- self.valgrind_test()
- self._do_star_topology_test( MessengerReceiverValgrind, MessengerSenderValgrind )
-
- def test_oneway_reactor(self):
- self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())
-
- def test_oneway_reactor_valgrind(self):
- self.valgrind_test()
- self._do_oneway_test(ReactorReceiverValgrind(), ReactorSenderValgrind())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org