You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jd...@apache.org on 2020/05/26 15:15:25 UTC
[qpid-proton] 02/02: PROTON-2220 [python] add tests for leak issues,
mostly in BlockingConnection
This is an automated email from the ASF dual-hosted git repository.
jdanek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 8c2a08c432f87ba55012c81b17b5e68dce94dc96
Author: Jiri Danek <jd...@redhat.com>
AuthorDate: Fri Feb 21 11:29:03 2020 +0100
PROTON-2220 [python] add tests for leak issues, mostly in BlockingConnection
---
.travis.yml | 4 +-
python/CMakeLists.txt | 32 +++-
python/proton/_reactor.py | 2 +-
python/proton/_transport.py | 2 +-
..._PROTON_2116_blocking_connection_object_leak.py | 83 ++++++++++
python/tests/integration/certificates/ca.json | 16 ++
python/tests/integration/certificates/ca1.pem | 23 +++
python/tests/integration/certificates/ca2.pem | 23 +++
.../tests/integration/certificates/localhost.json | 20 +++
.../integration/certificates/localhost_ca1-key.pem | 27 ++++
.../integration/certificates/localhost_ca1.pem | 26 ++++
.../integration/certificates/localhost_ca2-key.pem | 27 ++++
.../integration/certificates/localhost_ca2.pem | 26 ++++
python/tests/integration/certificates/mkcerts.sh | 30 ++++
...st_PROTON_1709_application_event_object_leak.py | 96 ++++++++++++
...test_PROTON_1800_syncrequestresponse_fd_leak.py | 137 +++++++++++++++++
...TON_2111_container_ssl_ssldomain_object_leak.py | 168 +++++++++++++++++++++
..._PROTON_2116_blocking_connection_object_leak.py | 163 ++++++++++++++++++++
...test_PROTON_2121_blocking_connection_fd_leak.py | 159 +++++++++++++++++++
tests/lsan.supp | 5 +-
20 files changed, 1062 insertions(+), 7 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index b51505d..30ef260 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -64,9 +64,9 @@ matrix:
- os: linux
dist: bionic
env:
- # python-test and python-tox-test segfault
+ # python-test, python-integration-test, and python-tox-test segfault
- QPID_PROTON_CMAKE_ARGS='-DRUNTIME_CHECK=tsan -DENABLE_TOX_TEST=OFF'
- - QPID_PROTON_CTEST_ARGS='-E python-test'
+ - QPID_PROTON_CTEST_ARGS="-E 'python-test|python-integration-test'"
- os: linux
env:
- QPID_PROTON_CMAKE_ARGS='-DCMAKE_BUILD_TYPE=Coverage'
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 87057d8..710ac05 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -227,14 +227,15 @@ set (py_bin "${CMAKE_CURRENT_BINARY_DIR}")
set (py_dll "$<TARGET_FILE_DIR:_cproton>")
set (py_bld "$<TARGET_FILE_DIR:qpid-proton-core>") # For windows
set (py_tests "${py_src}/tests")
+set (tests_py "${py_src}/../tests/py")
set (py_path ${CMAKE_BINARY_DIR}/c/tools ${py_bld} $ENV{PATH})
-set (py_pythonpath ${py_tests} ${py_src} ${py_bin} ${py_dll} $ENV{PYTHONPATH})
+set (py_pythonpath ${py_tests} ${py_src} ${py_bin} ${py_dll} ${tests_py} $ENV{PYTHONPATH})
to_native_path ("${py_pythonpath}" py_pythonpath)
to_native_path ("${py_path}" py_path)
if (CMAKE_BUILD_TYPE MATCHES "Coverage")
- set (python_coverage_options -m coverage run)
+ set (python_coverage_options -m coverage run --parallel-mode)
endif(CMAKE_BUILD_TYPE MATCHES "Coverage")
pn_add_test(
@@ -247,6 +248,33 @@ pn_add_test(
COMMAND ${PYTHON_EXECUTABLE} ${python_coverage_options} -- "${py_tests}/proton-test")
set_tests_properties(python-test PROPERTIES PASS_REGULAR_EXPRESSION "Totals: .* 0 failed")
+if(PYTHON_VERSION_MAJOR EQUAL 2 AND PYTHON_VERSION_MINOR LESS 7)
+ execute_process(COMMAND "${PYTHON_EXECUTABLE}" "-c" "import unittest2"
+ RESULT_VARIABLE UNITTEST_MISSING
+ ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
+ if(UNITTEST_MISSING)
+ message(WARNING "unittest2 is not installed. ***some unit tests cannot be run***\ntry 'pip install unittest2' to install unittest2")
+ else(UNITTEST_MISSING)
+ set(PYTHON_TEST_COMMAND "-m" "unittest2")
+ endif(UNITTEST_MISSING)
+else(PYTHON_VERSION_MAJOR EQUAL 2 AND PYTHON_VERSION_MINOR LESS 7)
+ set(PYTHON_TEST_COMMAND "-m" "unittest")
+endif(PYTHON_VERSION_MAJOR EQUAL 2 AND PYTHON_VERSION_MINOR LESS 7)
+
+if (PYTHON_TEST_COMMAND)
+ pn_add_test(
+ INTERPRETED
+ NAME python-integration-test
+ PREPEND_ENVIRONMENT
+ "PATH=${py_path}"
+ "PYTHONPATH=${py_pythonpath}"
+ "SASLPASSWD=${CyrusSASL_Saslpasswd_EXECUTABLE}"
+ COMMAND
+ ${PYTHON_EXECUTABLE}
+ ${python_coverage_options}
+ ${PYTHON_TEST_COMMAND} discover -v -s "${py_tests}/integration")
+endif(PYTHON_TEST_COMMAND)
+
check_python_module("tox" TOX_MODULE_FOUND)
if (NOT TOX_MODULE_FOUND)
message(STATUS "The tox tool is not available; skipping the python-tox-tests")
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index 21cf120..3a82e9b 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -456,7 +456,7 @@ class ApplicationEvent(EventBase):
try:
eventtype = self.TYPES[typename]
except KeyError:
- eventtype = EventType(typename)
+ eventtype = EventType(typename)
self.TYPES[typename] = eventtype
super(ApplicationEvent, self).__init__(eventtype)
self.clazz = PN_PYREF
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index 6383051..970ef2e 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -772,7 +772,7 @@ class SSLDomain(object):
:type key_file: ``str``
:param password: The password used to sign the key, else ``None`` if key is not
protected.
- :type password: ``str``
+ :type password: ``str`` or ``None``
:return: 0 on success
:rtype: ``int``
:raise: :exc:`SSLException` if there is any Proton error
diff --git a/python/tests/integration/broker_PROTON_2116_blocking_connection_object_leak.py b/python/tests/integration/broker_PROTON_2116_blocking_connection_object_leak.py
new file mode 100644
index 0000000..9f84d79
--- /dev/null
+++ b/python/tests/integration/broker_PROTON_2116_blocking_connection_object_leak.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import optparse
+import socket
+import sys
+import threading
+
+import cproton
+
+import proton.handlers
+import proton.reactor
+import proton.utils
+
+
+class Broker(proton.handlers.MessagingHandler):
+ def __init__(self, acceptor_url):
+ # type: (str) -> None
+ super(Broker, self).__init__()
+ self.acceptor_url = acceptor_url
+
+ self.acceptor = None
+ self._acceptor_opened_event = threading.Event()
+
+ def get_acceptor_sockname(self):
+ # type: () -> (str, int)
+ self._acceptor_opened_event.wait()
+ if hasattr(self.acceptor, '_selectable'): # proton 0.30.0+
+ sockname = self.acceptor._selectable._delegate.getsockname()
+ else: # works in proton 0.27.0
+ selectable = cproton.pn_cast_pn_selectable(self.acceptor._impl)
+ fd = cproton.pn_selectable_get_fd(selectable)
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ sockname = s.getsockname()
+ return sockname[:2]
+
+ def on_start(self, event):
+ self.acceptor = event.container.listen(self.acceptor_url)
+ self._acceptor_opened_event.set()
+
+ def on_link_opening(self, event):
+ if event.link.is_sender:
+ assert not event.link.remote_source.dynamic, "This cannot happen"
+ event.link.source.address = event.link.remote_source.address
+ elif event.link.remote_target.address:
+ event.link.target.address = event.link.remote_target.address
+
+
+def main():
+ parser = optparse.OptionParser()
+ parser.add_option("-b", dest="hostport", default="localhost:0", type="string",
+ help="port number to use")
+ options, args = parser.parse_args()
+
+ broker = Broker(options.hostport)
+ container = proton.reactor.Container(broker)
+ threading.Thread(target=container.run).start()
+ print("{0}:{1}".format(*broker.get_acceptor_sockname()))
+ sys.stdout.flush()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/tests/integration/certificates/ca.json b/python/tests/integration/certificates/ca.json
new file mode 100644
index 0000000..f451bbc
--- /dev/null
+++ b/python/tests/integration/certificates/ca.json
@@ -0,0 +1,16 @@
+{
+ "CN": "Custom Widgets Root CA",
+ "key": {
+ "algo": "rsa",
+ "size": 2048
+ },
+ "names": [
+ {
+ "C": "GB",
+ "L": "London",
+ "O": "Custom Widgets",
+ "OU": "Custom Widgets Root CA",
+ "ST": "England"
+ }
+ ]
+}
diff --git a/python/tests/integration/certificates/ca1.pem b/python/tests/integration/certificates/ca1.pem
new file mode 100644
index 0000000..dabb913
--- /dev/null
+++ b/python/tests/integration/certificates/ca1.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIID6DCCAtCgAwIBAgIUMwAUyXly2ZxEamQZ3JCxH0mhSBMwDQYJKoZIhvcNAQEL
+BQAwgYsxCzAJBgNVBAYTAkdCMRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZM
+b25kb24xFzAVBgNVBAoTDkN1c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20g
+V2lkZ2V0cyBSb290IENBMR8wHQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENB
+MB4XDTIwMDUxMzA4NDQwMFoXDTI1MDUxMjA4NDQwMFowgYsxCzAJBgNVBAYTAkdC
+MRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZMb25kb24xFzAVBgNVBAoTDkN1
+c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20gV2lkZ2V0cyBSb290IENBMR8w
+HQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENBMIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEA1LjaAn2cpud0cOOx13jOHOfVU7aRRmb0KOdt1oyxoF+b
+XiCrzbb1o4PZDq0it/u4jHdmA8mWNbNPDOMgckPSBbcTlg4pjw97kH/cowvvb3d1
+zt8SH9u7XN8Y1CebdK83mrLlWeS7SK3WCzdhU67YgIKLqaC+3HspCpfCrxReYDP6
+mVh3j1xJ/z8t0y0LHCwy9zrTJwWKa0+YLObc2pfl7opH/Ak5DwfXS3Z+QHOQGZBc
+fHW5a9/nQn5NVE90RXP5cfcvCK0xBm+63jOB0TFO2jIlmCEGjgDDlvFgYLFm1E5P
+R3v8fwhkgvgHwI18bjpo1K/GNv027vJ2/MmpNGWulQIDAQABo0IwQDAOBgNVHQ8B
+Af8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUi6yfwd8BWMwuINn0
+gMoAg8OlXCUwDQYJKoZIhvcNAQELBQADggEBAI4PGGBP6AY0qYuUmb9xR0qJigBi
+J/Q0nT+VpYtH9lxyNM2DPuG2niu0vpVBtHQfcpVSlX/eGL1TjyckWipwKdRr+gw1
+L76L/dpia7cI051UppEYNUo4Fldg3C9NLfwwyPGb1FejbHtSjXAMP123Uy4hlO/B
+YskML8e75cB7T8hqWu8K7+hT3KlyOZRj7Zvb5eMKo+ugoXz1+Ywe/Nx1UpS9kiO8
+zhHc6Ckx/mteIcmeikxSkt51OhXqDzwO4H5Ed2LkHQQ4M22QldiLaMGePwsVegxe
+wYOwBwKDOB00j1kt0j2/+9JRy7wP5w1nTAFWPK90nYtjRSo8dThEjl1aHgs=
+-----END CERTIFICATE-----
diff --git a/python/tests/integration/certificates/ca2.pem b/python/tests/integration/certificates/ca2.pem
new file mode 100644
index 0000000..1ed10a6
--- /dev/null
+++ b/python/tests/integration/certificates/ca2.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIID6DCCAtCgAwIBAgIUb0cAgS1G1j7OOcCbqjJLfZuOySowDQYJKoZIhvcNAQEL
+BQAwgYsxCzAJBgNVBAYTAkdCMRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZM
+b25kb24xFzAVBgNVBAoTDkN1c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20g
+V2lkZ2V0cyBSb290IENBMR8wHQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENB
+MB4XDTIwMDUxMzA4NDQwMFoXDTI1MDUxMjA4NDQwMFowgYsxCzAJBgNVBAYTAkdC
+MRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZMb25kb24xFzAVBgNVBAoTDkN1
+c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20gV2lkZ2V0cyBSb290IENBMR8w
+HQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENBMIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEAt6RCKe0fb2hJDrUJ6Vn8kmSrYgNLN78XO4lUoMcWULm5
+zJb+cRxAsrge67oz+XWRQZcE3ief80gGWnLYEpmpxnzvdaGxVs0kqS8jhcD+BaqS
+XB4rcH5lo3PiipbQCbFlz5KimmWDUTkpIC12EHoan4i6yHttx+9LFhMm94ELMzzr
+NHC5d6ddY/+zIQxf6xz2Lqa2W3VqdOJvd7cGGl3Jjj1PQY4/rxUAFajzJ0ulGjdS
+Mv7jfLKPp4avIuie/mD+KOpB14V6UBWIFacKDKK701D44fj59rmHEloP5WtKq2/o
+SOnRtNpthoWwSBpDxOP54a8RmXt2ia9gEhnakcQEKwIDAQABo0IwQDAOBgNVHQ8B
+Af8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUv7p/bfq1Z0aLXBPF
+k5wKQdrG7b0wDQYJKoZIhvcNAQELBQADggEBAH09LUiRv6rJ3VXc2Xs9Ge8K1QMm
+z71jCLX0PjhBdG+KBSbLy6InHq9QuEI5plWJShXnmu9PNMTUyU5eH8izQy5hxUIU
+RyJ7KUj+MRHWp9GGbvuTbcT/xzTgOn8HspsBCYulynjo422/ZxrGc5IOwkfQy6t1
+ycPmmwgYlw2aTPmS3wdNiip+ArmGiyKTSjR84pnVCNxTJHnmdjojqXXXx5LBUuAk
+76dRIeUznGAtWUrMV1/25CxTUanvuJxWHxPZXV56bzMrP1goEvK0WfHCjOx0YfdJ
+dKQImr+qVwU5nHtR8qEaQxter53RVS0nSpNSbxKRpUCzqvFuahWKutQowdQ=
+-----END CERTIFICATE-----
diff --git a/python/tests/integration/certificates/localhost.json b/python/tests/integration/certificates/localhost.json
new file mode 100644
index 0000000..ee04408
--- /dev/null
+++ b/python/tests/integration/certificates/localhost.json
@@ -0,0 +1,20 @@
+{
+ "CN": "host.custom-widgets.com",
+ "key": {
+ "algo": "rsa",
+ "size": 2048
+ },
+ "names": [
+ {
+ "C": "GB",
+ "L": "London",
+ "O": "Custom Widgets",
+ "OU": "Custom Widgets Hosts",
+ "ST": "England"
+ }
+ ],
+ "hosts": [
+ "host1.custom-widgets.com",
+ "localhost"
+ ]
+}
diff --git a/python/tests/integration/certificates/localhost_ca1-key.pem b/python/tests/integration/certificates/localhost_ca1-key.pem
new file mode 100644
index 0000000..54d69d1
--- /dev/null
+++ b/python/tests/integration/certificates/localhost_ca1-key.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAxMWRlQEsbIsRruUZfc/u3ENTBH4d8V68HJm1AEY6lSwMuiVo
+2DxdbaYUILAuAvsbNtA62NQaZlrmk4UA8pzGXxz8n/3bCZLZgHfqP6TXiObev5XB
+73mm/nRfPWnXUuyDYqbmlOHAlylQ63J3UcNgEyQvaBGF3VGa09YtyOnvQt/wmCrw
+LeBpUaNWAxKj23q+8uAeFD53V83teKRwLCaj63y0AlqIXDCrumvUlLVG+qXbZbrb
+PblYczOvsHalDG4qNPTvA/wugCk1qiZV6WqGlhkMUAg2bO88GWliq+Mrt44C8M1I
+RQ64NRWqZIxnuteVdaP2CJJ+dbmHIDlfQ8sYQQIDAQABAoIBABxvrNVKwR/CjUCo
+LSHobc15EUNB8pPSK/86G1U50PeD0ScJhvhQ6POSn7AFpfCaV1l5iDeTl2kZU98X
+xoZJL8XJC50Xc5MTnMkbC0g9S0SmkKRBQTCZdQ+qw6S9afe65FXaZtI3ObzepKoR
+9bpkMLszbIJundjZFTXt6tnxLLmOIHbw0rjRqIS53xwJZF4w1Wy1DncRLk4DorkH
+4/l3nM88Bym0ypIpmd+UfDpbwVukn9ioCPUdiGveA3kGwc9bgha6qqHIro3y39k3
+giN1wE9PUN3J3kDVgh/qCoSlqQUW6/h+CSShlqohhrHqi3ydEwbLcDP/crbGwbYh
+G5+xDn0CgYEA5BH4JrRqnFhouJ3br9ePGQD1Ox3jVKgZozv2m4Hd0oRA0wcHOIEw
+eOirZxL203kHj81NdUplwbRhSimUk/wog8AoyRJw15i3mL+tgxwQDF/oTLZamBoM
+LmNWA1a1lM82R8pt12qjyV5PhzLgFGQsJsA0vOhVm4ctRfks01JMuq8CgYEA3N5j
+nLIT1dQ26oqHhLDFaU1mkhXo0OpqLmDUE+61TvYQblrubc5KxeJPsPpFSK4jFHJm
+RdW+GV64CIQXwJ6aUqFSgQWlYxrDDzkJxmxficd8dFG7WTdna4VBVqog0De7JTQ2
+QdSITnOD3Wr/Pgjjt+jAa8Skm571jLstecjgWA8CgYEA0uVy7IeE1hJCtAT1MsNH
+xb1HB2V547yWCIXYYrBSKOq27uze1ndQFV5BsUyuBZszTNxxtfYX5mkgVe3hQH66
+ECrPDDALPLIxhAQrNMPsayT8sIMnfuMHRJYC4Y961aJO9U/RBpPL5Nda/xAieXiw
+Ax1VJyJIl0sGqF/j/X1rCm0CgYEAhXex9De8OsPxp4us1udHdBm8uNyagtyU64/B
+uIXQdHXHehhi6mH111yp0YV7Jq9sLWfwG5VNOeF+Dk9cVx7AnNw1khgKWDgM1X8f
+RBOrLAQrVdMqBoCvc07kK+3ExG5ZHeNOQjufXuD5N2z37tHKYhE5biY3Xn8RXUii
+82wK/csCgYAO741jsY2XTBsXZI9bZvswVByDHqeZV1lc5IGr1HKIBdM+bfQ6nIUu
+x8KPMQ73p55y9Tlrl6KIDO3hWz8iHJL74QSTteDwOFdQUZftwqktaao/CFA2JfPX
+RHYR3rXDe4mFus/fW7mGTGI3rUQdFff7WttzmraWTcSd2CAjQP+pVA==
+-----END RSA PRIVATE KEY-----
diff --git a/python/tests/integration/certificates/localhost_ca1.pem b/python/tests/integration/certificates/localhost_ca1.pem
new file mode 100644
index 0000000..71760b9
--- /dev/null
+++ b/python/tests/integration/certificates/localhost_ca1.pem
@@ -0,0 +1,26 @@
+-----BEGIN CERTIFICATE-----
+MIIEVjCCAz6gAwIBAgIUcMzG/wXG5sb8ZmTLMgSFgRGFZigwDQYJKoZIhvcNAQEL
+BQAwgYsxCzAJBgNVBAYTAkdCMRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZM
+b25kb24xFzAVBgNVBAoTDkN1c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20g
+V2lkZ2V0cyBSb290IENBMR8wHQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENB
+MB4XDTIwMDUxMzA4NDQwMFoXDTIxMDUxMzA4NDQwMFowgYoxCzAJBgNVBAYTAkdC
+MRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZMb25kb24xFzAVBgNVBAoTDkN1
+c3RvbSBXaWRnZXRzMR0wGwYDVQQLExRDdXN0b20gV2lkZ2V0cyBIb3N0czEgMB4G
+A1UEAxMXaG9zdC5jdXN0b20td2lkZ2V0cy5jb20wggEiMA0GCSqGSIb3DQEBAQUA
+A4IBDwAwggEKAoIBAQDExZGVASxsixGu5Rl9z+7cQ1MEfh3xXrwcmbUARjqVLAy6
+JWjYPF1tphQgsC4C+xs20DrY1BpmWuaThQDynMZfHPyf/dsJktmAd+o/pNeI5t6/
+lcHveab+dF89addS7INipuaU4cCXKVDrcndRw2ATJC9oEYXdUZrT1i3I6e9C3/CY
+KvAt4GlRo1YDEqPber7y4B4UPndXze14pHAsJqPrfLQCWohcMKu6a9SUtUb6pdtl
+uts9uVhzM6+wdqUMbio09O8D/C6AKTWqJlXpaoaWGQxQCDZs7zwZaWKr4yu3jgLw
+zUhFDrg1FapkjGe615V1o/YIkn51uYcgOV9DyxhBAgMBAAGjgbAwga0wDgYDVR0P
+AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMB
+Af8EAjAAMB0GA1UdDgQWBBShzVz/OHCGtHwpsi3p+KtE4NWLrTAfBgNVHSMEGDAW
+gBSLrJ/B3wFYzC4g2fSAygCDw6VcJTAuBgNVHREEJzAlghhob3N0MS5jdXN0b20t
+d2lkZ2V0cy5jb22CCWxvY2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAr/75WZ0c
+8uqHkQazjKAAx8DBgahIorDZ3IAjsRhQEz3sgT6mrMrdOicpiSvApEGjau0jZAax
+QVWDB8e+A5HkItTt1mrSYolXhO2ROMjDKrthiw+XRwIY2AdqN2wVpAieRkOvqmQ+
+60StQQzm7Aoizt9nPu6Bv0OZp2fuC1d8W9RXFUI5bjHENYM80R5YshH5Ui/0dq1q
+tl7ImVYuqgbXBJ99WDqf6UfIzj9KowakXHXG7xkAFD7byKs+8QEGMTiF1w3eIBvx
+1XV3dR6PxMVxk5gJQVTBI60/PSYj+iREHYwl21kFOskJ1aQyaO21T8nPKAU2SztI
+4Eb/HvP6sTygEw==
+-----END CERTIFICATE-----
diff --git a/python/tests/integration/certificates/localhost_ca2-key.pem b/python/tests/integration/certificates/localhost_ca2-key.pem
new file mode 100644
index 0000000..3de440a
--- /dev/null
+++ b/python/tests/integration/certificates/localhost_ca2-key.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAuohImiboo4N1ttlY39eiVQP6PoAMiNlD4NREG+9/s/liSQS4
+5NsjBoowU9Gt4XMQ2qR1t2LjE8+xBYR9xADvONRqWAbH0kpoDSJf/LAiLTdGBNES
+qZybOEDfyB8emp4vDJl1c3labM6nPianoaQbYvgO27Vhmm+pTBY6zeCVd8T1WwV4
+N3T3kOoRJrc05BZTBREGUhet4ST9ynXB/90+/O6SY4ufkBkxWqE4DUKfHJ7ZtLu7
+2hJcSE8N70UW9OyVngJRpBqOJTENLPgXgssB8NuiQkrUn2p1hjtothfTsU0p/KbU
+tX3Q/rTOuHLad+mLr+5p0cBTXE1oyHSwcc/lQQIDAQABAoIBAQCdHeIZhiB6amai
+p5yGnzkq5vjH4E+ujWLxY5oi9a7ZR3wUCRg8HKD6BzgBwiH2Ple58sD8wRyIOW7g
+OOM+T5MwiSsyDjQN24KdXZEAZNPExkQCFqDjoPfVgD9+b0f18Lusny4hH9ycvcx9
+O04DhxlSWaMaSUN3NAqj/i118G+1uw7WLT02LHpB9eDelBtyVRFnYoZpjD3ZozRh
+jbAYvGYYppy2JCfYhk1l3hqVzBzEcAzvN7nYpyT4Z3dFrcHuflL/S3u9JBIBFXI1
+Mwt0x789Iw1k66qmDwY5egrN2WTA3hzr7WTPFRbWDkjX1DLAer371oQ0tJk3Y+mB
+6zFg9EQJAoGBANq0c2NdCstCrYX7wMJhZygD2bmNnUxv3dj+By99ZOJ6VR9p6X6b
+fkA/lhGGhvJ030Rxee6ViZ301/y7FLHpoOXMFDfL0/JYuSS1NYLeKmgjq2tJM7KH
+hwGrdckCVbTIr8rhxuge3qLt9yyeQc1yDTROHBZ8g+TAcP0HN1/kgS4HAoGBANpX
+V88ukYjGjS+mJihtxqTxAOvclw/SpGnPfgtve1zUdR8uWToiT9egWLcm+XioK9W1
+gPW230DdZrniKkslp3JuLykUr1idRTRtYoHCiwIo9p+mrUUgWUy67fgJvKLrPEzQ
+fy6m9KGSPiTQkiX/VVwXVN8OOQTrgnxw/32XQIB3AoGAFTNi8CHn1vZavd8+u1kX
+1+AvrfYVZoB9n/hYF/lu2ymCsO3ibZyDK5U+ZeqFkGFV91uMt10VnxNKELzN78U3
+DK+w0gvXOunw4KcUTeBdegTjLB5Hfan3o2jMnTS1vDWsHN2wG3ZKnL62tEOPG2xP
+7V8ZB/EAFB+3lD+r1YbgIucCgYEAzR1miWTnJYXZVt0QVcSi64rY4bruUtgAysI5
+WAbX7mJM0Qkam6lmNlwVW6IKlNXvsCl9x9ePLgGQIqocL1JlVvO57C7Zdzbvr4vf
+EaWwL0xKO7s6ZYk5OwMU0YJcKvUwRb1M7Ye8oxietrkVwwp2pzfn7FARMtUIVV1W
+NybjWosCgYA5TbOP6EkYsVtGDiG40GQ2hxS52kn7Kk/iGbkwj/JZzH6AA++w522G
+dJWHsFE4xM4HPpIAiZ2KUzxRFWzuOloObFvFwS5Rq/E0tHw0t8JM4rShWS7RPPKx
+0v72TgbZs5kBmEpRQfXsC39hBQYOODzesJMlcLI1bZ2WTYJSE19xDA==
+-----END RSA PRIVATE KEY-----
diff --git a/python/tests/integration/certificates/localhost_ca2.pem b/python/tests/integration/certificates/localhost_ca2.pem
new file mode 100644
index 0000000..a06681c
--- /dev/null
+++ b/python/tests/integration/certificates/localhost_ca2.pem
@@ -0,0 +1,26 @@
+-----BEGIN CERTIFICATE-----
+MIIEVjCCAz6gAwIBAgIUPCIXPXv5xSh7Iv1TwzcIygddfrIwDQYJKoZIhvcNAQEL
+BQAwgYsxCzAJBgNVBAYTAkdCMRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZM
+b25kb24xFzAVBgNVBAoTDkN1c3RvbSBXaWRnZXRzMR8wHQYDVQQLExZDdXN0b20g
+V2lkZ2V0cyBSb290IENBMR8wHQYDVQQDExZDdXN0b20gV2lkZ2V0cyBSb290IENB
+MB4XDTIwMDUxMzA4NDQwMFoXDTIxMDUxMzA4NDQwMFowgYoxCzAJBgNVBAYTAkdC
+MRAwDgYDVQQIEwdFbmdsYW5kMQ8wDQYDVQQHEwZMb25kb24xFzAVBgNVBAoTDkN1
+c3RvbSBXaWRnZXRzMR0wGwYDVQQLExRDdXN0b20gV2lkZ2V0cyBIb3N0czEgMB4G
+A1UEAxMXaG9zdC5jdXN0b20td2lkZ2V0cy5jb20wggEiMA0GCSqGSIb3DQEBAQUA
+A4IBDwAwggEKAoIBAQC6iEiaJuijg3W22Vjf16JVA/o+gAyI2UPg1EQb73+z+WJJ
+BLjk2yMGijBT0a3hcxDapHW3YuMTz7EFhH3EAO841GpYBsfSSmgNIl/8sCItN0YE
+0RKpnJs4QN/IHx6ani8MmXVzeVpszqc+JqehpBti+A7btWGab6lMFjrN4JV3xPVb
+BXg3dPeQ6hEmtzTkFlMFEQZSF63hJP3KdcH/3T787pJji5+QGTFaoTgNQp8cntm0
+u7vaElxITw3vRRb07JWeAlGkGo4lMQ0s+BeCywHw26JCStSfanWGO2i2F9OxTSn8
+ptS1fdD+tM64ctp36Yuv7mnRwFNcTWjIdLBxz+VBAgMBAAGjgbAwga0wDgYDVR0P
+AQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAMBgNVHRMB
+Af8EAjAAMB0GA1UdDgQWBBRzuETlwXII+JW3OQcaIH/fsTW5CTAfBgNVHSMEGDAW
+gBS/un9t+rVnRotcE8WTnApB2sbtvTAuBgNVHREEJzAlghhob3N0MS5jdXN0b20t
+d2lkZ2V0cy5jb22CCWxvY2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAoVrsvKXP
+eYQgCQj4w/BMfmYxIETtfJyd8BqP02olYwj4U2jSy3DoUbGvKVuHpxxXAiL8GKds
+nYKqjAle5HW4bZWhwyDkNKa+UiXQzJMOgizNwXHvm6FItZrZgCFf+nhSKZia3Dmg
+T8cHIX/bayiV6/EDqUn0DcB8tnseHPASEVwmaQD7mTD5x0MkAYF52JBsx4L33fcm
+eaO8jL22UVKPYaiUEZxmbLMMwFp4xFAJ6cX3QRMndmX/5jxhylfPVn4hIvlZn7hp
+iqj1S6RN/78eXymAwd/wy79l0hMWk3BD6fF8/k/g2QIBtH+2nAcU2Wdb74CYczP7
+dFdvVve4sIck8w==
+-----END CERTIFICATE-----
diff --git a/python/tests/integration/certificates/mkcerts.sh b/python/tests/integration/certificates/mkcerts.sh
new file mode 100644
index 0000000..2cdbde5
--- /dev/null
+++ b/python/tests/integration/certificates/mkcerts.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -Eeuxo pipefail
+
+# Uses https://github.com/cloudflare/cfssl
+# it is bit more approachable than openssl
+
+cfssl gencert -initca ca.json | cfssljson -bare ca1
+cfssl gencert -ca ca1.pem -ca-key ca1-key.pem localhost.json | cfssljson -bare localhost_ca1
+
+cfssl gencert -initca ca.json | cfssljson -bare ca2
+cfssl gencert -ca ca2.pem -ca-key ca2-key.pem localhost.json | cfssljson -bare localhost_ca2
diff --git a/python/tests/integration/test_PROTON_1709_application_event_object_leak.py b/python/tests/integration/test_PROTON_1709_application_event_object_leak.py
new file mode 100644
index 0000000..6e6d26b
--- /dev/null
+++ b/python/tests/integration/test_PROTON_1709_application_event_object_leak.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+PROTON-1709 [python] ApplicationEvent causing memory growth
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import platform
+import threading
+import gc
+
+import proton
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, ApplicationEvent, EventInjector
+
+from test_unittest import unittest
+
+
+class Program(MessagingHandler):
+ def __init__(self, injector):
+ self.injector = injector
+ self.counter = 0
+ self.on_start_ = threading.Event()
+
+ def on_reactor_init(self, event):
+ event.reactor.selectable(self.injector)
+ self.on_start_.set()
+
+ def on_count_up(self, event):
+ self.counter += 1
+ gc.collect()
+
+ def on_done(self, event):
+ event.subject.stop()
+
+
+class Proton1709Test(unittest.TestCase):
+ @unittest.skipIf(platform.system() == 'Windows', "TODO jdanek: Test is broken on Windows")
+ def test_application_event_no_object_leaks(self):
+ event_types_count = len(proton.EventType.TYPES)
+
+ injector = EventInjector()
+ p = Program(injector)
+ c = Container(p)
+ t = threading.Thread(target=c.run)
+ t.start()
+
+ p.on_start_.wait()
+
+ object_counts = []
+
+ gc.collect()
+ object_counts.append(len(gc.get_objects()))
+
+ for i in range(100):
+ injector.trigger(ApplicationEvent("count_up"))
+
+ gc.collect()
+ object_counts.append(len(gc.get_objects()))
+
+ self.assertEqual(len(proton.EventType.TYPES), event_types_count + 1)
+
+ injector.trigger(ApplicationEvent("done", subject=c))
+ self.assertEqual(len(proton.EventType.TYPES), event_types_count + 2)
+
+ t.join()
+
+ gc.collect()
+ object_counts.append(len(gc.get_objects()))
+
+ self.assertEqual(p.counter, 100)
+
+ self.assertTrue(object_counts[1] - object_counts[0] <= 220,
+ "Object counts should not be increasing too fast: {0}".format(object_counts))
+ self.assertTrue(object_counts[2] - object_counts[0] <= 10,
+ "No objects should be leaking at the end: {0}".format(object_counts))
diff --git a/python/tests/integration/test_PROTON_1800_syncrequestresponse_fd_leak.py b/python/tests/integration/test_PROTON_1800_syncrequestresponse_fd_leak.py
new file mode 100644
index 0000000..e143f8c
--- /dev/null
+++ b/python/tests/integration/test_PROTON_1800_syncrequestresponse_fd_leak.py
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+PROTON-1800 BlockingConnection descriptor leak
+"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import contextlib
+import socket
+import uuid
+import gc
+import os
+import threading
+import subprocess
+from collections import namedtuple
+
+import cproton
+
+import proton
+import proton.reactor
+
+from proton import Message
+from proton.utils import SyncRequestResponse, BlockingConnection
+from proton.handlers import IncomingMessageHandler
+
+from test_unittest import unittest
+
+
+def count_fds():
+ # type: () -> int
+ return len(os.listdir('/proc/self/fd/'))
+
+
+@contextlib.contextmanager
+def no_fd_leaks(test):
+ # type: (unittest.TestCase) -> None
+ before = count_fds()
+ yield
+ delta = count_fds() - before
+ if delta != 0:
+ subprocess.check_call("ls -lF /proc/{0}/fd/".format(os.getpid()), shell=True)
+ test.assertEqual(0, delta, "Found {0} new fd(s) after the test".format(delta))
+
+
+class Broker(proton.handlers.MessagingHandler):
+ def __init__(self, acceptor_url):
+ # type: (str) -> None
+ super(Broker, self).__init__()
+ self.acceptor_url = acceptor_url
+
+ self.sender = None
+ self.acceptor = None
+ self._acceptor_opened_event = threading.Event()
+
+ def get_acceptor_sockname(self):
+ # type: () -> (str, int)
+ self._acceptor_opened_event.wait()
+ if hasattr(self.acceptor, '_selectable'): # proton 0.30.0+
+ sockname = self.acceptor._selectable._delegate.getsockname()
+ else: # works in proton 0.27.0
+ selectable = cproton.pn_cast_pn_selectable(self.acceptor._impl)
+ fd = cproton.pn_selectable_get_fd(selectable)
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ sockname = s.getsockname()
+ return sockname[:2]
+
+ def on_start(self, event):
+ self.acceptor = event.container.listen(self.acceptor_url)
+ self._acceptor_opened_event.set()
+
+ def on_link_opening(self, event):
+ if event.link.is_sender:
+ assert event.link.remote_source.dynamic
+ address = str(uuid.uuid4())
+ event.link.source.address = address
+ self.sender = event.link
+ elif event.link.remote_target.address:
+ event.link.target.address = event.link.remote_target.address
+
+ def on_message(self, event):
+ message = event.message
+ assert self.sender.source.address == message.reply_to
+ reply = proton.Message(body=message.body.upper(), correlation_id=message.correlation_id)
+ self.sender.send(reply)
+
+
+@contextlib.contextmanager
+def test_broker():
+ broker = Broker('localhost:0')
+ container = proton.reactor.Container(broker)
+ threading.Thread(target=container.run).start()
+
+ yield broker
+
+ container.stop()
+
+
+PROC_SELF_FD_EXISTS = os.path.exists("/proc/self/fd"), "Skipped: Directory /proc/self/fd does not exist"
+
+
+class Proton1800Test(unittest.TestCase):
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ def test_sync_request_response_blocking_connection_no_object_leaks(self):
+ with test_broker() as tb:
+ sockname = tb.get_acceptor_sockname()
+ url = "{0}:{1}".format(*sockname)
+ opts = namedtuple('Opts', ['address', 'timeout'])(address=url, timeout=3)
+
+ with no_fd_leaks(self):
+ client = SyncRequestResponse(
+ BlockingConnection(url, opts.timeout, allowed_mechs="ANONYMOUS"), "somequeue")
+ try:
+ request = "One Two Three Four"
+ response = client.call(Message(body=request))
+ finally:
+ client.connection.close()
+
+ gc.collect()
diff --git a/python/tests/integration/test_PROTON_2111_container_ssl_ssldomain_object_leak.py b/python/tests/integration/test_PROTON_2111_container_ssl_ssldomain_object_leak.py
new file mode 100644
index 0000000..b8a0f2a
--- /dev/null
+++ b/python/tests/integration/test_PROTON_2111_container_ssl_ssldomain_object_leak.py
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+PROTON-2111 python: memory leak on Container, SSL, and SSLDomain objects
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import contextlib
+import platform
+
+import gc
+import os
+import socket
+import threading
+
+import cproton
+
+import proton.handlers
+import proton.utils
+import proton.reactor
+
+from test_unittest import unittest
+
+
+class Broker(proton.handlers.MessagingHandler):
+ def __init__(self, acceptor_url, ssl_domain=None):
+ # type: (str, proton.SSLDomain) -> None
+ super(Broker, self).__init__()
+ self.acceptor_url = acceptor_url
+ self.ssl_domain = ssl_domain
+
+ self.acceptor = None
+ self._acceptor_opened_event = threading.Event()
+
+ self.on_message_ = threading.Event()
+
+ def get_acceptor_sockname(self):
+ # type: () -> (str, int)
+ self._acceptor_opened_event.wait()
+ if hasattr(self.acceptor, '_selectable'): # proton 0.30.0+
+ sockname = self.acceptor._selectable._delegate.getsockname()
+ else: # works in proton 0.27.0
+ selectable = cproton.pn_cast_pn_selectable(self.acceptor._impl)
+ fd = cproton.pn_selectable_get_fd(selectable)
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ sockname = s.getsockname()
+ return sockname[:2]
+
+ def on_start(self, event):
+ self.acceptor = event.container.listen(self.acceptor_url, ssl_domain=self.ssl_domain)
+ self._acceptor_opened_event.set()
+
+ def on_link_opening(self, event):
+ link = event.link # type: proton.Link
+ if link.is_sender:
+ assert not link.remote_source.dynamic, "This cannot happen"
+ link.source.address = link.remote_source.address
+ elif link.remote_target.address:
+ link.target.address = link.remote_target.address
+
+ def on_message(self, event):
+ self.on_message_.set()
+
+
+@contextlib.contextmanager
+def test_broker(ssl_domain=None):
+ # type: (proton.SSLDomain) -> Broker
+ broker = Broker('localhost:0', ssl_domain=ssl_domain)
+ container = proton.reactor.Container(broker)
+ t = threading.Thread(target=container.run)
+ t.start()
+
+ yield broker
+
+ container.stop()
+ if broker.acceptor:
+ broker.acceptor.close()
+ t.join()
+
+
+class SampleSender(proton.handlers.MessagingHandler):
+ def __init__(self, msg_id, urls, ssl_domain=None, *args, **kwargs):
+ # type: (str, str, proton.SSLDomain, *object, **object) -> None
+ super(SampleSender, self).__init__(*args, **kwargs)
+ self.urls = urls
+ self.msg_id = msg_id
+ self.ssl_domain = ssl_domain
+
+ def on_start(self, event):
+ # type: (proton.Event) -> None
+ conn = event.container.connect(url=self.urls, reconnect=False, ssl_domain=self.ssl_domain)
+ event.container.create_sender(conn, target='topic://VirtualTopic.event')
+
+ def on_sendable(self, event):
+ msg = proton.Message(body={'msg-id': self.msg_id, 'name': 'python'})
+ event.sender.send(msg)
+ event.sender.close()
+ event.connection.close()
+
+ def on_connection_error(self, event):
+ print("on_error", event)
+
+
+class Proton2111Test(unittest.TestCase):
+ @unittest.skipIf(platform.system() == 'Windows', "TODO jdanek: Test is broken on Windows")
+ def test_send_message_ssl_no_object_leaks(self):
+ """Starts a broker with ssl acceptor, in a loop connects to it and sends message.
+
+ The test checks that number of Python objects is not increasing inside the loop.
+ """
+ cwd = os.path.dirname(__file__)
+ cert_file = os.path.join(cwd, 'certificates', 'localhost_ca1.pem')
+ key_file = os.path.join(cwd, 'certificates', 'localhost_ca1-key.pem')
+ certificate_db = os.path.join(cwd, 'certificates', 'ca1.pem')
+ password = None
+
+ broker_ssl_domain = proton.SSLDomain(proton.SSLDomain.MODE_SERVER)
+ broker_ssl_domain.set_credentials(cert_file, key_file, password=password)
+
+ client_ssl_domain = proton.SSLDomain(proton.SSLDomain.MODE_CLIENT)
+ client_ssl_domain.set_trusted_ca_db(certificate_db)
+ client_ssl_domain.set_peer_authentication(proton.SSLDomain.VERIFY_PEER)
+
+ # client_ssl_domain.set_peer_authentication(proton.SSLDomain.VERIFY_PEER_NAME)
+
+ def send_msg(msg_id, urls):
+ container = proton.reactor.Container(SampleSender(msg_id, urls, client_ssl_domain))
+ container.run()
+
+ with test_broker(ssl_domain=broker_ssl_domain) as broker:
+ urls = "amqps://{0}:{1}".format(*broker.get_acceptor_sockname())
+
+ gc.collect()
+ object_counts = []
+ for i in range(300):
+ send_msg(i + 1, urls)
+ broker.on_message_.wait() # message got through
+ broker.on_message_.clear()
+ gc.collect()
+ object_counts.append(len(gc.get_objects()))
+
+ # drop first few values, it is usually different (before counts settle)
+ object_counts = object_counts[2:]
+
+ diffs = [c - object_counts[0] for c in object_counts]
+ for diff in diffs:
+ # allow for random variation from initial value on some systems, but prohibit linear growth
+ self.assertTrue(diff <= 30, "Object counts should not be increasing: {0}".format(diffs))
diff --git a/python/tests/integration/test_PROTON_2116_blocking_connection_object_leak.py b/python/tests/integration/test_PROTON_2116_blocking_connection_object_leak.py
new file mode 100644
index 0000000..afe62db
--- /dev/null
+++ b/python/tests/integration/test_PROTON_2116_blocking_connection_object_leak.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+PROTON-2116 Memory leak in python client
+PROTON-2192 Memory leak in Python client on Windows
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import platform
+import gc
+import logging
+import os
+import subprocess
+import sys
+import threading
+import time
+import uuid
+
+import proton.handlers
+import proton.reactor
+import proton.utils
+
+from test_unittest import unittest
+
+logger = logging.getLogger(__name__)
+
+
+class ReconnectingTestClient:
+ def __init__(self, hostport):
+ # type: (str) -> None
+ self.hostport = hostport
+
+ self.object_counts = []
+ self.done = threading.Event()
+
+ def count_objects(self, message):
+ # type: (str) -> None
+ gc.collect()
+ n = len(gc.get_objects())
+ if message == "loop":
+ self.object_counts.append(n)
+ logger.debug("Message %s, Count %d", message, n)
+
+ def run(self):
+ ADDR = "testing123"
+ HEARTBEAT = 5
+ SLEEP = 5
+
+ recv = None
+ conn = None
+ for _ in range(3):
+ subscribed = False
+ while not subscribed:
+ try:
+ conn = proton.utils.BlockingConnection(self.hostport, ssl_domain=None, heartbeat=HEARTBEAT)
+ recv = conn.create_receiver(ADDR, name=str(uuid.uuid4()), dynamic=False, options=None)
+ subscribed = True
+ except Exception as e:
+ logger.info("received exception %s on connect/subscribe, retry", e)
+ time.sleep(0.5)
+
+ self.count_objects("loop")
+ logger.debug("connected")
+ while subscribed:
+ try:
+ recv.receive(SLEEP)
+ except proton.Timeout:
+ pass
+ except Exception as e:
+ logger.info(e)
+ try:
+ recv.close()
+ recv = None
+ except:
+ self.count_objects("link close() failed")
+ pass
+ try:
+ conn.close()
+ conn = None
+ self.count_objects("conn closed")
+ except:
+ self.count_objects("conn close() failed")
+ pass
+ subscribed = False
+ self.done.set()
+
+
+class Proton2116Test(unittest.TestCase):
+ @unittest.skipIf(platform.system() == 'Windows', "PROTON-2192: The issue is not resolved on Windows")
+ def test_blocking_connection_object_leak(self):
+ """Kills and restarts broker repeatedly, while client is reconnecting.
+
+ The value of `gc.get_objects()` should not keep increasing in the client.
+
+ These are the automated reproduction steps for PROTON-2116"""
+ gc.collect()
+
+ thread = None
+ client = None
+
+ host_port = "" # random on first broker startup
+ broker_process = None
+
+ while not client or not client.done.is_set():
+ try:
+ params = []
+ if host_port:
+ params = ['-b', host_port]
+ cwd = os.path.dirname(__file__)
+ broker_process = subprocess.Popen(
+ args=[sys.executable,
+ os.path.join(cwd, 'broker_PROTON_2116_blocking_connection_object_leak.py')] + params,
+ stdout=subprocess.PIPE,
+ universal_newlines=True,
+ )
+ host_port = broker_process.stdout.readline()
+
+ if not client:
+ client = ReconnectingTestClient(host_port)
+ thread = threading.Thread(target=client.run)
+ thread.start()
+
+ time.sleep(3)
+ finally:
+ if broker_process:
+ broker_process.kill()
+ broker_process.wait()
+ broker_process.stdout.close()
+ time.sleep(0.3)
+
+ thread.join()
+
+ logger.info("client.object_counts:", client.object_counts)
+
+ # drop first value, it is usually different (before counts settle)
+ object_counts = client.object_counts[1:]
+
+ diffs = [c - object_counts[0] for c in object_counts]
+ self.assertEqual([0] * 2, diffs, "Object counts should not be increasing")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/python/tests/integration/test_PROTON_2121_blocking_connection_fd_leak.py b/python/tests/integration/test_PROTON_2121_blocking_connection_fd_leak.py
new file mode 100644
index 0000000..954d679
--- /dev/null
+++ b/python/tests/integration/test_PROTON_2121_blocking_connection_fd_leak.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+"""
+PROTON-2121 python-qpid-proton 0.28 BlockingConnection leaks connections (does not close file descriptors)
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import contextlib
+import socket
+import gc
+import os
+import subprocess
+import threading
+
+import cproton
+
+import proton.handlers
+import proton.utils
+import proton.reactor
+
+from test_unittest import unittest
+
+
+def count_fds():
+ # type: () -> int
+ return len(os.listdir('/proc/self/fd/'))
+
+
+@contextlib.contextmanager
+def no_fd_leaks(test):
+ # type: (unittest.TestCase) -> None
+ before = count_fds()
+ yield
+ delta = count_fds() - before
+ if delta != 0:
+ subprocess.check_call("ls -lF /proc/{0}/fd/".format(os.getpid()), shell=True)
+ test.assertEqual(0, delta, "Found {0} new fd(s) after the test".format(delta))
+
+
+class Broker(proton.handlers.MessagingHandler):
+ def __init__(self, acceptor_url):
+ # type: (str) -> None
+ super(Broker, self).__init__()
+ self.acceptor_url = acceptor_url
+
+ self.acceptor = None
+ self._acceptor_opened_event = threading.Event()
+
+ def get_acceptor_sockname(self):
+ # type: () -> (str, int)
+ self._acceptor_opened_event.wait()
+ if hasattr(self.acceptor, '_selectable'): # proton 0.30.0+
+ sockname = self.acceptor._selectable._delegate.getsockname()
+ else: # works in proton 0.27.0
+ selectable = cproton.pn_cast_pn_selectable(self.acceptor._impl)
+ fd = cproton.pn_selectable_get_fd(selectable)
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ sockname = s.getsockname()
+ return sockname[:2]
+
+ def on_start(self, event):
+ self.acceptor = event.container.listen(self.acceptor_url)
+ self._acceptor_opened_event.set()
+
+ def on_link_opening(self, event):
+ if event.link.is_sender:
+ assert not event.link.remote_source.dynamic, "This cannot happen"
+ event.link.source.address = event.link.remote_source.address
+ elif event.link.remote_target.address:
+ event.link.target.address = event.link.remote_target.address
+
+
+@contextlib.contextmanager
+def test_broker():
+ broker = Broker('localhost:0')
+ container = proton.reactor.Container(broker)
+ threading.Thread(target=container.run).start()
+
+ yield broker
+
+ container.stop()
+
+
+PROC_SELF_FD_EXISTS = os.path.exists("/proc/self/fd"), "Skipped: Directory /proc/self/fd does not exist"
+
+
+class BlockingConnectionFDLeakTests(unittest.TestCase):
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ @unittest.expectedFailure
+ def test_just_start_stop_test_broker(self):
+ with no_fd_leaks(self):
+ with test_broker() as broker:
+ broker.get_acceptor_sockname() # wait for acceptor to open
+
+ gc.collect()
+
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ @unittest.expectedFailure
+ def test_connection_close_all(self):
+ with no_fd_leaks(self):
+ with test_broker() as broker:
+ c = proton.utils.BlockingConnection("{0}:{1}".format(*broker.get_acceptor_sockname()))
+ c.close()
+
+ gc.collect()
+
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ def test_connection_close_all__do_not_check_test_broker(self):
+ with test_broker() as broker:
+ acceptor_sockname = broker.get_acceptor_sockname()
+ with no_fd_leaks(self):
+ c = proton.utils.BlockingConnection("{0}:{1}".format(*acceptor_sockname))
+ c.close()
+
+ gc.collect()
+
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ @unittest.expectedFailure
+ def test_connection_sender_close_all(self):
+ with no_fd_leaks(self):
+ with test_broker() as broker:
+ c = proton.utils.BlockingConnection("{0}:{1}".format(*broker.get_acceptor_sockname()))
+ s = c.create_sender("anAddress")
+ s.close()
+ c.close()
+
+ gc.collect()
+
+ @unittest.skipUnless(*PROC_SELF_FD_EXISTS)
+ @unittest.expectedFailure
+ def test_connection_receiver_close_all(self):
+ with no_fd_leaks(self):
+ with test_broker() as broker:
+ c = proton.utils.BlockingConnection("{0}:{1}".format(*broker.get_acceptor_sockname()))
+ s = c.create_receiver("anAddress")
+ s.close()
+ c.close()
+
+ gc.collect()
diff --git a/tests/lsan.supp b/tests/lsan.supp
index 06ff33d..e362e2e 100644
--- a/tests/lsan.supp
+++ b/tests/lsan.supp
@@ -99,6 +99,9 @@ leak:libpython
# Sasl 2 library
leak:^_plug_strdup$
+leak:libsasl2.so
+leak:libsasl2.so.2
+leak:sasl2/libanonymous.so
# /usr/sbin/saslpasswd2 binary
leak:saslpasswd2
@@ -109,4 +112,4 @@ leak:libdigestmd5.so
leak:bash
# ruby in ruby-example-test
-leak:libruby.so
\ No newline at end of file
+leak:libruby.so
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org