You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/31 23:17:11 UTC
svn commit: r929717 - in /qpid/trunk/qpid/python: examples/api/
qpid/messaging/ qpid/tests/messaging/
Author: rhs
Date: Wed Mar 31 21:17:09 2010
New Revision: 929717
URL: http://svn.apache.org/viewvc?rev=929717&view=rev
Log:
added SSL support to API
Added:
qpid/trunk/qpid/python/qpid/messaging/transports.py (with props)
Modified:
qpid/trunk/qpid/python/examples/api/drain
qpid/trunk/qpid/python/examples/api/server
qpid/trunk/qpid/python/examples/api/spout
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/message.py
Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Wed Mar 31 21:17:09 2010
@@ -73,7 +73,7 @@ class Formatter:
return eval(st, self.environ)
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Wed Mar 31 21:17:09 2010
@@ -51,7 +51,7 @@ else:
parser.error("address is required")
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Wed Mar 31 21:17:09 2010
@@ -93,7 +93,7 @@ else:
content = text
# XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
username=url.user,
password=url.password,
reconnect=opts.reconnect,
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Mar 31 21:17:09 2010
@@ -26,13 +26,12 @@ from qpid.datatypes import RangedSet, Se
from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
-from qpid.messaging import address
+from qpid.messaging import address, transports
from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
-from qpid.util import connect
from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
@@ -328,7 +327,7 @@ class Driver:
self.connection.backups
self._host = 0
self._retrying = False
- self._socket = None
+ self._transport = None
self._timeout = None
@@ -346,15 +345,17 @@ class Driver:
self._selector.unregister(self)
def fileno(self):
- return self._socket.fileno()
+ return self._transport.fileno()
@synchronized
def reading(self):
- return self._socket is not None
+ return self._transport is not None and \
+ self._transport.reading(True)
@synchronized
def writing(self):
- return self._socket is not None and self.engine.pending()
+ return self._transport is not None and \
+ self._transport.writing(self.engine.pending())
@synchronized
def timing(self):
@@ -363,8 +364,10 @@ class Driver:
@synchronized
def readable(self):
try:
- data = self._socket.recv(64*1024)
- if data:
+ data = self._transport.recv(64*1024)
+ if data is None:
+ return
+ elif data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
self.engine.write(data)
else:
@@ -404,8 +407,8 @@ class Driver:
def st_closed(self):
# XXX: this log statement seems to sometimes hit when the socket is not connected
# XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
- self._socket.close()
- self._socket = None
+ self._transport.close()
+ self._transport = None
self.engine = None
return True
@@ -416,7 +419,8 @@ class Driver:
def writeable(self):
notify = False
try:
- n = self._socket.send(self.engine.peek())
+ n = self._transport.send(self.engine.peek())
+ if n == 0: return
sent = self.engine.read(n)
rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
@@ -433,7 +437,7 @@ class Driver:
def dispatch(self):
try:
- if self._socket is None:
+ if self._transport is None:
if self.connection._connected:
self.connect()
else:
@@ -454,7 +458,11 @@ class Driver:
self.engine = Engine(self.connection)
self.engine.open()
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
- self._socket = connect(host, port)
+ trans = getattr(transports, self.connection.transport, None)
+ if trans:
+ self._transport = trans(host, port)
+ else:
+ raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying:
log.warn("reconnect succeeded: %s:%s", host, port)
self._timeout = None
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Mar 31 21:17:09 2010
@@ -79,7 +79,6 @@ class Connection:
@return: a disconnected Connection
"""
self.host = host
- self.port = default(port, AMQP_PORT)
self.username = username
self.password = password
self.mechanisms = options.get("mechanisms")
@@ -87,9 +86,15 @@ class Connection:
self.reconnect = options.get("reconnect", False)
self.reconnect_delay = options.get("reconnect_delay", 3)
self.reconnect_limit = options.get("reconnect_limit")
+ self.transport = options.get("transport", "plain")
self.backups = options.get("backups", [])
self.options = options
+ if self.transport == "tls":
+ self.port = default(port, AMQPS_PORT)
+ else:
+ self.port = default(port, AMQP_PORT)
+
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
Added: qpid/trunk/qpid/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/transports.py?rev=929717&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/transports.py (added)
+++ qpid/trunk/qpid/python/qpid/messaging/transports.py Wed Mar 31 21:17:09 2010
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.util import connect
+
+class plain:
+
+ def __init__(self, host, port):
+ self.socket = connect(host, port)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self, reading):
+ return reading
+
+ def writing(self, writing):
+ return writing
+
+ def send(self, bytes):
+ return self.socket.send(bytes)
+
+ def recv(self, n):
+ return self.socket.recv(n)
+
+ def close(self):
+ self.socket.close()
+
+try:
+ from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
+ SSL_ERROR_WANT_WRITE
+except ImportError:
+ pass
+else:
+ class tls:
+
+ def __init__(self, host, port):
+ self.socket = connect(host, port)
+ self.tls = wrap_socket(self.socket)
+ self.socket.setblocking(0)
+ self.state = None
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self, reading):
+ if self.state is None:
+ return reading
+ else:
+ return self.state == SSL_ERROR_WANT_READ
+
+ def writing(self, writing):
+ if self.state is None:
+ return writing
+ else:
+ return self.state == SSL_ERROR_WANT_WRITE
+
+ def send(self, bytes):
+ self._clear_state()
+ try:
+ return self.tls.write(bytes)
+ except SSLError, e:
+ if self._update_state(e.args[0]):
+ return 0
+ else:
+ raise
+
+ def recv(self, n):
+ self._clear_state()
+ try:
+ return self.tls.read(n)
+ except SSLError, e:
+ if self._update_state(e.args[0]):
+ return None
+ else:
+ raise
+
+ def _clear_state(self):
+ self.state = None
+
+ def _update_state(self, code):
+ if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE):
+ self.state = code
+ return True
+ else:
+ return False
+
+ def close(self):
+ self.socket.setblocking(1)
+ # this closes the underlying socket
+ self.tls.close()
Propchange: qpid/trunk/qpid/python/qpid/messaging/transports.py
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Wed Mar 31 21:17:09 2010
@@ -143,4 +143,15 @@ class Base(Test):
def reconnect(self):
return self.get_bool("reconnect")
+
+ def transport(self):
+ if self.broker.scheme == self.broker.AMQPS:
+ return "tls"
+ else:
+ return "plain"
+
+ def connection_options(self):
+ return {"reconnect": self.reconnect(),
+ "transport": self.transport()}
+
import address, endpoints, message
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Wed Mar 31 21:17:09 2010
@@ -30,13 +30,13 @@ class SetupTests(Base):
def testOpen(self):
# XXX: need to flesh out URL support/syntax
self.conn = Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
self.ping(self.conn.session())
def testConnect(self):
# XXX: need to flesh out URL support/syntax
self.conn = Connection(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
self.conn.connect()
self.ping(self.conn.session())
@@ -65,7 +65,8 @@ class SetupTests(Base):
for i in range(32):
if fds: os.close(fds.pop())
for i in xrange(64):
- conn = Connection.open(self.broker.host, self.broker.port)
+ conn = Connection.open(self.broker.host, self.broker.port,
+ **self.connection_options())
conn.close()
finally:
while fds:
@@ -75,7 +76,7 @@ class ConnectionTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -118,7 +119,7 @@ class SessionTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -405,7 +406,7 @@ class ReceiverTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -575,7 +576,7 @@ class AddressTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -846,7 +847,7 @@ class AddressErrorTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
@@ -913,7 +914,7 @@ class SenderTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Wed Mar 31 21:17:09 2010
@@ -54,7 +54,7 @@ class MessageEchoTests(Base):
def setup_connection(self):
return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
+ **self.connection_options())
def setup_session(self):
return self.conn.session()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org