You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/31 23:17:11 UTC

svn commit: r929717 - in /qpid/trunk/qpid/python: examples/api/ qpid/messaging/ qpid/tests/messaging/

Author: rhs
Date: Wed Mar 31 21:17:09 2010
New Revision: 929717

URL: http://svn.apache.org/viewvc?rev=929717&view=rev
Log:
added SSL support to API

Added:
    qpid/trunk/qpid/python/qpid/messaging/transports.py   (with props)
Modified:
    qpid/trunk/qpid/python/examples/api/drain
    qpid/trunk/qpid/python/examples/api/server
    qpid/trunk/qpid/python/examples/api/spout
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/message.py

Modified: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (original)
+++ qpid/trunk/qpid/python/examples/api/drain Wed Mar 31 21:17:09 2010
@@ -73,7 +73,7 @@ class Formatter:
     return eval(st, self.environ)
 
 # XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,

Modified: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (original)
+++ qpid/trunk/qpid/python/examples/api/server Wed Mar 31 21:17:09 2010
@@ -51,7 +51,7 @@ else:
   parser.error("address is required")
 
 # XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,

Modified: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (original)
+++ qpid/trunk/qpid/python/examples/api/spout Wed Mar 31 21:17:09 2010
@@ -93,7 +93,7 @@ else:
   content = text
 
 # XXX: should make URL default the port for us
-conn = Connection(url.host, url.port or AMQP_PORT,
+conn = Connection(url.host, url.port,
                   username=url.user,
                   password=url.password,
                   reconnect=opts.reconnect,

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Mar 31 21:17:09 2010
@@ -26,13 +26,12 @@ from qpid.datatypes import RangedSet, Se
 from qpid.exceptions import Timeout, VersionError
 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
     FrameDecoder, SegmentDecoder, OpDecoder
-from qpid.messaging import address
+from qpid.messaging import address, transports
 from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
 from qpid.messaging.exceptions import ConnectError
 from qpid.messaging.message import get_codec, Disposition, Message
 from qpid.ops import *
 from qpid.selector import Selector
-from qpid.util import connect
 from qpid.validator import And, Context, List, Map, Types, Values
 from threading import Condition, Thread
 
@@ -328,7 +327,7 @@ class Driver:
         self.connection.backups
     self._host = 0
     self._retrying = False
-    self._socket = None
+    self._transport = None
 
     self._timeout = None
 
@@ -346,15 +345,17 @@ class Driver:
     self._selector.unregister(self)
 
   def fileno(self):
-    return self._socket.fileno()
+    return self._transport.fileno()
 
   @synchronized
   def reading(self):
-    return self._socket is not None
+    return self._transport is not None and \
+        self._transport.reading(True)
 
   @synchronized
   def writing(self):
-    return self._socket is not None and self.engine.pending()
+    return self._transport is not None and \
+        self._transport.writing(self.engine.pending())
 
   @synchronized
   def timing(self):
@@ -363,8 +364,10 @@ class Driver:
   @synchronized
   def readable(self):
     try:
-      data = self._socket.recv(64*1024)
-      if data:
+      data = self._transport.recv(64*1024)
+      if data is None:
+        return
+      elif data:
         rawlog.debug("READ[%s]: %r", self.log_id, data)
         self.engine.write(data)
       else:
@@ -404,8 +407,8 @@ class Driver:
   def st_closed(self):
     # XXX: this log statement seems to sometimes hit when the socket is not connected
     # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
-    self._socket.close()
-    self._socket = None
+    self._transport.close()
+    self._transport = None
     self.engine = None
     return True
 
@@ -416,7 +419,8 @@ class Driver:
   def writeable(self):
     notify = False
     try:
-      n = self._socket.send(self.engine.peek())
+      n = self._transport.send(self.engine.peek())
+      if n == 0: return
       sent = self.engine.read(n)
       rawlog.debug("SENT[%s]: %r", self.log_id, sent)
     except socket.error, e:
@@ -433,7 +437,7 @@ class Driver:
 
   def dispatch(self):
     try:
-      if self._socket is None:
+      if self._transport is None:
         if self.connection._connected:
           self.connect()
       else:
@@ -454,7 +458,11 @@ class Driver:
       self.engine = Engine(self.connection)
       self.engine.open()
       rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
-      self._socket = connect(host, port)
+      trans = getattr(transports, self.connection.transport, None)
+      if trans:
+        self._transport = trans(host, port)
+      else:
+        raise ConnectError("no such transport: %s" % self.connection.transport)
       if self._retrying:
         log.warn("reconnect succeeded: %s:%s", host, port)
       self._timeout = None

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Wed Mar 31 21:17:09 2010
@@ -79,7 +79,6 @@ class Connection:
     @return: a disconnected Connection
     """
     self.host = host
-    self.port = default(port, AMQP_PORT)
     self.username = username
     self.password = password
     self.mechanisms = options.get("mechanisms")
@@ -87,9 +86,15 @@ class Connection:
     self.reconnect = options.get("reconnect", False)
     self.reconnect_delay = options.get("reconnect_delay", 3)
     self.reconnect_limit = options.get("reconnect_limit")
+    self.transport = options.get("transport", "plain")
     self.backups = options.get("backups", [])
     self.options = options
 
+    if self.transport == "tls":
+      self.port = default(port, AMQPS_PORT)
+    else:
+      self.port = default(port, AMQP_PORT)
+
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}

Added: qpid/trunk/qpid/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/transports.py?rev=929717&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/transports.py (added)
+++ qpid/trunk/qpid/python/qpid/messaging/transports.py Wed Mar 31 21:17:09 2010
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.util import connect
+
+class plain:
+
+  def __init__(self, host, port):
+    self.socket = connect(host, port)
+
+  def fileno(self):
+    return self.socket.fileno()
+
+  def reading(self, reading):
+    return reading
+
+  def writing(self, writing):
+    return writing
+
+  def send(self, bytes):
+    return self.socket.send(bytes)
+
+  def recv(self, n):
+    return self.socket.recv(n)
+
+  def close(self):
+    self.socket.close()
+
+try:
+  from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
+      SSL_ERROR_WANT_WRITE
+except ImportError:
+  pass
+else:
+  class tls:
+
+    def __init__(self, host, port):
+      self.socket = connect(host, port)
+      self.tls = wrap_socket(self.socket)
+      self.socket.setblocking(0)
+      self.state = None
+
+    def fileno(self):
+      return self.socket.fileno()
+
+    def reading(self, reading):
+      if self.state is None:
+        return reading
+      else:
+        return self.state == SSL_ERROR_WANT_READ
+
+    def writing(self, writing):
+      if self.state is None:
+        return writing
+      else:
+        return self.state == SSL_ERROR_WANT_WRITE
+
+    def send(self, bytes):
+      self._clear_state()
+      try:
+        return self.tls.write(bytes)
+      except SSLError, e:
+        if self._update_state(e.args[0]):
+          return 0
+        else:
+          raise
+
+    def recv(self, n):
+      self._clear_state()
+      try:
+        return self.tls.read(n)
+      except SSLError, e:
+        if self._update_state(e.args[0]):
+          return None
+        else:
+          raise
+
+    def _clear_state(self):
+      self.state = None
+
+    def _update_state(self, code):
+      if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE):
+        self.state = code
+        return True
+      else:
+        return False
+
+    def close(self):
+      self.socket.setblocking(1)
+      # this closes the underlying socket
+      self.tls.close()

Propchange: qpid/trunk/qpid/python/qpid/messaging/transports.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Wed Mar 31 21:17:09 2010
@@ -143,4 +143,15 @@ class Base(Test):
   def reconnect(self):
     return self.get_bool("reconnect")
 
+
+  def transport(self):
+    if self.broker.scheme == self.broker.AMQPS:
+      return "tls"
+    else:
+      return "plain"
+
+  def connection_options(self):
+    return {"reconnect": self.reconnect(),
+            "transport": self.transport()}
+
 import address, endpoints, message

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Wed Mar 31 21:17:09 2010
@@ -30,13 +30,13 @@ class SetupTests(Base):
   def testOpen(self):
     # XXX: need to flesh out URL support/syntax
     self.conn = Connection.open(self.broker.host, self.broker.port,
-                                reconnect=self.reconnect())
+                                **self.connection_options())
     self.ping(self.conn.session())
 
   def testConnect(self):
     # XXX: need to flesh out URL support/syntax
     self.conn = Connection(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
     self.conn.connect()
     self.ping(self.conn.session())
 
@@ -65,7 +65,8 @@ class SetupTests(Base):
       for i in range(32):
         if fds: os.close(fds.pop())
       for i in xrange(64):
-        conn = Connection.open(self.broker.host, self.broker.port)
+        conn = Connection.open(self.broker.host, self.broker.port,
+                               **self.connection_options())
         conn.close()
     finally:
       while fds:
@@ -75,7 +76,7 @@ class ConnectionTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def testSessionAnon(self):
     ssn1 = self.conn.session()
@@ -118,7 +119,7 @@ class SessionTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -405,7 +406,7 @@ class ReceiverTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -575,7 +576,7 @@ class AddressTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -846,7 +847,7 @@ class AddressErrorTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()
@@ -913,7 +914,7 @@ class SenderTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/message.py?rev=929717&r1=929716&r2=929717&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/message.py Wed Mar 31 21:17:09 2010
@@ -54,7 +54,7 @@ class MessageEchoTests(Base):
 
   def setup_connection(self):
     return Connection.open(self.broker.host, self.broker.port,
-                           reconnect=self.reconnect())
+                           **self.connection_options())
 
   def setup_session(self):
     return self.conn.session()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org