You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/01/05 20:27:51 UTC

svn commit: r1055594 - in /cassandra/trunk: drivers/py/cql/__init__.py drivers/py/cql/connection.py drivers/py/cql/connection_pool.py drivers/py/cql/errors.py test/system/test_cql.py

Author: eevans
Date: Wed Jan  5 19:27:50 2011
New Revision: 1055594

URL: http://svn.apache.org/viewvc?rev=1055594&view=rev
Log:
basic connection pooling for python driver

Patch by eevans; reviewed by gdusbabek for CASSANDRA-1711

Added:
    cassandra/trunk/drivers/py/cql/connection.py
    cassandra/trunk/drivers/py/cql/connection_pool.py
    cassandra/trunk/drivers/py/cql/errors.py
Modified:
    cassandra/trunk/drivers/py/cql/__init__.py
    cassandra/trunk/test/system/test_cql.py

Modified: cassandra/trunk/drivers/py/cql/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/__init__.py?rev=1055594&r1=1055593&r2=1055594&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cql/__init__.py (original)
+++ cassandra/trunk/drivers/py/cql/__init__.py Wed Jan  5 19:27:50 2011
@@ -1,79 +1,23 @@
 
-from os.path import exists, abspath, dirname, join
-from thrift.transport import TTransport, TSocket
-from thrift.protocol import TBinaryProtocol
-from thrift.Thrift import TApplicationException
-import zlib
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Cassandra Query Language driver
+"""
 
-try:
-    from cassandra import Cassandra
-    from cassandra.ttypes import Compression, InvalidRequestException, \
-                                 CqlResultType
-except ImportError:
-    # Hack to run from a source tree
-    import sys
-    sys.path.append(join(abspath(dirname(__file__)),
-                         '..',
-                         '..',
-                         '..',
-                         'interface',
-                         'thrift',
-                         'gen-py'))
-    from cassandra import Cassandra
-    from cassandra.ttypes import Compression, InvalidRequestException, \
-                          CqlResultType
-    
-COMPRESSION_SCHEMES = ['GZIP']
-DEFAULT_COMPRESSION = 'GZIP'
-
-class Connection(object):
-    def __init__(self, keyspace, host, port=9160):
-        socket = TSocket.TSocket(host, port)
-        self.transport = TTransport.TFramedTransport(socket)
-        protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
-        self.client = Cassandra.Client(protocol)
-        socket.open()
-
-        if keyspace:
-            self.execute('USE %s' % keyspace)
-
-    def execute(self, query, compression=None):
-        compress = compression is None and DEFAULT_COMPRESSION \
-                or compression.upper()
-    
-        compressed_query = Connection.compress_query(query, compress)
-        request_compression = getattr(Compression, compress)
-
-        try:
-            response = self.client.execute_cql_query(compressed_query,
-                                                     request_compression)
-        except InvalidRequestException, ire:
-            raise CQLException("Bad Request: %s" % ire.why)
-        except TApplicationException, tapp:
-            raise CQLException("Internal application error")
-        except Exception, exc:
-            raise CQLException(exc)
-
-        if response.type == CqlResultType.ROWS:
-            return response.rows
-        if response.type == CqlResultType.INT:
-            return response.num
-
-        return None
-
-    def close(self):
-        self.transport.close()
-
-    @classmethod
-    def compress_query(cls, query, compression):
-        if not compression in COMPRESSION_SCHEMES:
-            raise InvalidCompressionScheme(compression)
-
-        if compression == 'GZIP':
-            return zlib.compress(query)
-
-
-class InvalidCompressionScheme(Exception): pass
-class CQLException(Exception): pass
-
-# vi: ai ts=4 tw=0 sw=4 et
+from connection import Connection
+from connection_pool import ConnectionPool

Added: cassandra/trunk/drivers/py/cql/connection.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection.py (added)
+++ cassandra/trunk/drivers/py/cql/connection.py Wed Jan  5 19:27:50 2011
@@ -0,0 +1,121 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from os.path import exists, abspath, dirname, join
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.Thrift import TApplicationException
+from errors import CQLException, InvalidCompressionScheme
+import zlib
+
+try:
+    from cassandra import Cassandra
+    from cassandra.ttypes import Compression, InvalidRequestException, \
+                                 CqlResultType
+except ImportError:
+    # Hack to run from a source tree
+    import sys
+    sys.path.append(join(abspath(dirname(__file__)),
+                         '..',
+                         '..',
+                         '..',
+                         'interface',
+                         'thrift',
+                         'gen-py'))
+    from cassandra import Cassandra
+    from cassandra.ttypes import Compression, InvalidRequestException, \
+                          CqlResultType
+    
+COMPRESSION_SCHEMES = ['GZIP']
+DEFAULT_COMPRESSION = 'GZIP'
+
+__all__ = ['COMPRESSION_SCHEMES', 'DEFAULT_COMPRESSION', 'Connection']
+
+class Connection(object):
+    """
+    CQL connection object.
+    
+    Example usage:
+    >>> conn = Connection("localhost", keyspace="Keyspace1")
+    >>> r = conn.execute('SELECT "age" FROM Users')
+    >>> for row in r.rows:
+    ...     for column in row.columns:
+    ...         print "%s is %s years of age" % (r.key, column.age)
+    """
+    def __init__(self, host, port=9160, keyspace=None):
+        socket = TSocket.TSocket(host, port)
+        self.transport = TTransport.TFramedTransport(socket)
+        protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
+        self.client = Cassandra.Client(protocol)
+        socket.open()
+        
+        if keyspace:
+            self.execute('USE %s;' % keyspace)
+
+    def execute(self, query, compression=None):
+        """
+        Execute a CQL query on a remote node.
+        
+        Params:
+        * query .........: CQL query string.
+        * compression ...: Query compression type (optional).
+        """
+        compress = compression is None and DEFAULT_COMPRESSION \
+                or compression.upper()
+    
+        compressed_query = Connection.compress_query(query, compress)
+        request_compression = getattr(Compression, compress)
+
+        try:
+            response = self.client.execute_cql_query(compressed_query,
+                                                     request_compression)
+        except InvalidRequestException, ire:
+            raise CQLException("Bad Request: %s" % ire.why)
+        except TApplicationException, tapp:
+            raise CQLException("Internal application error")
+        except Exception, exc:
+            raise CQLException(exc)
+
+        if response.type == CqlResultType.ROWS:
+            return response.rows
+        if response.type == CqlResultType.INT:
+            return response.num
+
+        return None
+
+    def close(self):
+        self.transport.close()
+        
+    def is_open(self):
+        return self.transport.isOpen()
+
+    @classmethod
+    def compress_query(cls, query, compression):
+        """
+        Returns a query string compressed with the specified compression type.
+        
+        Params:
+        * query .........: The query string to compress.
+        * compression ...: Type of compression to use.
+        """
+        if not compression in COMPRESSION_SCHEMES:
+            raise InvalidCompressionScheme(compression)
+
+        if compression == 'GZIP':
+            return zlib.compress(query)
+
+# vi: ai ts=4 tw=0 sw=4 et

Added: cassandra/trunk/drivers/py/cql/connection_pool.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection_pool.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection_pool.py (added)
+++ cassandra/trunk/drivers/py/cql/connection_pool.py Wed Jan  5 19:27:50 2011
@@ -0,0 +1,92 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from Queue import Queue, Empty
+from threading import Thread
+from time import sleep
+from connection import Connection
+
+__all__ = ['ConnectionPool']
+
+class ConnectionPool(object):
+    """
+    Simple connection-caching pool implementation.
+
+    ConnectionPool provides the simplest possible connection pooling,
+    lazily creating new connections if needed as `borrow_connection' is
+    called.  Connections are re-added to the pool by `return_connection',
+    unless doing so would exceed the maximum pool size.
+    
+    Example usage:
+    >>> pool = ConnectionPool("localhost", 9160, "Keyspace1")
+    >>> conn = pool.borrow_connection()
+    >>> conn.execute(...)
+    >>> pool.return_connection(conn)
+    """
+    def __init__(self, hostname, port=9160, keyspace=None, max_conns=25,
+                 max_idle=5, eviction_delay=10000):
+        self.hostname = hostname
+        self.port = port
+        self.keyspace = keyspace
+        self.max_conns = max_conns
+        self.max_idle = max_idle
+        self.eviction_delay = eviction_delay
+        
+        self.connections = Queue()
+        self.connections.put(Connection(hostname, port, keyspace))
+        self.eviction = Eviction(self.connections,
+                                 self.max_idle,
+                                 self.eviction_delay)
+        
+    def borrow_connection(self):
+        try:
+            connection = self.connections.get(block=False)
+        except Empty:
+            connection = Connection(self.hostname, self.port, self.keyspace)
+        return connection
+    
+    def return_connection(self, connection):
+        if self.connections.qsize() > self.max_conns:
+            connection.close()
+            return
+        if not connection.is_open():
+            return
+        self.connections.put(connection)
+
+class Eviction(Thread):
+    def __init__(self, connections, max_idle, eviction_delay):
+        Thread.__init__(self)
+        
+        self.connections = connections
+        self.max_idle = max_idle
+        self.eviction_delay = eviction_delay
+        
+        self.setDaemon(True)
+        self.setName("EVICTION-THREAD")
+        self.start()
+        
+    def run(self):
+        while(True):
+            while(self.connections.qsize() > self.max_idle):
+                connection = self.connections.get(block=False)
+                if connection:
+                    if connection.is_open():
+                        connection.close()
+            sleep(self.eviction_delay/1000)
+        
+        
+        

Added: cassandra/trunk/drivers/py/cql/errors.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/errors.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/errors.py (added)
+++ cassandra/trunk/drivers/py/cql/errors.py Wed Jan  5 19:27:50 2011
@@ -0,0 +1,21 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__all__ = ['InvalidCompressionScheme', 'CQLException']
+
+class InvalidCompressionScheme(Exception): pass
+class CQLException(Exception): pass

Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1055594&r1=1055593&r2=1055594&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Wed Jan  5 19:27:50 2011
@@ -4,7 +4,8 @@ import sys
 
 sys.path.append(join(abspath(dirname(__file__)), '../../drivers/py'))
 
-from cql import Connection, CQLException
+from cql import Connection
+from cql.errors import CQLException
 from . import ThriftTester
 from avro_utils import assert_raises
 
@@ -45,7 +46,7 @@ def load_sample(dbconn):
     """)
 
 def init(keyspace="Keyspace1"):
-    dbconn = Connection(keyspace, 'localhost', 9170)
+    dbconn = Connection('localhost', 9170, keyspace)
     load_sample(dbconn)
     return dbconn