You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/12/04 22:41:53 UTC
svn commit: r1417204 -
/qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py
Author: kgiusti
Date: Tue Dec 4 21:41:52 2012
New Revision: 1417204
URL: http://svn.apache.org/viewvc?rev=1417204&view=rev
Log:
PROTON-136: rework the SSL tests to use the new domain api.
Modified:
qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py
Modified: qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py?rev=1417204&r1=1417203&r2=1417204&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py (original)
+++ qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py Tue Dec 4 21:41:52 2012
@@ -30,29 +30,39 @@ class SslTest(common.Test):
def setup(self):
try:
- self.t_server = Transport()
- self.server = SSL(self.t_server)
- self.server.init(SSL.MODE_SERVER)
- self.t_client = Transport()
- self.client = SSL(self.t_client)
- self.client.init(SSL.MODE_CLIENT)
+ self.server_domain = SSLDomain(SSL.MODE_SERVER)
+ self.client_domain = SSLDomain(SSL.MODE_CLIENT)
except SSLUnavailable, e:
raise Skipped(e)
def teardown(self):
- self.t_client = None
- self.t_server = None
+ self.server_domain = None
+ self.client_domain = None
- def _pump(self):
- self._pump2(self.t_client, self.t_server)
+ class SslTestConnection(object):
+ """ Represents a single SSL connection.
+ """
+ def __init__(self, domain=None):
+ try:
+ self.ssl = None
+ self.domain = domain
+ self.transport = Transport()
+ self.connection = Connection()
+ self.transport.bind(self.connection)
+ if domain:
+ self.ssl = SSL( self.transport, self.domain )
+ except SSLUnavailable, e:
+ raise Skipped(e)
- def _pump2(self, client, server):
+ def _pump(self, ssl1, ssl2):
+ """ Allow two SslTestConnections to transfer data until done.
+ """
while True:
- out_client = client.output(1024)
- out_server = server.output(1024)
- if out_client: server.input(out_client)
- if out_server: client.input(out_server)
- if not out_client and not out_server: break
+ out1 = ssl1.transport.output(1024)
+ out2 = ssl2.transport.output(1024)
+ if out1: ssl2.transport.input(out1)
+ if out2: ssl1.transport.input(out2)
+ if not out1 and not out2: break
def _testpath(self, file):
""" Set the full path to the certificate,keyfile, etc. for the test.
@@ -64,370 +74,375 @@ class SslTest(common.Test):
""" By default, both the server and the client support anonymous
ciphers - they should connect without need for a certificate.
"""
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
# check that no SSL connection exists
- assert not self.server.cipher_name()
- assert not self.client.protocol_name()
+ assert not server.ssl.cipher_name()
+ assert not client.ssl.protocol_name()
+
+ #client.transport.trace(Transport.TRACE_DRV)
+ #server.transport.trace(Transport.TRACE_DRV)
- client_conn.open()
- server_conn.open()
- self._pump()
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
# now SSL should be active
- assert self.server.cipher_name() is not None
- assert self.client.protocol_name() is not None
+ assert server.ssl.cipher_name() is not None
+ assert client.ssl.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
def test_server_certificate(self):
""" Test that anonymous clients can still connect to a server that has
a certificate configured.
"""
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
- assert self.client.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
def test_server_authentication(self):
""" Simple SSL connection with authentication of the server
"""
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
-
- self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
- assert self.client.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
def test_client_authentication(self):
""" Force the client to authenticate.
"""
# note: when requesting client auth, the server _must_ send its
# certificate, so make sure we configure one!
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.server.set_peer_authentication( SSL.VERIFY_PEER,
- self._testpath("ca-certificate.pem") )
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ server = SslTest.SslTestConnection( self.server_domain )
+ server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
# give the client a certificate, but let's not require server authentication
- self.client.set_credentials(self._testpath("client-certificate.pem"),
- self._testpath("client-private-key.pem"),
- "client-password")
- self.client.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
- assert self.client.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+ self._testpath("client-private-key.pem"),
+ "client-password")
+ client = SslTest.SslTestConnection( self.client_domain )
+ client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
+ def test_client_authentication_fail_bad_cert(self):
+ """ Ensure that the server can detect a bad client certificate.
+ """
+ # note: when requesting client auth, the server _must_ send its
+ # certificate, so make sure we configure one!
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ server = SslTest.SslTestConnection( self.server_domain )
+ server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
+
+ self.client_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
+ self._testpath("bad-server-private-key.pem"),
+ "server-password")
+ client = SslTest.SslTestConnection( self.client_domain )
+ client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ client.connection.open()
+ server.connection.open()
+ try:
+ self._pump( client, server )
+ assert False, "Server failed to reject bad certificate."
+ except TransportException, e:
+ pass
+
+ def test_client_authentication_fail_no_cert(self):
+ """ Ensure that the server will fail a client that does not provide a
+ certificate.
+ """
+ # note: when requesting client auth, the server _must_ send its
+ # certificate, so make sure we configure one!
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ server = SslTest.SslTestConnection( self.server_domain )
+ server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
+
+ client = SslTest.SslTestConnection( self.client_domain )
+ client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ client.connection.open()
+ server.connection.open()
+ try:
+ self._pump( client, server )
+ assert False, "Server failed to reject bad certificate."
+ except TransportException, e:
+ pass
def test_client_server_authentication(self):
""" Require both client and server to mutually identify themselves.
"""
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.server.set_peer_authentication( SSL.VERIFY_PEER,
- self._testpath("ca-certificate.pem") )
-
- self.client.set_credentials(self._testpath("client-certificate.pem"),
- self._testpath("client-private-key.pem"),
- "client-password")
- self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
- assert self.client.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
+
+ self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+ self._testpath("client-private-key.pem"),
+ "client-password")
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
def test_server_only_authentication(self):
""" Client verifies server, but server does not verify client.
"""
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
- self.client.set_credentials(self._testpath("client-certificate.pem"),
- self._testpath("client-private-key.pem"),
- "client-password")
- self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
- assert self.client.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump()
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+ self._testpath("client-private-key.pem"),
+ "client-password")
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
def test_bad_server_certificate(self):
""" A server with a self-signed certificate that is not trusted by the
client. The client should reject the server.
"""
- self.server.set_credentials(self._testpath("bad-server-certificate.pem"),
- self._testpath("bad-server-private-key.pem"),
- "server-password")
- self.server.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
- self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
+ self.server_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
+ self._testpath("bad-server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ client.connection.open()
+ server.connection.open()
try:
- self._pump()
+ self._pump( client, server )
assert False, "Client failed to reject bad certificate."
except TransportException, e:
pass
- def test_allow_unsecured_client(self):
- """ Client is allows to connect unsecured to secured client
- """
- self.t_client = Transport()
-
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.server.set_peer_authentication( SSL.VERIFY_PEER,
- self._testpath("ca-certificate.pem") )
- self.server.allow_unsecured_client()
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- self._pump()
+ del server
+ del client
- def test_disallow_unsecured_client(self):
- """ Client is disallowed from connecting unsecured to secured client
- """
- self.t_client = Transport()
+ # now re-try with a client that does not require peer verification
- self.server.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- self.server.set_peer_authentication( SSL.VERIFY_PEER,
- self._testpath("ca-certificate.pem") )
-
- client_conn = Connection()
- self.t_client.bind(client_conn)
- server_conn = Connection()
- self.t_server.bind(server_conn)
- client_conn.open()
- server_conn.open()
- try:
- self._pump()
- assert False, "Expecting exception"
- except TransportException:
- assert True
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+ client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
- def test_domain_server_authentication(self):
- """ Simple SSL connection with authentication of the server
+ def test_allow_unsecured_client(self):
+ """ Server allows an unsecured client to connect if configured.
"""
- server_domain = SSLDomain(SSL.MODE_SERVER)
- server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
self._testpath("server-private-key.pem"),
"server-password")
- server_transport = Transport()
- server_ssl = SSL( server_transport, server_domain)
-
- client_domain = SSLDomain(SSL.MODE_CLIENT)
- client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
- client_transport = Transport()
- client_ssl = SSL( client_transport, client_domain )
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
+ # allow unsecured clients on this connection
+ server = SslTest.SslTestConnection( self.server_domain )
+ server.ssl.allow_unsecured_client()
+
+ # non-ssl connection
+ client = SslTest.SslTestConnection()
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert server.ssl.protocol_name() is None
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
- client_conn = Connection()
- client_transport.bind(client_conn)
-
- server_conn = Connection()
- server_transport.bind(server_conn)
-
- client_conn.open()
- server_conn.open()
- self._pump2( client_transport, server_transport )
- assert client_ssl.protocol_name() is not None
- client_conn.close()
- server_conn.close()
- self._pump2( client_transport, server_transport )
-
-
- def test_domain_bad_server_certificate(self):
- """ A server with a self-signed certificate that is not trusted by the
- client. The client should reject the server.
+ def test_disallow_unsecured_client(self):
+ """ Non-SSL Client is disallowed from connecting to server.
"""
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+ server = SslTest.SslTestConnection( self.server_domain )
- server_domain = SSLDomain(SSL.MODE_SERVER)
- server_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
- self._testpath("bad-server-private-key.pem"),
- "server-password")
- server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
-
- # by default, clients will allow connections to the server with the bad cert
- client_domain = SSLDomain(SSL.MODE_CLIENT)
- client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- client_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
-
- # create a server listener
- server_transport = Transport()
- server_ssl = SSL( server_transport, server_domain)
- server_conn = Connection()
- server_transport.bind(server_conn)
-
- # create a client using anonymous
- bad_client_transport = Transport()
- bad_client_ssl = SSL( bad_client_transport, client_domain )
- bad_client_conn = Connection()
- bad_client_transport.bind(bad_client_conn)
-
- # client will allow connection to server:
- bad_client_conn.open()
- server_conn.open()
- self._pump2( bad_client_transport, server_transport )
- assert bad_client_ssl.protocol_name() is not None
- bad_client_conn.close()
- server_conn.close()
-
- # now instantiate new client using same domain, but override to verify
- # peer:
- good_client_transport = Transport()
- good_client_ssl = SSL( good_client_transport, client_domain )
- good_client_ssl.set_peer_authentication( SSL.VERIFY_PEER )
- good_client_conn = Connection()
- good_client_transport.bind(good_client_conn)
-
- server_transport = Transport()
- server_ssl = SSL( server_transport, server_domain)
- server_conn = Connection()
- server_transport.bind(server_conn)
+ # non-ssl connection
+ client = SslTest.SslTestConnection()
- good_client_conn.open()
- server_conn.open()
+ client.connection.open()
+ server.connection.open()
try:
- self._pump2( good_client_transport, server_transport )
- assert False, "Client failed to reject bad certificate."
- except TransportException, e:
+ self._pump( client, server )
+ assert False, "Server did not reject client as expected."
+ except TransportException:
pass
- good_client_conn.close()
- server_conn.close()
def test_session_resume(self):
""" Test resume of client session.
"""
- # server config
- server_domain = SSLDomain(SSL.MODE_SERVER)
- server_domain.set_credentials(self._testpath("server-certificate.pem"),
- self._testpath("server-private-key.pem"),
- "server-password")
- # client config
- client_domain = SSLDomain(SSL.MODE_CLIENT)
- client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
-
- # setup first client session
- client_transport = Transport()
- client_ssl = SSL( client_transport, client_domain )
- client_conn = Connection()
- client_transport.bind(client_conn)
-
- # create server session to connect to
- server_transport = Transport()
- server_ssl = SSL( server_transport, server_domain)
- server_conn = Connection()
- server_transport.bind(server_conn)
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
- #client_transport.trace(Transport.TRACE_DRV)
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
# bring up the connection and store its state
- client_conn.open()
- server_conn.open()
- self._pump2( client_transport, server_transport )
- assert client_ssl.protocol_name() is not None
- ssl_state = client_ssl.get_state()
- client_conn.close()
- server_conn.close()
- self._pump2( client_transport, server_transport )
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert client.ssl.protocol_name() is not None
+ ssl_state = client.ssl.get_state()
+
+ # cleanly shutdown the connection
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
+
+ # destroy the existing clients, and client domain
+ del client
+ del server
+ del self.client_domain
# now create a new set of connections
- client_transport = Transport()
- client_ssl = SSL( client_transport, client_domain )
- client_conn = Connection()
- client_transport.bind(client_conn)
-
- server_transport = Transport()
- server_ssl = SSL( server_transport, server_domain)
- server_conn = Connection()
- server_transport.bind(server_conn)
-
- client_ssl.resume_state( ssl_state )
- client_conn.open()
- server_conn.open()
- self._pump2( client_transport, server_transport )
- assert client_ssl.protocol_name() is not None
- assert client_ssl.state_resumed_ok()
- #newstate = client_ssl.get_state()
-
- del ssl_state
- client_conn.close()
- server_conn.close()
- self._pump2( client_transport, server_transport )
+ self.client_domain = SSLDomain(SSL.MODE_CLIENT)
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ server = SslTest.SslTestConnection( self.server_domain )
+ client = SslTest.SslTestConnection( self.client_domain )
+
+ # resume the client state before attempting to connect
+ client.ssl.resume_state( ssl_state )
+
+ #client.transport.trace(Transport.TRACE_DRV)
+ #server.transport.trace(Transport.TRACE_DRV)
+
+ client.connection.open()
+ server.connection.open()
+ self._pump( client, server )
+ assert server.ssl.protocol_name() is not None
+ assert client.ssl.state_resumed_ok()
+ client.connection.close()
+ server.connection.close()
+ self._pump( client, server )
+
+ def test_multiple_sessions(self):
+ """ Test multiple simultaineous active SSL sessions with bi-directional
+ certificate verification, shared across two domains.
+ """
+ self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+ self._testpath("server-private-key.pem"),
+ "server-password")
+ self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+ self._testpath("ca-certificate.pem") )
+
+ self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+ self._testpath("client-private-key.pem"),
+ "client-password")
+ self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+ self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+ max_count = 100
+ sessions = [(SslTest.SslTestConnection( self.server_domain ),
+ SslTest.SslTestConnection( self.client_domain )) for x in
+ range(max_count)]
+ for s in sessions:
+ s[0].connection.open()
+ self._pump( s[0], s[1] )
+
+ for s in sessions:
+ s[1].connection.open()
+ self._pump( s[1], s[0] )
+ assert s[0].ssl.cipher_name() is not None
+ assert s[1].ssl.cipher_name() == s[0].ssl.cipher_name()
+
+ for s in sessions:
+ s[1].connection.close()
+ self._pump( s[0], s[1] )
+
+ for s in sessions:
+ s[0].connection.close()
+ self._pump( s[1], s[0] )
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org