You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/29 20:41:49 UTC

[2/2] qpid-proton git commit: Added ssl support

Added ssl support


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/dcec9ff0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/dcec9ff0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/dcec9ff0

Branch: refs/heads/master
Commit: dcec9ff064cc7cae88b6a44e497c04c9cbeaece7
Parents: 93095ed
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 28 21:53:17 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Thu Jan 29 19:42:22 2015 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton/reactors.py | 72 +++++++++++++++++-------
 proton-c/bindings/python/proton/utils.py    |  4 +-
 2 files changed, 55 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index c5eddd9..463bf2d 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -20,7 +20,7 @@ import logging, os, Queue, socket, time, types
 from heapq import heappush, heappop, nsmallest
 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
-from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
+from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, symbol
 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
 from select import select
 from proton.handlers import OutgoingMessageHandler, ScopedHandler
@@ -45,7 +45,9 @@ class AmqpSocket(object):
         self.read_done = False
         self._closed = False
 
-    def accept(self, force_sasl=True):
+    def accept(self, force_sasl=True, ssl_domain=None):
+        if ssl_domain:
+            self.ssl = SSL(self.transport, ssl_domain)
         if force_sasl:
             sasl = self.transport.sasl()
             sasl.mechanisms("ANONYMOUS")
@@ -54,7 +56,10 @@ class AmqpSocket(object):
         #TODO: use SASL anyway if requested by peer
         return self
 
-    def connect(self, host, port=None, username=None, password=None, force_sasl=True):
+    def connect(self, host, port=None, username=None, password=None, force_sasl=True, ssl_domain=None):
+        if ssl_domain:
+            self.ssl = SSL(self.transport, ssl_domain)
+            self.ssl.peer_hostname = host
         if username and password:
             sasl = self.transport.sasl()
             sasl.plain(username, password)
@@ -165,7 +170,7 @@ class AmqpAcceptor:
     itself be added to an io loop.
     """
 
-    def __init__(self, events, loop, host, port):
+    def __init__(self, events, loop, host, port, ssl_domain=None):
         self.events = events
         self.loop = loop
         self.socket = socket.socket()
@@ -173,6 +178,7 @@ class AmqpAcceptor:
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.socket.bind((host, port))
         self.socket.listen(5)
+        self.ssl_domain = ssl_domain
         self.loop.add(self)
         self._closed = False
 
@@ -198,7 +204,7 @@ class AmqpAcceptor:
     def readable(self):
         sock, addr = self.socket.accept()
         if sock:
-            self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept())
+            self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept(ssl_domain=self.ssl_domain))
 
     def removed(self): pass
     def tick(self): return None
@@ -634,16 +640,27 @@ class Connector(Handler):
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
-    def __init__(self, loop):
+    def __init__(self, loop, ssl_domain=None):
         self.loop = loop
+        self.ssl_domain = ssl_domain
+
+    def _get_ssl_domain(self, connection, scheme):
+        if hasattr(connection, 'ssl_domain'):
+            return connection.ssl_domain
+        elif scheme == 'amqps':
+            return self.ssl_domain
+        else:
+            return None
 
     def _connect(self, connection):
-        host, port = connection.address.next()
-        logging.info("connecting to %s:%i" % (host, port))
+        url = connection.address.next()
+        logging.info("connecting to %s:%i" % (url.host, url.port))
         heartbeat = None
         if hasattr(connection, 'heartbeat'):
             heartbeat = connection.heartbeat
-        self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port))
+        s = AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat)
+        s.connect(url.host, url.port, username=url.username, password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme))
+        self.loop.add(s)
         connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
 
     def on_connection_local_open(self, event):
@@ -698,26 +715,38 @@ class Urls(object):
     def __iter__(self):
         return self
 
-    def _as_pair(self, url):
-        return (url.host, url.port)
-
     def next(self):
         try:
-            return self._as_pair(self.i.next())
+            return self.i.next()
         except StopIteration:
             self.i = iter(self.values)
-            return self._as_pair(self.i.next())
+            return self.i.next()
+
+class SSLConfig(object):
+    def __init__(self):
+        self.client = SSLDomain(SSLDomain.MODE_CLIENT)
+        self.server = SSLDomain(SSLDomain.MODE_SERVER)
+
+    def set_credentials(self, cert_file, key_file, password):
+        self.client.set_credentials(cert_file, key_file, password)
+        self.server.set_credentials(cert_file, key_file, password)
+
+    def set_trusted_ca_db(self, certificate_db):
+        self.client.set_trusted_ca_db(certificate_db)
+        self.server.set_trusted_ca_db(certificate_db)
+
 
 class Container(object):
     def __init__(self, *handlers):
-        h = [Connector(self), ScopedHandler()]
+        self.ssl = SSLConfig()
+        h = [Connector(self, self.ssl.client), ScopedHandler()]
         h.extend(handlers)
         self.events = Events(*h)
         self.loop = SelectLoop(self.events)
         self.trigger = None
         self.container_id = str(generate_uuid())
 
-    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None):
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
         conn = self.events.connection()
         conn._pin = conn #circular reference until the open event gets handled
         if handler:
@@ -732,6 +761,8 @@ class Container(object):
             conn.reconnect = reconnect
         elif reconnect is None:
             conn.reconnect = Backoff()
+        if ssl_domain:
+            conn.ssl_domain = ssl_domain
         conn._session_policy = SessionPerConnection() #todo: make configurable
         conn.open()
         return conn
@@ -800,9 +831,12 @@ class Container(object):
             context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
         return Transaction(context._txn_ctrl, handler, settle_before_discharge)
 
-    def listen(self, url):
-        host, port = Urls([url]).next()
-        return AmqpAcceptor(self.events, self, host, port)
+    def listen(self, url, ssl_domain=None):
+        url = Urls([url]).next()
+        ssl_config = ssl_domain
+        if not ssl_config and url.scheme == 'amqps':
+            ssl_config = self.ssl_domain
+        return AmqpAcceptor(self.events, self, url.host, url.port, ssl_domain=ssl_config)
 
     def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
         self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 4c9d509..d2ece79 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -139,11 +139,11 @@ class BlockingConnection(Handler):
     """
     A synchronous style connection wrapper.
     """
-    def __init__(self, url, timeout=None, container=None):
+    def __init__(self, url, timeout=None, container=None, ssl_domain=None):
         self.timeout = timeout
         self.container = container or Container()
         self.url = Url(utf8(url)).defaults()
-        self.conn = self.container.connect(url=self.url, handler=self)
+        self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain)
         self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                   msg="Opening connection")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org