You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2019/04/22 15:32:14 UTC
[qpid-dispatch] branch master updated: DISPATCH-1230: Fix issues
with detecting TLSv1.3 in proton and OpenSSL
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 7838300 DISPATCH-1230: Fix issues with detecting TLSv1.3 in proton and OpenSSL
7838300 is described below
commit 783830026582fe7645442d564574d605e155e030
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Mon Apr 22 11:28:40 2019 -0400
DISPATCH-1230: Fix issues with detecting TLSv1.3 in proton and OpenSSL
Merge branch https://github.com/fgiogetti/fgiorgetti-DISPATCH-1230-2
This closes #487
Squashed commit of the following:
commit 3733d5337a673b3cf8a9a6904bef24616cf026ef
Author: Fernando Giorgetti <fg...@redhat.com>
Date: Thu Apr 18 12:14:54 2019 -0300
DISPATCH-1230 - Reading OpenSSL version from command line
commit 0d98c579bb47fb6af367d22478ed08d13c4f27ba
Author: Chuck Rolke <ch...@apache.org>
Date: Tue Apr 16 12:08:17 2019 -0400
DISPATCH-1230: TLSv1.3: detect Proton 1.3 support; 1.3-only test
commit d94af68ec1d7633abb9e93729f9d22d6402939a5
Author: Fernando Giorgetti <fg...@redhat.com>
Date: Tue Apr 9 12:02:57 2019 -0300
DISPATCH-1230 - Fixed issues with system_tests_ssl always allowing TLS1.2 with OpenSSL >= 1.1
---
tests/system_tests_ssl.py | 171 +++++++++++++++++++++++++++++++++++++---------
1 file changed, 138 insertions(+), 33 deletions(-)
diff --git a/tests/system_tests_ssl.py b/tests/system_tests_ssl.py
index feabfea..b5b2fd0 100644
--- a/tests/system_tests_ssl.py
+++ b/tests/system_tests_ssl.py
@@ -23,11 +23,13 @@ Provides tests related with allowed TLS protocol version restrictions.
import os
import ssl
import sys
+import re
from subprocess import Popen, PIPE
from qpid_dispatch.management.client import Node
from system_test import TestCase, main_module, Qdrouterd, DIR, SkipIfNeeded
-from proton import SASL, Url, SSLDomain
+from proton import SASL, Url, SSLDomain, SSLUnavailable
from proton.utils import BlockingConnection
+from distutils.version import StrictVersion
import proton
import cproton
import unittest2 as unittest
@@ -39,6 +41,7 @@ class RouterTestSslBase(TestCase):
"""
# If unable to determine which protocol versions are allowed system wide
DISABLE_SSL_TESTING = False
+ DISABLE_REASON = "Unable to determine MinProtocol"
@staticmethod
def ssl_file(name):
@@ -86,6 +89,7 @@ class RouterTestSslClient(RouterTestSslBase):
PORT_TLS1 = 0
PORT_TLS11 = 0
PORT_TLS12 = 0
+ PORT_TLS13 = 0
PORT_TLS1_TLS11 = 0
PORT_TLS1_TLS12 = 0
PORT_TLS11_TLS12 = 0
@@ -95,7 +99,26 @@ class RouterTestSslClient(RouterTestSslBase):
TIMEOUT = 3
# If using OpenSSL 1.1 or greater, TLSv1.2 is always being allowed
- OPENSSL_VER_1_1_GT = ssl.OPENSSL_VERSION_INFO[:2] >= (1, 1)
+ OPENSSL_OUT_VER = None
+ try:
+ OPENSSL_VER_1_1_GT = ssl.OPENSSL_VERSION_INFO[:2] >= (1, 1)
+ except AttributeError:
+ OPENSSL_VER_1_1_GT = False
+
+ # If still False, try getting it from "openssl version" (command output)
+ # The version from ssl.OPENSSL_VERSION_INFO reflects OpenSSL version in which
+ # Python was compiled with, not the one installed in the system.
+ if not OPENSSL_VER_1_1_GT:
+ print("Python libraries SSL Version < 1.1")
+ try:
+ p = Popen(['openssl', 'version'], stdout=PIPE, universal_newlines=True)
+ openssl_out = p.communicate()[0]
+ m = re.search('[0-9]+\.[0-9]+\.[0-9]+', openssl_out)
+ OPENSSL_OUT_VER = m.group(0)
+ OPENSSL_VER_1_1_GT = StrictVersion(OPENSSL_OUT_VER) >= StrictVersion('1.1')
+ print("OpenSSL Version found = %s" % OPENSSL_OUT_VER)
+ except:
+ pass
# Following variables define TLS versions allowed by openssl
OPENSSL_MIN_VER = 0
@@ -103,20 +126,53 @@ class RouterTestSslClient(RouterTestSslBase):
OPENSSL_ALLOW_TLSV1 = True
OPENSSL_ALLOW_TLSV1_1 = True
OPENSSL_ALLOW_TLSV1_2 = True
+ OPENSSL_ALLOW_TLSV1_3 = False
+
+ # Test if OpenSSL has TLSv1_3
+ OPENSSL_HAS_TLSV1_3 = False
+ if OPENSSL_VER_1_1_GT:
+ try:
+ ssl.TLSVersion.TLSv1_3
+ OPENSSL_HAS_TLSV1_3 = True
+ except:
+ pass
+
+ # Test if Proton supports TLSv1_3
+ try:
+ dummydomain = SSLDomain(SSLDomain.MODE_CLIENT)
+ PROTON_HAS_TLSV1_3 = cproton.PN_OK == cproton.pn_ssl_domain_set_protocols(dummydomain._domain, "TLSv1.3")
+ print("TLSV1_3? Proton has: %s, OpenSSL has: %s" % (PROTON_HAS_TLSV1_3, OPENSSL_HAS_TLSV1_3))
+ except SSLUnavailable:
+ PROTON_HAS_TLSV1_3 = False
# When using OpenSSL >= 1.1 and python >= 3.7, we can retrieve OpenSSL min and max protocols
if OPENSSL_VER_1_1_GT:
if sys.version_info >= (3, 7):
- OPENSSL_CTX = ssl.create_default_context()
- OPENSSL_MIN_VER = OPENSSL_CTX.minimum_version
- OPENSSL_MAX_VER = OPENSSL_CTX.maximum_version if OPENSSL_CTX.maximum_version > 0 else 9999
- OPENSSL_ALLOW_TLSV1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1 <= OPENSSL_MAX_VER
- OPENSSL_ALLOW_TLSV1_1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_1 <= OPENSSL_MAX_VER
- OPENSSL_ALLOW_TLSV1_2 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_2 <= OPENSSL_MAX_VER
+ if OPENSSL_HAS_TLSV1_3 and not PROTON_HAS_TLSV1_3:
+ # If OpenSSL has 1.3 but proton won't let us turn it on and off then
+ # this test fails because v1.3 runs unexpectedly.
+ RouterTestSslBase.DISABLE_SSL_TESTING = True
+ RouterTestSslBase.DISABLE_REASON = "Proton version does not support TLSv1.3 but OpenSSL does"
+ else:
+ OPENSSL_CTX = ssl.create_default_context()
+ OPENSSL_MIN_VER = OPENSSL_CTX.minimum_version
+ OPENSSL_MAX_VER = OPENSSL_CTX.maximum_version if OPENSSL_CTX.maximum_version > 0 else 9999
+ OPENSSL_ALLOW_TLSV1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1 <= OPENSSL_MAX_VER
+ OPENSSL_ALLOW_TLSV1_1 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_1 <= OPENSSL_MAX_VER
+ OPENSSL_ALLOW_TLSV1_2 = OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_2 <= OPENSSL_MAX_VER
+ OPENSSL_ALLOW_TLSV1_3 = OPENSSL_HAS_TLSV1_3 and PROTON_HAS_TLSV1_3 \
+ and OPENSSL_MIN_VER <= ssl.TLSVersion.TLSv1_3 <= OPENSSL_MAX_VER
else:
# At this point we are not able to precisely determine what are the minimum and maximum
# TLS versions allowed in the system, so tests will be disabled
RouterTestSslBase.DISABLE_SSL_TESTING = True
+ RouterTestSslBase.DISABLE_REASON = "OpenSSL >= 1.1 but Python < 3.7 - Unable to determine MinProtocol"
+ else:
+ if OPENSSL_HAS_TLSV1_3 and not PROTON_HAS_TLSV1_3:
+ # If OpenSSL has 1.3 but proton won't let us turn it on and off then
+ # this test fails because v1.3 runs unexpectedly.
+ RouterTestSslBase.DISABLE_SSL_TESTING = True
+ RouterTestSslBase.DISABLE_REASON = "Proton version does not support TLSv1.3 but OpenSSL does"
@classmethod
def setUpClass(cls):
@@ -144,6 +200,7 @@ class RouterTestSslClient(RouterTestSslBase):
cls.PORT_TLS1 = cls.tester.get_port()
cls.PORT_TLS11 = cls.tester.get_port()
cls.PORT_TLS12 = cls.tester.get_port()
+ cls.PORT_TLS13 = cls.tester.get_port()
cls.PORT_TLS1_TLS11 = cls.tester.get_port()
cls.PORT_TLS1_TLS12 = cls.tester.get_port()
cls.PORT_TLS11_TLS12 = cls.tester.get_port()
@@ -272,6 +329,22 @@ class RouterTestSslClient(RouterTestSslBase):
'password': 'server-password'})
]
+ if cls.OPENSSL_ALLOW_TLSV1_3:
+ conf += [
+ # TLSv1.3 only
+ ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.PORT_TLS13,
+ 'authenticatePeer': 'no',
+ 'sslProfile': 'ssl-profile-tls13'}),
+ # SSL Profile for TLSv1.3
+ ('sslProfile', {'name': 'ssl-profile-tls13',
+ 'caCertFile': cls.ssl_file('ca-certificate.pem'),
+ 'certFile': cls.ssl_file('server-certificate.pem'),
+ 'privateKeyFile': cls.ssl_file('server-private-key.pem'),
+ 'protocols': 'TLSv1.3',
+ 'password': 'server-password'})
+
+ ]
+
config = Qdrouterd.Config(conf)
cls.routers.append(cls.tester.qdrouterd("A", config, wait=False))
@@ -289,6 +362,10 @@ class RouterTestSslClient(RouterTestSslBase):
for proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2']:
results.append(self.is_proto_allowed(listener_port, proto))
+ if self.OPENSSL_ALLOW_TLSV1_3:
+ results.append(self.is_proto_allowed(listener_port, 'TLSv1.3'))
+ else:
+ results.append(False)
return results
def is_proto_allowed(self, listener_port, tls_protocol):
@@ -314,6 +391,8 @@ class RouterTestSslClient(RouterTestSslBase):
return False
except proton.ConnectionException:
return False
+ except:
+ return False
# TLS version provided was accepted
connection.close()
@@ -366,75 +445,85 @@ class RouterTestSslClient(RouterTestSslBase):
:param expected_results:
:return:
"""
- (tlsv1, tlsv1_1, tlsv1_2) = expected_results
+ (tlsv1, tlsv1_1, tlsv1_2, tlsv1_3) = expected_results
return [self.OPENSSL_ALLOW_TLSV1 and tlsv1,
self.OPENSSL_ALLOW_TLSV1_1 and tlsv1_1,
- self.OPENSSL_VER_1_1_GT or (self.OPENSSL_ALLOW_TLSV1_2 and tlsv1_2)]
+ self.OPENSSL_ALLOW_TLSV1_2 and tlsv1_2,
+ self.OPENSSL_ALLOW_TLSV1_3 and tlsv1_3]
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls1_only(self):
"""
Expects TLSv1 only is allowed
"""
- self.assertEquals(self.get_expected_tls_result([True, False, False]),
+ self.assertEquals(self.get_expected_tls_result([True, False, False, False]),
self.get_allowed_protocols(self.PORT_TLS1))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls11_only(self):
"""
Expects TLSv1.1 only is allowed
"""
- self.assertEquals(self.get_expected_tls_result([False, True, False]),
+ self.assertEquals(self.get_expected_tls_result([False, True, False, False]),
self.get_allowed_protocols(self.PORT_TLS11))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls12_only(self):
"""
Expects TLSv1.2 only is allowed
"""
- self.assertEquals(self.get_expected_tls_result([False, False, True]),
+ self.assertEquals(self.get_expected_tls_result([False, False, True, False]),
self.get_allowed_protocols(self.PORT_TLS12))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
+ def test_tls13_only(self):
+ """
+ Expects TLSv1.3 only is allowed
+ """
+ self.assertEquals(self.get_expected_tls_result([False, False, False, True]),
+ self.get_allowed_protocols(self.PORT_TLS13))
+
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls1_tls11_only(self):
"""
Expects TLSv1 and TLSv1.1 only are allowed
"""
- self.assertEquals(self.get_expected_tls_result([True, True, False]),
+ self.assertEquals(self.get_expected_tls_result([True, True, False, False]),
self.get_allowed_protocols(self.PORT_TLS1_TLS11))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls1_tls12_only(self):
"""
Expects TLSv1 and TLSv1.2 only are allowed
"""
- self.assertEquals(self.get_expected_tls_result([True, False, True]),
+ self.assertEquals(self.get_expected_tls_result([True, False, True, False]),
self.get_allowed_protocols(self.PORT_TLS1_TLS12))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls11_tls12_only(self):
"""
Expects TLSv1.1 and TLSv1.2 only are allowed
"""
- self.assertEquals(self.get_expected_tls_result([False, True, True]),
+ self.assertEquals(self.get_expected_tls_result([False, True, True, False]),
self.get_allowed_protocols(self.PORT_TLS11_TLS12))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_tls_all(self):
"""
- Expects all supported versions: TLSv1, TLSv1.1 and TLSv1.2 to be allowed
+ Expects all supported versions: TLSv1, TLSv1.1, TLSv1.2 and TLSv1.3 to be allowed
"""
- self.assertEquals(self.get_expected_tls_result([True, True, True]),
+ self.assertEquals(self.get_expected_tls_result([True, True, True, True]),
self.get_allowed_protocols(self.PORT_TLS_ALL))
- @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, "Unable to determine MinProtocol")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING, RouterTestSslBase.DISABLE_REASON)
def test_ssl_invalid(self):
"""
Expects connection is rejected as SSL is no longer supported
"""
self.assertEqual(False, self.is_proto_allowed(self.PORT_SSL3, 'SSLv3'))
- @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(),
+ "Cyrus library not available. skipping test")
def test_ssl_sasl_client_valid(self):
"""
Attempts to connect a Proton client using a valid SASL authentication info
@@ -444,10 +533,12 @@ class RouterTestSslClient(RouterTestSslBase):
if not SASL.extended():
self.skipTest("Cyrus library not available. skipping test")
- self.assertTrue(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1"))
- self.assertTrue(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.2"))
+ exp_tls_results = self.get_expected_tls_result([True, False, True, False])
+ self.assertEqual(exp_tls_results[0], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1"))
+ self.assertEqual(exp_tls_results[2], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.2"))
- @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(),
+ "Cyrus library not available. skipping test")
def test_ssl_sasl_client_invalid(self):
"""
Attempts to connect a Proton client using a valid SASL authentication info
@@ -457,7 +548,8 @@ class RouterTestSslClient(RouterTestSslBase):
if not SASL.extended():
self.skipTest("Cyrus library not available. skipping test")
- self.assertFalse(self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.1"))
+ exp_tls_results = self.get_expected_tls_result([True, False, True, False])
+ self.assertEqual(exp_tls_results[1], self.is_ssl_sasl_client_accepted(self.PORT_TLS_SASL, "TLSv1.1"))
class RouterTestSslInterRouter(RouterTestSslBase):
@@ -657,7 +749,8 @@ class RouterTestSslInterRouter(RouterTestSslBase):
node.close()
return router_nodes
- @SkipIfNeeded(not SASL.extended(), "Cyrus library not available. skipping test")
+ @SkipIfNeeded(RouterTestSslBase.DISABLE_SSL_TESTING or not SASL.extended(),
+ "Cyrus library not available. skipping test")
def test_connected_tls_sasl_routers(self):
"""
Validates if all expected routers are connected in the network
@@ -670,7 +763,19 @@ class RouterTestSslInterRouter(RouterTestSslBase):
for node in router_nodes:
self.assertTrue(node in self.connected_tls_sasl_routers,
"%s should not be connected" % node)
- self.assertEqual(len(router_nodes), 4)
+
+ # Router A and B are always expected (no tls version restriction)
+ expected_nodes = len(self.connected_tls_sasl_routers)
+
+ # Router C only if TLSv1.2 is allowed
+ if not RouterTestSslClient.OPENSSL_ALLOW_TLSV1_2:
+ expected_nodes -= 1
+
+ # Router D only if TLSv1.1 is allowed
+ if not RouterTestSslClient.OPENSSL_ALLOW_TLSV1_1:
+ expected_nodes -= 1
+
+ self.assertEqual(len(router_nodes), expected_nodes)
if __name__ == '__main__':
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org