You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/01/05 20:27:51 UTC
svn commit: r1055594 - in /cassandra/trunk: drivers/py/cql/__init__.py
drivers/py/cql/connection.py drivers/py/cql/connection_pool.py
drivers/py/cql/errors.py test/system/test_cql.py
Author: eevans
Date: Wed Jan 5 19:27:50 2011
New Revision: 1055594
URL: http://svn.apache.org/viewvc?rev=1055594&view=rev
Log:
basic connection pooling for python driver
Patch by eevans; reviewed by gdusbabek for CASSANDRA-1711
Added:
cassandra/trunk/drivers/py/cql/connection.py
cassandra/trunk/drivers/py/cql/connection_pool.py
cassandra/trunk/drivers/py/cql/errors.py
Modified:
cassandra/trunk/drivers/py/cql/__init__.py
cassandra/trunk/test/system/test_cql.py
Modified: cassandra/trunk/drivers/py/cql/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/__init__.py?rev=1055594&r1=1055593&r2=1055594&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cql/__init__.py (original)
+++ cassandra/trunk/drivers/py/cql/__init__.py Wed Jan 5 19:27:50 2011
@@ -1,79 +1,23 @@
-from os.path import exists, abspath, dirname, join
-from thrift.transport import TTransport, TSocket
-from thrift.protocol import TBinaryProtocol
-from thrift.Thrift import TApplicationException
-import zlib
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Cassandra Query Language driver
+"""
-try:
- from cassandra import Cassandra
- from cassandra.ttypes import Compression, InvalidRequestException, \
- CqlResultType
-except ImportError:
- # Hack to run from a source tree
- import sys
- sys.path.append(join(abspath(dirname(__file__)),
- '..',
- '..',
- '..',
- 'interface',
- 'thrift',
- 'gen-py'))
- from cassandra import Cassandra
- from cassandra.ttypes import Compression, InvalidRequestException, \
- CqlResultType
-
-COMPRESSION_SCHEMES = ['GZIP']
-DEFAULT_COMPRESSION = 'GZIP'
-
-class Connection(object):
- def __init__(self, keyspace, host, port=9160):
- socket = TSocket.TSocket(host, port)
- self.transport = TTransport.TFramedTransport(socket)
- protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
- self.client = Cassandra.Client(protocol)
- socket.open()
-
- if keyspace:
- self.execute('USE %s' % keyspace)
-
- def execute(self, query, compression=None):
- compress = compression is None and DEFAULT_COMPRESSION \
- or compression.upper()
-
- compressed_query = Connection.compress_query(query, compress)
- request_compression = getattr(Compression, compress)
-
- try:
- response = self.client.execute_cql_query(compressed_query,
- request_compression)
- except InvalidRequestException, ire:
- raise CQLException("Bad Request: %s" % ire.why)
- except TApplicationException, tapp:
- raise CQLException("Internal application error")
- except Exception, exc:
- raise CQLException(exc)
-
- if response.type == CqlResultType.ROWS:
- return response.rows
- if response.type == CqlResultType.INT:
- return response.num
-
- return None
-
- def close(self):
- self.transport.close()
-
- @classmethod
- def compress_query(cls, query, compression):
- if not compression in COMPRESSION_SCHEMES:
- raise InvalidCompressionScheme(compression)
-
- if compression == 'GZIP':
- return zlib.compress(query)
-
-
-class InvalidCompressionScheme(Exception): pass
-class CQLException(Exception): pass
-
-# vi: ai ts=4 tw=0 sw=4 et
+from connection import Connection
+from connection_pool import ConnectionPool
Added: cassandra/trunk/drivers/py/cql/connection.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection.py (added)
+++ cassandra/trunk/drivers/py/cql/connection.py Wed Jan 5 19:27:50 2011
@@ -0,0 +1,121 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from os.path import exists, abspath, dirname, join
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.Thrift import TApplicationException
+from errors import CQLException, InvalidCompressionScheme
+import zlib
+
+try:
+ from cassandra import Cassandra
+ from cassandra.ttypes import Compression, InvalidRequestException, \
+ CqlResultType
+except ImportError:
+ # Hack to run from a source tree
+ import sys
+ sys.path.append(join(abspath(dirname(__file__)),
+ '..',
+ '..',
+ '..',
+ 'interface',
+ 'thrift',
+ 'gen-py'))
+ from cassandra import Cassandra
+ from cassandra.ttypes import Compression, InvalidRequestException, \
+ CqlResultType
+
+COMPRESSION_SCHEMES = ['GZIP']
+DEFAULT_COMPRESSION = 'GZIP'
+
+__all__ = ['COMPRESSION_SCHEMES', 'DEFAULT_COMPRESSION', 'Connection']
+
+class Connection(object):
+ """
+ CQL connection object.
+
+ Example usage:
+ >>> conn = Connection("localhost", keyspace="Keyspace1")
+ >>> r = conn.execute('SELECT "age" FROM Users')
+ >>> for row in r.rows:
+ ... for column in row.columns:
+ ... print "%s is %s years of age" % (r.key, column.age)
+ """
+ def __init__(self, host, port=9160, keyspace=None):
+ socket = TSocket.TSocket(host, port)
+ self.transport = TTransport.TFramedTransport(socket)
+ protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
+ self.client = Cassandra.Client(protocol)
+ socket.open()
+
+ if keyspace:
+ self.execute('USE %s;' % keyspace)
+
+ def execute(self, query, compression=None):
+ """
+ Execute a CQL query on a remote node.
+
+ Params:
+ * query .........: CQL query string.
+ * compression ...: Query compression type (optional).
+ """
+ compress = compression is None and DEFAULT_COMPRESSION \
+ or compression.upper()
+
+ compressed_query = Connection.compress_query(query, compress)
+ request_compression = getattr(Compression, compress)
+
+ try:
+ response = self.client.execute_cql_query(compressed_query,
+ request_compression)
+ except InvalidRequestException, ire:
+ raise CQLException("Bad Request: %s" % ire.why)
+ except TApplicationException, tapp:
+ raise CQLException("Internal application error")
+ except Exception, exc:
+ raise CQLException(exc)
+
+ if response.type == CqlResultType.ROWS:
+ return response.rows
+ if response.type == CqlResultType.INT:
+ return response.num
+
+ return None
+
+ def close(self):
+ self.transport.close()
+
+ def is_open(self):
+ return self.transport.isOpen()
+
+ @classmethod
+ def compress_query(cls, query, compression):
+ """
+ Returns a query string compressed with the specified compression type.
+
+ Params:
+ * query .........: The query string to compress.
+ * compression ...: Type of compression to use.
+ """
+ if not compression in COMPRESSION_SCHEMES:
+ raise InvalidCompressionScheme(compression)
+
+ if compression == 'GZIP':
+ return zlib.compress(query)
+
+# vi: ai ts=4 tw=0 sw=4 et
Added: cassandra/trunk/drivers/py/cql/connection_pool.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection_pool.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection_pool.py (added)
+++ cassandra/trunk/drivers/py/cql/connection_pool.py Wed Jan 5 19:27:50 2011
@@ -0,0 +1,92 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from Queue import Queue, Empty
+from threading import Thread
+from time import sleep
+from connection import Connection
+
+__all__ = ['ConnectionPool']
+
+class ConnectionPool(object):
+ """
+ Simple connection-caching pool implementation.
+
+ ConnectionPool provides the simplest possible connection pooling,
+ lazily creating new connections if needed as `borrow_connection' is
+ called. Connections are re-added to the pool by `return_connection',
+ unless doing so would exceed the maximum pool size.
+
+ Example usage:
+ >>> pool = ConnectionPool("localhost", 9160, "Keyspace1")
+ >>> conn = pool.borrow_connection()
+ >>> conn.execute(...)
+ >>> pool.return_connection(conn)
+ """
+ def __init__(self, hostname, port=9160, keyspace=None, max_conns=25,
+ max_idle=5, eviction_delay=10000):
+ self.hostname = hostname
+ self.port = port
+ self.keyspace = keyspace
+ self.max_conns = max_conns
+ self.max_idle = max_idle
+ self.eviction_delay = eviction_delay
+
+ self.connections = Queue()
+ self.connections.put(Connection(hostname, port, keyspace))
+ self.eviction = Eviction(self.connections,
+ self.max_idle,
+ self.eviction_delay)
+
+ def borrow_connection(self):
+ try:
+ connection = self.connections.get(block=False)
+ except Empty:
+ connection = Connection(self.hostname, self.port, self.keyspace)
+ return connection
+
+ def return_connection(self, connection):
+ if self.connections.qsize() > self.max_conns:
+ connection.close()
+ return
+ if not connection.is_open():
+ return
+ self.connections.put(connection)
+
+class Eviction(Thread):
+ def __init__(self, connections, max_idle, eviction_delay):
+ Thread.__init__(self)
+
+ self.connections = connections
+ self.max_idle = max_idle
+ self.eviction_delay = eviction_delay
+
+ self.setDaemon(True)
+ self.setName("EVICTION-THREAD")
+ self.start()
+
+ def run(self):
+ while(True):
+ while(self.connections.qsize() > self.max_idle):
+ connection = self.connections.get(block=False)
+ if connection:
+ if connection.is_open():
+ connection.close()
+ sleep(self.eviction_delay/1000)
+
+
+
Added: cassandra/trunk/drivers/py/cql/errors.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/errors.py?rev=1055594&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/errors.py (added)
+++ cassandra/trunk/drivers/py/cql/errors.py Wed Jan 5 19:27:50 2011
@@ -0,0 +1,21 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__all__ = ['InvalidCompressionScheme', 'CQLException']
+
+class InvalidCompressionScheme(Exception): pass
+class CQLException(Exception): pass
Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1055594&r1=1055593&r2=1055594&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Wed Jan 5 19:27:50 2011
@@ -4,7 +4,8 @@ import sys
sys.path.append(join(abspath(dirname(__file__)), '../../drivers/py'))
-from cql import Connection, CQLException
+from cql import Connection
+from cql.errors import CQLException
from . import ThriftTester
from avro_utils import assert_raises
@@ -45,7 +46,7 @@ def load_sample(dbconn):
""")
def init(keyspace="Keyspace1"):
- dbconn = Connection(keyspace, 'localhost', 9170)
+ dbconn = Connection('localhost', 9170, keyspace)
load_sample(dbconn)
return dbconn