You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/12/04 22:41:53 UTC

svn commit: r1417204 - /qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py

Author: kgiusti
Date: Tue Dec  4 21:41:52 2012
New Revision: 1417204

URL: http://svn.apache.org/viewvc?rev=1417204&view=rev
Log:
PROTON-136: rework the SSL tests to use the new domain api.

Modified:
    qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py

Modified: qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py?rev=1417204&r1=1417203&r2=1417204&view=diff
==============================================================================
--- qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py (original)
+++ qpid/proton/branches/kgiusti-proton-136/tests/proton_tests/ssl.py Tue Dec  4 21:41:52 2012
@@ -30,29 +30,39 @@ class SslTest(common.Test):
 
     def setup(self):
         try:
-            self.t_server = Transport()
-            self.server = SSL(self.t_server)
-            self.server.init(SSL.MODE_SERVER)
-            self.t_client = Transport()
-            self.client = SSL(self.t_client)
-            self.client.init(SSL.MODE_CLIENT)
+            self.server_domain = SSLDomain(SSL.MODE_SERVER)
+            self.client_domain = SSLDomain(SSL.MODE_CLIENT)
         except SSLUnavailable, e:
             raise Skipped(e)
 
     def teardown(self):
-        self.t_client = None
-        self.t_server = None
+        self.server_domain = None
+        self.client_domain = None
 
-    def _pump(self):
-        self._pump2(self.t_client, self.t_server)
+    class SslTestConnection(object):
+        """ Represents a single SSL connection.
+        """
+        def __init__(self, domain=None):
+            try:
+                self.ssl = None
+                self.domain = domain
+                self.transport = Transport()
+                self.connection = Connection()
+                self.transport.bind(self.connection)
+                if domain:
+                    self.ssl = SSL( self.transport, self.domain )
+            except SSLUnavailable, e:
+                raise Skipped(e)
 
-    def _pump2(self, client, server):
+    def _pump(self, ssl1, ssl2):
+        """ Allow two SslTestConnections to transfer data until done.
+        """
         while True:
-            out_client = client.output(1024)
-            out_server = server.output(1024)
-            if out_client: server.input(out_client)
-            if out_server: client.input(out_server)
-            if not out_client and not out_server: break
+            out1 = ssl1.transport.output(1024)
+            out2 = ssl2.transport.output(1024)
+            if out1: ssl2.transport.input(out1)
+            if out2: ssl1.transport.input(out2)
+            if not out1 and not out2: break
 
     def _testpath(self, file):
         """ Set the full path to the certificate,keyfile, etc. for the test.
@@ -64,370 +74,375 @@ class SslTest(common.Test):
         """ By default, both the server and the client support anonymous
         ciphers - they should connect without need for a certificate.
         """
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
 
         # check that no SSL connection exists
-        assert not self.server.cipher_name()
-        assert not self.client.protocol_name()
+        assert not server.ssl.cipher_name()
+        assert not client.ssl.protocol_name()
+
+        #client.transport.trace(Transport.TRACE_DRV)
+        #server.transport.trace(Transport.TRACE_DRV)
 
-        client_conn.open()
-        server_conn.open()
-        self._pump()
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
 
         # now SSL should be active
-        assert self.server.cipher_name() is not None
-        assert self.client.protocol_name() is not None
+        assert server.ssl.cipher_name() is not None
+        assert client.ssl.protocol_name() is not None
 
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
     def test_server_certificate(self):
         """ Test that anonymous clients can still connect to a server that has
         a certificate configured.
         """
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
-        assert self.client.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
     def test_server_authentication(self):
         """ Simple SSL connection with authentication of the server
         """
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-
-        self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
-        assert self.client.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
     def test_client_authentication(self):
         """ Force the client to authenticate.
         """
         # note: when requesting client auth, the server _must_ send its
         # certificate, so make sure we configure one!
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.server.set_peer_authentication( SSL.VERIFY_PEER,
-                                             self._testpath("ca-certificate.pem") )
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        server = SslTest.SslTestConnection( self.server_domain )
+        server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+                                            self._testpath("ca-certificate.pem") )
 
         # give the client a certificate, but let's not require server authentication
-        self.client.set_credentials(self._testpath("client-certificate.pem"),
-                                    self._testpath("client-private-key.pem"),
-                                    "client-password")
-        self.client.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
-        assert self.client.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+                                           self._testpath("client-private-key.pem"),
+                                           "client-password")
+        client = SslTest.SslTestConnection( self.client_domain )
+        client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
+    def test_client_authentication_fail_bad_cert(self):
+        """ Ensure that the server can detect a bad client certificate.
+        """
+        # note: when requesting client auth, the server _must_ send its
+        # certificate, so make sure we configure one!
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        server = SslTest.SslTestConnection( self.server_domain )
+        server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+                                            self._testpath("ca-certificate.pem") )
+
+        self.client_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
+                                           self._testpath("bad-server-private-key.pem"),
+                                           "server-password")
+        client = SslTest.SslTestConnection( self.client_domain )
+        client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        client.connection.open()
+        server.connection.open()
+        try:
+            self._pump( client, server )
+            assert False, "Server failed to reject bad certificate."
+        except TransportException, e:
+            pass
+
+    def test_client_authentication_fail_no_cert(self):
+        """ Ensure that the server will fail a client that does not provide a
+        certificate.
+        """
+        # note: when requesting client auth, the server _must_ send its
+        # certificate, so make sure we configure one!
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        server = SslTest.SslTestConnection( self.server_domain )
+        server.ssl.set_peer_authentication( SSL.VERIFY_PEER,
+                                            self._testpath("ca-certificate.pem") )
+
+        client = SslTest.SslTestConnection( self.client_domain )
+        client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        client.connection.open()
+        server.connection.open()
+        try:
+            self._pump( client, server )
+            assert False, "Server failed to reject bad certificate."
+        except TransportException, e:
+            pass
 
     def test_client_server_authentication(self):
         """ Require both client and server to mutually identify themselves.
         """
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.server.set_peer_authentication( SSL.VERIFY_PEER,
-                                             self._testpath("ca-certificate.pem") )
-
-        self.client.set_credentials(self._testpath("client-certificate.pem"),
-                                    self._testpath("client-private-key.pem"),
-                                    "client-password")
-        self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
-        assert self.client.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+                                                            self._testpath("ca-certificate.pem") )
+
+        self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+                                           self._testpath("client-private-key.pem"),
+                                           "client-password")
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
     def test_server_only_authentication(self):
         """ Client verifies server, but server does not verify client.
         """
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        self.server.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
-        self.client.set_credentials(self._testpath("client-certificate.pem"),
-                                    self._testpath("client-private-key.pem"),
-                                    "client-password")
-        self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
-        assert self.client.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump()
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+                                           self._testpath("client-private-key.pem"),
+                                           "client-password")
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
     def test_bad_server_certificate(self):
         """ A server with a self-signed certificate that is not trusted by the
         client.  The client should reject the server.
         """
-        self.server.set_credentials(self._testpath("bad-server-certificate.pem"),
-                                    self._testpath("bad-server-private-key.pem"),
-                                    "server-password")
-        self.server.set_peer_authentication( SSL.ANONYMOUS_PEER )
-
-        self.client.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.client.set_peer_authentication( SSL.VERIFY_PEER )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
+        self.server_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
+                                           self._testpath("bad-server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        client.connection.open()
+        server.connection.open()
         try:
-            self._pump()
+            self._pump( client, server )
             assert False, "Client failed to reject bad certificate."
         except TransportException, e:
             pass
 
-    def test_allow_unsecured_client(self):
-        """ Client is allows to connect unsecured to secured client
-        """
-        self.t_client = Transport()
-
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.server.set_peer_authentication( SSL.VERIFY_PEER,
-                                             self._testpath("ca-certificate.pem") )
-        self.server.allow_unsecured_client()
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        self._pump()
+        del server
+        del client
 
-    def test_disallow_unsecured_client(self):
-        """ Client is disallowed from connecting unsecured to secured client
-        """
-        self.t_client = Transport()
+        # now re-try with a client that does not require peer verification
 
-        self.server.set_credentials(self._testpath("server-certificate.pem"),
-                                    self._testpath("server-private-key.pem"),
-                                    "server-password")
-        self.server.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        self.server.set_peer_authentication( SSL.VERIFY_PEER,
-                                             self._testpath("ca-certificate.pem") )
-
-        client_conn = Connection()
-        self.t_client.bind(client_conn)
-        server_conn = Connection()
-        self.t_server.bind(server_conn)
-        client_conn.open()
-        server_conn.open()
-        try:
-            self._pump()
-            assert False, "Expecting exception"
-        except TransportException:
-            assert True
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+        client.ssl.set_peer_authentication( SSL.ANONYMOUS_PEER )
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
-    def test_domain_server_authentication(self):
-        """ Simple SSL connection with authentication of the server
+    def test_allow_unsecured_client(self):
+        """ Server allows an unsecured client to connect if configured.
         """
-        server_domain = SSLDomain(SSL.MODE_SERVER)
-        server_domain.set_credentials(self._testpath("server-certificate.pem"),
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
                                            self._testpath("server-private-key.pem"),
                                            "server-password")
-        server_transport = Transport()
-        server_ssl = SSL( server_transport, server_domain)
-
-        client_domain = SSLDomain(SSL.MODE_CLIENT)
-        client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
-        client_transport = Transport()
-        client_ssl = SSL( client_transport, client_domain )
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+                                                            self._testpath("ca-certificate.pem") )
+        # allow unsecured clients on this connection
+        server = SslTest.SslTestConnection( self.server_domain )
+        server.ssl.allow_unsecured_client()
+
+        # non-ssl connection
+        client = SslTest.SslTestConnection()
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert server.ssl.protocol_name() is None
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
 
-        client_conn = Connection()
-        client_transport.bind(client_conn)
-
-        server_conn = Connection()
-        server_transport.bind(server_conn)
-
-        client_conn.open()
-        server_conn.open()
-        self._pump2( client_transport, server_transport )
-        assert client_ssl.protocol_name() is not None
-        client_conn.close()
-        server_conn.close()
-        self._pump2( client_transport, server_transport )
-
-
-    def test_domain_bad_server_certificate(self):
-        """ A server with a self-signed certificate that is not trusted by the
-        client.  The client should reject the server.
+    def test_disallow_unsecured_client(self):
+        """ Non-SSL Client is disallowed from connecting to server.
         """
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
+        server = SslTest.SslTestConnection( self.server_domain )
 
-        server_domain = SSLDomain(SSL.MODE_SERVER)
-        server_domain.set_credentials(self._testpath("bad-server-certificate.pem"),
-                                      self._testpath("bad-server-private-key.pem"),
-                                      "server-password")
-        server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
-
-        # by default, clients will allow connections to the server with the bad cert
-        client_domain = SSLDomain(SSL.MODE_CLIENT)
-        client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        client_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
-
-        # create a server listener
-        server_transport = Transport()
-        server_ssl = SSL( server_transport, server_domain)
-        server_conn = Connection()
-        server_transport.bind(server_conn)
-
-        # create a client using anonymous
-        bad_client_transport = Transport()
-        bad_client_ssl = SSL( bad_client_transport, client_domain )
-        bad_client_conn = Connection()
-        bad_client_transport.bind(bad_client_conn)
-
-        # client will allow connection to server:
-        bad_client_conn.open()
-        server_conn.open()
-        self._pump2( bad_client_transport, server_transport )
-        assert bad_client_ssl.protocol_name() is not None
-        bad_client_conn.close()
-        server_conn.close()
-
-        # now instantiate new client using same domain, but override to verify
-        # peer:
-        good_client_transport = Transport()
-        good_client_ssl = SSL( good_client_transport, client_domain )
-        good_client_ssl.set_peer_authentication( SSL.VERIFY_PEER )
-        good_client_conn = Connection()
-        good_client_transport.bind(good_client_conn)
-
-        server_transport = Transport()
-        server_ssl = SSL( server_transport, server_domain)
-        server_conn = Connection()
-        server_transport.bind(server_conn)
+        # non-ssl connection
+        client = SslTest.SslTestConnection()
 
-        good_client_conn.open()
-        server_conn.open()
+        client.connection.open()
+        server.connection.open()
         try:
-            self._pump2( good_client_transport, server_transport )
-            assert False, "Client failed to reject bad certificate."
-        except TransportException, e:
+            self._pump( client, server )
+            assert False, "Server did not reject client as expected."
+        except TransportException:
             pass
-        good_client_conn.close()
-        server_conn.close()
 
     def test_session_resume(self):
         """ Test resume of client session.
         """
-        # server config
-        server_domain = SSLDomain(SSL.MODE_SERVER)
-        server_domain.set_credentials(self._testpath("server-certificate.pem"),
-                                           self._testpath("server-private-key.pem"),
-                                           "server-password")
-        # client config
-        client_domain = SSLDomain(SSL.MODE_CLIENT)
-        client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
-        client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
-
-        # setup first client session
-        client_transport = Transport()
-        client_ssl = SSL( client_transport, client_domain )
-        client_conn = Connection()
-        client_transport.bind(client_conn)
-
-        # create server session to connect to
-        server_transport = Transport()
-        server_ssl = SSL( server_transport, server_domain)
-        server_conn = Connection()
-        server_transport.bind(server_conn)
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_default_peer_authentication( SSL.ANONYMOUS_PEER )
 
-        #client_transport.trace(Transport.TRACE_DRV)
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
 
         # bring up the connection and store its state
-        client_conn.open()
-        server_conn.open()
-        self._pump2( client_transport, server_transport )
-        assert client_ssl.protocol_name() is not None
-        ssl_state = client_ssl.get_state()
-        client_conn.close()
-        server_conn.close()
-        self._pump2( client_transport, server_transport )
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert client.ssl.protocol_name() is not None
+        ssl_state = client.ssl.get_state()
+
+        # cleanly shutdown the connection
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
+
+        # destroy the existing clients, and client domain
+        del client
+        del server
+        del self.client_domain
 
         # now create a new set of connections
-        client_transport = Transport()
-        client_ssl = SSL( client_transport, client_domain )
-        client_conn = Connection()
-        client_transport.bind(client_conn)
-
-        server_transport = Transport()
-        server_ssl = SSL( server_transport, server_domain)
-        server_conn = Connection()
-        server_transport.bind(server_conn)
-
-        client_ssl.resume_state( ssl_state )
-        client_conn.open()
-        server_conn.open()
-        self._pump2( client_transport, server_transport )
-        assert client_ssl.protocol_name() is not None
-        assert client_ssl.state_resumed_ok()
-        #newstate = client_ssl.get_state()
-
-        del ssl_state
-        client_conn.close()
-        server_conn.close()
-        self._pump2( client_transport, server_transport )
+        self.client_domain = SSLDomain(SSL.MODE_CLIENT)
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        server = SslTest.SslTestConnection( self.server_domain )
+        client = SslTest.SslTestConnection( self.client_domain )
+
+        # resume the client state before attempting to connect
+        client.ssl.resume_state( ssl_state )
+
+        #client.transport.trace(Transport.TRACE_DRV)
+        #server.transport.trace(Transport.TRACE_DRV)
+
+        client.connection.open()
+        server.connection.open()
+        self._pump( client, server )
+        assert server.ssl.protocol_name() is not None
+        assert client.ssl.state_resumed_ok()
+        client.connection.close()
+        server.connection.close()
+        self._pump( client, server )
+
+    def test_multiple_sessions(self):
+        """ Test multiple simultaineous active SSL sessions with bi-directional
+        certificate verification, shared across two domains.
+        """
+        self.server_domain.set_credentials(self._testpath("server-certificate.pem"),
+                                           self._testpath("server-private-key.pem"),
+                                           "server-password")
+        self.server_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.server_domain.set_default_peer_authentication( SSL.VERIFY_PEER,
+                                                            self._testpath("ca-certificate.pem") )
+
+        self.client_domain.set_credentials(self._testpath("client-certificate.pem"),
+                                           self._testpath("client-private-key.pem"),
+                                           "client-password")
+        self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
+        self.client_domain.set_default_peer_authentication( SSL.VERIFY_PEER )
+
+        max_count = 100
+        sessions = [(SslTest.SslTestConnection( self.server_domain ),
+                     SslTest.SslTestConnection( self.client_domain )) for x in
+                    range(max_count)]
+        for s in sessions:
+            s[0].connection.open()
+            self._pump( s[0], s[1] )
+
+        for s in sessions:
+            s[1].connection.open()
+            self._pump( s[1], s[0] )
+            assert s[0].ssl.cipher_name() is not None
+            assert s[1].ssl.cipher_name() == s[0].ssl.cipher_name()
+
+        for s in sessions:
+            s[1].connection.close()
+            self._pump( s[0], s[1] )
+
+        for s in sessions:
+            s[0].connection.close()
+            self._pump( s[1], s[0] )
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org