You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/29 20:41:49 UTC
[2/2] qpid-proton git commit: Added ssl support
Added ssl support
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/dcec9ff0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/dcec9ff0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/dcec9ff0
Branch: refs/heads/master
Commit: dcec9ff064cc7cae88b6a44e497c04c9cbeaece7
Parents: 93095ed
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 28 21:53:17 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Thu Jan 29 19:42:22 2015 +0000
----------------------------------------------------------------------
proton-c/bindings/python/proton/reactors.py | 72 +++++++++++++++++-------
proton-c/bindings/python/proton/utils.py | 4 +-
2 files changed, 55 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index c5eddd9..463bf2d 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -20,7 +20,7 @@ import logging, os, Queue, socket, time, types
from heapq import heappush, heappop, nsmallest
from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
-from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
+from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, symbol
from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
from select import select
from proton.handlers import OutgoingMessageHandler, ScopedHandler
@@ -45,7 +45,9 @@ class AmqpSocket(object):
self.read_done = False
self._closed = False
- def accept(self, force_sasl=True):
+ def accept(self, force_sasl=True, ssl_domain=None):
+ if ssl_domain:
+ self.ssl = SSL(self.transport, ssl_domain)
if force_sasl:
sasl = self.transport.sasl()
sasl.mechanisms("ANONYMOUS")
@@ -54,7 +56,10 @@ class AmqpSocket(object):
#TODO: use SASL anyway if requested by peer
return self
- def connect(self, host, port=None, username=None, password=None, force_sasl=True):
+ def connect(self, host, port=None, username=None, password=None, force_sasl=True, ssl_domain=None):
+ if ssl_domain:
+ self.ssl = SSL(self.transport, ssl_domain)
+ self.ssl.peer_hostname = host
if username and password:
sasl = self.transport.sasl()
sasl.plain(username, password)
@@ -165,7 +170,7 @@ class AmqpAcceptor:
itself be added to an io loop.
"""
- def __init__(self, events, loop, host, port):
+ def __init__(self, events, loop, host, port, ssl_domain=None):
self.events = events
self.loop = loop
self.socket = socket.socket()
@@ -173,6 +178,7 @@ class AmqpAcceptor:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((host, port))
self.socket.listen(5)
+ self.ssl_domain = ssl_domain
self.loop.add(self)
self._closed = False
@@ -198,7 +204,7 @@ class AmqpAcceptor:
def readable(self):
sock, addr = self.socket.accept()
if sock:
- self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept())
+ self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept(ssl_domain=self.ssl_domain))
def removed(self): pass
def tick(self): return None
@@ -634,16 +640,27 @@ class Connector(Handler):
Internal handler that triggers the necessary socket connect for an
opened connection.
"""
- def __init__(self, loop):
+ def __init__(self, loop, ssl_domain=None):
self.loop = loop
+ self.ssl_domain = ssl_domain
+
+ def _get_ssl_domain(self, connection, scheme):
+ if hasattr(connection, 'ssl_domain'):
+ return connection.ssl_domain
+ elif scheme == 'amqps':
+ return self.ssl_domain
+ else:
+ return None
def _connect(self, connection):
- host, port = connection.address.next()
- logging.info("connecting to %s:%i" % (host, port))
+ url = connection.address.next()
+ logging.info("connecting to %s:%i" % (url.host, url.port))
heartbeat = None
if hasattr(connection, 'heartbeat'):
heartbeat = connection.heartbeat
- self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port))
+ s = AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat)
+ s.connect(url.host, url.port, username=url.username, password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme))
+ self.loop.add(s)
connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
def on_connection_local_open(self, event):
@@ -698,26 +715,38 @@ class Urls(object):
def __iter__(self):
return self
- def _as_pair(self, url):
- return (url.host, url.port)
-
def next(self):
try:
- return self._as_pair(self.i.next())
+ return self.i.next()
except StopIteration:
self.i = iter(self.values)
- return self._as_pair(self.i.next())
+ return self.i.next()
+
+class SSLConfig(object):
+ def __init__(self):
+ self.client = SSLDomain(SSLDomain.MODE_CLIENT)
+ self.server = SSLDomain(SSLDomain.MODE_SERVER)
+
+ def set_credentials(self, cert_file, key_file, password):
+ self.client.set_credentials(cert_file, key_file, password)
+ self.server.set_credentials(cert_file, key_file, password)
+
+ def set_trusted_ca_db(self, certificate_db):
+ self.client.set_trusted_ca_db(certificate_db)
+ self.server.set_trusted_ca_db(certificate_db)
+
class Container(object):
def __init__(self, *handlers):
- h = [Connector(self), ScopedHandler()]
+ self.ssl = SSLConfig()
+ h = [Connector(self, self.ssl.client), ScopedHandler()]
h.extend(handlers)
self.events = Events(*h)
self.loop = SelectLoop(self.events)
self.trigger = None
self.container_id = str(generate_uuid())
- def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None):
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
conn = self.events.connection()
conn._pin = conn #circular reference until the open event gets handled
if handler:
@@ -732,6 +761,8 @@ class Container(object):
conn.reconnect = reconnect
elif reconnect is None:
conn.reconnect = Backoff()
+ if ssl_domain:
+ conn.ssl_domain = ssl_domain
conn._session_policy = SessionPerConnection() #todo: make configurable
conn.open()
return conn
@@ -800,9 +831,12 @@ class Container(object):
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
return Transaction(context._txn_ctrl, handler, settle_before_discharge)
- def listen(self, url):
- host, port = Urls([url]).next()
- return AmqpAcceptor(self.events, self, host, port)
+ def listen(self, url, ssl_domain=None):
+ url = Urls([url]).next()
+ ssl_config = ssl_domain
+ if not ssl_config and url.scheme == 'amqps':
+ ssl_config = self.ssl_domain
+ return AmqpAcceptor(self.events, self, url.host, url.port, ssl_domain=ssl_config)
def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dcec9ff0/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 4c9d509..d2ece79 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -139,11 +139,11 @@ class BlockingConnection(Handler):
"""
A synchronous style connection wrapper.
"""
- def __init__(self, url, timeout=None, container=None):
+ def __init__(self, url, timeout=None, container=None, ssl_domain=None):
self.timeout = timeout
self.container = container or Container()
self.url = Url(utf8(url)).defaults()
- self.conn = self.container.connect(url=self.url, handler=self)
+ self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain)
self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
msg="Opening connection")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org