You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/09/12 22:49:13 UTC
[cassandra-dbapi2] 16 new revisions pushed by pcannon@gmail.com on
2012-09-12 20:48 GMT
16 new revisions:
Revision: 1470ef1ddfb4
Author: paul cannon <pa...@datastax.com>
Date: Mon Aug 20 16:49:01 2012
Log: tie in native protocol
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=1470ef1ddfb4
Revision: dcdd3d2c693e
Author: paul cannon <pa...@datastax.com>
Date: Thu Sep 6 17:54:49 2012
Log: MODE_CHANGE message no longer in CASSANDRA-4480
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=dcdd3d2c693e
Revision: 2e02819ac520
Author: paul cannon <pa...@datastax.com>
Date: Thu Sep 6 18:27:39 2012
Log: Update STARTUP/SUPPORTED msgs for CASSANDRA-4539
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2e02819ac520
Revision: 8433e2abdf4c
Author: paul cannon <pa...@datastax.com>
Date: Sun Sep 9 23:22:55 2012
Log: support new native protocol error codes...
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8433e2abdf4c
Revision: 377853dfc0f1
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 15:53:29 2012
Log: fix up new thrifteries module
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=377853dfc0f1
Revision: c1e68422f9a7
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 15:57:31 2012
Log: update tests to treat Murmur3Partitioner as random
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=c1e68422f9a7
Revision: e52231f5954d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 16:26:28 2012
Log: kill outdated test_regex test
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e52231f5954d
Revision: 693860ff1d3d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 16:26:59 2012
Log: update cql3-related tests for new option syntax...
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=693860ff1d3d
Revision: 0de872e76b44
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 17:31:33 2012
Log: separate value decoding for native/thrift
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0de872e76b44
Revision: 7b3e9fb54165
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 17:36:56 2012
Log: add inet as a native result type
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=7b3e9fb54165
Revision: 33424f2e0b0e
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:05:34 2012
Log: update error handling post CASSANDRA-3979
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=33424f2e0b0e
Revision: 0a2238e01515
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:09:18 2012
Log: get rid of bogus NativeCursor.executemany
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0a2238e01515
Revision: 8862c20c4d3b
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:09:31 2012
Log: turn off protocol debugging
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8862c20c4d3b
Revision: e53d73953c9d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:28:03 2012
Log: Don't include literal-quoted classes in cql_types
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e53d73953c9d
Revision: 114401688c90
Author: paul cannon <pa...@datastax.com>
Date: Wed Sep 12 13:32:53 2012
Log: ColumnToCollectionType can have multiple subs
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=114401688c90
Revision: 2fcf040212be
Author: paul cannon <pa...@datastax.com>
Date: Wed Sep 12 13:47:23 2012
Log: release 1.2.0
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2fcf040212be
==============================================================================
Revision: 1470ef1ddfb4
Author: paul cannon <pa...@datastax.com>
Date: Mon Aug 20 16:49:01 2012
Log: tie in native protocol
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=1470ef1ddfb4
Added:
/cql/thrifteries.py
Modified:
/cql/apivalues.py
/cql/connection.py
/cql/cqltypes.py
/cql/cursor.py
/cql/decoders.py
/cql/native.py
=======================================
--- /dev/null
+++ /cql/thrifteries.py Mon Aug 20 16:49:01 2012
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import zlib
+from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
+from cql.query import cql_quote, cql_quote_name, prepare_query,
PreparedQuery
+from cql.connection import Connection
+from cql.cassandra import Cassandra
+from thrift.Thrift import TApplicationException
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+from cql.cassandra.ttypes import (AuthenticationRequest, Compression,
+ CqlResultType, InvalidRequestException, UnavailableException,
+ TimedOutException, SchemaDisagreementException)
+
+MIN_THRIFT_FOR_PREPARED_QUERIES = (19, 27, 0)
+
+class ThriftCursor(Cursor):
+ def __init__(self, parent_connection):
+ Cursor.__init__(self, parent_connection)
+
+ if hasattr(parent_connection.client, 'execute_prepared_cql_query')
\
+ and parent_connection.remote_thrift_version >=
MIN_THRIFT_FOR_PREPARED_QUERIES:
+ self.supports_prepared_queries = True
+
+ def compress_query_text(self, querytext):
+ if self.compression == 'GZIP':
+ compressed_q = zlib.compress(querytext)
+ else:
+ compressed_q = querytext
+ req_compression = getattr(Compression, self.compression)
+ return compressed_q, req_compression
+
+ def prepare_query(self, query):
+ if isinstance(query, unicode):
+ raise ValueError("CQL query must be bytes, not unicode")
+ prepared_q_text, paramnames = prepare_query(query)
+ compressed_q, compression =
self.compress_query_text(prepared_q_text)
+ presult = self._connection.client.prepare_cql_query(compressed_q,
compression)
+ assert presult.count == len(paramnames)
+ if presult.variable_types is None and presult.count > 0:
+ raise cql.ProgrammingError("Cassandra did not provide types
for bound"
+ " parameters. Prepared statements
are only"
+ " supported with cql3.")
+ return PreparedQuery(query, presult.itemId,
presult.variable_types, paramnames)
+
+ def get_response(self, cql_query):
+ compressed_q, compress = self.compress_query_text(cql_query)
+ doquery = self._connection.client.execute_cql_query
+ return self.handle_cql_execution_errors(doquery, compressed_q,
compress)
+
+ def get_response_prepared(self, prepared_query, params):
+ doquery = self._connection.client.execute_prepared_cql_query
+ paramvals = prepared_query.encode_params(params)
+ return self.handle_cql_execution_errors(doquery,
prepared_query.itemid, paramvals)
+
+ def handle_cql_execution_errors(self, executor, *args, **kwargs):
+ try:
+ return executor(*args, **kwargs)
+ except InvalidRequestException, ire:
+ raise cql.ProgrammingError("Bad Request: %s" % ire.why)
+ except SchemaDisagreementException, sde:
+ raise cql.IntegrityError("Schema versions disagree, (try again
later).")
+ except UnavailableException:
+ raise cql.OperationalError("Unable to complete request: one
or "
+ "more nodes were unavailable.")
+ except TimedOutException:
+ raise cql.OperationalError("Request did not complete within
rpc_timeout.")
+ except TApplicationException, tapp:
+ raise cql.InternalError("Internal application error")
+
+ def process_execution_results(self, response, decoder=None):
+ if response.type == CqlResultType.ROWS:
+ self.decoder = (decoder or
self.default_decoder)(response.schema)
+ self.result = response.rows
+ self.rs_idx = 0
+ self.rowcount = len(self.result)
+ if self.result:
+ self.get_metadata_info(self.result[0])
+ elif response.type == CqlResultType.INT:
+ self.result = [(response.num,)]
+ self.rs_idx = 0
+ self.rowcount = 1
+ # TODO: name could be the COUNT expression
+ self.description = _COUNT_DESCRIPTION
+ self.name_info = None
+ elif response.type == CqlResultType.VOID:
+ self.result = []
+ self.rs_idx = 0
+ self.rowcount = 0
+ self.description = _VOID_DESCRIPTION
+ self.name_info = ()
+ else:
+ raise Exception('unknown result type %s' % response.type)
+
+ # 'Return values are not defined.'
+ return True
+
+class ThriftConnection(Connection):
+ cursorclass = ThriftCursor
+
+ def establish_connection(self):
+ socket = TSocket.TSocket(self.host, self.port)
+ self.transport = TTransport.TFramedTransport(socket)
+ protocol =
TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
+ self.client = Cassandra.Client(protocol)
+ socket.open()
+
+ if self.credentials:
+
self.client.login(AuthenticationRequest(credentials=credentials))
+
+ self.remote_thrift_version = tuple(map(int,
self.client.describe_version().split('.')))
+
+ if cql_version:
+ self.set_cql_version(cql_version)
+
+ if keyspace:
+ self.set_initial_keyspace(keyspace)
+
+ def set_cql_version(self, cql_version):
+ self.client.set_cql_version(cql_version)
+ try:
+ self.cql_major_version = int(cql_version.split('.')[0])
+ except ValueError:
+ pass
+
+ def set_initial_keyspace(self, keyspace):
+ c = self.cursor()
+ if self.cql_major_version >= 3:
+ ksname = cql_quote_name(keyspace)
+ else:
+ ksname = cql_quote(keyspace)
+ c.execute('USE %s' % ksname)
+ c.close()
+
+ def terminate_conn(self):
+ transport.close()
=======================================
--- /cql/apivalues.py Sun Aug 19 12:02:17 2012
+++ /cql/apivalues.py Mon Aug 20 16:49:01 2012
@@ -20,7 +20,10 @@
# dbapi Error hierarchy
class Warning(exceptions.StandardError): pass
-class Error (exceptions.StandardError): pass
+class Error (exceptions.StandardError):
+ def __init__(self, msg, code=None):
+ exceptions.StandardError.__init__(self, msg)
+ self.code = code
class InterfaceError(Error): pass
class DatabaseError (Error): pass
@@ -31,6 +34,7 @@
class InternalError (DatabaseError): pass
class ProgrammingError (DatabaseError): pass
class NotSupportedError(DatabaseError): pass
+class NotAuthenticated (DatabaseError): pass
# Module constants
=======================================
--- /cql/connection.py Sun Aug 19 13:16:40 2012
+++ /cql/connection.py Mon Aug 20 16:49:01 2012
@@ -14,19 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from cql.cursor import Cursor
-from cql.query import cql_quote, cql_quote_name
-from cql.cassandra import Cassandra
-from thrift.transport import TTransport, TSocket
-from thrift.protocol import TBinaryProtocol
-from cql.cassandra.ttypes import AuthenticationRequest
from cql.apivalues import ProgrammingError, NotSupportedError
-
class Connection(object):
cql_major_version = 2
- def __init__(self, host, port, keyspace, user=None, password=None,
cql_version=None):
+ def __init__(self, host, port, keyspace, user=None, password=None,
cql_version=None,
+ compression=None):
"""
Params:
* host .........: hostname of Cassandra node.
@@ -35,43 +29,30 @@
* user .........: username used in authentication (optional).
* password .....: password used in authentication (optional).
* cql_version...: CQL version to use (optional).
+ * compression...: the sort of compression to use by default;
+ * overrideable per Cursor object. (optional).
"""
self.host = host
self.port = port
self.keyspace = keyspace
+ self.cql_version = cql_version
+ self.compression = compression
+ self.open_socket = False
- socket = TSocket.TSocket(host, port)
- self.transport = TTransport.TFramedTransport(socket)
- protocol =
TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
- self.client = Cassandra.Client(protocol)
+ self.credentials = None
+ if user or password:
+ self.credentials = {"username": user, "password": password}
- socket.open()
+ self.establish_connection()
self.open_socket = True
- if user and password:
- credentials = {"username": user, "password": password}
-
self.client.login(AuthenticationRequest(credentials=credentials))
-
- self.remote_thrift_version = tuple(map(int,
self.client.describe_version().split('.')))
-
- if cql_version:
- self.client.set_cql_version(cql_version)
- try:
- self.cql_major_version = int(cql_version.split('.')[0])
- except ValueError:
- pass
-
- if keyspace:
- c = self.cursor()
- if self.cql_major_version >= 3:
- ksname = cql_quote_name(keyspace)
- else:
- ksname = cql_quote(keyspace)
- c.execute('USE %s' % ksname)
- c.close()
-
def __str__(self):
- return "{host: '%s:%s',
keyspace: '%s'}"%(self.host,self.port,self.keyspace)
+ return ("%s(host=%r, port=%r, keyspace=%r, %s)"
+ % (self.__class__.__name__, self.host, self.port,
self.keyspace,
+ self.open_socket and 'conn open' or 'conn closed'))
+
+ def keyspace_changed(self, keyspace):
+ self.keyspace = keyspace
###
# Connection API
@@ -80,8 +61,7 @@
def close(self):
if not self.open_socket:
return
-
- self.transport.close()
+ self.terminate_connection()
self.open_socket = False
def commit(self):
@@ -97,8 +77,20 @@
def cursor(self):
if not self.open_socket:
raise ProgrammingError("Connection has been closed.")
- return Cursor(self)
+ curs = self.cursorclass(self)
+ curs.compression = self.compression
+ return curs
+
+class NativeConnection(Connection):
+ pass
# TODO: Pull connections out of a pool instead.
-def connect(host, port=9160, keyspace=None, user=None, password=None,
cql_version=None):
- return Connection(host, port, keyspace, user, password, cql_version)
+def connect(host, port=9160, keyspace=None, user=None, password=None,
+ cql_version=None, native=False):
+ if native:
+ from native import NativeConnection
+ connclass = NativeConnection
+ else:
+ from thriftconnection import ThriftConnection
+ connclass = ThriftConnection
+ return connclass(host, port, keyspace, user, password, cql_version)
=======================================
--- /cql/cqltypes.py Sat Sep 8 21:30:43 2012
+++ /cql/cqltypes.py Mon Aug 20 16:49:01 2012
@@ -137,6 +137,8 @@
"""
+ if isinstance(casstype, CassandraType):
+ return casstype
try:
return parse_casstype_args(casstype)
except (ValueError, AssertionError, IndexError), e:
@@ -157,6 +159,8 @@
"""
+ if isinstance(cqltype, CassandraType):
+ return cqltype
args = ()
if cqltype.startswith("'") and cqltype.endswith("'"):
return lookup_casstype(cqltype[1:-1].replace("''", "'"))
=======================================
--- /cql/cursor.py Sun Aug 19 14:09:48 2012
+++ /cql/cursor.py Mon Aug 20 16:49:01 2012
@@ -14,39 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import re
-import zlib
-
import cql
-from cql.query import prepare_inline, prepare_query, PreparedQuery
from cql.decoders import SchemaDecoder
-from cql.cassandra.ttypes import (
- Compression,
- CqlResultType,
- InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException)
-from thrift.Thrift import TApplicationException
+from cql.query import prepare_inline
_COUNT_DESCRIPTION = (None, None, None, None, None, None, None)
-_VOID_DESCRIPTION = (None)
-
-MIN_THRIFT_FOR_PREPARED_QUERIES = (19, 27, 0)
+_VOID_DESCRIPTION = None
class Cursor:
- _keyspace_re = re.compile("USE (\w+);?",
- re.IGNORECASE | re.MULTILINE)
- _cfamily_re = re.compile("\s*SELECT\s+.+?\s+FROM\s+[\']?(\w+)",
- re.IGNORECASE | re.MULTILINE | re.DOTALL)
- _ddl_re = re.compile("\s*(CREATE|ALTER|DROP)\s+",
- re.IGNORECASE | re.MULTILINE)
+ default_decoder = SchemaDecoder
supports_prepared_queries = False
supports_column_types = True
supports_name_info = True
def __init__(self, parent_connection):
- self.open_socket = True
self._connection = parent_connection
self.cql_major_version = parent_connection.cql_major_version
@@ -62,48 +43,15 @@
self.arraysize = 1
self.rowcount = -1 # Populate on execute()
- self.compression = 'GZIP'
+ self.compression = None
self.decoder = None
- if hasattr(parent_connection.client, 'execute_prepared_cql_query')
\
- and parent_connection.remote_thrift_version >=
MIN_THRIFT_FOR_PREPARED_QUERIES:
- self.supports_prepared_queries = True
-
###
# Cursor API
###
def close(self):
- self.open_socket = False
-
- def compress_query_text(self, querytext):
- if self.compression == 'GZIP':
- compressed_q = zlib.compress(querytext)
- else:
- compressed_q = querytext
- req_compression = getattr(Compression, self.compression)
- return compressed_q, req_compression
-
- def prepare_inline(self, query, params):
- try:
- prepared_q_text = prepare_inline(query, params)
- except KeyError, e:
- raise cql.ProgrammingError("Unmatched named substitution: " +
- "%s not given for %r" % (e, query))
- return self.compress_query_text(prepared_q_text)
-
- def prepare_query(self, query, paramtypes=None):
- if isinstance(query, unicode):
- raise ValueError("CQL query must be bytes, not unicode")
- prepared_q_text, paramnames = prepare_query(query)
- compressed_q, compression =
self.compress_query_text(prepared_q_text)
- presult = self._connection.client.prepare_cql_query(compressed_q,
compression)
- assert presult.count == len(paramnames)
- if presult.variable_types is None and presult.count > 0:
- raise cql.ProgrammingError("Cassandra did not provide types
for bound"
- " parameters. Prepared statements
are only"
- " supported with cql3.")
- return PreparedQuery(query, presult.itemId,
presult.variable_types, paramnames)
+ self._connection = None
def pre_execution_setup(self):
self.__checksock()
@@ -113,85 +61,30 @@
self.name_info = None
self.column_types = None
+ def prepare_inline(self, query, params):
+ try:
+ return prepare_inline(query, params)
+ except KeyError, e:
+ raise cql.ProgrammingError("Unmatched named substitution: " +
+ "%s not given for %r" % (e, query))
+
def execute(self, cql_query, params={}, decoder=None):
if isinstance(cql_query, unicode):
raise ValueError("CQL query must be bytes, not unicode")
self.pre_execution_setup()
-
- prepared_q, compress = self.prepare_inline(cql_query, params)
- doquery = self._connection.client.execute_cql_query
- response = self.handle_cql_execution_errors(doquery, prepared_q,
compress)
-
+ prepared_q = self.prepare_inline(cql_query, params)
+ response = self.get_response(prepared_q)
return self.process_execution_results(response, decoder=decoder)
def execute_prepared(self, prepared_query, params={}, decoder=None):
self.pre_execution_setup()
-
- doquery = self._connection.client.execute_prepared_cql_query
- paramvals = prepared_query.encode_params(params)
- response = self.handle_cql_execution_errors(doquery,
prepared_query.itemid, paramvals)
-
+ response = self.get_response_prepared(prepared_query, params)
return self.process_execution_results(response, decoder=decoder)
- def handle_cql_execution_errors(self, executor, *args, **kwargs):
- try:
- return executor(*args, **kwargs)
- except InvalidRequestException, ire:
- raise cql.ProgrammingError("Bad Request: %s" % ire.why)
- except SchemaDisagreementException, sde:
- raise cql.IntegrityError("Schema versions disagree, (try again
later).")
- except UnavailableException:
- raise cql.OperationalError("Unable to complete request: one
or "
- "more nodes were unavailable.")
- except TimedOutException:
- raise cql.OperationalError("Request did not complete within
rpc_timeout.")
- except TApplicationException, tapp:
- raise cql.InternalError("Internal application error")
-
- def process_execution_results(self, response, decoder=None):
- if response.type == CqlResultType.ROWS:
- self.decoder = (decoder or SchemaDecoder)(response.schema)
- self.result = response.rows
- self.rs_idx = 0
- self.rowcount = len(self.result)
- if self.result:
- self.get_metadata_info(self.result[0])
- elif response.type == CqlResultType.INT:
- self.result = [(response.num,)]
- self.rs_idx = 0
- self.rowcount = 1
- # TODO: name could be the COUNT expression
- self.description = _COUNT_DESCRIPTION
- self.name_info = None
- elif response.type == CqlResultType.VOID:
- self.result = []
- self.rs_idx = 0
- self.rowcount = 0
- self.description = _VOID_DESCRIPTION
- self.name_info = ()
- else:
- raise Exception('unknown result type %s' % response.type)
-
- # 'Return values are not defined.'
- return True
-
def get_metadata_info(self, row):
self.description, self.name_info, self.column_types = \
self.decoder.decode_metadata_and_types(row)
- def executemany(self, operation_list, argslist):
- self.__checksock()
- opssize = len(operation_list)
- argsize = len(argslist)
-
- if opssize > argsize:
- raise cql.InterfaceError("Operations outnumber args for
executemany().")
- elif opssize < argsize:
- raise cql.InterfaceError("Args outnumber operations for
executemany().")
-
- for idx in xrange(opssize):
- self.execute(operation_list[idx], *argslist[idx])
-
def fetchone(self):
self.__checksock()
if self.rs_idx == len(self.result):
@@ -222,6 +115,19 @@
def fetchall(self):
return self.fetchmany(len(self.result) - self.rs_idx)
+ def executemany(self, operation_list, argslist):
+ self.__checksock()
+ opssize = len(operation_list)
+ argsize = len(argslist)
+
+ if opssize > argsize:
+ raise cql.InterfaceError("Operations outnumber args for
executemany().")
+ elif opssize < argsize:
+ raise cql.InterfaceError("Args outnumber operations for
executemany().")
+
+ for idx in xrange(opssize):
+ self.execute(operation_list[idx], *argslist[idx])
+
###
# extra, for cqlsh
###
@@ -262,6 +168,5 @@
###
def __checksock(self):
- if not self.open_socket:
- raise cql.InternalError("Cursor belonging to %s has been
closed." %
- (self._connection, ))
+ if self._connection is None:
+ raise cql.ProgrammingError("Cursor has been closed.")
=======================================
--- /cql/decoders.py Thu Aug 30 22:47:27 2012
+++ /cql/decoders.py Mon Aug 20 16:49:01 2012
@@ -61,7 +61,6 @@
return description, name_info, column_types
def decode_row(self, row, column_types=None):
- schema = self.schema
values = []
if column_types is None:
column_types = self.decode_metadata_and_types(row)[2]
@@ -74,3 +73,6 @@
values.append(value)
return values
+
+ def decode_metadata_and_types_native(self, row):
+ pass
=======================================
--- /cql/native.py Sun Aug 19 11:08:00 2012
+++ /cql/native.py Mon Aug 20 16:49:01 2012
@@ -14,8 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from marshal import int32_pack, int32_unpack, uint16_pack, uint16_unpack
-from cqltypes import lookup_casstype
+import cql
+from cql.marshal import int32_pack, int32_unpack, uint16_pack,
uint16_unpack
+from cql.cqltypes import lookup_cqltype
+from cql.connection import Connection
+from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
+from cql.apivalues import ProgrammingError, OperationalError
+from cql.query import PreparedQuery, prepare_query, cql_quote_name
+import socket
+import itertools
try:
from cStringIO import StringIO
except ImportError:
@@ -82,12 +89,13 @@
body = StringIO()
self.send_body(body)
body = body.getvalue()
- write_byte(f, PROTOCOL_VERSION | HEADER_DIRECTION_FROM_CLIENT)
- write_byte(f, 0) # no compression supported yet
- write_byte(f, streamid)
- write_byte(f, self.opcode)
- write_int(f, len(body))
- f.write(body)
+ version = PROTOCOL_VERSION | HEADER_DIRECTION_FROM_CLIENT
+ flags = 0 # no compression supported yet
+ msglen = int32_pack(len(body))
+ header = '%c%c%c%c%s' % (version, flags, streamid, self.opcode,
msglen)
+ f.write(header)
+ if len(body) > 0:
+ f.write(body)
def __str__(self):
paramstrs = ['%s=%r' % (pname, getattr(self, pname)) for pname in
self.params]
@@ -95,11 +103,9 @@
__repr__ = __str__
def read_frame(f):
- version = read_byte(f)
- flags = read_byte(f)
- stream = read_byte(f)
- opcode = read_byte(f)
- body_len = read_int(f)
+ header = f.read(8)
+ version, flags, stream, opcode = map(ord, header[:4])
+ body_len = int32_unpack(header[4:])
assert version & PROTOCOL_VERSION_MASK == PROTOCOL_VERSION, \
"Unsupported CQL protocol version %d" % version
assert version & HEADER_DIRECTION_MASK == HEADER_DIRECTION_TO_CLIENT, \
@@ -113,11 +119,6 @@
msg.stream_id = stream
return msg
-def do_request(f, msg):
- msg.send(f, 0)
- f.flush()
- return read_frame(f)
-
class ErrorMessage(_MessageType):
opcode = 0x00
name = 'ERROR'
@@ -139,9 +140,12 @@
msg = read_string(f)
return cls(code=code, message=msg)
- def __str__(self):
- return '<ErrorMessage code=%04x [%s] message=%r>' \
+ def summary(self):
+ return 'code=%04x [%s] message=%r' \
% (self.code, self.error_codes.get(self.code, '(Unknown)'),
self.message)
+
+ def __str__(self):
+ return '<ErrorMessage %s>' % self.summary()
__repr__ = __str__
class StartupMessage(_MessageType):
@@ -152,7 +156,10 @@
STARTUP_USE_COMPRESSION = 0x0001
def send_body(self, f):
+ if isinstance(self.options, dict):
+ self.options = self.options.items()
write_string(f, self.cqlversion)
+ write_short(f, len(self.options))
for key, value in self.options:
write_short(f, key)
if key == STARTUP_USE_COMPRESSION:
@@ -161,8 +168,8 @@
# should be a safe guess
write_string(f, value)
else:
- raise NotImplemented("Startup option 0x%04x not known;
can't send "
- "value to server" % key)
+ raise NotImplementedError("Startup option 0x%04x not
known; can't send "
+ "value to server" % key)
class ReadyMessage(_MessageType):
opcode = 0x02
@@ -279,7 +286,7 @@
def recv_results_prepared(self, f):
queryid = read_int(f)
colspecs = cls.recv_results_metadata(f)
- return PreparedResult(queryid, colspecs)
+ return (queryid, colspecs)
@classmethod
def recv_results_metadata(cls, f):
@@ -304,13 +311,16 @@
@classmethod
def read_type(cls, f):
- # XXX: stubbed out. should really return more useful 'type'
objects.
optid = read_short(f)
- cqltype = lookup_cqltype(cls.type_codes.get(optid))
- if cqltypename in ('list', 'set'):
+ try:
+ cqltype = lookup_cqltype(cls.type_codes[optid])
+ except KeyError:
+ raise cql.NotSupportedError("Unknown data type code 0x%x. Have
to skip"
+ " entire result set." % optid)
+ if cqltype.typename in ('list', 'set'):
subtype = cls.read_type(f)
cqltype = cqltype.apply_parameters(subtype)
- elif cqltypename == 'map':
+ elif cqltype.typename == 'map':
keysubtype = cls.read_type(f)
valsubtype = cls.read_type(f)
cqltype = cqltype.apply_parameters(keysubtype, valsubtype)
@@ -339,6 +349,54 @@
for param in self.queryparams:
write_value(f, param)
+class ModeChangeMessage(_MessageType):
+ opcode = 0x0B
+ name = 'MODE_CHANGE'
+ params = ('is_control',)
+
+ def send_body(self, f):
+ write_byte(f, bool(self.is_control))
+
+known_event_types = frozenset((
+ 'topology_change',
+ 'status_change',
+))
+
+class RegisterMessage(_MessageType):
+ opcode = 0x0C
+ name = 'REGISTER'
+ params = ('eventlist',)
+
+ def send_body(self, f):
+ write_stringlist(f, self.eventlist)
+
+class EventMessage(_MessageType):
+ opcode = 0x0D
+ name = 'EVENT'
+ params = ('eventtype', 'eventargs')
+
+ @classmethod
+ def recv_body(cls, f):
+ eventtype = read_string(f).lower()
+ if eventtype in known_event_types:
+ readmethod = getattr(cls, 'recv_' + eventtype)
+ return cls(eventtype=eventtype, eventargs=readmethod(f))
+ raise cql.NotSupportedError('Unknown event type %r' % eventtype)
+
+ @classmethod
+ def recv_topology_change(cls, f):
+ # "new_node" or "removed_node"
+ changetype = read_string(f)
+ address = read_inet(f)
+ return dict(changetype=changetype, address=address)
+
+ @classmethod
+ def recv_status_change(cls, f):
+ # "up" or "down"
+ changetype = read_string(f)
+ address = read_inet(f)
+ return dict(changetype=changetype, address=address)
+
def read_byte(f):
return ord(f.read(1))
@@ -402,12 +460,320 @@
write_int(f, len(v))
f.write(v)
-# won't work, unless the change from CASSANDRA-4539 is implemented
-#def read_option(f):
-# optid = read_short(f)
-# value = read_value(f)
-# return (optid, value)
-#
-#def write_option(f, optid, value):
-# write_short(f, optid)
-# write_value(f, value)
+def read_inet(f):
+ size = read_byte(f)
+ addrbytes = f.read(size)
+ port = read_int(f)
+ if size == 4:
+ addrfam = socket.AF_INET
+ elif size == 16:
+ addrfam = socket.AF_INET6
+ else:
+ raise cql.InternalError("bad inet address: %r" % (addrbytes,))
+ return (socket.inet_ntop(addrfam, addrbytes), port)
+
+def write_inet(f, addrtuple):
+ addr, port = addrtuple
+ if ':' in addr:
+ addrfam = socket.AF_INET6
+ else:
+ addrfam = socket.AF_INET
+ addrbytes = socket.inet_pton(addrfam, addr)
+ write_byte(f, len(addrbytes))
+ f.write(addrbytes)
+ write_int(f, port)
+
+
+class FakeThriftRow:
+ def __init__(self, columns):
+ self.columns = columns
+
+class NativeCursor(Cursor):
+ def prepare_query(self, query):
+ pquery, paramnames = prepare_query(query)
+ prepared =
self._connection.wait_for_request(PrepareMessage(query=pquery))
+ if isinstance(prepared, ErrorMessage):
+ raise cql.Error('Query preparation failed: %s' %
prepared.summary())
+ if prepared.kind != ResultMessage.KIND_PREPARED:
+ raise cql.InternalError('Query preparation did not result in
prepared query')
+ queryid, colspecs = prepared.results
+ kss, cfs, names, ctypes = zip(*colspecs)
+ return PreparedQuery(query, queryid, ctypes, paramnames)
+
+ def get_response(self, query):
+ return self._connection.wait_for_request(QueryMessage(query=query))
+
+ def get_response_prepared(self, prepared_query, params):
+ em = ExecuteMessage(queryid=prepared_query.itemid,
queryparams=params)
+ return self._connection.wait_for_request(em)
+
+ def executemany(self, querylist, argslist):
+ pass
+
+ def translate_schema(self, metadata):
+ print "incoming metadata: %r" % (metadata,)
+ pass
+
+ def translate_row(self, row):
+ return FakeThriftRow(row)
+
+ def handle_cql_execution_errors(self, response):
+ if not isinstance(response, ErrorMessage):
+ return
+ try:
+ codemsg = response.error_codes[response.code]
+ except KeyError:
+ codemsg = '(Unknown error code %04x)' % response.code
+ if codemsg == 'Schema disagreement exception':
+ eclass = cql.IntegrityError
+ elif codemsg == 'Authentication error':
+ eclass = cql.NotAuthenticated
+ elif codemsg == ('Unavailable exception', 'Timeout exception'):
+ eclass = cql.OperationalError
+ elif codemsg == 'Request exception':
+ eclass = cql.ProgrammingError
+ else:
+ eclass = cql.InternalError
+ raise eclass('%s: %s' % (codemsg, response.msg))
+
+ error_codes = {
+ 0x0000: 'Server error',
+ 0x0001: 'Protocol error',
+ 0x0002: 'Authentication error',
+ 0x0100: 'Unavailable exception',
+ 0x0101: 'Timeout exception',
+ 0x0102: 'Schema disagreement exception',
+ 0x0200: 'Request exception',
+ }
+
+ def process_execution_results(self, response, decoder=None):
+ self.handle_cql_execution_errors(response)
+ if not isinstance(response, ResultMessage):
+ raise cql.InternalError('Query execution resulted in %s!?' %
(response,))
+ if response.kind == ResultMessage.KIND_PREPARED:
+ raise cql.InternalError('Query execution resulted in prepared
query!?')
+
+ self.rs_idx = 0
+ self.description = None
+ self.result = []
+ self.name_info = ()
+
+ if response.kind == ResultMessage.KIND_VOID:
+ self.description = _VOID_DESCRIPTION
+ elif response.kind == ResultMessage.KIND_SET_KS:
+ self._connection.keyspace_changed(response.results)
+ self.description = _VOID_DESCRIPTION
+ elif response.kind == ResultMessage.KIND_ROWS:
+ schema =
self.translate_schema(response.results.column_metadata)
+ self.decoder = (decoder or self.default_decoder)(schema)
+ self.result = map(self.translate_row, response.results.rows)
+ if self.result:
+ self.get_metadata_info(self.result[0])
+ else:
+ raise Exception('unknown response kind %s: %s' %
(response.kind, response))
+ self.rowcount = len(self.result)
+
+ def get_compression(self):
+ return None
+
+ def set_compression(self, val):
+ if val is not None:
+ raise NotImplementedError("Setting per-cursor compression is
not "
+ "supported in NativeCursor.")
+
+ compression = property(get_compression, set_compression)
+
+class debugsock:
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, data):
+ print '[sending %r]' % (data,)
+ self.sock.send(data)
+
+ def read(self, readlen):
+ data = ''
+ while readlen > 0:
+ add = self.sock.recv(readlen)
+ print '[received %r]' % (add,)
+ if add == '':
+ raise cql.InternalError("short read of %s bytes (%s
expected)"
+ % (len(data), len(data) + readlen))
+ data += add
+ readlen -= len(add)
+ return data
+
+ def close(self):
+ pass
+
+class NativeConnection(Connection):
+ cursorclass = NativeCursor
+
+ def __init__(self, *args, **kwargs):
+ self.make_reqid = itertools.count().next
+ self.responses = {}
+ self.waiting = {}
+ self.conn_ready = False
+ Connection.__init__(self, *args, **kwargs)
+
+ def establish_connection(self):
+ self.conn_ready = False
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((self.host, self.port))
+ #self.socketf = s.makefile(bufsize=0)
+ self.socketf = debugsock(s)
+ self.sockfd = s
+ self.open_socket = True
+ supported = self.wait_for_request(OptionsMessage())
+ self.supported_cql_versions = supported.cql_versions
+ self.supported_compressions = supported.compressions
+
+ if self.cql_version:
+ if self.cql_version not in self.supported_cql_versions:
+ raise ProgrammingError("cql_version %r is not supported by"
+ " remote (w/ native protocol).
Supported"
+ " versions: %r"
+ % (self.cql_version,
self.supported_cql_versions))
+ else:
+ self.cql_version = self.supported_cql_versions[0]
+
+ opts = {}
+ if self.compression:
+ if self.compression not in self.supported_compressions:
+ raise ProgrammingError("Compression type %r is not
supported by"
+ " remote. Supported compression
types: %r"
+ % (self.compression,
self.supported_compressions))
+ # XXX: Remove this once snappy compression is supported
+ raise NotImplementedError("CQL driver does not yet support
compression")
+ opts[StartupMessage.STARTUP_USE_COMPRESSION] = self.compression
+
+ sm = StartupMessage(cqlversion=self.cql_version, options=opts)
+ startup_response = self.wait_for_request(sm)
+ while True:
+ if isinstance(startup_response, ReadyMessage):
+ self.conn_ready = True
+ break
+ if isinstance(startup_response, AuthenticateMessage):
+ self.authenticator = startup_response.authenticator
+ if self.credentials is None:
+ raise ProgrammingError('Remote end requires
authentication.')
+ cm = CredentialsMessage(creds=self.credentials)
+ startup_response = self.wait_for_request(cm)
+ elif isinstance(startup_response, ErrorMessage):
+ raise ProgrammingError("Server did not accept
credentials. %s"
+ % startup_response.summary())
+ else:
+ raise cql.InternalError("Unexpected response %r during
connection setup"
+ % startup_response)
+
+ if self.keyspace:
+ self.set_initial_keyspace(self.keyspace)
+
+ def set_initial_keyspace(self, keyspace):
+ c = self.cursor()
+ c.execute('USE %s' % cql_quote_name(self.keyspace))
+ c.close()
+
+ def terminate_conn(self):
+ self.socketf.close()
+ self.sockfd.close()
+
+ def wait_for_request(self, msg):
+ """
+ Given a message, send it to the server, wait for a response, and
+ return the response.
+ """
+
+ return self.wait_for_requests(msg)[0]
+
+ def wait_for_requests(self, *msgs):
+ """
+ Given any number of message objects, send them all to the server
+ and wait for responses to each one. Once they arrive, return all
+ of the responses in the same order as the messages to which they
+ respond.
+ """
+
+ reqids = []
+ for msg in msgs:
+ reqid = self.make_reqid()
+ reqids.append(reqid)
+ msg.send(self.socketf, reqid)
+ resultdict = self.wait_for_results(*reqids)
+ return [resultdict[reqid] for reqid in reqids]
+
+ def wait_for_results(self, *reqids):
+ """
+ Given any number of stream-ids, wait until responses have arrived
for
+ each one, and return a dictionary mapping the stream-ids to the
+ appropriate results.
+ """
+
+ waiting_for = set(reqids)
+ results = {}
+ for r in reqids:
+ try:
+ result = self.responses.pop(r)
+ except KeyError:
+ pass
+ else:
+ results[r] = result
+ waiting_for.remove(r)
+ while waiting_for:
+ newmsg = read_frame(self.socketf)
+ if newmsg.stream_id in waiting_for:
+ results[newmsg.stream_id] = newmsg
+ waiting_for.remove(newmsg.stream_id)
+ else:
+ self.handle_incoming(newmsg)
+ return results
+
+ def wait_for_result(self, reqid):
+ """
+ Given a stream-id, wait until a response arrives with that
stream-id,
+ and return the msg.
+ """
+
+ return self.wait_for_results(reqid)[reqid]
+
+ def handle_incoming(self, msg):
+ if msg.stream_id < 0:
+ self.handle_pushed(msg)
+ return
+ try:
+ cb = self.waiting.pop(msg.stream_id)
+ except KeyError:
+ self.responses[msg.stream_id] = msg
+ else:
+ cb(msg)
+
+ def callback_when(self, reqid, cb):
+ """
+ Callback cb with a message object once a message with a stream-id
+ of reqid is received. The callback may be immediate, if a response
+ is already in the received queue.
+
+ Otherwise, note also that the callback may not be called
immediately
+ upon the arrival of the response packet; it may have to wait until
+ something else waits on a result.
+ """
+
+ try:
+ msg = self.responses.pop(reqid)
+ except KeyError:
+ pass
+ else:
+ return cb(msg)
+ self.waiting[reqid] = cb
+
+ def request_and_callback(self, msg, cb):
+ """
+ Given a message msg and a callable cb, send the message to the
server
+ and call cb with the result once it arrives. Note that the callback
+ may not be called immediately upon the arrival of the response
packet;
+ it may have to wait until something else waits on a result.
+ """
+
+ reqid = self.make_reqid()
+ msg.send(self.socketf, reqid)
+ self.callback_when(reqid, cb)
==============================================================================
Revision: dcdd3d2c693e
Author: paul cannon <pa...@datastax.com>
Date: Thu Sep 6 17:54:49 2012
Log: MODE_CHANGE message no longer in CASSANDRA-4480
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=dcdd3d2c693e
Modified:
/cql/native.py
=======================================
--- /cql/native.py Mon Aug 20 16:49:01 2012
+++ /cql/native.py Thu Sep 6 17:54:49 2012
@@ -349,21 +349,13 @@
for param in self.queryparams:
write_value(f, param)
-class ModeChangeMessage(_MessageType):
- opcode = 0x0B
- name = 'MODE_CHANGE'
- params = ('is_control',)
-
- def send_body(self, f):
- write_byte(f, bool(self.is_control))
-
known_event_types = frozenset((
- 'topology_change',
- 'status_change',
+ 'TOPOLOGY_CHANGE',
+ 'STATUS_CHANGE',
))
class RegisterMessage(_MessageType):
- opcode = 0x0C
+ opcode = 0x0B
name = 'REGISTER'
params = ('eventlist',)
@@ -371,28 +363,28 @@
write_stringlist(f, self.eventlist)
class EventMessage(_MessageType):
- opcode = 0x0D
+ opcode = 0x0C
name = 'EVENT'
params = ('eventtype', 'eventargs')
@classmethod
def recv_body(cls, f):
- eventtype = read_string(f).lower()
+ eventtype = read_string(f).upper()
if eventtype in known_event_types:
- readmethod = getattr(cls, 'recv_' + eventtype)
+ readmethod = getattr(cls, 'recv_' + eventtype.lower())
return cls(eventtype=eventtype, eventargs=readmethod(f))
raise cql.NotSupportedError('Unknown event type %r' % eventtype)
@classmethod
def recv_topology_change(cls, f):
- # "new_node" or "removed_node"
+ # "NEW_NODE" or "REMOVED_NODE"
changetype = read_string(f)
address = read_inet(f)
return dict(changetype=changetype, address=address)
@classmethod
def recv_status_change(cls, f):
- # "up" or "down"
+ # "UP" or "DOWN"
changetype = read_string(f)
address = read_inet(f)
return dict(changetype=changetype, address=address)
==============================================================================
Revision: 2e02819ac520
Author: paul cannon <pa...@datastax.com>
Date: Thu Sep 6 18:27:39 2012
Log: Update STARTUP/SUPPORTED msgs for CASSANDRA-4539
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2e02819ac520
Modified:
/cql/native.py
=======================================
--- /cql/native.py Thu Sep 6 17:54:49 2012
+++ /cql/native.py Thu Sep 6 18:27:39 2012
@@ -153,23 +153,15 @@
name = 'STARTUP'
params = ('cqlversion', 'options')
- STARTUP_USE_COMPRESSION = 0x0001
+ KNOWN_OPTION_KEYS = set((
+ 'CQL_VERSION',
+ 'COMPRESSION',
+ ))
def send_body(self, f):
- if isinstance(self.options, dict):
- self.options = self.options.items()
- write_string(f, self.cqlversion)
- write_short(f, len(self.options))
- for key, value in self.options:
- write_short(f, key)
- if key == STARTUP_USE_COMPRESSION:
- write_string(f, value)
- elif isinstance(value, str): # not unicode
- # should be a safe guess
- write_string(f, value)
- else:
- raise NotImplementedError("Startup option 0x%04x not
known; can't send "
- "value to server" % key)
+ optmap = self.options.copy()
+ optmap['CQL_VERSION'] = self.cqlversion
+ write_stringmap(f, optmap)
class ReadyMessage(_MessageType):
opcode = 0x02
@@ -212,13 +204,13 @@
class SupportedMessage(_MessageType):
opcode = 0x06
name = 'SUPPORTED'
- params = ('cql_versions', 'compressions')
+ params = ('cqlversions', 'options',)
@classmethod
def recv_body(cls, f):
- cqlvers = read_stringlist(f)
- compressions = read_stringlist(f)
- return cls(cql_versions=cqlvers, compressions=compressions)
+ options = read_stringmultimap(f)
+ cqlversions = options.pop('CQL_VERSION')
+ return cls(cqlversions=cqlversions, options=options)
class QueryMessage(_MessageType):
opcode = 0x07
@@ -439,6 +431,34 @@
for s in stringlist:
write_string(f, s)
+def read_stringmap(f):
+ numpairs = read_short(f)
+ strmap = {}
+ for x in xrange(numpairs):
+ k = read_string(f)
+ strmap[k] = read_string(f)
+ return strmap
+
+def write_stringmap(f, strmap):
+ write_short(f, len(strmap))
+ for k, v in strmap.items():
+ write_string(f, k)
+ write_string(f, v)
+
+def read_stringmultimap(f):
+ numkeys = read_short(f)
+ strmmap = {}
+ for x in xrange(numkeys):
+ k = read_string(f)
+ strmmap[k] = read_stringlist(f)
+ return strmmap
+
+def write_stringmultimap(f, strmmap):
+ write_short(f, len(strmmap))
+ for k, v in strmmap.items():
+ write_string(f, k)
+ write_stringlist(f, v)
+
def read_value(f):
size = read_int(f)
if size < 0:
@@ -617,8 +637,8 @@
self.sockfd = s
self.open_socket = True
supported = self.wait_for_request(OptionsMessage())
- self.supported_cql_versions = supported.cql_versions
- self.supported_compressions = supported.compressions
+ self.supported_cql_versions = supported.cqlversions
+ self.supported_compressions = supported.options['COMPRESSION']
if self.cql_version:
if self.cql_version not in self.supported_cql_versions:
@@ -635,9 +655,9 @@
raise ProgrammingError("Compression type %r is not
supported by"
" remote. Supported compression
types: %r"
% (self.compression,
self.supported_compressions))
- # XXX: Remove this once snappy compression is supported
+ # XXX: Remove this once some compressions are supported
raise NotImplementedError("CQL driver does not yet support
compression")
- opts[StartupMessage.STARTUP_USE_COMPRESSION] = self.compression
+ opts['COMPRESSION'] = self.compression
sm = StartupMessage(cqlversion=self.cql_version, options=opts)
startup_response = self.wait_for_request(sm)
==============================================================================
Revision: 8433e2abdf4c
Author: paul cannon <pa...@datastax.com>
Date: Sun Sep 9 23:22:55 2012
Log: support new native protocol error codes
along with the extra info per message. still need to udpate the error
handling bit
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8433e2abdf4c
Modified:
/cql/native.py
=======================================
--- /cql/native.py Thu Sep 6 18:27:39 2012
+++ /cql/native.py Sun Sep 9 23:22:55 2012
@@ -118,35 +118,129 @@
msg = msgclass.recv_body(StringIO(body))
msg.stream_id = stream
return msg
+
+error_classes = {}
class ErrorMessage(_MessageType):
opcode = 0x00
name = 'ERROR'
- params = ('code', 'message')
-
- error_codes = {
- 0x0000: 'Server error',
- 0x0001: 'Protocol error',
- 0x0002: 'Authentication error',
- 0x0100: 'Unavailable exception',
- 0x0101: 'Timeout exception',
- 0x0102: 'Schema disagreement exception',
- 0x0200: 'Request exception',
- }
+ params = ('code', 'message', 'info')
+ summary = 'Unknown'
@classmethod
def recv_body(cls, f):
code = read_int(f)
msg = read_string(f)
- return cls(code=code, message=msg)
+ subcls = error_classes.get(code, cls)
+ extra_info = subcls.recv_error_info(f)
+ return subcls(code=code, message=msg, info=extra_info)
- def summary(self):
- return 'code=%04x [%s] message=%r' \
- % (self.code, self.error_codes.get(self.code, '(Unknown)'),
self.message)
+ def summarymsg(self):
+ msg = 'code=%04x [%s] message="%s"' \
+ % (self.code, self.summary, self.message)
+ if self.info is not None:
+ msg += (' ' + self.info)
+ return msg
def __str__(self):
- return '<ErrorMessage %s>' % self.summary()
+ return '<ErrorMessage %s>' % self.summarymsg()
__repr__ = __str__
+
+ @staticmethod
+ def recv_error_info(f):
+ pass
+
+class ErrorMessageSubclass(_register_msg_type):
+ def __init__(cls, name, bases, dct):
+ if not name.startswith('_'):
+ error_classes[cls.errorcode] = cls
+
+class _ErrorMessageSub(ErrorMessage):
+ __metaclass__ = ErrorMessageSubclass
+
+class ServerErrorMessage(_ErrorMessageSub):
+ summary = 'Server error'
+ errorcode = 0x0000
+
+class ProtocolErrorMessage(_ErrorMessageSub):
+ summary = 'Protocol error'
+ errorcode = 0x000A
+
+class UnavailableExceptionErrorMessage(_ErrorMessageSub):
+ summary = 'Unavailable exception'
+ errorcode = 0x1000
+
+ @staticmethod
+ def recv_error_info(f):
+ return {
+ 'consistencylevel': read_string(f),
+ 'required': read_int(f),
+ 'alive': read_int(f),
+ }
+
+class OverloadedErrorMessage(_ErrorMessageSub):
+ summary = 'Coordinator node overloaded'
+ errorcode = 0x1001
+
+class IsBootstrappingErrorMessage(_ErrorMessageSub):
+ summary = 'Coordinator node is bootstrapping'
+ errorcode = 0x1002
+
+class TruncateErrorMessage(_ErrorMessageSub):
+ summary = 'Error during truncate'
+ errorcode = 0x1003
+
+class WriteTimeoutErrorMessage(_ErrorMessageSub):
+ summary = 'Timeout during write request'
+ errorcode = 0x1100
+
+ @staticmethod
+ def recv_error_info(f):
+ return {
+ 'consistencylevel': read_string(f),
+ 'received': read_int(f),
+ 'blockfor': read_int(f),
+ }
+
+class ReadTimeoutErrorMessage(_ErrorMessageSub):
+ summary = 'Timeout during read request'
+ errorcode = 0x1200
+
+ @staticmethod
+ def recv_error_info(f):
+ return {
+ 'consistencylevel': read_string(f),
+ 'received': read_int(f),
+ 'blockfor': read_int(f),
+ 'data_present': bool(read_byte(f)),
+ }
+
+class SyntaxErrorErrorMessage(_ErrorMessageSub):
+ summary = 'Syntax error in CQL query'
+ errorcode = 0x2000
+
+class UnauthorizedErrorMessage(_ErrorMessageSub):
+ summary = 'Unauthorized'
+ errorcode = 0x2100
+
+class InvalidQueryErrorMessage(_ErrorMessageSub):
+ summary = 'Invalid query'
+ errorcode = 0x2200
+
+class BadConfigErrorMessage(_ErrorMessageSub):
+ summary = 'Query invalid because of configuration issue'
+ errorcode = 0x2300
+
+class AlreadyExistsErrorMessage(_ErrorMessageSub):
+ summary = 'Item already exists'
+ errorcode = 0x2400
+
+ @staticmethod
+ def recv_error_info(f):
+ return {
+ 'keyspace': read_string(f),
+ 'table': read_string(f),
+ }
class StartupMessage(_MessageType):
opcode = 0x01
@@ -505,7 +599,7 @@
pquery, paramnames = prepare_query(query)
prepared =
self._connection.wait_for_request(PrepareMessage(query=pquery))
if isinstance(prepared, ErrorMessage):
- raise cql.Error('Query preparation failed: %s' %
prepared.summary())
+ raise cql.Error('Query preparation failed: %s' %
prepared.summarymsg())
if prepared.kind != ResultMessage.KIND_PREPARED:
raise cql.InternalError('Query preparation did not result in
prepared query')
queryid, colspecs = prepared.results
@@ -673,7 +767,7 @@
startup_response = self.wait_for_request(cm)
elif isinstance(startup_response, ErrorMessage):
raise ProgrammingError("Server did not accept
credentials. %s"
- % startup_response.summary())
+ % startup_response.summarymsg())
else:
raise cql.InternalError("Unexpected response %r during
connection setup"
% startup_response)
==============================================================================
Revision: 377853dfc0f1
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 15:53:29 2012
Log: fix up new thrifteries module
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=377853dfc0f1
Modified:
/cql/connection.py
/cql/thrifteries.py
=======================================
--- /cql/connection.py Mon Aug 20 16:49:01 2012
+++ /cql/connection.py Tue Sep 11 15:53:29 2012
@@ -91,6 +91,6 @@
from native import NativeConnection
connclass = NativeConnection
else:
- from thriftconnection import ThriftConnection
+ from thrifteries import ThriftConnection
connclass = ThriftConnection
return connclass(host, port, keyspace, user, password, cql_version)
=======================================
--- /cql/thrifteries.py Mon Aug 20 16:49:01 2012
+++ /cql/thrifteries.py Tue Sep 11 15:53:29 2012
@@ -15,6 +15,7 @@
# limitations under the License.
import zlib
+import cql
from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
from cql.query import cql_quote, cql_quote_name, prepare_query,
PreparedQuery
from cql.connection import Connection
@@ -41,7 +42,7 @@
compressed_q = zlib.compress(querytext)
else:
compressed_q = querytext
- req_compression = getattr(Compression, self.compression)
+ req_compression = getattr(Compression, self.compression or 'NONE')
return compressed_q, req_compression
def prepare_query(self, query):
@@ -85,7 +86,7 @@
def process_execution_results(self, response, decoder=None):
if response.type == CqlResultType.ROWS:
self.decoder = (decoder or
self.default_decoder)(response.schema)
- self.result = response.rows
+ self.result = [r.columns for r in response.rows]
self.rs_idx = 0
self.rowcount = len(self.result)
if self.result:
@@ -124,11 +125,11 @@
self.remote_thrift_version = tuple(map(int,
self.client.describe_version().split('.')))
- if cql_version:
- self.set_cql_version(cql_version)
+ if self.cql_version:
+ self.set_cql_version(self.cql_version)
- if keyspace:
- self.set_initial_keyspace(keyspace)
+ if self.keyspace:
+ self.set_initial_keyspace(self.keyspace)
def set_cql_version(self, cql_version):
self.client.set_cql_version(cql_version)
==============================================================================
Revision: c1e68422f9a7
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 15:57:31 2012
Log: update tests to treat Murmur3Partitioner as random
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=c1e68422f9a7
Modified:
/test/test_cql.py
=======================================
--- /test/test_cql.py Thu Aug 30 22:28:30 2012
+++ /test/test_cql.py Tue Sep 11 15:57:31 2012
@@ -19,7 +19,7 @@
# PYTHONPATH=test nosetests --tests=test_cql:TestCql.test_column_count
# Note that some tests will be skipped if run against a cluster with
-# RandomPartitioner.
+# random partitioning semantics.
# to configure behavior, define $CQL_TEST_HOST to the destination address
# for Thrift connections, and $CQL_TEST_PORT to the associated port.
@@ -42,6 +42,8 @@
from cql.cassandra import Cassandra
from cql.cqltypes import AsciiType, UUIDType
+RANDOM_PARTITIONERS = ('RandomPartitioner', 'Murmur3Partitioner')
+
def get_thrift_client(host=TEST_HOST, port=TEST_PORT):
socket = TSocket.TSocket(host, port)
transport = TTransport.TFramedTransport(socket)
@@ -188,6 +190,15 @@
def assertIsSubclass(self, class_a, class_b):
assert issubclass(class_a, class_b), '%r is not a subclass
of %r' % (class_a, class_b)
+ def check_ordered_partitioner(self, msg="Key ranges don't make sense
under RP"):
+ if self.get_partitioner().split('.')[-1] in RANDOM_PARTITIONERS:
+ # skipTest is Python >= 2.7
+ if hasattr(self, 'skipTest'):
+ self.skipTest(msg)
+ return False
+ return True
+
+
def test_select_simple(self):
"single-row named column queries"
@@ -217,12 +228,8 @@
def test_select_row_range(self):
"retrieve a range of rows with columns"
- if self.get_partitioner().split('.')[-1] == 'RandomPartitioner':
- # skipTest is >= Python 2.7
- if hasattr(self, 'skipTest'):
- self.skipTest("Key ranges don't make sense under RP")
- else: return None
-
+ if not self.check_ordered_partitioner():
+ return
# everything
cursor = self.cursor
cursor.execute("SELECT * FROM StandardLongA")
@@ -381,12 +388,8 @@
def test_index_scan_with_start_key(self):
"indexed scan with a starting key"
- if self.get_partitioner().split('.')[-1] == 'RandomPartitioner':
- # skipTest is Python >= 2.7
- if hasattr(self, 'skipTest'):
- self.skipTest("Key ranges don't make sense under RP")
- else: return None
-
+ if not self.check_ordered_partitioner():
+ return
cursor = self.cursor
cursor.execute("""
SELECT KEY, 'birthdate' FROM IndexedA
@@ -398,6 +401,9 @@
def test_no_where_clause(self):
"empty where clause (range query w/o start key)"
+
+ if not self.check_ordered_partitioner():
+ return
cursor = self.cursor
cursor.execute("SELECT KEY, 'col' FROM StandardString1 LIMIT 3")
self.assertEqual(cursor.rowcount, 3)
==============================================================================
Revision: e52231f5954d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 16:26:28 2012
Log: kill outdated test_regex test
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e52231f5954d
Deleted:
/test/test_regex.py
=======================================
--- /test/test_regex.py Tue Dec 27 14:55:01 2011
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from cql.cursor import Cursor
-
-class TestRegex(unittest.TestCase):
-
- def single_match(self, match, string):
- groups = match.groups()
- self.assertEquals(groups, (string, ))
-
- def test_cfamily_regex(self):
- cf_re = Cursor._cfamily_re
-
- m = cf_re.match("SELECT key FROM column_family WHERE key = 'foo'")
- self.single_match(m, "column_family")
-
- m = cf_re.match("SELECT key FROM 'column_family' WHERE key
= 'foo'")
- self.single_match(m, "column_family")
-
- m = cf_re.match("SELECT key FROM column_family WHERE key = 'break
from chores'")
- self.single_match(m, "column_family")
-
- m = cf_re.match("SELECT key FROM 'from_cf' WHERE key = 'break from
chores'")
- self.single_match(m, "from_cf")
-
- m = cf_re.match("SELECT '\nkey' FROM 'column_family' WHERE key
= 'break \nfrom chores'")
- self.single_match(m, "column_family")
==============================================================================
Revision: 693860ff1d3d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 16:26:59 2012
Log: update cql3-related tests for new option syntax
post CASSANDRA-4497
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=693860ff1d3d
Modified:
/test/test_prepared_queries.py
=======================================
--- /test/test_prepared_queries.py Sun Aug 19 13:16:40 2012
+++ /test/test_prepared_queries.py Tue Sep 11 16:26:59 2012
@@ -31,6 +31,7 @@
import cql
+MIN_THRIFT_FOR_CQL_3_0_0_FINAL = (19, 33, 0)
class TestPreparedQueries(unittest.TestCase):
cursor = None
@@ -57,10 +58,15 @@
def create_schema(self):
ksname = 'CqlDriverTest_%d' % random.randrange(0x100000000)
- self.cursor.execute("""create keyspace %s
- with strategy_class='SimpleStrategy'
- and
strategy_options:replication_factor=1"""
- % ksname)
+ if self.dbconn.remote_thrift_version >=
MIN_THRIFT_FOR_CQL_3_0_0_FINAL:
+ create_ks = """create keyspace %s
+ with replication = {'class': 'SimpleStrategy',
+ 'replication_factor':
1};"""
+ else:
+ create_ks = """create keyspace %s
+ with strategy_class='SimpleStrategy'
+ and strategy_options:replication_factor=1"""
+ self.cursor.execute(create_ks % ksname)
self.cursor.execute('use %s' % ksname)
self.cursor.execute("""create columnfamily abc (thekey timestamp
primary key,
theint int,
==============================================================================
Revision: 0de872e76b44
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 17:31:33 2012
Log: separate value decoding for native/thrift
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0de872e76b44
Modified:
/cql/connection.py
/cql/cursor.py
/cql/decoders.py
/cql/native.py
/cql/thrifteries.py
=======================================
--- /cql/connection.py Tue Sep 11 15:53:29 2012
+++ /cql/connection.py Tue Sep 11 17:31:33 2012
@@ -81,9 +81,6 @@
curs.compression = self.compression
return curs
-class NativeConnection(Connection):
- pass
-
# TODO: Pull connections out of a pool instead.
def connect(host, port=9160, keyspace=None, user=None, password=None,
cql_version=None, native=False):
=======================================
--- /cql/cursor.py Mon Aug 20 16:49:01 2012
+++ /cql/cursor.py Tue Sep 11 17:31:33 2012
@@ -69,6 +69,8 @@
"%s not given for %r" % (e, query))
def execute(self, cql_query, params={}, decoder=None):
+ # note that 'decoder' here is actually the decoder class, not the
+ # instance to be used for decoding. bad naming, but it's in use
now.
if isinstance(cql_query, unicode):
raise ValueError("CQL query must be bytes, not unicode")
self.pre_execution_setup()
@@ -77,13 +79,32 @@
return self.process_execution_results(response, decoder=decoder)
def execute_prepared(self, prepared_query, params={}, decoder=None):
+ # note that 'decoder' here is actually the decoder class, not the
+ # instance to be used for decoding. bad naming, but it's in use
now.
self.pre_execution_setup()
response = self.get_response_prepared(prepared_query, params)
return self.process_execution_results(response, decoder=decoder)
def get_metadata_info(self, row):
- self.description, self.name_info, self.column_types = \
- self.decoder.decode_metadata_and_types(row)
+ self.description = description = []
+ self.name_info = name_info = []
+ self.column_types = column_types = []
+ for colid in self.columninfo(row):
+ name, nbytes, vtype, ctype = self.get_column_metadata(colid)
+ column_types.append(vtype)
+ description.append((name, vtype.cass_parameterized_type(),
+ None, None, None, None, True))
+ name_info.append((nbytes, ctype))
+
+ def get_column_metadata(self, column_id):
+ return self.decoder.decode_metadata_and_type(column_id)
+
+ def decode_row(self, row):
+ values = []
+ bytevals = self.columnvalues(row)
+ for val, vtype, nameinfo in zip(bytevals, self.column_types,
self.name_info):
+ values.append(self.decoder.decode_value(val, vtype,
nameinfo[0]))
+ return values
def fetchone(self):
self.__checksock()
@@ -98,7 +119,7 @@
if self.cql_major_version < 3:
# (don't bother redecoding descriptions or names otherwise)
self.get_metadata_info(row)
- return self.decoder.decode_row(row, self.column_types)
+ return self.decode_row(row)
def fetchmany(self, size=None):
self.__checksock()
@@ -109,7 +130,7 @@
while len(L) < size and self.rs_idx < len(self.result):
row = self.result[self.rs_idx]
self.rs_idx += 1
- L.append(self.decoder.decode_row(row, self.column_types))
+ L.append(self.decode_row(row))
return L
def fetchall(self):
=======================================
--- /cql/decoders.py Mon Aug 20 16:49:01 2012
+++ /cql/decoders.py Tue Sep 11 17:31:33 2012
@@ -32,47 +32,28 @@
raise ProgrammingError("value %r (in col %r) can't be deserialized
as %s: %s"
% (valuebytes, namebytes, expectedtype,
err))
- def decode_description(self, row):
- return self.decode_metadata(row)[0]
-
- def decode_metadata(self, row):
- return self.decode_metadata_and_types(row)[:2]
-
- def decode_metadata_and_types(self, row):
+ def decode_metadata_and_type(self, namebytes):
schema = self.schema
- description = []
- column_types = []
- name_info = []
- for column in row.columns:
- namebytes = column.name
- comparator = schema.name_types.get(namebytes,
schema.default_name_type)
- comptype = cqltypes.lookup_casstype(comparator)
- validator = schema.value_types.get(namebytes,
schema.default_value_type)
- valdtype = cqltypes.lookup_casstype(validator)
+ comparator = schema.name_types.get(namebytes,
schema.default_name_type)
+ comptype = cqltypes.lookup_casstype(comparator)
+ validator = schema.value_types.get(namebytes,
schema.default_value_type)
+ valdtype = cqltypes.lookup_casstype(validator)
- try:
- name = comptype.from_binary(namebytes)
- except Exception, e:
- name = self.name_decode_error(e, namebytes, comparator)
- column_types.append(valdtype)
- description.append((name, validator, None, None, None, None,
True))
- name_info.append((namebytes, comptype))
+ try:
+ name = comptype.from_binary(namebytes)
+ except Exception, e:
+ name = self.name_decode_error(e, namebytes,
comptype.cql_parameterized_type())
- return description, name_info, column_types
+ return name, namebytes, valdtype, comptype
- def decode_row(self, row, column_types=None):
- values = []
- if column_types is None:
- column_types = self.decode_metadata_and_types(row)[2]
- for (column, vtype) in zip(row.columns, column_types):
- try:
- value = vtype.from_binary(column.value)
- except Exception, e:
- value = self.value_decode_error(e, column.name,
column.value,
-
vtype.cass_parameterized_type(full=True))
- values.append(value)
+ def decode_value(self, valbytes, vtype, colname):
+ try:
+ value = vtype.from_binary(valbytes)
+ except Exception, e:
+ value = self.value_decode_error(e, colname, valbytes,
+ vtype.cql_parameterized_type())
+ return value
- return values
-
- def decode_metadata_and_types_native(self, row):
- pass
+ def decode_metadata_and_type_native(self, colid):
+ ks, cf, colname, vtype = self.schema[colid]
+ return colname, colname, vtype, 'UTF8Type'
=======================================
--- /cql/native.py Sun Sep 9 23:22:55 2012
+++ /cql/native.py Tue Sep 11 17:31:33 2012
@@ -590,10 +590,6 @@
write_int(f, port)
-class FakeThriftRow:
- def __init__(self, columns):
- self.columns = columns
-
class NativeCursor(Cursor):
def prepare_query(self, query):
pquery, paramnames = prepare_query(query)
@@ -616,12 +612,14 @@
def executemany(self, querylist, argslist):
pass
- def translate_schema(self, metadata):
- print "incoming metadata: %r" % (metadata,)
- pass
+ def get_column_metadata(self, column_id):
+ return self.decoder.decode_metadata_and_type_native(column_id)
- def translate_row(self, row):
- return FakeThriftRow(row)
+ def columninfo(self, row):
+ return xrange(len(row))
+
+ def columnvalues(self, row):
+ return row
def handle_cql_execution_errors(self, response):
if not isinstance(response, ErrorMessage):
@@ -670,9 +668,9 @@
self._connection.keyspace_changed(response.results)
self.description = _VOID_DESCRIPTION
elif response.kind == ResultMessage.KIND_ROWS:
- schema =
self.translate_schema(response.results.column_metadata)
+ schema = response.results.column_metadata
self.decoder = (decoder or self.default_decoder)(schema)
- self.result = map(self.translate_row, response.results.rows)
+ self.result = response.results.rows
if self.result:
self.get_metadata_info(self.result[0])
else:
@@ -780,7 +778,7 @@
c.execute('USE %s' % cql_quote_name(self.keyspace))
c.close()
- def terminate_conn(self):
+ def terminate_connection(self):
self.socketf.close()
self.sockfd.close()
=======================================
--- /cql/thrifteries.py Tue Sep 11 15:53:29 2012
+++ /cql/thrifteries.py Tue Sep 11 17:31:33 2012
@@ -110,6 +110,12 @@
# 'Return values are not defined.'
return True
+ def columnvalues(self, row):
+ return [column.value for column in row]
+
+ def columninfo(self, row):
+ return (column.name for column in row)
+
class ThriftConnection(Connection):
cursorclass = ThriftCursor
@@ -121,7 +127,7 @@
socket.open()
if self.credentials:
-
self.client.login(AuthenticationRequest(credentials=credentials))
+
self.client.login(AuthenticationRequest(credentials=self.credentials))
self.remote_thrift_version = tuple(map(int,
self.client.describe_version().split('.')))
@@ -147,5 +153,5 @@
c.execute('USE %s' % ksname)
c.close()
- def terminate_conn(self):
- transport.close()
+ def terminate_connection(self):
+ self.transport.close()
==============================================================================
Revision: 7b3e9fb54165
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 17:36:56 2012
Log: add inet as a native result type
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=7b3e9fb54165
Modified:
/cql/native.py
=======================================
--- /cql/native.py Tue Sep 11 17:31:33 2012
+++ /cql/native.py Tue Sep 11 17:36:56 2012
@@ -340,6 +340,7 @@
0x000D: 'varchar',
0x000E: 'varint',
0x000F: 'timeuuid',
+ 0x0010: 'inet',
0x0020: 'list',
0x0021: 'map',
0x0022: 'set',
==============================================================================
Revision: 33424f2e0b0e
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:05:34 2012
Log: update error handling post CASSANDRA-3979
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=33424f2e0b0e
Modified:
/cql/native.py
=======================================
--- /cql/native.py Tue Sep 11 17:36:56 2012
+++ /cql/native.py Tue Sep 11 18:05:34 2012
@@ -152,21 +152,28 @@
class ErrorMessageSubclass(_register_msg_type):
def __init__(cls, name, bases, dct):
- if not name.startswith('_'):
+ if cls.errorcode is not None:
error_classes[cls.errorcode] = cls
-class _ErrorMessageSub(ErrorMessage):
+class ErrorMessageSub(ErrorMessage):
__metaclass__ = ErrorMessageSubclass
+ errorcode = None
-class ServerErrorMessage(_ErrorMessageSub):
+class RequestExecutionException(ErrorMessageSub):
+ pass
+
+class RequestValidationException(ErrorMessageSub):
+ pass
+
+class ServerError(ErrorMessageSub):
summary = 'Server error'
errorcode = 0x0000
-class ProtocolErrorMessage(_ErrorMessageSub):
+class ProtocolException(ErrorMessageSub):
summary = 'Protocol error'
errorcode = 0x000A
-class UnavailableExceptionErrorMessage(_ErrorMessageSub):
+class UnavailableExceptionErrorMessage(RequestExecutionException):
summary = 'Unavailable exception'
errorcode = 0x1000
@@ -178,19 +185,22 @@
'alive': read_int(f),
}
-class OverloadedErrorMessage(_ErrorMessageSub):
+class OverloadedErrorMessage(RequestExecutionException):
summary = 'Coordinator node overloaded'
errorcode = 0x1001
-class IsBootstrappingErrorMessage(_ErrorMessageSub):
+class IsBootstrappingErrorMessage(RequestExecutionException):
summary = 'Coordinator node is bootstrapping'
errorcode = 0x1002
-class TruncateErrorMessage(_ErrorMessageSub):
+class TruncateError(RequestExecutionException):
summary = 'Error during truncate'
errorcode = 0x1003
-class WriteTimeoutErrorMessage(_ErrorMessageSub):
+class RequestTimeoutException(RequestExecutionException):
+ pass
+
+class WriteTimeoutErrorMessage(RequestTimeoutException):
summary = 'Timeout during write request'
errorcode = 0x1100
@@ -202,7 +212,7 @@
'blockfor': read_int(f),
}
-class ReadTimeoutErrorMessage(_ErrorMessageSub):
+class ReadTimeoutErrorMessage(RequestTimeoutException):
summary = 'Timeout during read request'
errorcode = 0x1200
@@ -215,23 +225,23 @@
'data_present': bool(read_byte(f)),
}
-class SyntaxErrorErrorMessage(_ErrorMessageSub):
+class SyntaxException(RequestValidationException):
summary = 'Syntax error in CQL query'
errorcode = 0x2000
-class UnauthorizedErrorMessage(_ErrorMessageSub):
+class UnauthorizedErrorMessage(RequestValidationException):
summary = 'Unauthorized'
errorcode = 0x2100
-class InvalidQueryErrorMessage(_ErrorMessageSub):
+class InvalidRequestException(RequestValidationException):
summary = 'Invalid query'
errorcode = 0x2200
-class BadConfigErrorMessage(_ErrorMessageSub):
+class ConfigurationException(RequestValidationException):
summary = 'Query invalid because of configuration issue'
errorcode = 0x2300
-class AlreadyExistsErrorMessage(_ErrorMessageSub):
+class AlreadyExistsException(ConfigurationException):
summary = 'Item already exists'
errorcode = 0x2400
@@ -625,31 +635,15 @@
def handle_cql_execution_errors(self, response):
if not isinstance(response, ErrorMessage):
return
- try:
- codemsg = response.error_codes[response.code]
- except KeyError:
- codemsg = '(Unknown error code %04x)' % response.code
- if codemsg == 'Schema disagreement exception':
- eclass = cql.IntegrityError
- elif codemsg == 'Authentication error':
+ if isinstance(response, UnauthorizedErrorMessage):
eclass = cql.NotAuthenticated
- elif codemsg == ('Unavailable exception', 'Timeout exception'):
+ elif isinstance(response, RequestExecutionException):
eclass = cql.OperationalError
- elif codemsg == 'Request exception':
+ elif isinstance(response, RequestValidationException):
eclass = cql.ProgrammingError
else:
eclass = cql.InternalError
- raise eclass('%s: %s' % (codemsg, response.msg))
-
- error_codes = {
- 0x0000: 'Server error',
- 0x0001: 'Protocol error',
- 0x0002: 'Authentication error',
- 0x0100: 'Unavailable exception',
- 0x0101: 'Timeout exception',
- 0x0102: 'Schema disagreement exception',
- 0x0200: 'Request exception',
- }
+ raise eclass(response.summarymsg())
def process_execution_results(self, response, decoder=None):
self.handle_cql_execution_errors(response)
==============================================================================
Revision: 0a2238e01515
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:09:18 2012
Log: get rid of bogus NativeCursor.executemany
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0a2238e01515
Modified:
/cql/native.py
=======================================
--- /cql/native.py Tue Sep 11 18:05:34 2012
+++ /cql/native.py Tue Sep 11 18:09:18 2012
@@ -620,9 +620,6 @@
em = ExecuteMessage(queryid=prepared_query.itemid,
queryparams=params)
return self._connection.wait_for_request(em)
- def executemany(self, querylist, argslist):
- pass
-
def get_column_metadata(self, column_id):
return self.decoder.decode_metadata_and_type_native(column_id)
==============================================================================
Revision: 8862c20c4d3b
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:09:31 2012
Log: turn off protocol debugging
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8862c20c4d3b
Modified:
/cql/native.py
=======================================
--- /cql/native.py Tue Sep 11 18:09:18 2012
+++ /cql/native.py Tue Sep 11 18:09:31 2012
@@ -716,8 +716,7 @@
self.conn_ready = False
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
- #self.socketf = s.makefile(bufsize=0)
- self.socketf = debugsock(s)
+ self.socketf = s.makefile(bufsize=0)
self.sockfd = s
self.open_socket = True
supported = self.wait_for_request(OptionsMessage())
==============================================================================
Revision: e53d73953c9d
Author: paul cannon <pa...@datastax.com>
Date: Tue Sep 11 18:28:03 2012
Log: Don't include literal-quoted classes in cql_types
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e53d73953c9d
Modified:
/cql/cqltypes.py
=======================================
--- /cql/cqltypes.py Mon Aug 20 16:49:01 2012
+++ /cql/cqltypes.py Tue Sep 11 18:28:03 2012
@@ -648,7 +648,7 @@
cql_type_to_apache_class = dict([(c, t.cassname) for (c, t) in
_cqltypes.items()])
apache_class_to_cql_type = dict([(n, t.typename) for (n, t) in
_casstypes.items()])
-cql_types = sorted(_cqltypes.keys())
+cql_types = sorted([t for t in _cqltypes.keys() if not t.startswith("'")])
def cql_typename(casstypename):
"""
==============================================================================
Revision: 114401688c90
Author: paul cannon <pa...@datastax.com>
Date: Wed Sep 12 13:32:53 2012
Log: ColumnToCollectionType can have multiple subs
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=114401688c90
Modified:
/cql/cqltypes.py
=======================================
--- /cql/cqltypes.py Tue Sep 11 18:28:03 2012
+++ /cql/cqltypes.py Wed Sep 12 13:32:53 2012
@@ -624,7 +624,7 @@
information.
"""
typename = "'org.apache.cassandra.db.marshal.ColumnToCollectionType'"
- num_subtypes = 1
+ num_subtypes = 'UNKNOWN'
class ReversedType(_ParameterizedType):
typename = "'org.apache.cassandra.db.marshal.ReversedType'"
==============================================================================
Revision: 2fcf040212be
Author: paul cannon <pa...@datastax.com>
Date: Wed Sep 12 13:47:23 2012
Log: release 1.2.0
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2fcf040212be
Modified:
/CHANGES.txt
/setup.py
=======================================
--- /CHANGES.txt Thu Aug 30 22:58:26 2012
+++ /CHANGES.txt Wed Sep 12 13:47:23 2012
@@ -1,3 +1,13 @@
+1.2.0 - 2012/09/12
+ * Changes to SchemaDecoder interface- now decodes one value or column
+ metadata instance at a time
+ * Beta support for Cassandra's thriftless binary protocol- thrift is
+ wholly unneeded if you pass native=True to cql.connect(). Everything
+ should work exactly the same except for a different call is made to
+ get metadata from the SchemaDecoder, in case you have a custom one.
+ * Support for inet CQL type
+ * Support for parsing and processing data of type ReversedType(whatever)
+
1.1.0 - 2012/08/30
* Support for CQL3 collections
* Unified type handling
=======================================
--- /setup.py Thu Aug 30 22:58:26 2012
+++ /setup.py Wed Sep 12 13:47:23 2012
@@ -20,7 +20,7 @@
setup(
name="cql",
- version="1.1.0",
+ version="1.2.0",
description="Cassandra Query Language driver",
long_description=open(abspath(join(dirname(__file__), 'README'))).read(),
maintainer='Cassandra DBAPI-2 Driver Team',