You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@codespot.com on 2012/09/12 22:49:13 UTC

[cassandra-dbapi2] 16 new revisions pushed by pcannon@gmail.com on 2012-09-12 20:48 GMT

16 new revisions:

Revision: 1470ef1ddfb4
Author:   paul cannon <pa...@datastax.com>
Date:     Mon Aug 20 16:49:01 2012
Log:      tie in native protocol
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=1470ef1ddfb4

Revision: dcdd3d2c693e
Author:   paul cannon <pa...@datastax.com>
Date:     Thu Sep  6 17:54:49 2012
Log:      MODE_CHANGE message no longer in CASSANDRA-4480
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=dcdd3d2c693e

Revision: 2e02819ac520
Author:   paul cannon <pa...@datastax.com>
Date:     Thu Sep  6 18:27:39 2012
Log:      Update STARTUP/SUPPORTED msgs for CASSANDRA-4539
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2e02819ac520

Revision: 8433e2abdf4c
Author:   paul cannon <pa...@datastax.com>
Date:     Sun Sep  9 23:22:55 2012
Log:      support new native protocol error codes...
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8433e2abdf4c

Revision: 377853dfc0f1
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 15:53:29 2012
Log:      fix up new thrifteries module
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=377853dfc0f1

Revision: c1e68422f9a7
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 15:57:31 2012
Log:      update tests to treat Murmur3Partitioner as random
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=c1e68422f9a7

Revision: e52231f5954d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 16:26:28 2012
Log:      kill outdated test_regex test
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e52231f5954d

Revision: 693860ff1d3d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 16:26:59 2012
Log:      update cql3-related tests for new option syntax...
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=693860ff1d3d

Revision: 0de872e76b44
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 17:31:33 2012
Log:      separate value decoding for native/thrift
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0de872e76b44

Revision: 7b3e9fb54165
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 17:36:56 2012
Log:      add inet as a native result type
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=7b3e9fb54165

Revision: 33424f2e0b0e
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:05:34 2012
Log:      update error handling post CASSANDRA-3979
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=33424f2e0b0e

Revision: 0a2238e01515
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:09:18 2012
Log:      get rid of bogus NativeCursor.executemany
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0a2238e01515

Revision: 8862c20c4d3b
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:09:31 2012
Log:      turn off protocol debugging
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8862c20c4d3b

Revision: e53d73953c9d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:28:03 2012
Log:      Don't include literal-quoted classes in cql_types
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e53d73953c9d

Revision: 114401688c90
Author:   paul cannon <pa...@datastax.com>
Date:     Wed Sep 12 13:32:53 2012
Log:      ColumnToCollectionType can have multiple subs
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=114401688c90

Revision: 2fcf040212be
Author:   paul cannon <pa...@datastax.com>
Date:     Wed Sep 12 13:47:23 2012
Log:      release 1.2.0
http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2fcf040212be

==============================================================================
Revision: 1470ef1ddfb4
Author:   paul cannon <pa...@datastax.com>
Date:     Mon Aug 20 16:49:01 2012
Log:      tie in native protocol

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=1470ef1ddfb4

Added:
  /cql/thrifteries.py
Modified:
  /cql/apivalues.py
  /cql/connection.py
  /cql/cqltypes.py
  /cql/cursor.py
  /cql/decoders.py
  /cql/native.py

=======================================
--- /dev/null
+++ /cql/thrifteries.py	Mon Aug 20 16:49:01 2012
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import zlib
+from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
+from cql.query import cql_quote, cql_quote_name, prepare_query,  
PreparedQuery
+from cql.connection import Connection
+from cql.cassandra import Cassandra
+from thrift.Thrift import TApplicationException
+from thrift.transport import TTransport, TSocket
+from thrift.protocol import TBinaryProtocol
+from cql.cassandra.ttypes import (AuthenticationRequest, Compression,
+        CqlResultType, InvalidRequestException, UnavailableException,
+        TimedOutException, SchemaDisagreementException)
+
+MIN_THRIFT_FOR_PREPARED_QUERIES = (19, 27, 0)
+
+class ThriftCursor(Cursor):
+    def __init__(self, parent_connection):
+        Cursor.__init__(self, parent_connection)
+
+        if hasattr(parent_connection.client, 'execute_prepared_cql_query')  
\
+                and parent_connection.remote_thrift_version >=  
MIN_THRIFT_FOR_PREPARED_QUERIES:
+            self.supports_prepared_queries = True
+
+    def compress_query_text(self, querytext):
+        if self.compression == 'GZIP':
+            compressed_q = zlib.compress(querytext)
+        else:
+            compressed_q = querytext
+        req_compression = getattr(Compression, self.compression)
+        return compressed_q, req_compression
+
+    def prepare_query(self, query):
+        if isinstance(query, unicode):
+            raise ValueError("CQL query must be bytes, not unicode")
+        prepared_q_text, paramnames = prepare_query(query)
+        compressed_q, compression =  
self.compress_query_text(prepared_q_text)
+        presult = self._connection.client.prepare_cql_query(compressed_q,  
compression)
+        assert presult.count == len(paramnames)
+        if presult.variable_types is None and presult.count > 0:
+            raise cql.ProgrammingError("Cassandra did not provide types  
for bound"
+                                       " parameters. Prepared statements  
are only"
+                                       " supported with cql3.")
+        return PreparedQuery(query, presult.itemId,  
presult.variable_types, paramnames)
+
+    def get_response(self, cql_query):
+        compressed_q, compress = self.compress_query_text(cql_query)
+        doquery = self._connection.client.execute_cql_query
+        return self.handle_cql_execution_errors(doquery, compressed_q,  
compress)
+
+    def get_response_prepared(self, prepared_query, params):
+        doquery = self._connection.client.execute_prepared_cql_query
+        paramvals = prepared_query.encode_params(params)
+        return self.handle_cql_execution_errors(doquery,  
prepared_query.itemid, paramvals)
+
+    def handle_cql_execution_errors(self, executor, *args, **kwargs):
+        try:
+            return executor(*args, **kwargs)
+        except InvalidRequestException, ire:
+            raise cql.ProgrammingError("Bad Request: %s" % ire.why)
+        except SchemaDisagreementException, sde:
+            raise cql.IntegrityError("Schema versions disagree, (try again  
later).")
+        except UnavailableException:
+            raise cql.OperationalError("Unable to complete request: one  
or "
+                                       "more nodes were unavailable.")
+        except TimedOutException:
+            raise cql.OperationalError("Request did not complete within  
rpc_timeout.")
+        except TApplicationException, tapp:
+            raise cql.InternalError("Internal application error")
+
+    def process_execution_results(self, response, decoder=None):
+        if response.type == CqlResultType.ROWS:
+            self.decoder = (decoder or  
self.default_decoder)(response.schema)
+            self.result = response.rows
+            self.rs_idx = 0
+            self.rowcount = len(self.result)
+            if self.result:
+                self.get_metadata_info(self.result[0])
+        elif response.type == CqlResultType.INT:
+            self.result = [(response.num,)]
+            self.rs_idx = 0
+            self.rowcount = 1
+            # TODO: name could be the COUNT expression
+            self.description = _COUNT_DESCRIPTION
+            self.name_info = None
+        elif response.type == CqlResultType.VOID:
+            self.result = []
+            self.rs_idx = 0
+            self.rowcount = 0
+            self.description = _VOID_DESCRIPTION
+            self.name_info = ()
+        else:
+            raise Exception('unknown result type %s' % response.type)
+
+        # 'Return values are not defined.'
+        return True
+
+class ThriftConnection(Connection):
+    cursorclass = ThriftCursor
+
+    def establish_connection(self):
+        socket = TSocket.TSocket(self.host, self.port)
+        self.transport = TTransport.TFramedTransport(socket)
+        protocol =  
TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
+        self.client = Cassandra.Client(protocol)
+        socket.open()
+
+        if self.credentials:
+             
self.client.login(AuthenticationRequest(credentials=credentials))
+
+        self.remote_thrift_version = tuple(map(int,  
self.client.describe_version().split('.')))
+
+        if cql_version:
+            self.set_cql_version(cql_version)
+
+        if keyspace:
+            self.set_initial_keyspace(keyspace)
+
+    def set_cql_version(self, cql_version):
+        self.client.set_cql_version(cql_version)
+        try:
+            self.cql_major_version = int(cql_version.split('.')[0])
+        except ValueError:
+            pass
+
+    def set_initial_keyspace(self, keyspace):
+        c = self.cursor()
+        if self.cql_major_version >= 3:
+            ksname = cql_quote_name(keyspace)
+        else:
+            ksname = cql_quote(keyspace)
+        c.execute('USE %s' % ksname)
+        c.close()
+
+    def terminate_conn(self):
+        transport.close()
=======================================
--- /cql/apivalues.py	Sun Aug 19 12:02:17 2012
+++ /cql/apivalues.py	Mon Aug 20 16:49:01 2012
@@ -20,7 +20,10 @@
  # dbapi Error hierarchy

  class Warning(exceptions.StandardError): pass
-class Error  (exceptions.StandardError): pass
+class Error  (exceptions.StandardError):
+    def __init__(self, msg, code=None):
+        exceptions.StandardError.__init__(self, msg)
+        self.code = code

  class InterfaceError(Error): pass
  class DatabaseError (Error): pass
@@ -31,6 +34,7 @@
  class InternalError    (DatabaseError): pass
  class ProgrammingError (DatabaseError): pass
  class NotSupportedError(DatabaseError): pass
+class NotAuthenticated (DatabaseError): pass


  # Module constants
=======================================
--- /cql/connection.py	Sun Aug 19 13:16:40 2012
+++ /cql/connection.py	Mon Aug 20 16:49:01 2012
@@ -14,19 +14,13 @@
  # See the License for the specific language governing permissions and
  # limitations under the License.

-from cql.cursor import Cursor
-from cql.query import cql_quote, cql_quote_name
-from cql.cassandra import Cassandra
-from thrift.transport import TTransport, TSocket
-from thrift.protocol import TBinaryProtocol
-from cql.cassandra.ttypes import AuthenticationRequest
  from cql.apivalues import ProgrammingError, NotSupportedError

-
  class Connection(object):
      cql_major_version = 2

-    def __init__(self, host, port, keyspace, user=None, password=None,  
cql_version=None):
+    def __init__(self, host, port, keyspace, user=None, password=None,  
cql_version=None,
+                 compression=None):
          """
          Params:
          * host .........: hostname of Cassandra node.
@@ -35,43 +29,30 @@
          * user .........: username used in authentication (optional).
          * password .....: password used in authentication (optional).
          * cql_version...: CQL version to use (optional).
+        * compression...: the sort of compression to use by default;
+        *                 overrideable per Cursor object. (optional).
          """
          self.host = host
          self.port = port
          self.keyspace = keyspace
+        self.cql_version = cql_version
+        self.compression = compression
+        self.open_socket = False

-        socket = TSocket.TSocket(host, port)
-        self.transport = TTransport.TFramedTransport(socket)
-        protocol =  
TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
-        self.client = Cassandra.Client(protocol)
+        self.credentials = None
+        if user or password:
+            self.credentials = {"username": user, "password": password}

-        socket.open()
+        self.establish_connection()
          self.open_socket = True

-        if user and password:
-            credentials = {"username": user, "password": password}
-             
self.client.login(AuthenticationRequest(credentials=credentials))
-
-        self.remote_thrift_version = tuple(map(int,  
self.client.describe_version().split('.')))
-
-        if cql_version:
-            self.client.set_cql_version(cql_version)
-            try:
-                self.cql_major_version = int(cql_version.split('.')[0])
-            except ValueError:
-                pass
-
-        if keyspace:
-            c = self.cursor()
-            if self.cql_major_version >= 3:
-                ksname = cql_quote_name(keyspace)
-            else:
-                ksname = cql_quote(keyspace)
-            c.execute('USE %s' % ksname)
-            c.close()
-
      def __str__(self):
-        return "{host: '%s:%s',  
keyspace: '%s'}"%(self.host,self.port,self.keyspace)
+        return ("%s(host=%r, port=%r, keyspace=%r, %s)"
+                % (self.__class__.__name__, self.host, self.port,  
self.keyspace,
+                   self.open_socket and 'conn open' or 'conn closed'))
+
+    def keyspace_changed(self, keyspace):
+        self.keyspace = keyspace

      ###
      # Connection API
@@ -80,8 +61,7 @@
      def close(self):
          if not self.open_socket:
              return
-
-        self.transport.close()
+        self.terminate_connection()
          self.open_socket = False

      def commit(self):
@@ -97,8 +77,20 @@
      def cursor(self):
          if not self.open_socket:
              raise ProgrammingError("Connection has been closed.")
-        return Cursor(self)
+        curs = self.cursorclass(self)
+        curs.compression = self.compression
+        return curs
+
+class NativeConnection(Connection):
+    pass

  # TODO: Pull connections out of a pool instead.
-def connect(host, port=9160, keyspace=None, user=None, password=None,  
cql_version=None):
-    return Connection(host, port, keyspace, user, password, cql_version)
+def connect(host, port=9160, keyspace=None, user=None, password=None,
+            cql_version=None, native=False):
+    if native:
+        from native import NativeConnection
+        connclass = NativeConnection
+    else:
+        from thriftconnection import ThriftConnection
+        connclass = ThriftConnection
+    return connclass(host, port, keyspace, user, password, cql_version)
=======================================
--- /cql/cqltypes.py	Sat Sep  8 21:30:43 2012
+++ /cql/cqltypes.py	Mon Aug 20 16:49:01 2012
@@ -137,6 +137,8 @@

      """

+    if isinstance(casstype, CassandraType):
+        return casstype
      try:
          return parse_casstype_args(casstype)
      except (ValueError, AssertionError, IndexError), e:
@@ -157,6 +159,8 @@

      """

+    if isinstance(cqltype, CassandraType):
+        return cqltype
      args = ()
      if cqltype.startswith("'") and cqltype.endswith("'"):
          return lookup_casstype(cqltype[1:-1].replace("''", "'"))
=======================================
--- /cql/cursor.py	Sun Aug 19 14:09:48 2012
+++ /cql/cursor.py	Mon Aug 20 16:49:01 2012
@@ -14,39 +14,20 @@
  # See the License for the specific language governing permissions and
  # limitations under the License.

-import re
-import zlib
-
  import cql
-from cql.query import prepare_inline, prepare_query, PreparedQuery
  from cql.decoders import SchemaDecoder
-from cql.cassandra.ttypes import (
-    Compression,
-    CqlResultType,
-    InvalidRequestException,
-    UnavailableException,
-    TimedOutException,
-    SchemaDisagreementException)
-from thrift.Thrift import TApplicationException
+from cql.query import prepare_inline

  _COUNT_DESCRIPTION = (None, None, None, None, None, None, None)
-_VOID_DESCRIPTION = (None)
-
-MIN_THRIFT_FOR_PREPARED_QUERIES = (19, 27, 0)
+_VOID_DESCRIPTION = None

  class Cursor:
-    _keyspace_re = re.compile("USE (\w+);?",
-                              re.IGNORECASE | re.MULTILINE)
-    _cfamily_re = re.compile("\s*SELECT\s+.+?\s+FROM\s+[\']?(\w+)",
-                             re.IGNORECASE | re.MULTILINE | re.DOTALL)
-    _ddl_re = re.compile("\s*(CREATE|ALTER|DROP)\s+",
-                         re.IGNORECASE | re.MULTILINE)
+    default_decoder = SchemaDecoder
      supports_prepared_queries = False
      supports_column_types = True
      supports_name_info = True

      def __init__(self, parent_connection):
-        self.open_socket = True
          self._connection = parent_connection
          self.cql_major_version = parent_connection.cql_major_version

@@ -62,48 +43,15 @@

          self.arraysize = 1
          self.rowcount = -1      # Populate on execute()
-        self.compression = 'GZIP'
+        self.compression = None
          self.decoder = None

-        if hasattr(parent_connection.client, 'execute_prepared_cql_query')  
\
-                and parent_connection.remote_thrift_version >=  
MIN_THRIFT_FOR_PREPARED_QUERIES:
-            self.supports_prepared_queries = True
-
      ###
      # Cursor API
      ###

      def close(self):
-        self.open_socket = False
-
-    def compress_query_text(self, querytext):
-        if self.compression == 'GZIP':
-            compressed_q = zlib.compress(querytext)
-        else:
-            compressed_q = querytext
-        req_compression = getattr(Compression, self.compression)
-        return compressed_q, req_compression
-
-    def prepare_inline(self, query, params):
-        try:
-            prepared_q_text = prepare_inline(query, params)
-        except KeyError, e:
-            raise cql.ProgrammingError("Unmatched named substitution: " +
-                                       "%s not given for %r" % (e, query))
-        return self.compress_query_text(prepared_q_text)
-
-    def prepare_query(self, query, paramtypes=None):
-        if isinstance(query, unicode):
-            raise ValueError("CQL query must be bytes, not unicode")
-        prepared_q_text, paramnames = prepare_query(query)
-        compressed_q, compression =  
self.compress_query_text(prepared_q_text)
-        presult = self._connection.client.prepare_cql_query(compressed_q,  
compression)
-        assert presult.count == len(paramnames)
-        if presult.variable_types is None and presult.count > 0:
-            raise cql.ProgrammingError("Cassandra did not provide types  
for bound"
-                                       " parameters. Prepared statements  
are only"
-                                       " supported with cql3.")
-        return PreparedQuery(query, presult.itemId,  
presult.variable_types, paramnames)
+        self._connection = None

      def pre_execution_setup(self):
          self.__checksock()
@@ -113,85 +61,30 @@
          self.name_info = None
          self.column_types = None

+    def prepare_inline(self, query, params):
+        try:
+            return prepare_inline(query, params)
+        except KeyError, e:
+            raise cql.ProgrammingError("Unmatched named substitution: " +
+                                       "%s not given for %r" % (e, query))
+
      def execute(self, cql_query, params={}, decoder=None):
          if isinstance(cql_query, unicode):
              raise ValueError("CQL query must be bytes, not unicode")
          self.pre_execution_setup()
-
-        prepared_q, compress = self.prepare_inline(cql_query, params)
-        doquery = self._connection.client.execute_cql_query
-        response = self.handle_cql_execution_errors(doquery, prepared_q,  
compress)
-
+        prepared_q = self.prepare_inline(cql_query, params)
+        response = self.get_response(prepared_q)
          return self.process_execution_results(response, decoder=decoder)

      def execute_prepared(self, prepared_query, params={}, decoder=None):
          self.pre_execution_setup()
-
-        doquery = self._connection.client.execute_prepared_cql_query
-        paramvals = prepared_query.encode_params(params)
-        response = self.handle_cql_execution_errors(doquery,  
prepared_query.itemid, paramvals)
-
+        response = self.get_response_prepared(prepared_query, params)
          return self.process_execution_results(response, decoder=decoder)

-    def handle_cql_execution_errors(self, executor, *args, **kwargs):
-        try:
-            return executor(*args, **kwargs)
-        except InvalidRequestException, ire:
-            raise cql.ProgrammingError("Bad Request: %s" % ire.why)
-        except SchemaDisagreementException, sde:
-            raise cql.IntegrityError("Schema versions disagree, (try again  
later).")
-        except UnavailableException:
-            raise cql.OperationalError("Unable to complete request: one  
or "
-                                       "more nodes were unavailable.")
-        except TimedOutException:
-            raise cql.OperationalError("Request did not complete within  
rpc_timeout.")
-        except TApplicationException, tapp:
-            raise cql.InternalError("Internal application error")
-
-    def process_execution_results(self, response, decoder=None):
-        if response.type == CqlResultType.ROWS:
-            self.decoder = (decoder or SchemaDecoder)(response.schema)
-            self.result = response.rows
-            self.rs_idx = 0
-            self.rowcount = len(self.result)
-            if self.result:
-                self.get_metadata_info(self.result[0])
-        elif response.type == CqlResultType.INT:
-            self.result = [(response.num,)]
-            self.rs_idx = 0
-            self.rowcount = 1
-            # TODO: name could be the COUNT expression
-            self.description = _COUNT_DESCRIPTION
-            self.name_info = None
-        elif response.type == CqlResultType.VOID:
-            self.result = []
-            self.rs_idx = 0
-            self.rowcount = 0
-            self.description = _VOID_DESCRIPTION
-            self.name_info = ()
-        else:
-            raise Exception('unknown result type %s' % response.type)
-
-        # 'Return values are not defined.'
-        return True
-
      def get_metadata_info(self, row):
          self.description, self.name_info, self.column_types = \
                  self.decoder.decode_metadata_and_types(row)

-    def executemany(self, operation_list, argslist):
-        self.__checksock()
-        opssize = len(operation_list)
-        argsize = len(argslist)
-
-        if opssize > argsize:
-            raise cql.InterfaceError("Operations outnumber args for  
executemany().")
-        elif opssize < argsize:
-            raise cql.InterfaceError("Args outnumber operations for  
executemany().")
-
-        for idx in xrange(opssize):
-            self.execute(operation_list[idx], *argslist[idx])
-
      def fetchone(self):
          self.__checksock()
          if self.rs_idx == len(self.result):
@@ -222,6 +115,19 @@
      def fetchall(self):
          return self.fetchmany(len(self.result) - self.rs_idx)

+    def executemany(self, operation_list, argslist):
+        self.__checksock()
+        opssize = len(operation_list)
+        argsize = len(argslist)
+
+        if opssize > argsize:
+            raise cql.InterfaceError("Operations outnumber args for  
executemany().")
+        elif opssize < argsize:
+            raise cql.InterfaceError("Args outnumber operations for  
executemany().")
+
+        for idx in xrange(opssize):
+            self.execute(operation_list[idx], *argslist[idx])
+
      ###
      # extra, for cqlsh
      ###
@@ -262,6 +168,5 @@
      ###

      def __checksock(self):
-        if not self.open_socket:
-            raise cql.InternalError("Cursor belonging to %s has been  
closed." %
-                                    (self._connection, ))
+        if self._connection is None:
+            raise cql.ProgrammingError("Cursor has been closed.")
=======================================
--- /cql/decoders.py	Thu Aug 30 22:47:27 2012
+++ /cql/decoders.py	Mon Aug 20 16:49:01 2012
@@ -61,7 +61,6 @@
          return description, name_info, column_types

      def decode_row(self, row, column_types=None):
-        schema = self.schema
          values = []
          if column_types is None:
              column_types = self.decode_metadata_and_types(row)[2]
@@ -74,3 +73,6 @@
              values.append(value)

          return values
+
+    def decode_metadata_and_types_native(self, row):
+        pass
=======================================
--- /cql/native.py	Sun Aug 19 11:08:00 2012
+++ /cql/native.py	Mon Aug 20 16:49:01 2012
@@ -14,8 +14,15 @@
  # See the License for the specific language governing permissions and
  # limitations under the License.

-from marshal import int32_pack, int32_unpack, uint16_pack, uint16_unpack
-from cqltypes import lookup_casstype
+import cql
+from cql.marshal import int32_pack, int32_unpack, uint16_pack,  
uint16_unpack
+from cql.cqltypes import lookup_cqltype
+from cql.connection import Connection
+from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
+from cql.apivalues import ProgrammingError, OperationalError
+from cql.query import PreparedQuery, prepare_query, cql_quote_name
+import socket
+import itertools
  try:
      from cStringIO import StringIO
  except ImportError:
@@ -82,12 +89,13 @@
          body = StringIO()
          self.send_body(body)
          body = body.getvalue()
-        write_byte(f, PROTOCOL_VERSION | HEADER_DIRECTION_FROM_CLIENT)
-        write_byte(f, 0) # no compression supported yet
-        write_byte(f, streamid)
-        write_byte(f, self.opcode)
-        write_int(f, len(body))
-        f.write(body)
+        version = PROTOCOL_VERSION | HEADER_DIRECTION_FROM_CLIENT
+        flags = 0 # no compression supported yet
+        msglen = int32_pack(len(body))
+        header = '%c%c%c%c%s' % (version, flags, streamid, self.opcode,  
msglen)
+        f.write(header)
+        if len(body) > 0:
+            f.write(body)

      def __str__(self):
          paramstrs = ['%s=%r' % (pname, getattr(self, pname)) for pname in  
self.params]
@@ -95,11 +103,9 @@
      __repr__ = __str__

  def read_frame(f):
-    version = read_byte(f)
-    flags = read_byte(f)
-    stream = read_byte(f)
-    opcode = read_byte(f)
-    body_len = read_int(f)
+    header = f.read(8)
+    version, flags, stream, opcode = map(ord, header[:4])
+    body_len = int32_unpack(header[4:])
      assert version & PROTOCOL_VERSION_MASK == PROTOCOL_VERSION, \
              "Unsupported CQL protocol version %d" % version
      assert version & HEADER_DIRECTION_MASK == HEADER_DIRECTION_TO_CLIENT, \
@@ -113,11 +119,6 @@
      msg.stream_id = stream
      return msg

-def do_request(f, msg):
-    msg.send(f, 0)
-    f.flush()
-    return read_frame(f)
-
  class ErrorMessage(_MessageType):
      opcode = 0x00
      name = 'ERROR'
@@ -139,9 +140,12 @@
          msg = read_string(f)
          return cls(code=code, message=msg)

-    def __str__(self):
-        return '<ErrorMessage code=%04x [%s] message=%r>' \
+    def summary(self):
+        return 'code=%04x [%s] message=%r' \
                 % (self.code, self.error_codes.get(self.code, '(Unknown)'),  
self.message)
+
+    def __str__(self):
+        return '<ErrorMessage %s>' % self.summary()
      __repr__ = __str__

  class StartupMessage(_MessageType):
@@ -152,7 +156,10 @@
      STARTUP_USE_COMPRESSION = 0x0001

      def send_body(self, f):
+        if isinstance(self.options, dict):
+            self.options = self.options.items()
          write_string(f, self.cqlversion)
+        write_short(f, len(self.options))
          for key, value in self.options:
              write_short(f, key)
              if key == STARTUP_USE_COMPRESSION:
@@ -161,8 +168,8 @@
                  # should be a safe guess
                  write_string(f, value)
              else:
-                raise NotImplemented("Startup option 0x%04x not known;  
can't send "
-                                     "value to server" % key)
+                raise NotImplementedError("Startup option 0x%04x not  
known; can't send "
+                                          "value to server" % key)

  class ReadyMessage(_MessageType):
      opcode = 0x02
@@ -279,7 +286,7 @@
      def recv_results_prepared(self, f):
          queryid = read_int(f)
          colspecs = cls.recv_results_metadata(f)
-        return PreparedResult(queryid, colspecs)
+        return (queryid, colspecs)

      @classmethod
      def recv_results_metadata(cls, f):
@@ -304,13 +311,16 @@

      @classmethod
      def read_type(cls, f):
-        # XXX: stubbed out. should really return more useful 'type'  
objects.
          optid = read_short(f)
-        cqltype = lookup_cqltype(cls.type_codes.get(optid))
-        if cqltypename in ('list', 'set'):
+        try:
+            cqltype = lookup_cqltype(cls.type_codes[optid])
+        except KeyError:
+            raise cql.NotSupportedError("Unknown data type code 0x%x. Have  
to skip"
+                                        " entire result set." % optid)
+        if cqltype.typename in ('list', 'set'):
              subtype = cls.read_type(f)
              cqltype = cqltype.apply_parameters(subtype)
-        elif cqltypename == 'map':
+        elif cqltype.typename == 'map':
              keysubtype = cls.read_type(f)
              valsubtype = cls.read_type(f)
              cqltype = cqltype.apply_parameters(keysubtype, valsubtype)
@@ -339,6 +349,54 @@
          for param in self.queryparams:
              write_value(f, param)

+class ModeChangeMessage(_MessageType):
+    opcode = 0x0B
+    name = 'MODE_CHANGE'
+    params = ('is_control',)
+
+    def send_body(self, f):
+        write_byte(f, bool(self.is_control))
+
+known_event_types = frozenset((
+    'topology_change',
+    'status_change',
+))
+
+class RegisterMessage(_MessageType):
+    opcode = 0x0C
+    name = 'REGISTER'
+    params = ('eventlist',)
+
+    def send_body(self, f):
+        write_stringlist(f, self.eventlist)
+
+class EventMessage(_MessageType):
+    opcode = 0x0D
+    name = 'EVENT'
+    params = ('eventtype', 'eventargs')
+
+    @classmethod
+    def recv_body(cls, f):
+        eventtype = read_string(f).lower()
+        if eventtype in known_event_types:
+            readmethod = getattr(cls, 'recv_' + eventtype)
+            return cls(eventtype=eventtype, eventargs=readmethod(f))
+        raise cql.NotSupportedError('Unknown event type %r' % eventtype)
+
+    @classmethod
+    def recv_topology_change(cls, f):
+        # "new_node" or "removed_node"
+        changetype = read_string(f)
+        address = read_inet(f)
+        return dict(changetype=changetype, address=address)
+
+    @classmethod
+    def recv_status_change(cls, f):
+        # "up" or "down"
+        changetype = read_string(f)
+        address = read_inet(f)
+        return dict(changetype=changetype, address=address)
+

  def read_byte(f):
      return ord(f.read(1))
@@ -402,12 +460,320 @@
          write_int(f, len(v))
          f.write(v)

-# won't work, unless the change from CASSANDRA-4539 is implemented
-#def read_option(f):
-#    optid = read_short(f)
-#    value = read_value(f)
-#    return (optid, value)
-#
-#def write_option(f, optid, value):
-#    write_short(f, optid)
-#    write_value(f, value)
+def read_inet(f):
+    size = read_byte(f)
+    addrbytes = f.read(size)
+    port = read_int(f)
+    if size == 4:
+        addrfam = socket.AF_INET
+    elif size == 16:
+        addrfam = socket.AF_INET6
+    else:
+        raise cql.InternalError("bad inet address: %r" % (addrbytes,))
+    return (socket.inet_ntop(addrfam, addrbytes), port)
+
+def write_inet(f, addrtuple):
+    addr, port = addrtuple
+    if ':' in addr:
+        addrfam = socket.AF_INET6
+    else:
+        addrfam = socket.AF_INET
+    addrbytes = socket.inet_pton(addrfam, addr)
+    write_byte(f, len(addrbytes))
+    f.write(addrbytes)
+    write_int(f, port)
+
+
+class FakeThriftRow:
+    def __init__(self, columns):
+        self.columns = columns
+
+class NativeCursor(Cursor):
+    def prepare_query(self, query):
+        pquery, paramnames = prepare_query(query)
+        prepared =  
self._connection.wait_for_request(PrepareMessage(query=pquery))
+        if isinstance(prepared, ErrorMessage):
+            raise cql.Error('Query preparation failed: %s' %  
prepared.summary())
+        if prepared.kind != ResultMessage.KIND_PREPARED:
+            raise cql.InternalError('Query preparation did not result in  
prepared query')
+        queryid, colspecs = prepared.results
+        kss, cfs, names, ctypes = zip(*colspecs)
+        return PreparedQuery(query, queryid, ctypes, paramnames)
+
+    def get_response(self, query):
+        return self._connection.wait_for_request(QueryMessage(query=query))
+
+    def get_response_prepared(self, prepared_query, params):
+        em = ExecuteMessage(queryid=prepared_query.itemid,  
queryparams=params)
+        return self._connection.wait_for_request(em)
+
+    def executemany(self, querylist, argslist):
+        pass
+
+    def translate_schema(self, metadata):
+        print "incoming metadata: %r" % (metadata,)
+        pass
+
+    def translate_row(self, row):
+        return FakeThriftRow(row)
+
+    def handle_cql_execution_errors(self, response):
+        if not isinstance(response, ErrorMessage):
+            return
+        try:
+            codemsg = response.error_codes[response.code]
+        except KeyError:
+            codemsg = '(Unknown error code %04x)' % response.code
+        if codemsg == 'Schema disagreement exception':
+            eclass = cql.IntegrityError
+        elif codemsg == 'Authentication error':
+            eclass = cql.NotAuthenticated
+        elif codemsg == ('Unavailable exception', 'Timeout exception'):
+            eclass = cql.OperationalError
+        elif codemsg == 'Request exception':
+            eclass = cql.ProgrammingError
+        else:
+            eclass = cql.InternalError
+        raise eclass('%s: %s' % (codemsg, response.msg))
+
+    error_codes = {
+        0x0000: 'Server error',
+        0x0001: 'Protocol error',
+        0x0002: 'Authentication error',
+        0x0100: 'Unavailable exception',
+        0x0101: 'Timeout exception',
+        0x0102: 'Schema disagreement exception',
+        0x0200: 'Request exception',
+    }
+
+    def process_execution_results(self, response, decoder=None):
+        self.handle_cql_execution_errors(response)
+        if not isinstance(response, ResultMessage):
+            raise cql.InternalError('Query execution resulted in %s!?' %  
(response,))
+        if response.kind == ResultMessage.KIND_PREPARED:
+            raise cql.InternalError('Query execution resulted in prepared  
query!?')
+
+        self.rs_idx = 0
+        self.description = None
+        self.result = []
+        self.name_info = ()
+
+        if response.kind == ResultMessage.KIND_VOID:
+            self.description = _VOID_DESCRIPTION
+        elif response.kind == ResultMessage.KIND_SET_KS:
+            self._connection.keyspace_changed(response.results)
+            self.description = _VOID_DESCRIPTION
+        elif response.kind == ResultMessage.KIND_ROWS:
+            schema =  
self.translate_schema(response.results.column_metadata)
+            self.decoder = (decoder or self.default_decoder)(schema)
+            self.result = map(self.translate_row, response.results.rows)
+            if self.result:
+                self.get_metadata_info(self.result[0])
+        else:
+            raise Exception('unknown response kind %s: %s' %  
(response.kind, response))
+        self.rowcount = len(self.result)
+
+    def get_compression(self):
+        return None
+
+    def set_compression(self, val):
+        if val is not None:
+            raise NotImplementedError("Setting per-cursor compression is  
not "
+                                      "supported in NativeCursor.")
+
+    compression = property(get_compression, set_compression)
+
+class debugsock:
+    def __init__(self, sock):
+        self.sock = sock
+
+    def write(self, data):
+        print '[sending %r]' % (data,)
+        self.sock.send(data)
+
+    def read(self, readlen):
+        data = ''
+        while readlen > 0:
+            add = self.sock.recv(readlen)
+            print '[received %r]' % (add,)
+            if add == '':
+                raise cql.InternalError("short read of %s bytes (%s  
expected)"
+                                        % (len(data), len(data) + readlen))
+            data += add
+            readlen -= len(add)
+        return data
+
+    def close(self):
+        pass
+
+class NativeConnection(Connection):
+    cursorclass = NativeCursor
+
+    def __init__(self, *args, **kwargs):
+        self.make_reqid = itertools.count().next
+        self.responses = {}
+        self.waiting = {}
+        self.conn_ready = False
+        Connection.__init__(self, *args, **kwargs)
+
+    def establish_connection(self):
+        self.conn_ready = False
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect((self.host, self.port))
+        #self.socketf = s.makefile(bufsize=0)
+        self.socketf = debugsock(s)
+        self.sockfd = s
+        self.open_socket = True
+        supported = self.wait_for_request(OptionsMessage())
+        self.supported_cql_versions = supported.cql_versions
+        self.supported_compressions = supported.compressions
+
+        if self.cql_version:
+            if self.cql_version not in self.supported_cql_versions:
+                raise ProgrammingError("cql_version %r is not supported by"
+                                       " remote (w/ native protocol).  
Supported"
+                                       " versions: %r"
+                                       % (self.cql_version,  
self.supported_cql_versions))
+        else:
+            self.cql_version = self.supported_cql_versions[0]
+
+        opts = {}
+        if self.compression:
+            if self.compression not in self.supported_compressions:
+                raise ProgrammingError("Compression type %r is not  
supported by"
+                                       " remote. Supported compression  
types: %r"
+                                       % (self.compression,  
self.supported_compressions))
+            # XXX: Remove this once snappy compression is supported
+            raise NotImplementedError("CQL driver does not yet support  
compression")
+            opts[StartupMessage.STARTUP_USE_COMPRESSION] = self.compression
+
+        sm = StartupMessage(cqlversion=self.cql_version, options=opts)
+        startup_response = self.wait_for_request(sm)
+        while True:
+            if isinstance(startup_response, ReadyMessage):
+                self.conn_ready = True
+                break
+            if isinstance(startup_response, AuthenticateMessage):
+                self.authenticator = startup_response.authenticator
+                if self.credentials is None:
+                    raise ProgrammingError('Remote end requires  
authentication.')
+                cm = CredentialsMessage(creds=self.credentials)
+                startup_response = self.wait_for_request(cm)
+            elif isinstance(startup_response, ErrorMessage):
+                raise ProgrammingError("Server did not accept  
credentials. %s"
+                                       % startup_response.summary())
+            else:
+                raise cql.InternalError("Unexpected response %r during  
connection setup"
+                                        % startup_response)
+
+        if self.keyspace:
+            self.set_initial_keyspace(self.keyspace)
+
+    def set_initial_keyspace(self, keyspace):
+        c = self.cursor()
+        c.execute('USE %s' % cql_quote_name(self.keyspace))
+        c.close()
+
+    def terminate_conn(self):
+        self.socketf.close()
+        self.sockfd.close()
+
+    def wait_for_request(self, msg):
+        """
+        Given a message, send it to the server, wait for a response, and
+        return the response.
+        """
+
+        return self.wait_for_requests(msg)[0]
+
+    def wait_for_requests(self, *msgs):
+        """
+        Given any number of message objects, send them all to the server
+        and wait for responses to each one. Once they arrive, return all
+        of the responses in the same order as the messages to which they
+        respond.
+        """
+
+        reqids = []
+        for msg in msgs:
+            reqid = self.make_reqid()
+            reqids.append(reqid)
+            msg.send(self.socketf, reqid)
+        resultdict = self.wait_for_results(*reqids)
+        return [resultdict[reqid] for reqid in reqids]
+
+    def wait_for_results(self, *reqids):
+        """
+        Given any number of stream-ids, wait until responses have arrived  
for
+        each one, and return a dictionary mapping the stream-ids to the
+        appropriate results.
+        """
+
+        waiting_for = set(reqids)
+        results = {}
+        for r in reqids:
+            try:
+                result = self.responses.pop(r)
+            except KeyError:
+                pass
+            else:
+                results[r] = result
+                waiting_for.remove(r)
+        while waiting_for:
+            newmsg = read_frame(self.socketf)
+            if newmsg.stream_id in waiting_for:
+                results[newmsg.stream_id] = newmsg
+                waiting_for.remove(newmsg.stream_id)
+            else:
+                self.handle_incoming(newmsg)
+        return results
+
+    def wait_for_result(self, reqid):
+        """
+        Given a stream-id, wait until a response arrives with that  
stream-id,
+        and return the msg.
+        """
+
+        return self.wait_for_results(reqid)[reqid]
+
+    def handle_incoming(self, msg):
+        if msg.stream_id < 0:
+            self.handle_pushed(msg)
+            return
+        try:
+            cb = self.waiting.pop(msg.stream_id)
+        except KeyError:
+            self.responses[msg.stream_id] = msg
+        else:
+            cb(msg)
+
+    def callback_when(self, reqid, cb):
+        """
+        Callback cb with a message object once a message with a stream-id
+        of reqid is received. The callback may be immediate, if a response
+        is already in the received queue.
+
+        Otherwise, note also that the callback may not be called  
immediately
+        upon the arrival of the response packet; it may have to wait until
+        something else waits on a result.
+        """
+
+        try:
+            msg = self.responses.pop(reqid)
+        except KeyError:
+            pass
+        else:
+            return cb(msg)
+        self.waiting[reqid] = cb
+
+    def request_and_callback(self, msg, cb):
+        """
+        Given a message msg and a callable cb, send the message to the  
server
+        and call cb with the result once it arrives. Note that the callback
+        may not be called immediately upon the arrival of the response  
packet;
+        it may have to wait until something else waits on a result.
+        """
+
+        reqid = self.make_reqid()
+        msg.send(self.socketf, reqid)
+        self.callback_when(reqid, cb)

==============================================================================
Revision: dcdd3d2c693e
Author:   paul cannon <pa...@datastax.com>
Date:     Thu Sep  6 17:54:49 2012
Log:      MODE_CHANGE message no longer in CASSANDRA-4480

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=dcdd3d2c693e

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Mon Aug 20 16:49:01 2012
+++ /cql/native.py	Thu Sep  6 17:54:49 2012
@@ -349,21 +349,13 @@
          for param in self.queryparams:
              write_value(f, param)

-class ModeChangeMessage(_MessageType):
-    opcode = 0x0B
-    name = 'MODE_CHANGE'
-    params = ('is_control',)
-
-    def send_body(self, f):
-        write_byte(f, bool(self.is_control))
-
  known_event_types = frozenset((
-    'topology_change',
-    'status_change',
+    'TOPOLOGY_CHANGE',
+    'STATUS_CHANGE',
  ))

  class RegisterMessage(_MessageType):
-    opcode = 0x0C
+    opcode = 0x0B
      name = 'REGISTER'
      params = ('eventlist',)

@@ -371,28 +363,28 @@
          write_stringlist(f, self.eventlist)

  class EventMessage(_MessageType):
-    opcode = 0x0D
+    opcode = 0x0C
      name = 'EVENT'
      params = ('eventtype', 'eventargs')

      @classmethod
      def recv_body(cls, f):
-        eventtype = read_string(f).lower()
+        eventtype = read_string(f).upper()
          if eventtype in known_event_types:
-            readmethod = getattr(cls, 'recv_' + eventtype)
+            readmethod = getattr(cls, 'recv_' + eventtype.lower())
              return cls(eventtype=eventtype, eventargs=readmethod(f))
          raise cql.NotSupportedError('Unknown event type %r' % eventtype)

      @classmethod
      def recv_topology_change(cls, f):
-        # "new_node" or "removed_node"
+        # "NEW_NODE" or "REMOVED_NODE"
          changetype = read_string(f)
          address = read_inet(f)
          return dict(changetype=changetype, address=address)

      @classmethod
      def recv_status_change(cls, f):
-        # "up" or "down"
+        # "UP" or "DOWN"
          changetype = read_string(f)
          address = read_inet(f)
          return dict(changetype=changetype, address=address)

==============================================================================
Revision: 2e02819ac520
Author:   paul cannon <pa...@datastax.com>
Date:     Thu Sep  6 18:27:39 2012
Log:      Update STARTUP/SUPPORTED msgs for CASSANDRA-4539

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2e02819ac520

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Thu Sep  6 17:54:49 2012
+++ /cql/native.py	Thu Sep  6 18:27:39 2012
@@ -153,23 +153,15 @@
      name = 'STARTUP'
      params = ('cqlversion', 'options')

-    STARTUP_USE_COMPRESSION = 0x0001
+    KNOWN_OPTION_KEYS = set((
+        'CQL_VERSION',
+        'COMPRESSION',
+    ))

      def send_body(self, f):
-        if isinstance(self.options, dict):
-            self.options = self.options.items()
-        write_string(f, self.cqlversion)
-        write_short(f, len(self.options))
-        for key, value in self.options:
-            write_short(f, key)
-            if key == STARTUP_USE_COMPRESSION:
-                write_string(f, value)
-            elif isinstance(value, str): # not unicode
-                # should be a safe guess
-                write_string(f, value)
-            else:
-                raise NotImplementedError("Startup option 0x%04x not  
known; can't send "
-                                          "value to server" % key)
+        optmap = self.options.copy()
+        optmap['CQL_VERSION'] = self.cqlversion
+        write_stringmap(f, optmap)

  class ReadyMessage(_MessageType):
      opcode = 0x02
@@ -212,13 +204,13 @@
  class SupportedMessage(_MessageType):
      opcode = 0x06
      name = 'SUPPORTED'
-    params = ('cql_versions', 'compressions')
+    params = ('cqlversions', 'options',)

      @classmethod
      def recv_body(cls, f):
-        cqlvers = read_stringlist(f)
-        compressions = read_stringlist(f)
-        return cls(cql_versions=cqlvers, compressions=compressions)
+        options = read_stringmultimap(f)
+        cqlversions = options.pop('CQL_VERSION')
+        return cls(cqlversions=cqlversions, options=options)

  class QueryMessage(_MessageType):
      opcode = 0x07
@@ -439,6 +431,34 @@
      for s in stringlist:
          write_string(f, s)

+def read_stringmap(f):
+    numpairs = read_short(f)
+    strmap = {}
+    for x in xrange(numpairs):
+        k = read_string(f)
+        strmap[k] = read_string(f)
+    return strmap
+
+def write_stringmap(f, strmap):
+    write_short(f, len(strmap))
+    for k, v in strmap.items():
+        write_string(f, k)
+        write_string(f, v)
+
+def read_stringmultimap(f):
+    numkeys = read_short(f)
+    strmmap = {}
+    for x in xrange(numkeys):
+        k = read_string(f)
+        strmmap[k] = read_stringlist(f)
+    return strmmap
+
+def write_stringmultimap(f, strmmap):
+    write_short(f, len(strmmap))
+    for k, v in strmmap.items():
+        write_string(f, k)
+        write_stringlist(f, v)
+
  def read_value(f):
      size = read_int(f)
      if size < 0:
@@ -617,8 +637,8 @@
          self.sockfd = s
          self.open_socket = True
          supported = self.wait_for_request(OptionsMessage())
-        self.supported_cql_versions = supported.cql_versions
-        self.supported_compressions = supported.compressions
+        self.supported_cql_versions = supported.cqlversions
+        self.supported_compressions = supported.options['COMPRESSION']

          if self.cql_version:
              if self.cql_version not in self.supported_cql_versions:
@@ -635,9 +655,9 @@
                  raise ProgrammingError("Compression type %r is not  
supported by"
                                         " remote. Supported compression  
types: %r"
                                         % (self.compression,  
self.supported_compressions))
-            # XXX: Remove this once snappy compression is supported
+            # XXX: Remove this once some compressions are supported
              raise NotImplementedError("CQL driver does not yet support  
compression")
-            opts[StartupMessage.STARTUP_USE_COMPRESSION] = self.compression
+            opts['COMPRESSION'] = self.compression

          sm = StartupMessage(cqlversion=self.cql_version, options=opts)
          startup_response = self.wait_for_request(sm)

==============================================================================
Revision: 8433e2abdf4c
Author:   paul cannon <pa...@datastax.com>
Date:     Sun Sep  9 23:22:55 2012
Log:      support new native protocol error codes

along with the extra info per message. still need to udpate the error
handling bit

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8433e2abdf4c

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Thu Sep  6 18:27:39 2012
+++ /cql/native.py	Sun Sep  9 23:22:55 2012
@@ -118,35 +118,129 @@
      msg = msgclass.recv_body(StringIO(body))
      msg.stream_id = stream
      return msg
+
+error_classes = {}

  class ErrorMessage(_MessageType):
      opcode = 0x00
      name = 'ERROR'
-    params = ('code', 'message')
-
-    error_codes = {
-        0x0000: 'Server error',
-        0x0001: 'Protocol error',
-        0x0002: 'Authentication error',
-        0x0100: 'Unavailable exception',
-        0x0101: 'Timeout exception',
-        0x0102: 'Schema disagreement exception',
-        0x0200: 'Request exception',
-    }
+    params = ('code', 'message', 'info')
+    summary = 'Unknown'

      @classmethod
      def recv_body(cls, f):
          code = read_int(f)
          msg = read_string(f)
-        return cls(code=code, message=msg)
+        subcls = error_classes.get(code, cls)
+        extra_info = subcls.recv_error_info(f)
+        return subcls(code=code, message=msg, info=extra_info)

-    def summary(self):
-        return 'code=%04x [%s] message=%r' \
-               % (self.code, self.error_codes.get(self.code, '(Unknown)'),  
self.message)
+    def summarymsg(self):
+        msg = 'code=%04x [%s] message="%s"' \
+              % (self.code, self.summary, self.message)
+        if self.info is not None:
+            msg += (' ' + self.info)
+        return msg

      def __str__(self):
-        return '<ErrorMessage %s>' % self.summary()
+        return '<ErrorMessage %s>' % self.summarymsg()
      __repr__ = __str__
+
+    @staticmethod
+    def recv_error_info(f):
+        pass
+
+class ErrorMessageSubclass(_register_msg_type):
+    def __init__(cls, name, bases, dct):
+        if not name.startswith('_'):
+            error_classes[cls.errorcode] = cls
+
+class _ErrorMessageSub(ErrorMessage):
+    __metaclass__ = ErrorMessageSubclass
+
+class ServerErrorMessage(_ErrorMessageSub):
+    summary = 'Server error'
+    errorcode = 0x0000
+
+class ProtocolErrorMessage(_ErrorMessageSub):
+    summary = 'Protocol error'
+    errorcode = 0x000A
+
+class UnavailableExceptionErrorMessage(_ErrorMessageSub):
+    summary = 'Unavailable exception'
+    errorcode = 0x1000
+
+    @staticmethod
+    def recv_error_info(f):
+        return {
+            'consistencylevel': read_string(f),
+            'required': read_int(f),
+            'alive': read_int(f),
+        }
+
+class OverloadedErrorMessage(_ErrorMessageSub):
+    summary = 'Coordinator node overloaded'
+    errorcode = 0x1001
+
+class IsBootstrappingErrorMessage(_ErrorMessageSub):
+    summary = 'Coordinator node is bootstrapping'
+    errorcode = 0x1002
+
+class TruncateErrorMessage(_ErrorMessageSub):
+    summary = 'Error during truncate'
+    errorcode = 0x1003
+
+class WriteTimeoutErrorMessage(_ErrorMessageSub):
+    summary = 'Timeout during write request'
+    errorcode = 0x1100
+
+    @staticmethod
+    def recv_error_info(f):
+        return {
+            'consistencylevel': read_string(f),
+            'received': read_int(f),
+            'blockfor': read_int(f),
+        }
+
+class ReadTimeoutErrorMessage(_ErrorMessageSub):
+    summary = 'Timeout during read request'
+    errorcode = 0x1200
+
+    @staticmethod
+    def recv_error_info(f):
+        return {
+            'consistencylevel': read_string(f),
+            'received': read_int(f),
+            'blockfor': read_int(f),
+            'data_present': bool(read_byte(f)),
+        }
+
+class SyntaxErrorErrorMessage(_ErrorMessageSub):
+    summary = 'Syntax error in CQL query'
+    errorcode = 0x2000
+
+class UnauthorizedErrorMessage(_ErrorMessageSub):
+    summary = 'Unauthorized'
+    errorcode = 0x2100
+
+class InvalidQueryErrorMessage(_ErrorMessageSub):
+    summary = 'Invalid query'
+    errorcode = 0x2200
+
+class BadConfigErrorMessage(_ErrorMessageSub):
+    summary = 'Query invalid because of configuration issue'
+    errorcode = 0x2300
+
+class AlreadyExistsErrorMessage(_ErrorMessageSub):
+    summary = 'Item already exists'
+    errorcode = 0x2400
+
+    @staticmethod
+    def recv_error_info(f):
+        return {
+            'keyspace': read_string(f),
+            'table': read_string(f),
+        }

  class StartupMessage(_MessageType):
      opcode = 0x01
@@ -505,7 +599,7 @@
          pquery, paramnames = prepare_query(query)
          prepared =  
self._connection.wait_for_request(PrepareMessage(query=pquery))
          if isinstance(prepared, ErrorMessage):
-            raise cql.Error('Query preparation failed: %s' %  
prepared.summary())
+            raise cql.Error('Query preparation failed: %s' %  
prepared.summarymsg())
          if prepared.kind != ResultMessage.KIND_PREPARED:
              raise cql.InternalError('Query preparation did not result in  
prepared query')
          queryid, colspecs = prepared.results
@@ -673,7 +767,7 @@
                  startup_response = self.wait_for_request(cm)
              elif isinstance(startup_response, ErrorMessage):
                  raise ProgrammingError("Server did not accept  
credentials. %s"
-                                       % startup_response.summary())
+                                       % startup_response.summarymsg())
              else:
                  raise cql.InternalError("Unexpected response %r during  
connection setup"
                                          % startup_response)

==============================================================================
Revision: 377853dfc0f1
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 15:53:29 2012
Log:      fix up new thrifteries module

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=377853dfc0f1

Modified:
  /cql/connection.py
  /cql/thrifteries.py

=======================================
--- /cql/connection.py	Mon Aug 20 16:49:01 2012
+++ /cql/connection.py	Tue Sep 11 15:53:29 2012
@@ -91,6 +91,6 @@
          from native import NativeConnection
          connclass = NativeConnection
      else:
-        from thriftconnection import ThriftConnection
+        from thrifteries import ThriftConnection
          connclass = ThriftConnection
      return connclass(host, port, keyspace, user, password, cql_version)
=======================================
--- /cql/thrifteries.py	Mon Aug 20 16:49:01 2012
+++ /cql/thrifteries.py	Tue Sep 11 15:53:29 2012
@@ -15,6 +15,7 @@
  # limitations under the License.

  import zlib
+import cql
  from cql.cursor import Cursor, _VOID_DESCRIPTION, _COUNT_DESCRIPTION
  from cql.query import cql_quote, cql_quote_name, prepare_query,  
PreparedQuery
  from cql.connection import Connection
@@ -41,7 +42,7 @@
              compressed_q = zlib.compress(querytext)
          else:
              compressed_q = querytext
-        req_compression = getattr(Compression, self.compression)
+        req_compression = getattr(Compression, self.compression or 'NONE')
          return compressed_q, req_compression

      def prepare_query(self, query):
@@ -85,7 +86,7 @@
      def process_execution_results(self, response, decoder=None):
          if response.type == CqlResultType.ROWS:
              self.decoder = (decoder or  
self.default_decoder)(response.schema)
-            self.result = response.rows
+            self.result = [r.columns for r in response.rows]
              self.rs_idx = 0
              self.rowcount = len(self.result)
              if self.result:
@@ -124,11 +125,11 @@

          self.remote_thrift_version = tuple(map(int,  
self.client.describe_version().split('.')))

-        if cql_version:
-            self.set_cql_version(cql_version)
+        if self.cql_version:
+            self.set_cql_version(self.cql_version)

-        if keyspace:
-            self.set_initial_keyspace(keyspace)
+        if self.keyspace:
+            self.set_initial_keyspace(self.keyspace)

      def set_cql_version(self, cql_version):
          self.client.set_cql_version(cql_version)

==============================================================================
Revision: c1e68422f9a7
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 15:57:31 2012
Log:      update tests to treat Murmur3Partitioner as random

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=c1e68422f9a7

Modified:
  /test/test_cql.py

=======================================
--- /test/test_cql.py	Thu Aug 30 22:28:30 2012
+++ /test/test_cql.py	Tue Sep 11 15:57:31 2012
@@ -19,7 +19,7 @@
  # PYTHONPATH=test nosetests --tests=test_cql:TestCql.test_column_count

  # Note that some tests will be skipped if run against a cluster with
-# RandomPartitioner.
+# random partitioning semantics.

  # to configure behavior, define $CQL_TEST_HOST to the destination address
  # for Thrift connections, and $CQL_TEST_PORT to the associated port.
@@ -42,6 +42,8 @@
  from cql.cassandra import Cassandra
  from cql.cqltypes import AsciiType, UUIDType

+RANDOM_PARTITIONERS = ('RandomPartitioner', 'Murmur3Partitioner')
+
  def get_thrift_client(host=TEST_HOST, port=TEST_PORT):
      socket = TSocket.TSocket(host, port)
      transport = TTransport.TFramedTransport(socket)
@@ -188,6 +190,15 @@
      def assertIsSubclass(self, class_a, class_b):
          assert issubclass(class_a, class_b), '%r is not a subclass  
of %r' % (class_a, class_b)

+    def check_ordered_partitioner(self, msg="Key ranges don't make sense  
under RP"):
+        if self.get_partitioner().split('.')[-1] in RANDOM_PARTITIONERS:
+            # skipTest is Python >= 2.7
+            if hasattr(self, 'skipTest'):
+                self.skipTest(msg)
+            return False
+        return True
+
+

      def test_select_simple(self):
          "single-row named column queries"
@@ -217,12 +228,8 @@
      def test_select_row_range(self):
          "retrieve a range of rows with columns"

-        if self.get_partitioner().split('.')[-1] == 'RandomPartitioner':
-            # skipTest is >= Python 2.7
-            if hasattr(self, 'skipTest'):
-                self.skipTest("Key ranges don't make sense under RP")
-            else: return None
-
+        if not self.check_ordered_partitioner():
+            return
          # everything
          cursor = self.cursor
          cursor.execute("SELECT * FROM StandardLongA")
@@ -381,12 +388,8 @@
      def test_index_scan_with_start_key(self):
          "indexed scan with a starting key"

-        if self.get_partitioner().split('.')[-1] == 'RandomPartitioner':
-            # skipTest is Python >= 2.7
-            if hasattr(self, 'skipTest'):
-                self.skipTest("Key ranges don't make sense under RP")
-            else: return None
-
+        if not self.check_ordered_partitioner():
+            return
          cursor = self.cursor
          cursor.execute("""
              SELECT KEY, 'birthdate' FROM IndexedA
@@ -398,6 +401,9 @@

      def test_no_where_clause(self):
          "empty where clause (range query w/o start key)"
+
+        if not self.check_ordered_partitioner():
+            return
          cursor = self.cursor
          cursor.execute("SELECT KEY, 'col' FROM StandardString1 LIMIT 3")
          self.assertEqual(cursor.rowcount, 3)

==============================================================================
Revision: e52231f5954d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 16:26:28 2012
Log:      kill outdated test_regex test

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e52231f5954d

Deleted:
  /test/test_regex.py

=======================================
--- /test/test_regex.py	Tue Dec 27 14:55:01 2011
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from cql.cursor import Cursor
-
-class TestRegex(unittest.TestCase):
-
-    def single_match(self, match, string):
-        groups = match.groups()
-        self.assertEquals(groups, (string, ))
-
-    def test_cfamily_regex(self):
-        cf_re = Cursor._cfamily_re
-
-        m = cf_re.match("SELECT key FROM column_family WHERE key = 'foo'")
-        self.single_match(m, "column_family")
-
-        m = cf_re.match("SELECT key FROM 'column_family' WHERE key  
= 'foo'")
-        self.single_match(m, "column_family")
-
-        m = cf_re.match("SELECT key FROM column_family WHERE key = 'break  
from chores'")
-        self.single_match(m, "column_family")
-
-        m = cf_re.match("SELECT key FROM 'from_cf' WHERE key = 'break from  
chores'")
-        self.single_match(m, "from_cf")
-
-        m = cf_re.match("SELECT '\nkey' FROM 'column_family' WHERE key  
= 'break \nfrom chores'")
-        self.single_match(m, "column_family")

==============================================================================
Revision: 693860ff1d3d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 16:26:59 2012
Log:      update cql3-related tests for new option syntax

post CASSANDRA-4497

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=693860ff1d3d

Modified:
  /test/test_prepared_queries.py

=======================================
--- /test/test_prepared_queries.py	Sun Aug 19 13:16:40 2012
+++ /test/test_prepared_queries.py	Tue Sep 11 16:26:59 2012
@@ -31,6 +31,7 @@

  import cql

+MIN_THRIFT_FOR_CQL_3_0_0_FINAL = (19, 33, 0)

  class TestPreparedQueries(unittest.TestCase):
      cursor = None
@@ -57,10 +58,15 @@

      def create_schema(self):
          ksname = 'CqlDriverTest_%d' % random.randrange(0x100000000)
-        self.cursor.execute("""create keyspace %s
-                                 with strategy_class='SimpleStrategy'
-                                 and  
strategy_options:replication_factor=1"""
-                            % ksname)
+        if self.dbconn.remote_thrift_version >=  
MIN_THRIFT_FOR_CQL_3_0_0_FINAL:
+            create_ks = """create keyspace %s
+                             with replication = {'class': 'SimpleStrategy',
+                                                 'replication_factor':  
1};"""
+        else:
+            create_ks = """create keyspace %s
+                             with strategy_class='SimpleStrategy'
+                             and strategy_options:replication_factor=1"""
+        self.cursor.execute(create_ks % ksname)
          self.cursor.execute('use %s' % ksname)
          self.cursor.execute("""create columnfamily abc (thekey timestamp  
primary key,
                                                          theint int,

==============================================================================
Revision: 0de872e76b44
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 17:31:33 2012
Log:      separate value decoding for native/thrift

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0de872e76b44

Modified:
  /cql/connection.py
  /cql/cursor.py
  /cql/decoders.py
  /cql/native.py
  /cql/thrifteries.py

=======================================
--- /cql/connection.py	Tue Sep 11 15:53:29 2012
+++ /cql/connection.py	Tue Sep 11 17:31:33 2012
@@ -81,9 +81,6 @@
          curs.compression = self.compression
          return curs

-class NativeConnection(Connection):
-    pass
-
  # TODO: Pull connections out of a pool instead.
  def connect(host, port=9160, keyspace=None, user=None, password=None,
              cql_version=None, native=False):
=======================================
--- /cql/cursor.py	Mon Aug 20 16:49:01 2012
+++ /cql/cursor.py	Tue Sep 11 17:31:33 2012
@@ -69,6 +69,8 @@
                                         "%s not given for %r" % (e, query))

      def execute(self, cql_query, params={}, decoder=None):
+        # note that 'decoder' here is actually the decoder class, not the
+        # instance to be used for decoding. bad naming, but it's in use  
now.
          if isinstance(cql_query, unicode):
              raise ValueError("CQL query must be bytes, not unicode")
          self.pre_execution_setup()
@@ -77,13 +79,32 @@
          return self.process_execution_results(response, decoder=decoder)

      def execute_prepared(self, prepared_query, params={}, decoder=None):
+        # note that 'decoder' here is actually the decoder class, not the
+        # instance to be used for decoding. bad naming, but it's in use  
now.
          self.pre_execution_setup()
          response = self.get_response_prepared(prepared_query, params)
          return self.process_execution_results(response, decoder=decoder)

      def get_metadata_info(self, row):
-        self.description, self.name_info, self.column_types = \
-                self.decoder.decode_metadata_and_types(row)
+        self.description = description = []
+        self.name_info = name_info = []
+        self.column_types = column_types = []
+        for colid in self.columninfo(row):
+            name, nbytes, vtype, ctype = self.get_column_metadata(colid)
+            column_types.append(vtype)
+            description.append((name, vtype.cass_parameterized_type(),
+                                None, None, None, None, True))
+            name_info.append((nbytes, ctype))
+
+    def get_column_metadata(self, column_id):
+        return self.decoder.decode_metadata_and_type(column_id)
+
+    def decode_row(self, row):
+        values = []
+        bytevals = self.columnvalues(row)
+        for val, vtype, nameinfo in zip(bytevals, self.column_types,  
self.name_info):
+            values.append(self.decoder.decode_value(val, vtype,  
nameinfo[0]))
+        return values

      def fetchone(self):
          self.__checksock()
@@ -98,7 +119,7 @@
              if self.cql_major_version < 3:
                  # (don't bother redecoding descriptions or names otherwise)
                  self.get_metadata_info(row)
-            return self.decoder.decode_row(row, self.column_types)
+            return self.decode_row(row)

      def fetchmany(self, size=None):
          self.__checksock()
@@ -109,7 +130,7 @@
          while len(L) < size and self.rs_idx < len(self.result):
              row = self.result[self.rs_idx]
              self.rs_idx += 1
-            L.append(self.decoder.decode_row(row, self.column_types))
+            L.append(self.decode_row(row))
          return L

      def fetchall(self):
=======================================
--- /cql/decoders.py	Mon Aug 20 16:49:01 2012
+++ /cql/decoders.py	Tue Sep 11 17:31:33 2012
@@ -32,47 +32,28 @@
          raise ProgrammingError("value %r (in col %r) can't be deserialized  
as %s: %s"
                                 % (valuebytes, namebytes, expectedtype,  
err))

-    def decode_description(self, row):
-        return self.decode_metadata(row)[0]
-
-    def decode_metadata(self, row):
-        return self.decode_metadata_and_types(row)[:2]
-
-    def decode_metadata_and_types(self, row):
+    def decode_metadata_and_type(self, namebytes):
          schema = self.schema
-        description = []
-        column_types = []
-        name_info = []
-        for column in row.columns:
-            namebytes = column.name
-            comparator = schema.name_types.get(namebytes,  
schema.default_name_type)
-            comptype = cqltypes.lookup_casstype(comparator)
-            validator = schema.value_types.get(namebytes,  
schema.default_value_type)
-            valdtype = cqltypes.lookup_casstype(validator)
+        comparator = schema.name_types.get(namebytes,  
schema.default_name_type)
+        comptype = cqltypes.lookup_casstype(comparator)
+        validator = schema.value_types.get(namebytes,  
schema.default_value_type)
+        valdtype = cqltypes.lookup_casstype(validator)

-            try:
-                name = comptype.from_binary(namebytes)
-            except Exception, e:
-                name = self.name_decode_error(e, namebytes, comparator)
-            column_types.append(valdtype)
-            description.append((name, validator, None, None, None, None,  
True))
-            name_info.append((namebytes, comptype))
+        try:
+            name = comptype.from_binary(namebytes)
+        except Exception, e:
+            name = self.name_decode_error(e, namebytes,  
comptype.cql_parameterized_type())

-        return description, name_info, column_types
+        return name, namebytes, valdtype, comptype

-    def decode_row(self, row, column_types=None):
-        values = []
-        if column_types is None:
-            column_types = self.decode_metadata_and_types(row)[2]
-        for (column, vtype) in zip(row.columns, column_types):
-            try:
-                value = vtype.from_binary(column.value)
-            except Exception, e:
-                value = self.value_decode_error(e, column.name,  
column.value,
-                                                 
vtype.cass_parameterized_type(full=True))
-            values.append(value)
+    def decode_value(self, valbytes, vtype, colname):
+        try:
+            value = vtype.from_binary(valbytes)
+        except Exception, e:
+            value = self.value_decode_error(e, colname, valbytes,
+                                            vtype.cql_parameterized_type())
+        return value

-        return values
-
-    def decode_metadata_and_types_native(self, row):
-        pass
+    def decode_metadata_and_type_native(self, colid):
+        ks, cf, colname, vtype = self.schema[colid]
+        return colname, colname, vtype, 'UTF8Type'
=======================================
--- /cql/native.py	Sun Sep  9 23:22:55 2012
+++ /cql/native.py	Tue Sep 11 17:31:33 2012
@@ -590,10 +590,6 @@
      write_int(f, port)


-class FakeThriftRow:
-    def __init__(self, columns):
-        self.columns = columns
-
  class NativeCursor(Cursor):
      def prepare_query(self, query):
          pquery, paramnames = prepare_query(query)
@@ -616,12 +612,14 @@
      def executemany(self, querylist, argslist):
          pass

-    def translate_schema(self, metadata):
-        print "incoming metadata: %r" % (metadata,)
-        pass
+    def get_column_metadata(self, column_id):
+        return self.decoder.decode_metadata_and_type_native(column_id)

-    def translate_row(self, row):
-        return FakeThriftRow(row)
+    def columninfo(self, row):
+        return xrange(len(row))
+
+    def columnvalues(self, row):
+        return row

      def handle_cql_execution_errors(self, response):
          if not isinstance(response, ErrorMessage):
@@ -670,9 +668,9 @@
              self._connection.keyspace_changed(response.results)
              self.description = _VOID_DESCRIPTION
          elif response.kind == ResultMessage.KIND_ROWS:
-            schema =  
self.translate_schema(response.results.column_metadata)
+            schema = response.results.column_metadata
              self.decoder = (decoder or self.default_decoder)(schema)
-            self.result = map(self.translate_row, response.results.rows)
+            self.result = response.results.rows
              if self.result:
                  self.get_metadata_info(self.result[0])
          else:
@@ -780,7 +778,7 @@
          c.execute('USE %s' % cql_quote_name(self.keyspace))
          c.close()

-    def terminate_conn(self):
+    def terminate_connection(self):
          self.socketf.close()
          self.sockfd.close()

=======================================
--- /cql/thrifteries.py	Tue Sep 11 15:53:29 2012
+++ /cql/thrifteries.py	Tue Sep 11 17:31:33 2012
@@ -110,6 +110,12 @@
          # 'Return values are not defined.'
          return True

+    def columnvalues(self, row):
+        return [column.value for column in row]
+
+    def columninfo(self, row):
+        return (column.name for column in row)
+
  class ThriftConnection(Connection):
      cursorclass = ThriftCursor

@@ -121,7 +127,7 @@
          socket.open()

          if self.credentials:
-             
self.client.login(AuthenticationRequest(credentials=credentials))
+             
self.client.login(AuthenticationRequest(credentials=self.credentials))

          self.remote_thrift_version = tuple(map(int,  
self.client.describe_version().split('.')))

@@ -147,5 +153,5 @@
          c.execute('USE %s' % ksname)
          c.close()

-    def terminate_conn(self):
-        transport.close()
+    def terminate_connection(self):
+        self.transport.close()

==============================================================================
Revision: 7b3e9fb54165
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 17:36:56 2012
Log:      add inet as a native result type

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=7b3e9fb54165

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Tue Sep 11 17:31:33 2012
+++ /cql/native.py	Tue Sep 11 17:36:56 2012
@@ -340,6 +340,7 @@
          0x000D: 'varchar',
          0x000E: 'varint',
          0x000F: 'timeuuid',
+        0x0010: 'inet',
          0x0020: 'list',
          0x0021: 'map',
          0x0022: 'set',

==============================================================================
Revision: 33424f2e0b0e
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:05:34 2012
Log:      update error handling post CASSANDRA-3979

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=33424f2e0b0e

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Tue Sep 11 17:36:56 2012
+++ /cql/native.py	Tue Sep 11 18:05:34 2012
@@ -152,21 +152,28 @@

  class ErrorMessageSubclass(_register_msg_type):
      def __init__(cls, name, bases, dct):
-        if not name.startswith('_'):
+        if cls.errorcode is not None:
              error_classes[cls.errorcode] = cls

-class _ErrorMessageSub(ErrorMessage):
+class ErrorMessageSub(ErrorMessage):
      __metaclass__ = ErrorMessageSubclass
+    errorcode = None

-class ServerErrorMessage(_ErrorMessageSub):
+class RequestExecutionException(ErrorMessageSub):
+    pass
+
+class RequestValidationException(ErrorMessageSub):
+    pass
+
+class ServerError(ErrorMessageSub):
      summary = 'Server error'
      errorcode = 0x0000

-class ProtocolErrorMessage(_ErrorMessageSub):
+class ProtocolException(ErrorMessageSub):
      summary = 'Protocol error'
      errorcode = 0x000A

-class UnavailableExceptionErrorMessage(_ErrorMessageSub):
+class UnavailableExceptionErrorMessage(RequestExecutionException):
      summary = 'Unavailable exception'
      errorcode = 0x1000

@@ -178,19 +185,22 @@
              'alive': read_int(f),
          }

-class OverloadedErrorMessage(_ErrorMessageSub):
+class OverloadedErrorMessage(RequestExecutionException):
      summary = 'Coordinator node overloaded'
      errorcode = 0x1001

-class IsBootstrappingErrorMessage(_ErrorMessageSub):
+class IsBootstrappingErrorMessage(RequestExecutionException):
      summary = 'Coordinator node is bootstrapping'
      errorcode = 0x1002

-class TruncateErrorMessage(_ErrorMessageSub):
+class TruncateError(RequestExecutionException):
      summary = 'Error during truncate'
      errorcode = 0x1003

-class WriteTimeoutErrorMessage(_ErrorMessageSub):
+class RequestTimeoutException(RequestExecutionException):
+    pass
+
+class WriteTimeoutErrorMessage(RequestTimeoutException):
      summary = 'Timeout during write request'
      errorcode = 0x1100

@@ -202,7 +212,7 @@
              'blockfor': read_int(f),
          }

-class ReadTimeoutErrorMessage(_ErrorMessageSub):
+class ReadTimeoutErrorMessage(RequestTimeoutException):
      summary = 'Timeout during read request'
      errorcode = 0x1200

@@ -215,23 +225,23 @@
              'data_present': bool(read_byte(f)),
          }

-class SyntaxErrorErrorMessage(_ErrorMessageSub):
+class SyntaxException(RequestValidationException):
      summary = 'Syntax error in CQL query'
      errorcode = 0x2000

-class UnauthorizedErrorMessage(_ErrorMessageSub):
+class UnauthorizedErrorMessage(RequestValidationException):
      summary = 'Unauthorized'
      errorcode = 0x2100

-class InvalidQueryErrorMessage(_ErrorMessageSub):
+class InvalidRequestException(RequestValidationException):
      summary = 'Invalid query'
      errorcode = 0x2200

-class BadConfigErrorMessage(_ErrorMessageSub):
+class ConfigurationException(RequestValidationException):
      summary = 'Query invalid because of configuration issue'
      errorcode = 0x2300

-class AlreadyExistsErrorMessage(_ErrorMessageSub):
+class AlreadyExistsException(ConfigurationException):
      summary = 'Item already exists'
      errorcode = 0x2400

@@ -625,31 +635,15 @@
      def handle_cql_execution_errors(self, response):
          if not isinstance(response, ErrorMessage):
              return
-        try:
-            codemsg = response.error_codes[response.code]
-        except KeyError:
-            codemsg = '(Unknown error code %04x)' % response.code
-        if codemsg == 'Schema disagreement exception':
-            eclass = cql.IntegrityError
-        elif codemsg == 'Authentication error':
+        if isinstance(response, UnauthorizedErrorMessage):
              eclass = cql.NotAuthenticated
-        elif codemsg == ('Unavailable exception', 'Timeout exception'):
+        elif isinstance(response, RequestExecutionException):
              eclass = cql.OperationalError
-        elif codemsg == 'Request exception':
+        elif isinstance(response, RequestValidationException):
              eclass = cql.ProgrammingError
          else:
              eclass = cql.InternalError
-        raise eclass('%s: %s' % (codemsg, response.msg))
-
-    error_codes = {
-        0x0000: 'Server error',
-        0x0001: 'Protocol error',
-        0x0002: 'Authentication error',
-        0x0100: 'Unavailable exception',
-        0x0101: 'Timeout exception',
-        0x0102: 'Schema disagreement exception',
-        0x0200: 'Request exception',
-    }
+        raise eclass(response.summarymsg())

      def process_execution_results(self, response, decoder=None):
          self.handle_cql_execution_errors(response)

==============================================================================
Revision: 0a2238e01515
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:09:18 2012
Log:      get rid of bogus NativeCursor.executemany

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=0a2238e01515

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Tue Sep 11 18:05:34 2012
+++ /cql/native.py	Tue Sep 11 18:09:18 2012
@@ -620,9 +620,6 @@
          em = ExecuteMessage(queryid=prepared_query.itemid,  
queryparams=params)
          return self._connection.wait_for_request(em)

-    def executemany(self, querylist, argslist):
-        pass
-
      def get_column_metadata(self, column_id):
          return self.decoder.decode_metadata_and_type_native(column_id)


==============================================================================
Revision: 8862c20c4d3b
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:09:31 2012
Log:      turn off protocol debugging

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=8862c20c4d3b

Modified:
  /cql/native.py

=======================================
--- /cql/native.py	Tue Sep 11 18:09:18 2012
+++ /cql/native.py	Tue Sep 11 18:09:31 2012
@@ -716,8 +716,7 @@
          self.conn_ready = False
          s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
          s.connect((self.host, self.port))
-        #self.socketf = s.makefile(bufsize=0)
-        self.socketf = debugsock(s)
+        self.socketf = s.makefile(bufsize=0)
          self.sockfd = s
          self.open_socket = True
          supported = self.wait_for_request(OptionsMessage())

==============================================================================
Revision: e53d73953c9d
Author:   paul cannon <pa...@datastax.com>
Date:     Tue Sep 11 18:28:03 2012
Log:      Don't include literal-quoted classes in cql_types

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=e53d73953c9d

Modified:
  /cql/cqltypes.py

=======================================
--- /cql/cqltypes.py	Mon Aug 20 16:49:01 2012
+++ /cql/cqltypes.py	Tue Sep 11 18:28:03 2012
@@ -648,7 +648,7 @@
  cql_type_to_apache_class = dict([(c, t.cassname) for (c, t) in  
_cqltypes.items()])
  apache_class_to_cql_type = dict([(n, t.typename) for (n, t) in  
_casstypes.items()])

-cql_types = sorted(_cqltypes.keys())
+cql_types = sorted([t for t in _cqltypes.keys() if not t.startswith("'")])

  def cql_typename(casstypename):
      """

==============================================================================
Revision: 114401688c90
Author:   paul cannon <pa...@datastax.com>
Date:     Wed Sep 12 13:32:53 2012
Log:      ColumnToCollectionType can have multiple subs

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=114401688c90

Modified:
  /cql/cqltypes.py

=======================================
--- /cql/cqltypes.py	Tue Sep 11 18:28:03 2012
+++ /cql/cqltypes.py	Wed Sep 12 13:32:53 2012
@@ -624,7 +624,7 @@
      information.
      """
      typename = "'org.apache.cassandra.db.marshal.ColumnToCollectionType'"
-    num_subtypes = 1
+    num_subtypes = 'UNKNOWN'

  class ReversedType(_ParameterizedType):
      typename = "'org.apache.cassandra.db.marshal.ReversedType'"

==============================================================================
Revision: 2fcf040212be
Author:   paul cannon <pa...@datastax.com>
Date:     Wed Sep 12 13:47:23 2012
Log:      release 1.2.0

http://code.google.com/a/apache-extras.org/p/cassandra-dbapi2/source/detail?r=2fcf040212be

Modified:
  /CHANGES.txt
  /setup.py

=======================================
--- /CHANGES.txt	Thu Aug 30 22:58:26 2012
+++ /CHANGES.txt	Wed Sep 12 13:47:23 2012
@@ -1,3 +1,13 @@
+1.2.0 - 2012/09/12
+ * Changes to SchemaDecoder interface- now decodes one value or column
+   metadata instance at a time
+ * Beta support for Cassandra's thriftless binary protocol- thrift is
+   wholly unneeded if you pass native=True to cql.connect(). Everything
+   should work exactly the same except for a different call is made to
+   get metadata from the SchemaDecoder, in case you have a custom one.
+ * Support for inet CQL type
+ * Support for parsing and processing data of type ReversedType(whatever)
+
  1.1.0 - 2012/08/30
   * Support for CQL3 collections
   * Unified type handling
=======================================
--- /setup.py	Thu Aug 30 22:58:26 2012
+++ /setup.py	Wed Sep 12 13:47:23 2012
@@ -20,7 +20,7 @@

  setup(
      name="cql",
-    version="1.1.0",
+    version="1.2.0",
      description="Cassandra Query Language driver",
       
long_description=open(abspath(join(dirname(__file__), 'README'))).read(),
      maintainer='Cassandra DBAPI-2 Driver Team',