You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/02/21 17:55:20 UTC
svn commit: r1073064 - in /cassandra/trunk/drivers/py: cql/connection.py
cql/connection_pool.py cql/decoders.py cql/marshal.py cql/results.py cqlsh
Author: eevans
Date: Mon Feb 21 16:55:19 2011
New Revision: 1073064
URL: http://svn.apache.org/viewvc?rev=1073064&view=rev
Log:
python CQL driver result decoding
Patch by eevans for CASSANDRA-1711
Added:
cassandra/trunk/drivers/py/cql/decoders.py
cassandra/trunk/drivers/py/cql/results.py
Modified:
cassandra/trunk/drivers/py/cql/connection.py
cassandra/trunk/drivers/py/cql/connection_pool.py
cassandra/trunk/drivers/py/cql/marshal.py
cassandra/trunk/drivers/py/cqlsh
Modified: cassandra/trunk/drivers/py/cql/connection.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection.py?rev=1073064&r1=1073063&r2=1073064&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection.py (original)
+++ cassandra/trunk/drivers/py/cql/connection.py Mon Feb 21 16:55:19 2011
@@ -21,7 +21,9 @@ from thrift.protocol import TBinaryProto
from thrift.Thrift import TApplicationException
from errors import CQLException, InvalidCompressionScheme
from marshal import prepare
-import zlib
+from decoders import SchemaDecoder
+from results import RowsProxy
+import zlib, re
try:
from cassandra import Cassandra
@@ -57,20 +59,79 @@ class Connection(object):
... for column in row.columns:
... print "%s is %s years of age" % (r.key, column.age)
"""
+ _keyspace_re = re.compile("USE (\w+);?", re.I | re.M)
+ _cfamily_re = re.compile("SELECT\s+.+\s+FROM\s+(\w+)", re.I | re.M)
+
def __init__(self, host, port=9160, keyspace=None, username=None,
- password=None):
+ password=None, decoder=None):
+ """
+ Params:
+ * host .........: hostname of Cassandra node.
+ * port .........: port number to connect to (optional).
+ * keyspace .....: keyspace name (optional).
+ * username .....: username used in authentication (optional).
+ * password .....: password used in authentication (optional).
+ * decoder ......: result decoder instance (optional, defaults to none).
+ """
socket = TSocket.TSocket(host, port)
self.transport = TTransport.TFramedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
self.client = Cassandra.Client(protocol)
socket.open()
+ # XXX: "current" is probably a misnomer.
+ self._cur_keyspace = None
+ self._cur_column_family = None
+
if username and password:
credentials = {"username": username, "password": password}
self.client.login(AuthenticationRequest(credentials=credentials))
if keyspace:
self.execute('USE %s;' % keyspace)
+ self._cur_keyspace = keyspace
+
+ if not decoder:
+ self.decoder = SchemaDecoder(self.__get_schema())
+ else:
+ self.decoder = decoder
+
+ def __get_schema(self):
+ def columns(metadata):
+ results = {}
+ for col in metadata:
+ results[col.name] = col.validation_class
+ return results
+
+ def column_families(cf_defs):
+ cfresults = {}
+ for cf in cf_defs:
+ cfresults[cf.name] = {"comparator": cf.comparator_type}
+ cfresults[cf.name]["default_validation_class"] = \
+ cf.default_validation_class
+ cfresults[cf.name]["columns"] = columns(cf.column_metadata)
+ return cfresults
+
+ schema = {}
+ for ksdef in self.client.describe_keyspaces():
+ schema[ksdef.name] = column_families(ksdef.cf_defs)
+ return schema
+
+ def prepare(self, query, *args):
+ prepared_query = prepare(query, *args)
+
+ # Snag the keyspace or column family and stash it for later use in
+ # decoding columns. These regexes don't match every query, but the
+ # current column family only needs to be current for SELECTs.
+ match = Connection._cfamily_re.match(prepared_query)
+ if match:
+ self._cur_column_family = match.group(1)
+ else:
+ match = Connection._keyspace_re.match(prepared_query)
+ if match:
+ self._cur_keyspace = match.group(1)
+
+ return prepared_query
def execute(self, query, *args, **kwargs):
"""
@@ -85,8 +146,8 @@ class Connection(object):
compress = kwargs.get("compression").upper()
else:
compress = DEFAULT_COMPRESSION
-
- compressed_query = Connection.compress_query(prepare(query, *args),
+
+ compressed_query = Connection.compress_query(self.prepare(query, *args),
compress)
request_compression = getattr(Compression, compress)
@@ -101,7 +162,11 @@ class Connection(object):
raise CQLException(exc)
if response.type == CqlResultType.ROWS:
- return response.rows
+ return RowsProxy(response.rows,
+ self._cur_keyspace,
+ self._cur_column_family,
+ self.decoder)
+
if response.type == CqlResultType.INT:
return response.num
@@ -127,5 +192,5 @@ class Connection(object):
if compression == 'GZIP':
return zlib.compress(query)
-
+
# vi: ai ts=4 tw=0 sw=4 et
Modified: cassandra/trunk/drivers/py/cql/connection_pool.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/connection_pool.py?rev=1073064&r1=1073063&r2=1073064&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cql/connection_pool.py (original)
+++ cassandra/trunk/drivers/py/cql/connection_pool.py Mon Feb 21 16:55:19 2011
@@ -38,12 +38,14 @@ class ConnectionPool(object):
>>> pool.return_connection(conn)
"""
def __init__(self, hostname, port=9160, keyspace=None, username=None,
- password=None, max_conns=25, max_idle=5, eviction_delay=10000):
+ password=None, decoder=None, max_conns=25, max_idle=5,
+ eviction_delay=10000):
self.hostname = hostname
self.port = port
self.keyspace = keyspace
self.username = username
self.password = password
+ self.decoder = decoder
self.max_conns = max_conns
self.max_idle = max_idle
self.eviction_delay = eviction_delay
@@ -59,7 +61,8 @@ class ConnectionPool(object):
port=self.port,
keyspace=self.keyspace,
username=self.username,
- password=self.password)
+ password=self.password,
+ decoder=self.decoder)
def borrow_connection(self):
try:
Added: cassandra/trunk/drivers/py/cql/decoders.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/decoders.py?rev=1073064&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/decoders.py (added)
+++ cassandra/trunk/drivers/py/cql/decoders.py Mon Feb 21 16:55:19 2011
@@ -0,0 +1,61 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from os.path import abspath, dirname, join
+from marshal import unmarshal
+
+class BaseDecoder(object):
+ def decode_column(self, keyspace, column_family, name, value):
+ raise NotImplementedError()
+
+class NoopDecoder(BaseDecoder):
+ def decode_column(self, keyspace, column_family, name, value):
+ return (name, value)
+
+class SchemaDecoder(BaseDecoder):
+ """
+ Decode binary column names/values according to schema.
+ """
+ def __init__(self, schema={}):
+ self.schema = schema
+
+ def __get_column_family_def(self, keyspace, column_family):
+ if self.schema.has_key(keyspace):
+ if self.schema[keyspace].has_key(column_family):
+ return self.schema[keyspace][column_family]
+ return None
+
+ def __comparator_for(self, keyspace, column_family):
+ cfam = self.__get_column_family_def(keyspace, column_family)
+ if cfam and cfam.has_key("comparator"):
+ return cfam["comparator"]
+ return None
+
+ def __validator_for(self, keyspace, column_family, name):
+ cfam = self.__get_column_family_def(keyspace, column_family)
+ if cfam:
+ if cfam["columns"].has_key(name):
+ return cfam["columns"][name]
+ else:
+ return cfam["default_validation_class"]
+ return None
+
+ def decode_column(self, keyspace, column_family, name, value):
+ comparator = self.__comparator_for(keyspace, column_family)
+ validator = self.__validator_for(keyspace, column_family, name)
+ return (unmarshal(name, comparator), unmarshal(value, validator))
+
Modified: cassandra/trunk/drivers/py/cql/marshal.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/marshal.py?rev=1073064&r1=1073063&r2=1073064&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cql/marshal.py (original)
+++ cassandra/trunk/drivers/py/cql/marshal.py Mon Feb 21 16:55:19 2011
@@ -18,8 +18,9 @@
from uuid import UUID
from StringIO import StringIO
from errors import InvalidQueryFormat
+from struct import unpack
-__all__ = ['prepare']
+__all__ = ['prepare', 'marshal', 'unmarshal']
def prepare(query, *args):
result = StringIO()
@@ -60,3 +61,28 @@ def marshal(term):
return "uuid(\"%s\")" % str(term)
else:
return str(term)
+
+def unmarshal(bytestr, typestr):
+ if typestr == "org.apache.cassandra.db.marshal.BytesType":
+ return bytestr
+ elif typestr == "org.apache.cassandra.db.marshal.AsciiType":
+ return bytestr
+ elif typestr == "org.apache.cassandra.db.marshal.UTF8Type":
+ return bytestr.decode("utf8")
+ elif typestr == "org.apache.cassandra.db.marshal.IntegerType":
+ return decode_bigint(bytestr)
+ elif typestr == "org.apache.cassandra.db.marshal.LongType":
+ return unpack(">q", bytestr)[0]
+ elif typestr == "org.apache.cassandra.db.marshal.LexicalUUIDType":
+ return UUID(bytes=bytestr)
+ elif typestr == "org.apache.cassandra.db.marshal.TimeUUIDType":
+ return UUID(bytes=bytetr)
+ else:
+ return bytestr
+
+def decode_bigint(term):
+ val = int(term.encode('hex'), 16)
+ if (ord(term[0]) & 128) != 0:
+ val = val - (1 << (len(term) * 8))
+ return val
+
Added: cassandra/trunk/drivers/py/cql/results.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cql/results.py?rev=1073064&view=auto
==============================================================================
--- cassandra/trunk/drivers/py/cql/results.py (added)
+++ cassandra/trunk/drivers/py/cql/results.py Mon Feb 21 16:55:19 2011
@@ -0,0 +1,82 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class RowsProxy(object):
+ def __init__(self, rows, keyspace, cfam, decoder):
+ self.rows = rows
+ self.keyspace = keyspace
+ self.cfam = cfam
+ self.decoder = decoder
+
+ def __len__(self):
+ return len(self.rows)
+
+ def __getitem__(self, idx):
+ return Row(self.rows[idx].key,
+ self.rows[idx].columns,
+ self.keyspace,
+ self.cfam,
+ self.decoder)
+
+ def __iter__(self):
+ for r in self.rows:
+ yield Row(r.key, r.columns, self.keyspace, self.cfam, self.decoder)
+
+class Row(object):
+ def __init__(self, key, columns, keyspace, cfam, decoder):
+ self.key = key
+ self.columns = ColumnsProxy(columns, keyspace, cfam, decoder)
+
+class ColumnsProxy(object):
+ def __init__(self, columns, keyspace, cfam, decoder):
+ self.columns = columns
+ self.keyspace = keyspace
+ self.cfam = cfam
+ self.decoder = decoder
+
+ def __len__(self):
+ return len(self.columns)
+
+ def __getitem__(self, idx):
+ return Column(self.decoder.decode_column(self.keyspace,
+ self.cfam,
+ self.columns[idx].name,
+ self.columns[idx].value))
+
+ def __iter__(self):
+ for c in self.columns:
+ yield Column(self.decoder.decode_column(self.keyspace,
+ self.cfam,
+ c.name,
+ c.value))
+
+ def __str__(self):
+ return "ColumnsProxy(columns=%s)" % self.columns
+
+ def __repr__(self):
+ return str(self)
+
+class Column(object):
+ def __init__(self, (name, value)):
+ self.name = name
+ self.value = value
+
+ def __str__(self):
+ return "Column(%s, %s)" % (self.name, self.value)
+
+ def __repr__(self):
+ return str(self)
\ No newline at end of file
Modified: cassandra/trunk/drivers/py/cqlsh
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/py/cqlsh?rev=1073064&r1=1073063&r2=1073064&view=diff
==============================================================================
--- cassandra/trunk/drivers/py/cqlsh (original)
+++ cassandra/trunk/drivers/py/cqlsh Mon Feb 21 16:55:19 2011
@@ -12,10 +12,12 @@ import re
try:
from cql import Connection
from cql.errors import CQLException
+ from cql.results import RowsProxy
except ImportError:
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
from cql import Connection
from cql.errors import CQLException
+ from cql.results import RowsProxy
HISTORY = os.path.join(os.path.expanduser('~'), '.cqlsh')
CQLTYPES = ("bytes", "ascii", "utf8", "timeuuid", "uuid", "long", "int")
@@ -76,7 +78,7 @@ class Shell(cmd.Cmd):
result = self.conn.execute(statement)
- if isinstance(result, list):
+ if isinstance(result, RowsProxy):
for row in result:
self.printout(row.key, BLUE, False)
for column in row.columns: