You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/12/16 12:39:30 UTC

[1/4] cassandra git commit: (cqlsh) further optimise COPY FROM

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 de55c39cd -> 57d558fc1


(cqlsh) further optimise COPY FROM

patch by Stefania Alborghetti; reviewed by Adam Holmberg for
CASSANDRA-9302


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/124f1bd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/124f1bd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/124f1bd2

Branch: refs/heads/cassandra-2.2
Commit: 124f1bd2613e400f69f8369ada0ad15c28738530
Parents: 994250c
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Oct 22 17:16:50 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Dec 15 21:03:31 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                |   4 +-
 bin/cqlsh                  | 285 ++-----------
 pylib/cqlshlib/copyutil.py | 910 ++++++++++++++++++++++++++++++++++------
 pylib/cqlshlib/util.py     |  19 +
 4 files changed, 838 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/124f1bd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8e58703..90f1bca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,12 +1,10 @@
 2.1.13
-<<<<<<< HEAD
+ * (cqlsh) further optimise COPY FROM (CASSANDRA-9302)
  * Allow CREATE TABLE WITH ID (CASSANDRA-9179)
  * Make Stress compiles within eclipse (CASSANDRA-10807)
  * Cassandra Daemon should print JVM arguments (CASSANDRA-10764)
  * Allow cancellation of index summary redistribution (CASSANDRA-8805)
-=======
  * sstableloader will fail if there are collections in the schema tables (CASSANDRA-10700)
->>>>>>> 5377183... stableloader will fail if there are collections in the schema tables
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/124f1bd2/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index e72624a..651420d 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -37,7 +37,6 @@ import ConfigParser
 import csv
 import getpass
 import locale
-import multiprocessing as mp
 import optparse
 import os
 import platform
@@ -48,7 +47,6 @@ import warnings
 
 from StringIO import StringIO
 from contextlib import contextmanager
-from functools import partial
 from glob import glob
 from uuid import UUID
 
@@ -110,10 +108,10 @@ except ImportError, e:
 
 from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster, PagedResult
-from cassandra.metadata import protect_name, protect_names, protect_value
+from cassandra.metadata import protect_name, protect_names
 from cassandra.policies import WhiteListRoundRobinPolicy
-from cassandra.protocol import QueryMessage, ResultMessage
-from cassandra.query import SimpleStatement, ordered_dict_factory
+from cassandra.protocol import ResultMessage
+from cassandra.query import SimpleStatement, ordered_dict_factory, tuple_factory
 
 # cqlsh should run correctly when run out of a Cassandra source tree,
 # out of an unpacked Cassandra tarball, and after a proper package install.
@@ -334,7 +332,7 @@ cqlsh_extra_syntax_rules = r'''
 
 <copyOptionVal> ::= <identifier>
                   | <reserved_identifier>
-                  | <stringLiteral>
+                  | <term>
                   ;
 
 # avoiding just "DEBUG" so that this rule doesn't get treated as a terminal
@@ -412,17 +410,20 @@ def complete_copy_column_names(ctxt, cqlsh):
     return set(colnames[1:]) - set(existcols)
 
 
-COPY_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', 'ENCODING',
-                'TIMEFORMAT', 'JOBS', 'PAGESIZE', 'PAGETIMEOUT', 'MAXATTEMPTS']
+COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL',
+                       'MAXATTEMPTS', 'REPORTFREQUENCY']
+COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE']
+COPY_TO_OPTIONS = ['ENCODING', 'TIMEFORMAT', 'PAGESIZE', 'PAGETIMEOUT', 'MAXREQUESTS']
 
 
 @cqlsh_syntax_completer('copyOption', 'optnames')
 def complete_copy_options(ctxt, cqlsh):
     optnames = map(str.upper, ctxt.get_binding('optnames', ()))
     direction = ctxt.get_binding('dir').upper()
-    opts = set(COPY_OPTIONS) - set(optnames)
     if direction == 'FROM':
-        opts -= set(['ENCODING', 'TIMEFORMAT', 'JOBS', 'PAGESIZE', 'PAGETIMEOUT', 'MAXATTEMPTS'])
+        opts = set(COPY_COMMON_OPTIONS + COPY_FROM_OPTIONS) - set(optnames)
+    elif direction == 'TO':
+        opts = set(COPY_COMMON_OPTIONS + COPY_TO_OPTIONS) - set(optnames)
     return opts
 
 
@@ -1520,12 +1521,18 @@ class Shell(cmd.Cmd):
           ESCAPE='\'              - character to appear before the QUOTE char when quoted
           HEADER=false            - whether to ignore the first line
           NULL=''                 - string that represents a null value
-          ENCODING='utf8'         - encoding for CSV output (COPY TO only)
-          TIMEFORMAT=             - timestamp strftime format (COPY TO only)
+          ENCODING='utf8'         - encoding for CSV output (COPY TO)
+          TIMEFORMAT=             - timestamp strftime format (COPY TO)
             '%Y-%m-%d %H:%M:%S%z'   defaults to time_format value in cqlshrc
-          PAGESIZE='1000'         - the page size for fetching results (COPY TO only)
-          PAGETIMEOUT=10          - the page timeout for fetching results (COPY TO only)
-          MAXATTEMPTS='5'         - the maximum number of attempts for errors (COPY TO only)
+          MAXREQUESTS=6           - the maximum number of requests each worker process can work on in parallel (COPY TO)
+          PAGESIZE=1000           - the page size for fetching results (COPY TO)
+          PAGETIMEOUT=10          - the page timeout for fetching results (COPY TO)
+          MAXATTEMPTS=5           - the maximum number of attempts for errors
+          CHUNKSIZE=1000          - the size of chunks passed to worker processes (COPY FROM)
+          INGESTRATE=100000       - an approximate ingest rate in rows per second (COPY FROM)
+          MAXBATCHSIZE=20         - the maximum size of an import batch (COPY FROM)
+          MINBATCHSIZE=2          - the minimum size of an import batch (COPY FROM)
+          REPORTFREQUENCY=0.25    - the frequency with which we display status updates in seconds
 
         When entering CSV data on STDIN, you can use the sequence "\."
         on a line by itself to end the data input.
@@ -1571,253 +1578,11 @@ class Shell(cmd.Cmd):
     def perform_csv_import(self, ks, cf, columns, fname, opts):
         csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts)
         if unrecognized_options:
-            self.printerr('Unrecognized COPY FROM options: %s'
-                          % ', '.join(unrecognized_options.keys()))
+            self.printerr('Unrecognized COPY FROM options: %s' % ', '.join(unrecognized_options.keys()))
             return 0
-        nullval, header = csv_options['nullval'], csv_options['header']
 
-        if fname is None:
-            do_close = False
-            print "[Use \. on a line by itself to end input]"
-            linesource = self.use_stdin_reader(prompt='[copy] ', until=r'\.')
-        else:
-            do_close = True
-            try:
-                linesource = open(fname, 'rb')
-            except IOError, e:
-                self.printerr("Can't open %r for reading: %s" % (fname, e))
-                return 0
-
-        current_record = None
-        processes, pipes = [], [],
-        try:
-            if header:
-                linesource.next()
-            reader = csv.reader(linesource, **dialect_options)
-
-            num_processes = copyutil.get_num_processes(cap=4)
-
-            for i in range(num_processes):
-                parent_conn, child_conn = mp.Pipe()
-                pipes.append(parent_conn)
-                proc_args = (child_conn, ks, cf, columns, nullval)
-                processes.append(mp.Process(target=self.multiproc_import, args=proc_args))
-
-            for process in processes:
-                process.start()
-
-            meter = copyutil.RateMeter(10000)
-            for current_record, row in enumerate(reader, start=1):
-                # write to the child process
-                pipes[current_record % num_processes].send((current_record, row))
-
-                # update the progress and current rate periodically
-                meter.increment()
-
-                # check for any errors reported by the children
-                if (current_record % 100) == 0:
-                    if self._check_import_processes(current_record, pipes):
-                        # no errors seen, continue with outer loop
-                        continue
-                    else:
-                        # errors seen, break out of outer loop
-                        break
-        except Exception, exc:
-            if current_record is None:
-                # we failed before we started
-                self.printerr("\nError starting import process:\n")
-                self.printerr(str(exc))
-                if self.debug:
-                    traceback.print_exc()
-            else:
-                self.printerr("\n" + str(exc))
-                self.printerr("\nAborting import at record #%d. "
-                              "Previously inserted records and some records after "
-                              "this number may be present."
-                              % (current_record,))
-                if self.debug:
-                    traceback.print_exc()
-        finally:
-            # send a message that indicates we're done
-            for pipe in pipes:
-                pipe.send((None, None))
-
-            for process in processes:
-                process.join()
-
-            self._check_import_processes(current_record, pipes)
-
-            for pipe in pipes:
-                pipe.close()
-
-            if do_close:
-                linesource.close()
-            elif self.tty:
-                print
-
-        return current_record
-
-    def _check_import_processes(self, current_record, pipes):
-        for pipe in pipes:
-            if pipe.poll():
-                try:
-                    (record_num, error) = pipe.recv()
-                    self.printerr("\n" + str(error))
-                    self.printerr(
-                        "Aborting import at record #%d. "
-                        "Previously inserted records are still present, "
-                        "and some records after that may be present as well."
-                        % (record_num,))
-                    return False
-                except EOFError:
-                    # pipe is closed, nothing to read
-                    self.printerr("\nChild process died without notification, "
-                                  "aborting import at record #%d. Previously "
-                                  "inserted records are probably still present, "
-                                  "and some records after that may be present "
-                                  "as well." % (current_record,))
-                    return False
-        return True
-
-    def multiproc_import(self, pipe, ks, cf, columns, nullval):
-        """
-        This method is where child processes start when doing a COPY FROM
-        operation.  The child process will open one connection to the node and
-        interact directly with the connection, bypassing most of the driver
-        code.  Because we don't need retries, connection pooling, thread safety,
-        and other fancy features, this is okay.
-        """
-
-        # open a new connection for this subprocess
-        new_cluster = Cluster(
-            contact_points=(self.hostname,),
-            port=self.port,
-            cql_version=self.conn.cql_version,
-            protocol_version=DEFAULT_PROTOCOL_VERSION,
-            auth_provider=self.auth_provider,
-            ssl_options=sslhandling.ssl_settings(self.hostname, CONFIG_FILE) if self.ssl else None,
-            load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]),
-            compression=None,
-            connect_timeout=self.conn.connect_timeout)
-        session = new_cluster.connect(self.keyspace)
-        conn = session._pools.values()[0]._connection
-
-        # pre-build as much of the query as we can
-        table_meta = self.get_table_meta(ks, cf)
-        pk_cols = [col.name for col in table_meta.primary_key]
-        cqltypes = [table_meta.columns[name].typestring for name in columns]
-        pk_indexes = [columns.index(col.name) for col in table_meta.primary_key]
-        is_counter_table = ("counter" in cqltypes)
-        if is_counter_table:
-            query = 'Update %s.%s SET %%s WHERE %%s' % (
-                protect_name(ks),
-                protect_name(cf))
-        else:
-            query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (
-                protect_name(ks),
-                protect_name(cf),
-                ', '.join(protect_names(columns)))
-
-        # we need to handle some types specially
-        should_escape = [t in ('ascii', 'text', 'timestamp', 'date', 'time', 'inet') for t in cqltypes]
-
-        insert_timestamp = int(time.time() * 1e6)
-
-        def callback(record_num, response):
-            # This is the callback we register for all inserts.  Because this
-            # is run on the event-loop thread, we need to hold a lock when
-            # adjusting in_flight.
-            with conn.lock:
-                conn.in_flight -= 1
-
-            if not isinstance(response, ResultMessage):
-                # It's an error. Notify the parent process and let it send
-                # a stop signal to all child processes (including this one).
-                pipe.send((record_num, str(response)))
-                if isinstance(response, Exception) and self.debug:
-                    traceback.print_exc(response)
-
-        current_record = 0
-        insert_num = 0
-        try:
-            while True:
-                # To avoid totally maxing out the connection,
-                # defer to the reactor thread when we're close
-                # to capacity
-                if conn.in_flight > (conn.max_request_id * 0.9):
-                    conn._readable = True
-                    time.sleep(0.05)
-                    continue
-
-                try:
-                    (current_record, row) = pipe.recv()
-                except EOFError:
-                    # the pipe was closed and there's nothing to receive
-                    sys.stdout.write('Failed to read from pipe:\n\n')
-                    sys.stdout.flush()
-                    conn._writable = True
-                    conn._readable = True
-                    break
-
-                # see if the parent process has signaled that we are done
-                if (current_record, row) == (None, None):
-                    conn._writable = True
-                    conn._readable = True
-                    pipe.close()
-                    break
-
-                # format the values in the row
-                for i, value in enumerate(row):
-                    if value != nullval:
-                        if should_escape[i]:
-                            row[i] = protect_value(value)
-                    elif i in pk_indexes:
-                        # By default, nullval is an empty string. See CASSANDRA-7792 for details.
-                        message = "Cannot insert null value for primary key column '%s'." % (pk_cols[i],)
-                        if nullval == '':
-                            message += " If you want to insert empty strings, consider using " \
-                                       "the WITH NULL=<marker> option for COPY."
-                        pipe.send((current_record, message))
-                        return
-                    else:
-                        row[i] = 'null'
-                if is_counter_table:
-                    where_clause = []
-                    set_clause = []
-                    for i, value in enumerate(row):
-                        if i in pk_indexes:
-                            where_clause.append("%s=%s" % (columns[i], value))
-                        else:
-                            set_clause.append("%s=%s+%s" % (columns[i], columns[i], value))
-                    full_query = query % (','.join(set_clause), ' AND '.join(where_clause))
-                else:
-                    full_query = query % (','.join(row),)
-                query_message = QueryMessage(
-                    full_query, self.consistency_level, serial_consistency_level=None,
-                    fetch_size=None, paging_state=None, timestamp=insert_timestamp)
-
-                request_id = conn.get_request_id()
-                conn.send_msg(query_message, request_id=request_id, cb=partial(callback, current_record))
-
-                with conn.lock:
-                    conn.in_flight += 1
-
-                # every 50 records, clear the pending writes queue and read
-                # any responses we have
-                if insert_num % 50 == 0:
-                    conn._writable = True
-                    conn._readable = True
-
-                insert_num += 1
-        except Exception, exc:
-            pipe.send((current_record, exc))
-        finally:
-            # wait for any pending requests to finish
-            while conn.in_flight > 0:
-                conn._readable = True
-                time.sleep(0.01)
-
-            new_cluster.shutdown()
+        return copyutil.ImportTask(self, ks, cf, columns, fname, csv_options, dialect_options,
+                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
 
     def perform_csv_export(self, ks, cf, columns, fname, opts):
         csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/124f1bd2/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index 8534b98..f699e64 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -19,23 +19,32 @@ import json
 import multiprocessing as mp
 import os
 import Queue
+import random
+import re
+import struct
 import sys
 import time
 import traceback
 
-from StringIO import StringIO
+from calendar import timegm
+from collections import defaultdict, deque, namedtuple
+from decimal import Decimal
 from random import randrange
+from StringIO import StringIO
 from threading import Lock
+from uuid import UUID
 
 from cassandra.cluster import Cluster
+from cassandra.cqltypes import ReversedType, UserType
 from cassandra.metadata import protect_name, protect_names
-from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, TokenAwarePolicy
-from cassandra.query import tuple_factory
+from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, TokenAwarePolicy, DCAwareRoundRobinPolicy
+from cassandra.query import BatchStatement, BatchType, SimpleStatement, tuple_factory
+from cassandra.util import Date, Time
 
-
-import sslhandling
+from cql3handling import CqlRuleSet
 from displaying import NO_COLOR_MAP
 from formatting import format_value_default, EMPTY, get_formatter
+from sslhandling import ssl_settings
 
 
 def parse_options(shell, opts):
@@ -60,13 +69,18 @@ def parse_options(shell, opts):
     csv_options['nullval'] = opts.pop('null', '')
     csv_options['header'] = bool(opts.pop('header', '').lower() == 'true')
     csv_options['encoding'] = opts.pop('encoding', 'utf8')
-    csv_options['jobs'] = int(opts.pop('jobs', 12))
+    csv_options['maxrequests'] = int(opts.pop('maxrequests', 6))
     csv_options['pagesize'] = int(opts.pop('pagesize', 1000))
     # by default the page timeout is 10 seconds per 1000 entries in the page size or 10 seconds if pagesize is smaller
     csv_options['pagetimeout'] = int(opts.pop('pagetimeout', max(10, 10 * (csv_options['pagesize'] / 1000))))
     csv_options['maxattempts'] = int(opts.pop('maxattempts', 5))
     csv_options['dtformats'] = opts.pop('timeformat', shell.display_time_format)
     csv_options['float_precision'] = shell.display_float_precision
+    csv_options['chunksize'] = int(opts.pop('chunksize', 1000))
+    csv_options['ingestrate'] = int(opts.pop('ingestrate', 100000))
+    csv_options['maxbatchsize'] = int(opts.pop('maxbatchsize', 20))
+    csv_options['minbatchsize'] = int(opts.pop('minbatchsize', 2))
+    csv_options['reportfrequency'] = float(opts.pop('reportfrequency', 0.25))
 
     return csv_options, dialect_options, opts
 
@@ -86,9 +100,9 @@ def get_num_processes(cap):
         return 1
 
 
-class ExportTask(object):
+class CopyTask(object):
     """
-    A class that exports data to .csv by instantiating one or more processes that work in parallel (ExportProcess).
+    A base class for ImportTask and ExportTask
     """
     def __init__(self, shell, ks, cf, columns, fname, csv_options, dialect_options, protocol_version, config_file):
         self.shell = shell
@@ -101,6 +115,55 @@ class ExportTask(object):
         self.protocol_version = protocol_version
         self.config_file = config_file
 
+        self.processes = []
+        self.inmsg = mp.Queue()
+        self.outmsg = mp.Queue()
+
+    def close(self):
+        for process in self.processes:
+            process.terminate()
+
+        self.inmsg.close()
+        self.outmsg.close()
+
+    def num_live_processes(self):
+        return sum(1 for p in self.processes if p.is_alive())
+
+    def make_params(self):
+        """
+        Return a dictionary of parameters to be used by the worker processes.
+        On Windows this dictionary must be pickle-able.
+
+        inmsg is the message queue flowing from parent to child process, so outmsg from the parent point
+        of view and, vice-versa,  outmsg is the message queue flowing from child to parent, so inmsg
+        from the parent point of view, hence the two are swapped below.
+        """
+        shell = self.shell
+        return dict(inmsg=self.outmsg,  # see comment above
+                    outmsg=self.inmsg,  # see comment above
+                    ks=self.ks,
+                    cf=self.cf,
+                    columns=self.columns,
+                    csv_options=self.csv_options,
+                    dialect_options=self.dialect_options,
+                    consistency_level=shell.consistency_level,
+                    connect_timeout=shell.conn.connect_timeout,
+                    hostname=shell.hostname,
+                    port=shell.port,
+                    ssl=shell.ssl,
+                    auth_provider=shell.auth_provider,
+                    cql_version=shell.conn.cql_version,
+                    config_file=self.config_file,
+                    protocol_version=self.protocol_version,
+                    debug=shell.debug
+                    )
+
+
+class ExportTask(CopyTask):
+    """
+    A class that exports data to .csv by instantiating one or more processes that work in parallel (ExportProcess).
+    """
+
     def run(self):
         """
         Initiates the export by creating the processes.
@@ -125,25 +188,18 @@ class ExportTask(object):
 
         ranges = self.get_ranges()
         num_processes = get_num_processes(cap=min(16, len(ranges)))
+        params = self.make_params()
 
-        inmsg = mp.Queue()
-        outmsg = mp.Queue()
-        processes = []
         for i in xrange(num_processes):
-            process = ExportProcess(outmsg, inmsg, self.ks, self.cf, self.columns, self.dialect_options,
-                                    self.csv_options, shell.debug, shell.port, shell.conn.cql_version,
-                                    shell.auth_provider, shell.ssl, self.protocol_version, self.config_file)
+            self.processes.append(ExportProcess(params))
+
+        for process in self.processes:
             process.start()
-            processes.append(process)
 
         try:
-            return self.check_processes(csvdest, ranges, inmsg, outmsg, processes)
+            return self.check_processes(csvdest, ranges)
         finally:
-            for process in processes:
-                process.terminate()
-
-            inmsg.close()
-            outmsg.close()
+            self.close()
             if do_close:
                 csvdest.close()
 
@@ -183,9 +239,9 @@ class ExportTask(object):
 
             hosts = []
             for host in replicas:
-                if host.datacenter == local_dc:
+                if host.is_up and host.datacenter == local_dc:
                     hosts.append(host.address)
-            if len(hosts) == 0:
+            if not hosts:
                 hosts.append(hostname)  # fallback to default host if no replicas in current dc
             ranges[(previous, token.value)] = make_range(hosts)
             previous_previous = previous
@@ -194,7 +250,7 @@ class ExportTask(object):
         #  If the ring is empty we get the entire ring from the
         #  host we are currently connected to, otherwise for the last ring interval
         #  we query the same replicas that hold the last token in the ring
-        if len(ranges) == 0:
+        if not ranges:
             ranges[(None, None)] = make_range([hostname])
         else:
             ranges[(previous, None)] = ranges[(previous_previous, previous)].copy()
@@ -217,32 +273,32 @@ class ExportTask(object):
         else:
             return None
 
-    @staticmethod
-    def send_work(ranges, tokens_to_send, queue):
+    def send_work(self, ranges, tokens_to_send):
         for token_range in tokens_to_send:
-            queue.put((token_range, ranges[token_range]))
+            self.outmsg.put((token_range, ranges[token_range]))
             ranges[token_range]['attempts'] += 1
 
-    def check_processes(self, csvdest, ranges, inmsg, outmsg, processes):
+    def check_processes(self, csvdest, ranges):
         """
         Here we monitor all child processes by collecting their results
         or any errors. We terminate when we have processed all the ranges or when there
         are no more processes.
         """
         shell = self.shell
-        meter = RateMeter(10000)
-        total_jobs = len(ranges)
+        processes = self.processes
+        meter = RateMeter(update_interval=self.csv_options['reportfrequency'])
+        total_requests = len(ranges)
         max_attempts = self.csv_options['maxattempts']
 
-        self.send_work(ranges, ranges.keys(), outmsg)
+        self.send_work(ranges, ranges.keys())
 
         num_processes = len(processes)
         succeeded = 0
         failed = 0
-        while (failed + succeeded) < total_jobs and self.num_live_processes(processes) == num_processes:
+        while (failed + succeeded) < total_requests and self.num_live_processes() == num_processes:
             try:
-                token_range, result = inmsg.get(timeout=1.0)
-                if token_range is None and result is None:  # a job has finished
+                token_range, result = self.inmsg.get(timeout=1.0)
+                if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
                     if token_range is None:  # the entire process failed
@@ -253,7 +309,7 @@ class ExportTask(object):
                         if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
                             shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
                                            % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range], outmsg)
+                            self.send_work(ranges, [token_range])
                         else:
                             shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
                                            % (token_range, result, ranges[token_range]['rows'],
@@ -267,34 +323,257 @@ class ExportTask(object):
             except Queue.Empty:
                 pass
 
-        if self.num_live_processes(processes) < len(processes):
+        if self.num_live_processes() < len(processes):
             for process in processes:
                 if not process.is_alive():
                     shell.printerr('Child process %d died with exit code %d' % (process.pid, process.exitcode))
 
-        if succeeded < total_jobs:
+        if succeeded < total_requests:
             shell.printerr('Exported %d ranges out of %d total ranges, some records might be missing'
-                           % (succeeded, total_jobs))
+                           % (succeeded, total_requests))
 
         return meter.get_total_records()
 
+
+class ImportReader(object):
+    """
+    A wrapper around a csv reader to keep track of when we have
+    exhausted reading input records.
+    """
+    def __init__(self, linesource, chunksize, dialect_options):
+        self.linesource = linesource
+        self.chunksize = chunksize
+        self.reader = csv.reader(linesource, **dialect_options)
+        self.exhausted = False
+
+    def read_rows(self):
+        if self.exhausted:
+            return []
+
+        rows = list(next(self.reader) for _ in xrange(self.chunksize))
+        self.exhausted = len(rows) < self.chunksize
+        return rows
+
+
+class ImportTask(CopyTask):
+    """
+    A class to import data from .csv by instantiating one or more processes
+    that work in parallel (ImportProcess).
+    """
+    def __init__(self, shell, ks, cf, columns, fname, csv_options, dialect_options, protocol_version, config_file):
+        CopyTask.__init__(self, shell, ks, cf, columns, fname,
+                          csv_options, dialect_options, protocol_version, config_file)
+
+        self.num_processes = get_num_processes(cap=4)
+        self.chunk_size = csv_options['chunksize']
+        self.ingest_rate = csv_options['ingestrate']
+        self.max_attempts = csv_options['maxattempts']
+        self.header = self.csv_options['header']
+        self.table_meta = self.shell.get_table_meta(self.ks, self.cf)
+        self.batch_id = 0
+        self.receive_meter = RateMeter(update_interval=csv_options['reportfrequency'])
+        self.send_meter = RateMeter(update_interval=1, log=False)
+        self.retries = deque([])
+        self.failed = 0
+        self.succeeded = 0
+        self.sent = 0
+
+    def run(self):
+        shell = self.shell
+
+        if self.fname is None:
+            do_close = False
+            print "[Use \. on a line by itself to end input]"
+            linesource = shell.use_stdin_reader(prompt='[copy] ', until=r'\.')
+        else:
+            do_close = True
+            try:
+                linesource = open(self.fname, 'rb')
+            except IOError, e:
+                shell.printerr("Can't open %r for reading: %s" % (self.fname, e))
+                return 0
+
+        try:
+            if self.header:
+                linesource.next()
+
+            reader = ImportReader(linesource, self.chunk_size, self.dialect_options)
+            params = self.make_params()
+
+            for i in range(self.num_processes):
+                self.processes.append(ImportProcess(params))
+
+            for process in self.processes:
+                process.start()
+
+            return self.process_records(reader)
+
+        except Exception, exc:
+            shell.printerr(str(exc))
+            if shell.debug:
+                traceback.print_exc()
+            return 0
+        finally:
+            self.close()
+            if do_close:
+                linesource.close()
+            elif shell.tty:
+                print
+
+    def process_records(self, reader):
+        """
+        Keep on running until we have stuff to receive or send and until all processes are running.
+        Send data (batches or retries) up to the max ingest rate. If we are waiting for stuff to
+        receive check the incoming queue.
+        """
+        while (self.has_more_to_send(reader) or self.has_more_to_receive()) and self.all_processes_running():
+            if self.has_more_to_send(reader):
+                if self.send_meter.current_record <= self.ingest_rate:
+                    self.send_batches(reader)
+                else:
+                    self.send_meter.maybe_update()
+
+            if self.has_more_to_receive():
+                self.receive()
+
+        if self.succeeded < self.sent:
+            self.shell.printerr("Failed to process %d batches" % (self.sent - self.succeeded))
+
+        return self.receive_meter.get_total_records()
+
+    def has_more_to_receive(self):
+        return (self.succeeded + self.failed) < self.sent
+
+    def has_more_to_send(self, reader):
+        return (not reader.exhausted) or self.retries
+
+    def all_processes_running(self):
+        return self.num_live_processes() == self.num_processes
+
+    def receive(self):
+        shell = self.shell
+        start_time = time.time()
+
+        while time.time() - start_time < 0.01:  # 10 millis
+            try:
+                batch, err = self.inmsg.get(timeout=0.001)  # 1 millisecond
+
+                if err is None:
+                    self.succeeded += batch['imported']
+                    self.receive_meter.increment(batch['imported'])
+                else:
+                    err = str(err)
+
+                    if err.startswith('ValueError') or err.startswith('TypeError') or err.startswith('IndexError') \
+                            or batch['attempts'] >= self.max_attempts:
+                        shell.printerr("Failed to import %d rows: %s -  given up after %d attempts"
+                                       % (len(batch['rows']), err, batch['attempts']))
+                        self.failed += len(batch['rows'])
+                    else:
+                        shell.printerr("Failed to import %d rows: %s -  will retry later, attempt %d of %d"
+                                       % (len(batch['rows']), err, batch['attempts'],
+                                          self.max_attempts))
+                        self.retries.append(self.reset_batch(batch))
+            except Queue.Empty:
+                break
+
+    def send_batches(self, reader):
+        """
+        Send batches to the queue until we have exceeded the ingest rate. In the export case we queue
+        everything and let the worker processes throttle using max_requests, here we throttle
+        in the parent process because of memory usage concerns.
+
+        When we have finished reading the csv file, then send any retries.
+        """
+        while self.send_meter.current_record <= self.ingest_rate:
+            if not reader.exhausted:
+                rows = reader.read_rows()
+                if rows:
+                    self.sent += self.send_batch(self.new_batch(rows))
+            elif self.retries:
+                batch = self.retries.popleft()
+                self.send_batch(batch)
+            else:
+                break
+
+    def send_batch(self, batch):
+        batch['attempts'] += 1
+        num_rows = len(batch['rows'])
+        self.send_meter.increment(num_rows)
+        self.outmsg.put(batch)
+        return num_rows
+
+    def new_batch(self, rows):
+        self.batch_id += 1
+        return self.make_batch(self.batch_id, rows, 0)
+
+    @staticmethod
+    def reset_batch(batch):
+        batch['imported'] = 0
+        return batch
+
     @staticmethod
-    def num_live_processes(processes):
-        return sum(1 for p in processes if p.is_alive())
+    def make_batch(batch_id, rows, attempts):
+        return {'id': batch_id, 'rows': rows, 'attempts': attempts, 'imported': 0}
+
+
+class ChildProcess(mp.Process):
+    """
+    An child worker process, this is for common functionality between ImportProcess and ExportProcess.
+    """
+
+    def __init__(self, params, target):
+        mp.Process.__init__(self, target=target)
+        self.inmsg = params['inmsg']
+        self.outmsg = params['outmsg']
+        self.ks = params['ks']
+        self.cf = params['cf']
+        self.columns = params['columns']
+        self.debug = params['debug']
+        self.port = params['port']
+        self.hostname = params['hostname']
+        self.consistency_level = params['consistency_level']
+        self.connect_timeout = params['connect_timeout']
+        self.cql_version = params['cql_version']
+        self.auth_provider = params['auth_provider']
+        self.ssl = params['ssl']
+        self.protocol_version = params['protocol_version']
+        self.config_file = params['config_file']
+
+        # Here we inject some failures for testing purposes, only if this environment variable is set
+        if os.environ.get('CQLSH_COPY_TEST_FAILURES', ''):
+            self.test_failures = json.loads(os.environ.get('CQLSH_COPY_TEST_FAILURES', ''))
+        else:
+            self.test_failures = None
+
+    def printmsg(self, text):
+        if self.debug:
+            sys.stderr.write(text + os.linesep)
+
+    def close(self):
+        self.printmsg("Closing queues...")
+        self.inmsg.close()
+        self.outmsg.close()
 
 
 class ExpBackoffRetryPolicy(RetryPolicy):
     """
-    A retry policy with exponential back-off for read timeouts,
-    see ExportProcess.
+    A retry policy with exponential back-off for read timeouts and write timeouts
     """
-    def __init__(self, export_process):
+    def __init__(self, parent_process):
         RetryPolicy.__init__(self)
-        self.max_attempts = export_process.csv_options['maxattempts']
-        self.printmsg = lambda txt: export_process.printmsg(txt)
+        self.max_attempts = parent_process.max_attempts
+        self.printmsg = parent_process.printmsg
 
     def on_read_timeout(self, query, consistency, required_responses,
                         received_responses, data_retrieved, retry_num):
+        return self._handle_timeout(consistency, retry_num)
+
+    def on_write_timeout(self, query, consistency, write_type,
+                         required_responses, received_responses, retry_num):
+        return self._handle_timeout(consistency, retry_num)
+
+    def _handle_timeout(self, consistency, retry_num):
         delay = self.backoff(retry_num)
         if delay > 0:
             self.printmsg("Timeout received, retrying after %d seconds" % (delay))
@@ -327,7 +606,7 @@ class ExpBackoffRetryPolicy(RetryPolicy):
 class ExportSession(object):
     """
     A class for connecting to a cluster and storing the number
-    of jobs that this connection is processing. It wraps the methods
+    of requests that this connection is processing. It wraps the methods
     for executing a query asynchronously and for shutting down the
     connection to the cluster.
     """
@@ -342,20 +621,20 @@ class ExportSession(object):
 
         self.cluster = cluster
         self.session = session
-        self.jobs = 1
+        self.requests = 1
         self.lock = Lock()
 
-    def add_job(self):
+    def add_request(self):
         with self.lock:
-            self.jobs += 1
+            self.requests += 1
 
-    def complete_job(self):
+    def complete_request(self):
         with self.lock:
-            self.jobs -= 1
+            self.requests -= 1
 
-    def num_jobs(self):
+    def num_requests(self):
         with self.lock:
-            return self.jobs
+            return self.requests
 
     def execute_async(self, query):
         return self.session.execute_async(query)
@@ -364,48 +643,26 @@ class ExportSession(object):
         self.cluster.shutdown()
 
 
-class ExportProcess(mp.Process):
+class ExportProcess(ChildProcess):
     """
     An child worker process for the export task, ExportTask.
     """
 
-    def __init__(self, inmsg, outmsg, ks, cf, columns, dialect_options, csv_options,
-                 debug, port, cql_version, auth_provider, ssl, protocol_version, config_file):
-        mp.Process.__init__(self, target=self.run)
-        self.inmsg = inmsg
-        self.outmsg = outmsg
-        self.ks = ks
-        self.cf = cf
-        self.columns = columns
-        self.dialect_options = dialect_options
+    def __init__(self, params):
+        ChildProcess.__init__(self, params=params, target=self.run)
+        self.dialect_options = params['dialect_options']
         self.hosts_to_sessions = dict()
 
-        self.debug = debug
-        self.port = port
-        self.cql_version = cql_version
-        self.auth_provider = auth_provider
-        self.ssl = ssl
-        self.protocol_version = protocol_version
-        self.config_file = config_file
-
+        csv_options = params['csv_options']
         self.encoding = csv_options['encoding']
         self.time_format = csv_options['dtformats']
         self.float_precision = csv_options['float_precision']
         self.nullval = csv_options['nullval']
-        self.maxjobs = csv_options['jobs']
+        self.max_attempts = csv_options['maxattempts']
+        self.max_requests = csv_options['maxrequests']
         self.csv_options = csv_options
         self.formatters = dict()
 
-        # Here we inject some failures for testing purposes, only if this environment variable is set
-        if os.environ.get('CQLSH_COPY_TEST_FAILURES', ''):
-            self.test_failures = json.loads(os.environ.get('CQLSH_COPY_TEST_FAILURES', ''))
-        else:
-            self.test_failures = None
-
-    def printmsg(self, text):
-        if self.debug:
-            sys.stderr.write(text + os.linesep)
-
     def run(self):
         try:
             self.inner_run()
@@ -423,12 +680,12 @@ class ExportProcess(mp.Process):
         We terminate when the inbound queue is closed.
         """
         while True:
-            if self.num_jobs() > self.maxjobs:
+            if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
                 continue
 
             token_range, info = self.inmsg.get()
-            self.start_job(token_range, info)
+            self.start_request(token_range, info)
 
     def report_error(self, err, token_range=None):
         if isinstance(err, str):
@@ -443,7 +700,7 @@ class ExportProcess(mp.Process):
         self.printmsg(msg)
         self.outmsg.put((token_range, Exception(msg)))
 
-    def start_job(self, token_range, info):
+    def start_request(self, token_range, info):
         """
         Begin querying a range by executing an async query that
         will later on invoke the callbacks attached in attach_callbacks.
@@ -454,14 +711,14 @@ class ExportProcess(mp.Process):
         future = session.execute_async(query)
         self.attach_callbacks(token_range, future, session)
 
-    def num_jobs(self):
-        return sum(session.num_jobs() for session in self.hosts_to_sessions.values())
+    def num_requests(self):
+        return sum(session.num_requests() for session in self.hosts_to_sessions.values())
 
     def get_session(self, hosts):
         """
         We select a host to connect to. If we have no connections to one of the hosts
         yet then we select this host, else we pick the one with the smallest number
-        of jobs.
+        of requests.
 
         :return: An ExportSession connected to the chosen host.
         """
@@ -474,19 +731,18 @@ class ExportProcess(mp.Process):
                 cql_version=self.cql_version,
                 protocol_version=self.protocol_version,
                 auth_provider=self.auth_provider,
-                ssl_options=sslhandling.ssl_settings(host, self.config_file) if self.ssl else None,
+                ssl_options=ssl_settings(host, self.config_file) if self.ssl else None,
                 load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
                 default_retry_policy=ExpBackoffRetryPolicy(self),
-                compression=None,
-                executor_threads=max(2, self.csv_options['jobs'] / 2))
+                compression=None)
 
             session = ExportSession(new_cluster, self)
             self.hosts_to_sessions[host] = session
             return session
         else:
-            host = min(hosts, key=lambda h: self.hosts_to_sessions[h].jobs)
+            host = min(hosts, key=lambda h: self.hosts_to_sessions[h].requests)
             session = self.hosts_to_sessions[host]
-            session.add_job()
+            session.add_request()
             return session
 
     def attach_callbacks(self, token_range, future, session):
@@ -497,16 +753,16 @@ class ExportProcess(mp.Process):
             else:
                 self.write_rows_to_csv(token_range, rows)
                 self.outmsg.put((None, None))
-                session.complete_job()
+                session.complete_request()
 
         def err_callback(err):
             self.report_error(err, token_range)
-            session.complete_job()
+            session.complete_request()
 
         future.add_callbacks(callback=result_callback, errback=err_callback)
 
     def write_rows_to_csv(self, token_range, rows):
-        if len(rows) == 0:
+        if not rows:
             return  # no rows in this range
 
         try:
@@ -537,12 +793,9 @@ class ExportProcess(mp.Process):
                          float_precision=self.float_precision, nullval=self.nullval, quote=False)
 
     def close(self):
-        self.printmsg("Export process terminating...")
-        self.inmsg.close()
-        self.outmsg.close()
+        ChildProcess.close(self)
         for session in self.hosts_to_sessions.values():
             session.shutdown()
-        self.printmsg("Export process terminated")
 
     def prepare_query(self, partition_key, token_range, attempts):
         """
@@ -598,26 +851,439 @@ class ExportProcess(mp.Process):
         return query
 
 
+class ImportConversion(object):
+    """
+    A class for converting strings to values when importing from csv, used by ImportProcess,
+    the parent.
+    """
+    def __init__(self, parent, table_meta, statement):
+        self.ks = parent.ks
+        self.cf = parent.cf
+        self.columns = parent.columns
+        self.nullval = parent.nullval
+        self.printmsg = parent.printmsg
+        self.table_meta = table_meta
+        self.primary_key_indexes = [self.columns.index(col.name) for col in self.table_meta.primary_key]
+        self.partition_key_indexes = [self.columns.index(col.name) for col in self.table_meta.partition_key]
+
+        self.proto_version = statement.protocol_version
+        self.cqltypes = dict([(c.name, c.type) for c in statement.column_metadata])
+        self.converters = dict([(c.name, self._get_converter(c.type)) for c in statement.column_metadata])
+
+    def _get_converter(self, cql_type):
+        """
+        Return a function that converts a string into a value the can be passed
+        into BoundStatement.bind() for the given cql type. See cassandra.cqltypes
+        for more details.
+        """
+        def unprotect(v):
+            if v is not None:
+                return CqlRuleSet.dequote_value(v)
+
+        def convert(t, v):
+            return converters.get(t.typename, convert_unknown)(unprotect(v), ct=t)
+
+        def split(val, sep=','):
+            """
+            Split into a list of values whenever we encounter a separator but
+            ignore separators inside parentheses or single quotes, except for the two
+            outermost parentheses, which will be ignored. We expect val to be at least
+            2 characters long (the two outer parentheses).
+            """
+            ret = []
+            last = 1
+            level = 0
+            quote = False
+            for i, c in enumerate(val):
+                if c == '{' or c == '[' or c == '(':
+                    level += 1
+                elif c == '}' or c == ']' or c == ')':
+                    level -= 1
+                elif c == '\'':
+                    quote = not quote
+                elif c == sep and level == 1 and not quote:
+                    ret.append(val[last:i])
+                    last = i + 1
+            else:
+                if last < len(val) - 1:
+                    ret.append(val[last:-1])
+
+            return ret
+
+        # this should match all possible CQL datetime formats
+        p = re.compile("(\d{4})\-(\d{2})\-(\d{2})\s?(?:'T')?" +  # YYYY-MM-DD[( |'T')]
+                       "(?:(\d{2}):(\d{2})(?::(\d{2}))?)?" +  # [HH:MM[:SS]]
+                       "(?:([+\-])(\d{2}):?(\d{2}))?")  # [(+|-)HH[:]MM]]
+
+        def convert_date(val, **_):
+            m = p.match(val)
+            if not m:
+                raise ValueError("can't interpret %r as a date" % (val,))
+
+            # https://docs.python.org/2/library/time.html#time.struct_time
+            tval = time.struct_time((int(m.group(1)), int(m.group(2)), int(m.group(3)),  # year, month, day
+                                     int(m.group(4)) if m.group(4) else 0,  # hour
+                                     int(m.group(5)) if m.group(5) else 0,  # minute
+                                     int(m.group(6)) if m.group(6) else 0,  # second
+                                     0, 1, -1))  # day of week, day of year, dst-flag
+
+            if m.group(7):
+                offset = (int(m.group(8)) * 3600 + int(m.group(9)) * 60) * int(m.group(7) + '1')
+            else:
+                offset = -time.timezone
+
+            # scale seconds to millis for the raw value
+            return (timegm(tval) + offset) * 1e3
+
+        def convert_tuple(val, ct=cql_type):
+            return tuple(convert(t, v) for t, v in zip(ct.subtypes, split(val)))
+
+        def convert_list(val, ct=cql_type):
+            return list(convert(ct.subtypes[0], v) for v in split(val))
+
+        def convert_set(val, ct=cql_type):
+            return frozenset(convert(ct.subtypes[0], v) for v in split(val))
+
+        def convert_map(val, ct=cql_type):
+            """
+            We need to pass to BoundStatement.bind() a dict() because it calls iteritems(),
+            except we can't create a dict with another dict as the key, hence we use a class
+            that adds iteritems to a frozen set of tuples (which is how dict are normally made
+            immutable in python).
+            """
+            class ImmutableDict(frozenset):
+                iteritems = frozenset.__iter__
+
+            return ImmutableDict(frozenset((convert(ct.subtypes[0], v[0]), convert(ct.subtypes[1], v[1]))
+                                 for v in [split('{%s}' % vv, sep=':') for vv in split(val)]))
+
+        def convert_user_type(val, ct=cql_type):
+            """
+            A user type is a dictionary except that we must convert each key into
+            an attribute, so we are using named tuples. It must also be hashable,
+            so we cannot use dictionaries. Maybe there is a way to instantiate ct
+            directly but I could not work it out.
+            """
+            vals = [v for v in [split('{%s}' % vv, sep=':') for vv in split(val)]]
+            ret_type = namedtuple(ct.typename, [unprotect(v[0]) for v in vals])
+            return ret_type(*tuple(convert(t, v[1]) for t, v in zip(ct.subtypes, vals)))
+
+        def convert_single_subtype(val, ct=cql_type):
+            return converters.get(ct.subtypes[0].typename, convert_unknown)(val, ct=ct.subtypes[0])
+
+        def convert_unknown(val, ct=cql_type):
+            if issubclass(ct, UserType):
+                return convert_user_type(val, ct=ct)
+            elif issubclass(ct, ReversedType):
+                return convert_single_subtype(val, ct=ct)
+
+            self.printmsg("Unknown type %s (%s) for val %s" % (ct, ct.typename, val))
+            return val
+
+        converters = {
+            'blob': (lambda v, ct=cql_type: bytearray.fromhex(v[2:])),
+            'decimal': (lambda v, ct=cql_type: Decimal(v)),
+            'uuid': (lambda v, ct=cql_type: UUID(v)),
+            'boolean': (lambda v, ct=cql_type: bool(v)),
+            'tinyint': (lambda v, ct=cql_type: int(v)),
+            'ascii': (lambda v, ct=cql_type: v),
+            'float': (lambda v, ct=cql_type: float(v)),
+            'double': (lambda v, ct=cql_type: float(v)),
+            'bigint': (lambda v, ct=cql_type: long(v)),
+            'int': (lambda v, ct=cql_type: int(v)),
+            'varint': (lambda v, ct=cql_type: int(v)),
+            'inet': (lambda v, ct=cql_type: v),
+            'counter': (lambda v, ct=cql_type: long(v)),
+            'timestamp': convert_date,
+            'timeuuid': (lambda v, ct=cql_type: UUID(v)),
+            'date': (lambda v, ct=cql_type: Date(v)),
+            'smallint': (lambda v, ct=cql_type: int(v)),
+            'time': (lambda v, ct=cql_type: Time(v)),
+            'text': (lambda v, ct=cql_type: v),
+            'varchar': (lambda v, ct=cql_type: v),
+            'list': convert_list,
+            'set': convert_set,
+            'map': convert_map,
+            'tuple': convert_tuple,
+            'frozen': convert_single_subtype,
+        }
+
+        return converters.get(cql_type.typename, convert_unknown)
+
+    def get_row_values(self, row):
+        """
+        Parse the row into a list of row values to be returned
+        """
+        ret = [None] * len(row)
+        for i, val in enumerate(row):
+            if val != self.nullval:
+                ret[i] = self.converters[self.columns[i]](val)
+            else:
+                if i in self.primary_key_indexes:
+                    message = "Cannot insert null value for primary key column '%s'." % (self.columns[i],)
+                    if self.nullval == '':
+                        message += " If you want to insert empty strings, consider using" \
+                                   " the WITH NULL=<marker> option for COPY."
+                    raise Exception(message=message)
+
+                ret[i] = None
+
+        return ret
+
+    def get_row_partition_key_values(self, row):
+        """
+        Return a string composed of the partition key values, serialized and binary packed -
+        as expected by metadata.get_replicas(), see also BoundStatement.routing_key.
+        """
+        def serialize(n):
+            c, v = self.columns[n], row[n]
+            return self.cqltypes[c].serialize(self.converters[c](v), self.proto_version)
+
+        partition_key_indexes = self.partition_key_indexes
+        if len(partition_key_indexes) == 1:
+            return serialize(partition_key_indexes[0])
+        else:
+            pk_values = []
+            for i in partition_key_indexes:
+                val = serialize(i)
+                l = len(val)
+                pk_values.append(struct.pack(">H%dsB" % l, l, val, 0))
+            return b"".join(pk_values)
+
+
+class ImportProcess(ChildProcess):
+
+    def __init__(self, params):
+        ChildProcess.__init__(self, params=params, target=self.run)
+
+        csv_options = params['csv_options']
+        self.nullval = csv_options['nullval']
+        self.max_attempts = csv_options['maxattempts']
+        self.min_batch_size = csv_options['minbatchsize']
+        self.max_batch_size = csv_options['maxbatchsize']
+        self._session = None
+
+    @property
+    def session(self):
+        if not self._session:
+            cluster = Cluster(
+                contact_points=(self.hostname,),
+                port=self.port,
+                cql_version=self.cql_version,
+                protocol_version=self.protocol_version,
+                auth_provider=self.auth_provider,
+                load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
+                ssl_options=ssl_settings(self.hostname, self.config_file) if self.ssl else None,
+                default_retry_policy=ExpBackoffRetryPolicy(self),
+                compression=None,
+                connect_timeout=self.connect_timeout)
+
+            self._session = cluster.connect(self.ks)
+            self._session.default_timeout = None
+        return self._session
+
+    def run(self):
+        try:
+            table_meta = self.session.cluster.metadata.keyspaces[self.ks].tables[self.cf]
+            is_counter = ("counter" in [table_meta.columns[name].typestring for name in self.columns])
+
+            if is_counter:
+                self.run_counter(table_meta)
+            else:
+                self.run_normal(table_meta)
+
+        except Exception, exc:
+            if self.debug:
+                traceback.print_exc(exc)
+
+        finally:
+            self.close()
+
+    def close(self):
+        if self._session:
+            self._session.cluster.shutdown()
+        ChildProcess.close(self)
+
+    def run_counter(self, table_meta):
+        """
+        Main run method for tables that contain counter columns.
+        """
+        query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.cf))
+
+        # We prepare a query statement to find out the types of the partition key columns so we can
+        # route the update query to the correct replicas. As far as I understood this is the easiest
+        # way to find out the types of the partition columns, we will never use this prepared statement
+        where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
+        select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.cf), where_clause)
+        conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
+
+        while True:
+            try:
+                batch = self.inmsg.get()
+
+                for batches in self.split_batches(batch, conv):
+                    for b in batches:
+                        self.send_counter_batch(query, conv, b)
+
+            except Exception, exc:
+                self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
+                if self.debug:
+                    traceback.print_exc(exc)
+
+    def run_normal(self, table_meta):
+        """
+        Main run method for normal tables, i.e. tables that do not contain counter columns.
+        """
+        query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+                                                        protect_name(self.cf),
+                                                        ', '.join(protect_names(self.columns),),
+                                                        ', '.join(['?' for _ in self.columns]))
+        query_statement = self.session.prepare(query)
+        conv = ImportConversion(self, table_meta, query_statement)
+
+        while True:
+            try:
+                batch = self.inmsg.get()
+
+                for batches in self.split_batches(batch, conv):
+                    for b in batches:
+                        self.send_normal_batch(conv, query_statement, b)
+
+            except Exception, exc:
+                self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
+                if self.debug:
+                    traceback.print_exc(exc)
+
+    def send_counter_batch(self, query_text, conv, batch):
+        if self.test_failures and self.maybe_inject_failures(batch):
+            return
+
+        columns = self.columns
+        batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+        for row in batch['rows']:
+            where_clause = []
+            set_clause = []
+            for i, value in enumerate(row):
+                if i in conv.primary_key_indexes:
+                    where_clause.append("%s=%s" % (columns[i], value))
+                else:
+                    set_clause.append("%s=%s+%s" % (columns[i], columns[i], value))
+
+            full_query_text = query_text % (','.join(set_clause), ' AND '.join(where_clause))
+            batch_statement.add(full_query_text)
+
+        self.execute_statement(batch_statement, batch)
+
+    def send_normal_batch(self, conv, query_statement, batch):
+        try:
+            if self.test_failures and self.maybe_inject_failures(batch):
+                return
+
+            batch_statement = BatchStatement(batch_type=BatchType.UNLOGGED, consistency_level=self.consistency_level)
+            for row in batch['rows']:
+                batch_statement.add(query_statement, conv.get_row_values(row))
+
+            self.execute_statement(batch_statement, batch)
+
+        except Exception, exc:
+            self.err_callback(exc, batch)
+
+    def maybe_inject_failures(self, batch):
+        """
+        Examine self.test_failures and see if token_range is either a token range
+        supposed to cause a failure (failing_range) or to terminate the worker process
+        (exit_range). If not then call prepare_export_query(), which implements the
+        normal behavior.
+        """
+        if 'failing_batch' in self.test_failures:
+            failing_batch = self.test_failures['failing_batch']
+            if failing_batch['id'] == batch['id']:
+                if batch['attempts'] < failing_batch['failures']:
+                    statement = SimpleStatement("INSERT INTO badtable (a, b) VALUES (1, 2)",
+                                                consistency_level=self.consistency_level)
+                    self.execute_statement(statement, batch)
+                    return True
+
+        if 'exit_batch' in self.test_failures:
+            exit_batch = self.test_failures['exit_batch']
+            if exit_batch['id'] == batch['id']:
+                sys.exit(1)
+
+        return False  # carry on as normal
+
+    def execute_statement(self, statement, batch):
+        future = self.session.execute_async(statement)
+        future.add_callbacks(callback=self.result_callback, callback_args=(batch, ),
+                             errback=self.err_callback, errback_args=(batch, ))
+
+    def split_batches(self, batch, conv):
+        """
+        Split a batch into sub-batches with the same
+        partition key, if possible. If there are at least
+        batch_size rows with the same partition key value then
+        create a sub-batch with that partition key value, else
+        aggregate all remaining rows in a single 'left-overs' batch
+        """
+        rows_by_pk = defaultdict(list)
+
+        for row in batch['rows']:
+            pk = conv.get_row_partition_key_values(row)
+            rows_by_pk[pk].append(row)
+
+        ret = dict()
+        remaining_rows = []
+
+        for pk, rows in rows_by_pk.items():
+            if len(rows) >= self.min_batch_size:
+                ret[pk] = self.batches(rows, batch)
+            else:
+                remaining_rows.extend(rows)
+
+        if remaining_rows:
+            ret[self.hostname] = self.batches(remaining_rows, batch)
+
+        return ret.itervalues()
+
+    def batches(self, rows, batch):
+        for i in xrange(0, len(rows), self.max_batch_size):
+            yield ImportTask.make_batch(batch['id'], rows[i:i + self.max_batch_size], batch['attempts'])
+
+    def result_callback(self, result, batch):
+        batch['imported'] = len(batch['rows'])
+        batch['rows'] = []  # no need to resend these
+        self.outmsg.put((batch, None))
+
+    def err_callback(self, response, batch):
+        batch['imported'] = len(batch['rows'])
+        self.outmsg.put((batch, '%s - %s' % (response.__class__.__name__, response.message)))
+        if self.debug:
+            traceback.print_exc(response)
+
+
 class RateMeter(object):
 
-    def __init__(self, log_threshold):
-        self.log_threshold = log_threshold  # number of records after which we log
-        self.last_checkpoint_time = time.time()  # last time we logged
+    def __init__(self, update_interval=0.25, log=True):
+        self.log = log  # true if we should log
+        self.update_interval = update_interval  # how often we update in seconds
+        self.start_time = time.time()  # the start time
+        self.last_checkpoint_time = self.start_time  # last time we logged
         self.current_rate = 0.0  # rows per second
-        self.current_record = 0  # number of records since we last logged
+        self.current_record = 0  # number of records since we last updated
         self.total_records = 0   # total number of records
 
     def increment(self, n=1):
         self.current_record += n
+        self.maybe_update()
 
-        if self.current_record >= self.log_threshold:
-            self.update()
-            self.log()
-
-    def update(self):
+    def maybe_update(self):
         new_checkpoint_time = time.time()
+        if new_checkpoint_time - self.last_checkpoint_time >= self.update_interval:
+            self.update(new_checkpoint_time)
+            self.log_message()
+
+    def update(self, new_checkpoint_time):
         time_difference = new_checkpoint_time - self.last_checkpoint_time
-        if time_difference != 0.0:
+        if time_difference >= 1e-09:
             self.current_rate = self.get_new_rate(self.current_record / time_difference)
 
         self.last_checkpoint_time = new_checkpoint_time
@@ -626,19 +1292,29 @@ class RateMeter(object):
 
     def get_new_rate(self, new_rate):
         """
-         return the previous rate averaged with the new rate to smooth a bit
+         return the rate of the last period: this is the new rate but
+         averaged with the last rate to smooth a bit
         """
         if self.current_rate == 0.0:
             return new_rate
         else:
             return (self.current_rate + new_rate) / 2.0
 
-    def log(self):
-        output = 'Processed %d rows; Written: %f rows/s\r' % (self.total_records, self.current_rate,)
-        sys.stdout.write(output)
-        sys.stdout.flush()
+    def get_avg_rate(self):
+        """
+         return the average rate since we started measuring
+        """
+        time_difference = time.time() - self.start_time
+        return self.total_records / time_difference if time_difference >= 1e-09 else 0
+
+    def log_message(self):
+        if self.log:
+            output = 'Processed: %d rows; Rate: %7.0f rows/s; Avg. rage: %7.0f rows/s\r' % \
+                     (self.total_records, self.current_rate, self.get_avg_rate())
+            sys.stdout.write(output)
+            sys.stdout.flush()
 
     def get_total_records(self):
-        self.update()
-        self.log()
+        self.update(time.time())
+        self.log_message()
         return self.total_records

http://git-wip-us.apache.org/repos/asf/cassandra/blob/124f1bd2/pylib/cqlshlib/util.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/util.py b/pylib/cqlshlib/util.py
index 4d6cf8a..281aad6 100644
--- a/pylib/cqlshlib/util.py
+++ b/pylib/cqlshlib/util.py
@@ -14,9 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
+import cProfile
 import codecs
+import pstats
+
 from itertools import izip
 from datetime import timedelta, tzinfo
+from StringIO import StringIO
 
 ZERO = timedelta(0)
 
@@ -122,3 +127,17 @@ def get_file_encoding_bomsize(filename):
         file_encoding, size = "utf-8", 0
 
     return (file_encoding, size)
+
+
+def profile_on():
+    pr = cProfile.Profile()
+    pr.enable()
+    return pr
+
+
+def profile_off(pr):
+    pr.disable()
+    s = StringIO()
+    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
+    ps.print_stats()
+    print s.getvalue()


[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57d558fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57d558fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57d558fc

Branch: refs/heads/cassandra-2.2
Commit: 57d558fc1ef8117c41567541b967e2b782d04a50
Parents: de55c39 124f1bd
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Dec 15 21:37:09 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Dec 15 21:37:09 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                |   4 +
 bin/cqlsh.py               | 329 ++-------------
 pylib/cqlshlib/copyutil.py | 912 ++++++++++++++++++++++++++++++++++------
 pylib/cqlshlib/util.py     |  19 +
 4 files changed, 843 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57d558fc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c9074fc,90f1bca..c969a4d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,15 +1,40 @@@
 -2.1.13
 +2.2.5
 + * Add property to allow listening on broadcast interface (CASSANDRA-9748)
 + * Fix regression in split size on CqlInputFormat (CASSANDRA-10835)
 + * Better handling of SSL connection errors inter-node (CASSANDRA-10816)
 + * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
 + * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 +Merged from 2.1:
+  * (cqlsh) further optimise COPY FROM (CASSANDRA-9302)
   * Allow CREATE TABLE WITH ID (CASSANDRA-9179)
   * Make Stress compiles within eclipse (CASSANDRA-10807)
   * Cassandra Daemon should print JVM arguments (CASSANDRA-10764)
   * Allow cancellation of index summary redistribution (CASSANDRA-8805)
+  * sstableloader will fail if there are collections in the schema tables (CASSANDRA-10700)
+  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
   * Fix Stress profile parsing on Windows (CASSANDRA-10808)
  
+ 
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 +Merged from 2.1:
   * Fix incremental repair hang when replica is down (CASSANDRA-10288)
   * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)


[3/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57d558fc/bin/cqlsh.py
----------------------------------------------------------------------
diff --cc bin/cqlsh.py
index a3fa666,0000000..42c2923
mode 100644,000000..100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@@ -1,2763 -1,0 +1,2486 @@@
 +#!/bin/sh
 +# -*- mode: Python -*-
 +
 +# Licensed to the Apache Software Foundation (ASF) under one
 +# or more contributor license agreements.  See the NOTICE file
 +# distributed with this work for additional information
 +# regarding copyright ownership.  The ASF licenses this file
 +# to you under the Apache License, Version 2.0 (the
 +# "License"); you may not use this file except in compliance
 +# with the License.  You may obtain a copy of the License at
 +#
 +#     http://www.apache.org/licenses/LICENSE-2.0
 +#
 +# Unless required by applicable law or agreed to in writing, software
 +# distributed under the License is distributed on an "AS IS" BASIS,
 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +# See the License for the specific language governing permissions and
 +# limitations under the License.
 +
 +""":"
 +# bash code here; finds a suitable python interpreter and execs this file.
 +# prefer unqualified "python" if suitable:
 +python -c 'import sys; sys.exit(not (0x020500b0 < sys.hexversion < 0x03000000))' 2>/dev/null \
 +    && exec python "$0" "$@"
 +for pyver in 2.6 2.7 2.5; do
 +    which python$pyver > /dev/null 2>&1 && exec python$pyver "$0" "$@"
 +done
 +echo "No appropriate python interpreter found." >&2
 +exit 1
 +":"""
 +
 +from __future__ import with_statement
 +
 +import cmd
 +import codecs
 +import ConfigParser
 +import csv
 +import getpass
 +import locale
- import multiprocessing as mp
 +import optparse
 +import os
 +import platform
 +import sys
 +import time
 +import traceback
 +import warnings
 +import webbrowser
++from StringIO import StringIO
 +from contextlib import contextmanager
- from functools import partial
 +from glob import glob
- from StringIO import StringIO
 +from uuid import UUID
 +
 +if sys.version_info[0] != 2 or sys.version_info[1] != 7:
 +    sys.exit("\nCQL Shell supports only Python 2.7\n")
 +
 +description = "CQL Shell for Apache Cassandra"
 +version = "5.0.1"
 +
 +readline = None
 +try:
 +    # check if tty first, cause readline doesn't check, and only cares
 +    # about $TERM. we don't want the funky escape code stuff to be
 +    # output if not a tty.
 +    if sys.stdin.isatty():
 +        import readline
 +except ImportError:
 +    pass
 +
 +CQL_LIB_PREFIX = 'cassandra-driver-internal-only-'
 +
 +CASSANDRA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
 +CASSANDRA_CQL_HTML_FALLBACK = 'https://cassandra.apache.org/doc/cql3/CQL-2.2.html'
 +
 +if os.path.exists(CASSANDRA_PATH + '/doc/cql3/CQL.html'):
 +    # default location of local CQL.html
 +    CASSANDRA_CQL_HTML = 'file://' + CASSANDRA_PATH + '/doc/cql3/CQL.html'
 +elif os.path.exists('/usr/share/doc/cassandra/CQL.html'):
 +    # fallback to package file
 +    CASSANDRA_CQL_HTML = 'file:///usr/share/doc/cassandra/CQL.html'
 +else:
 +    # fallback to online version
 +    CASSANDRA_CQL_HTML = CASSANDRA_CQL_HTML_FALLBACK
 +
 +# On Linux, the Python webbrowser module uses the 'xdg-open' executable
 +# to open a file/URL. But that only works, if the current session has been
 +# opened from _within_ a desktop environment. I.e. 'xdg-open' will fail,
 +# if the session's been opened via ssh to a remote box.
 +#
 +# Use 'python' to get some information about the detected browsers.
 +# >>> import webbrowser
 +# >>> webbrowser._tryorder
 +# >>> webbrowser._browser
 +#
 +if len(webbrowser._tryorder) == 0:
 +    CASSANDRA_CQL_HTML = CASSANDRA_CQL_HTML_FALLBACK
 +elif webbrowser._tryorder[0] == 'xdg-open' and os.environ.get('XDG_DATA_DIRS', '') == '':
 +    # only on Linux (some OS with xdg-open)
 +    webbrowser._tryorder.remove('xdg-open')
 +    webbrowser._tryorder.append('xdg-open')
 +
 +# use bundled libs for python-cql and thrift, if available. if there
 +# is a ../lib dir, use bundled libs there preferentially.
 +ZIPLIB_DIRS = [os.path.join(CASSANDRA_PATH, 'lib')]
 +myplatform = platform.system()
 +if myplatform == 'Linux':
 +    ZIPLIB_DIRS.append('/usr/share/cassandra/lib')
 +
 +if os.environ.get('CQLSH_NO_BUNDLED', ''):
 +    ZIPLIB_DIRS = ()
 +
 +
 +def find_zip(libprefix):
 +    for ziplibdir in ZIPLIB_DIRS:
 +        zips = glob(os.path.join(ziplibdir, libprefix + '*.zip'))
 +        if zips:
 +            return max(zips)   # probably the highest version, if multiple
 +
 +cql_zip = find_zip(CQL_LIB_PREFIX)
 +if cql_zip:
 +    ver = os.path.splitext(os.path.basename(cql_zip))[0][len(CQL_LIB_PREFIX):]
 +    sys.path.insert(0, os.path.join(cql_zip, 'cassandra-driver-' + ver))
 +
 +third_parties = ('futures-', 'six-')
 +
 +for lib in third_parties:
 +    lib_zip = find_zip(lib)
 +    if lib_zip:
 +        sys.path.insert(0, lib_zip)
 +
 +warnings.filterwarnings("ignore", r".*blist.*")
 +try:
 +    import cassandra
 +except ImportError, e:
 +    sys.exit("\nPython Cassandra driver not installed, or not on PYTHONPATH.\n"
 +             'You might try "pip install cassandra-driver".\n\n'
 +             'Python: %s\n'
 +             'Module load path: %r\n\n'
 +             'Error: %s\n' % (sys.executable, sys.path, e))
 +
 +from cassandra.auth import PlainTextAuthProvider
 +from cassandra.cluster import Cluster
 +from cassandra.metadata import (ColumnMetadata, KeyspaceMetadata,
-                                 TableMetadata, protect_name, protect_names,
-                                 protect_value)
++                                TableMetadata, protect_name, protect_names)
 +from cassandra.policies import WhiteListRoundRobinPolicy
- from cassandra.protocol import QueryMessage, ResultMessage
++from cassandra.protocol import ResultMessage
 +from cassandra.query import SimpleStatement, ordered_dict_factory, TraceUnavailable
 +
 +# cqlsh should run correctly when run out of a Cassandra source tree,
 +# out of an unpacked Cassandra tarball, and after a proper package install.
 +cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
 +if os.path.isdir(cqlshlibdir):
 +    sys.path.insert(0, cqlshlibdir)
 +
 +from cqlshlib import cql3handling, cqlhandling, copyutil, pylexotron, sslhandling
 +from cqlshlib.displaying import (ANSI_RESET, BLUE, COLUMN_NAME_COLORS, CYAN,
 +                                 RED, FormattedValue, colorme)
 +from cqlshlib.formatting import (DEFAULT_DATE_FORMAT, DEFAULT_NANOTIME_FORMAT,
 +                                 DEFAULT_TIMESTAMP_FORMAT, DateTimeFormat,
 +                                 format_by_type, format_value_utype,
 +                                 formatter_for)
 +from cqlshlib.tracing import print_trace, print_trace_session
 +from cqlshlib.util import get_file_encoding_bomsize, trim_if_present
 +
 +DEFAULT_HOST = '127.0.0.1'
 +DEFAULT_PORT = 9042
 +DEFAULT_CQLVER = '3.3.1'
 +DEFAULT_PROTOCOL_VERSION = 4
 +DEFAULT_CONNECT_TIMEOUT_SECONDS = 5
 +
 +DEFAULT_FLOAT_PRECISION = 5
 +DEFAULT_MAX_TRACE_WAIT = 10
 +
 +if readline is not None and readline.__doc__ is not None and 'libedit' in readline.__doc__:
 +    DEFAULT_COMPLETEKEY = '\t'
 +else:
 +    DEFAULT_COMPLETEKEY = 'tab'
 +
 +cqldocs = None
 +cqlruleset = None
 +
 +epilog = """Connects to %(DEFAULT_HOST)s:%(DEFAULT_PORT)d by default. These
 +defaults can be changed by setting $CQLSH_HOST and/or $CQLSH_PORT. When a
 +host (and optional port number) are given on the command line, they take
 +precedence over any defaults.""" % globals()
 +
 +parser = optparse.OptionParser(description=description, epilog=epilog,
 +                               usage="Usage: %prog [options] [host [port]]",
 +                               version='cqlsh ' + version)
 +parser.add_option("-C", "--color", action='store_true', dest='color',
 +                  help='Always use color output')
 +parser.add_option("--no-color", action='store_false', dest='color',
 +                  help='Never use color output')
 +parser.add_option("--browser", dest='browser', help="""The browser to use to display CQL help, where BROWSER can be:
 +                                                    - one of the supported browsers in https://docs.python.org/2/library/webbrowser.html.
 +                                                    - browser path followed by %s, example: /usr/bin/google-chrome-stable %s""")
 +parser.add_option('--ssl', action='store_true', help='Use SSL', default=False)
 +parser.add_option("-u", "--username", help="Authenticate as user.")
 +parser.add_option("-p", "--password", help="Authenticate using password.")
 +parser.add_option('-k', '--keyspace', help='Authenticate to the given keyspace.')
 +parser.add_option("-f", "--file", help="Execute commands from FILE, then exit")
 +parser.add_option('--debug', action='store_true',
 +                  help='Show additional debugging information')
 +parser.add_option("--encoding", help="Specify a non-default encoding for output.  If you are " +
 +                  "experiencing problems with unicode characters, using utf8 may fix the problem." +
 +                  " (Default from system preferences: %s)" % (locale.getpreferredencoding(),))
 +parser.add_option("--cqlshrc", help="Specify an alternative cqlshrc file location.")
 +parser.add_option('--cqlversion', default=DEFAULT_CQLVER,
 +                  help='Specify a particular CQL version (default: %default).'
 +                       ' Examples: "3.0.3", "3.1.0"')
 +parser.add_option("-e", "--execute", help='Execute the statement and quit.')
 +parser.add_option("--connect-timeout", default=DEFAULT_CONNECT_TIMEOUT_SECONDS, dest='connect_timeout',
 +                  help='Specify the connection timeout in seconds (default: %default seconds).')
 +
 +optvalues = optparse.Values()
 +(options, arguments) = parser.parse_args(sys.argv[1:], values=optvalues)
 +
 +# BEGIN history/config definition
 +HISTORY_DIR = os.path.expanduser(os.path.join('~', '.cassandra'))
 +
 +if hasattr(options, 'cqlshrc'):
 +    CONFIG_FILE = options.cqlshrc
 +    if not os.path.exists(CONFIG_FILE):
 +        print '\nWarning: Specified cqlshrc location `%s` does not exist.  Using `%s` instead.\n' % (CONFIG_FILE, HISTORY_DIR)
 +        CONFIG_FILE = os.path.join(HISTORY_DIR, 'cqlshrc')
 +else:
 +    CONFIG_FILE = os.path.join(HISTORY_DIR, 'cqlshrc')
 +
 +HISTORY = os.path.join(HISTORY_DIR, 'cqlsh_history')
 +if not os.path.exists(HISTORY_DIR):
 +    try:
 +        os.mkdir(HISTORY_DIR)
 +    except OSError:
 +        print '\nWarning: Cannot create directory at `%s`. Command history will not be saved.\n' % HISTORY_DIR
 +
 +OLD_CONFIG_FILE = os.path.expanduser(os.path.join('~', '.cqlshrc'))
 +if os.path.exists(OLD_CONFIG_FILE):
 +    if os.path.exists(CONFIG_FILE):
 +        print '\nWarning: cqlshrc config files were found at both the old location (%s) and \
 +                the new location (%s), the old config file will not be migrated to the new \
 +                location, and the new location will be used for now.  You should manually \
 +                consolidate the config files at the new location and remove the old file.' \
 +                % (OLD_CONFIG_FILE, CONFIG_FILE)
 +    else:
 +        os.rename(OLD_CONFIG_FILE, CONFIG_FILE)
 +OLD_HISTORY = os.path.expanduser(os.path.join('~', '.cqlsh_history'))
 +if os.path.exists(OLD_HISTORY):
 +    os.rename(OLD_HISTORY, HISTORY)
 +# END history/config definition
 +
 +CQL_ERRORS = (
 +    cassandra.AlreadyExists, cassandra.AuthenticationFailed, cassandra.InvalidRequest,
 +    cassandra.Timeout, cassandra.Unauthorized, cassandra.OperationTimedOut,
 +    cassandra.cluster.NoHostAvailable,
 +    cassandra.connection.ConnectionBusy, cassandra.connection.ProtocolError, cassandra.connection.ConnectionException,
 +    cassandra.protocol.ErrorMessage, cassandra.protocol.InternalError, cassandra.query.TraceUnavailable
 +)
 +
 +debug_completion = bool(os.environ.get('CQLSH_DEBUG_COMPLETION', '') == 'YES')
 +
 +# we want the cql parser to understand our cqlsh-specific commands too
 +my_commands_ending_with_newline = (
 +    'help',
 +    '?',
 +    'consistency',
 +    'serial',
 +    'describe',
 +    'desc',
 +    'show',
 +    'source',
 +    'capture',
 +    'login',
 +    'debug',
 +    'tracing',
 +    'expand',
 +    'paging',
 +    'exit',
 +    'quit',
 +    'clear',
 +    'cls'
 +)
 +
 +
 +cqlsh_syntax_completers = []
 +
 +
 +def cqlsh_syntax_completer(rulename, termname):
 +    def registrator(f):
 +        cqlsh_syntax_completers.append((rulename, termname, f))
 +        return f
 +    return registrator
 +
 +
 +cqlsh_extra_syntax_rules = r'''
 +<cqlshCommand> ::= <CQL_Statement>
 +                 | <specialCommand> ( ";" | "\n" )
 +                 ;
 +
 +<specialCommand> ::= <describeCommand>
 +                   | <consistencyCommand>
 +                   | <serialConsistencyCommand>
 +                   | <showCommand>
 +                   | <sourceCommand>
 +                   | <captureCommand>
 +                   | <copyCommand>
 +                   | <loginCommand>
 +                   | <debugCommand>
 +                   | <helpCommand>
 +                   | <tracingCommand>
 +                   | <expandCommand>
 +                   | <exitCommand>
 +                   | <pagingCommand>
 +                   | <clearCommand>
 +                   ;
 +
 +<describeCommand> ::= ( "DESCRIBE" | "DESC" )
 +                                  ( "FUNCTIONS"
 +                                  | "FUNCTION" udf=<anyFunctionName>
 +                                  | "AGGREGATES"
 +                                  | "AGGREGATE" uda=<userAggregateName>
 +                                  | "KEYSPACES"
 +                                  | "KEYSPACE" ksname=<keyspaceName>?
 +                                  | ( "COLUMNFAMILY" | "TABLE" ) cf=<columnFamilyName>
 +                                  | "INDEX" idx=<indexName>
 +                                  | ( "COLUMNFAMILIES" | "TABLES" )
 +                                  | "FULL"? "SCHEMA"
 +                                  | "CLUSTER"
 +                                  | "TYPES"
 +                                  | "TYPE" ut=<userTypeName>
 +                                  | (ksname=<keyspaceName> | cf=<columnFamilyName> | idx=<indexName>))
 +                    ;
 +
 +<consistencyCommand> ::= "CONSISTENCY" ( level=<consistencyLevel> )?
 +                       ;
 +
 +<consistencyLevel> ::= "ANY"
 +                     | "ONE"
 +                     | "TWO"
 +                     | "THREE"
 +                     | "QUORUM"
 +                     | "ALL"
 +                     | "LOCAL_QUORUM"
 +                     | "EACH_QUORUM"
 +                     | "SERIAL"
 +                     | "LOCAL_SERIAL"
 +                     | "LOCAL_ONE"
 +                     ;
 +
 +<serialConsistencyCommand> ::= "SERIAL" "CONSISTENCY" ( level=<serialConsistencyLevel> )?
 +                             ;
 +
 +<serialConsistencyLevel> ::= "SERIAL"
 +                           | "LOCAL_SERIAL"
 +                           ;
 +
 +<showCommand> ::= "SHOW" what=( "VERSION" | "HOST" | "SESSION" sessionid=<uuid> )
 +                ;
 +
 +<sourceCommand> ::= "SOURCE" fname=<stringLiteral>
 +                  ;
 +
 +<captureCommand> ::= "CAPTURE" ( fname=( <stringLiteral> | "OFF" ) )?
 +                   ;
 +
 +<copyCommand> ::= "COPY" cf=<columnFamilyName>
 +                         ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )?
 +                         ( dir="FROM" ( fname=<stringLiteral> | "STDIN" )
 +                         | dir="TO"   ( fname=<stringLiteral> | "STDOUT" ) )
 +                         ( "WITH" <copyOption> ( "AND" <copyOption> )* )?
 +                ;
 +
 +<copyOption> ::= [optnames]=(<identifier>|<reserved_identifier>) "=" [optvals]=<copyOptionVal>
 +               ;
 +
 +<copyOptionVal> ::= <identifier>
 +                  | <reserved_identifier>
-                   | <stringLiteral>
++                  | <term>
 +                  ;
 +
 +# avoiding just "DEBUG" so that this rule doesn't get treated as a terminal
 +<debugCommand> ::= "DEBUG" "THINGS"?
 +                 ;
 +
 +<helpCommand> ::= ( "HELP" | "?" ) [topic]=( /[a-z_]*/ )*
 +                ;
 +
 +<tracingCommand> ::= "TRACING" ( switch=( "ON" | "OFF" ) )?
 +                   ;
 +
 +<expandCommand> ::= "EXPAND" ( switch=( "ON" | "OFF" ) )?
 +                   ;
 +
 +<pagingCommand> ::= "PAGING" ( switch=( "ON" | "OFF" | /[0-9]+/) )?
 +                  ;
 +
 +<loginCommand> ::= "LOGIN" username=<username> (password=<stringLiteral>)?
 +                 ;
 +
 +<exitCommand> ::= "exit" | "quit"
 +                ;
 +
 +<clearCommand> ::= "CLEAR" | "CLS"
 +                 ;
 +
 +<qmark> ::= "?" ;
 +'''
 +
 +
 +@cqlsh_syntax_completer('helpCommand', 'topic')
 +def complete_help(ctxt, cqlsh):
 +    return sorted([t.upper() for t in cqldocs.get_help_topics() + cqlsh.get_help_topics()])
 +
 +
 +def complete_source_quoted_filename(ctxt, cqlsh):
 +    partial_path = ctxt.get_binding('partial', '')
 +    head, tail = os.path.split(partial_path)
 +    exhead = os.path.expanduser(head)
 +    try:
 +        contents = os.listdir(exhead or '.')
 +    except OSError:
 +        return ()
 +    matches = filter(lambda f: f.startswith(tail), contents)
 +    annotated = []
 +    for f in matches:
 +        match = os.path.join(head, f)
 +        if os.path.isdir(os.path.join(exhead, f)):
 +            match += '/'
 +        annotated.append(match)
 +    return annotated
 +
 +
 +cqlsh_syntax_completer('sourceCommand', 'fname')(complete_source_quoted_filename)
 +cqlsh_syntax_completer('captureCommand', 'fname')(complete_source_quoted_filename)
 +
 +
 +@cqlsh_syntax_completer('copyCommand', 'fname')
 +def copy_fname_completer(ctxt, cqlsh):
 +    lasttype = ctxt.get_binding('*LASTTYPE*')
 +    if lasttype == 'unclosedString':
 +        return complete_source_quoted_filename(ctxt, cqlsh)
 +    partial_path = ctxt.get_binding('partial')
 +    if partial_path == '':
 +        return ["'"]
 +    return ()
 +
 +
 +@cqlsh_syntax_completer('copyCommand', 'colnames')
 +def complete_copy_column_names(ctxt, cqlsh):
 +    existcols = map(cqlsh.cql_unprotect_name, ctxt.get_binding('colnames', ()))
 +    ks = cqlsh.cql_unprotect_name(ctxt.get_binding('ksname', None))
 +    cf = cqlsh.cql_unprotect_name(ctxt.get_binding('cfname'))
 +    colnames = cqlsh.get_column_names(ks, cf)
 +    if len(existcols) == 0:
 +        return [colnames[0]]
 +    return set(colnames[1:]) - set(existcols)
 +
 +
- COPY_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', 'ENCODING',
-                 'TIMEFORMAT', 'JOBS', 'PAGESIZE', 'PAGETIMEOUT', 'MAXATTEMPTS']
++COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL',
++                       'MAXATTEMPTS', 'REPORTFREQUENCY']
++COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE']
++COPY_TO_OPTIONS = ['ENCODING', 'TIMEFORMAT', 'PAGESIZE', 'PAGETIMEOUT', 'MAXREQUESTS']
 +
 +
 +@cqlsh_syntax_completer('copyOption', 'optnames')
 +def complete_copy_options(ctxt, cqlsh):
 +    optnames = map(str.upper, ctxt.get_binding('optnames', ()))
 +    direction = ctxt.get_binding('dir').upper()
-     opts = set(COPY_OPTIONS) - set(optnames)
 +    if direction == 'FROM':
-         opts -= set(['ENCODING', 'TIMEFORMAT', 'JOBS', 'PAGESIZE', 'PAGETIMEOUT', 'MAXATTEMPTS'])
++        opts = set(COPY_COMMON_OPTIONS + COPY_FROM_OPTIONS) - set(optnames)
++    elif direction == 'TO':
++        opts = set(COPY_COMMON_OPTIONS + COPY_TO_OPTIONS) - set(optnames)
 +    return opts
 +
 +
 +@cqlsh_syntax_completer('copyOption', 'optvals')
 +def complete_copy_opt_values(ctxt, cqlsh):
 +    optnames = ctxt.get_binding('optnames', ())
 +    lastopt = optnames[-1].lower()
 +    if lastopt == 'header':
 +        return ['true', 'false']
 +    return [cqlhandling.Hint('<single_character_string>')]
 +
 +
 +class NoKeyspaceError(Exception):
 +    pass
 +
 +
 +class KeyspaceNotFound(Exception):
 +    pass
 +
 +
 +class ColumnFamilyNotFound(Exception):
 +    pass
 +
 +
 +class IndexNotFound(Exception):
 +    pass
 +
 +
 +class ObjectNotFound(Exception):
 +    pass
 +
 +
 +class VersionNotSupported(Exception):
 +    pass
 +
 +
 +class UserTypeNotFound(Exception):
 +    pass
 +
 +
 +class FunctionNotFound(Exception):
 +    pass
 +
 +
 +class AggregateNotFound(Exception):
 +    pass
 +
 +
 +class DecodeError(Exception):
 +    verb = 'decode'
 +
 +    def __init__(self, thebytes, err, colname=None):
 +        self.thebytes = thebytes
 +        self.err = err
 +        self.colname = colname
 +
 +    def __str__(self):
 +        return str(self.thebytes)
 +
 +    def message(self):
 +        what = 'value %r' % (self.thebytes,)
 +        if self.colname is not None:
 +            what = 'value %r (for column %r)' % (self.thebytes, self.colname)
 +        return 'Failed to %s %s : %s' \
 +               % (self.verb, what, self.err)
 +
 +    def __repr__(self):
 +        return '<%s %s>' % (self.__class__.__name__, self.message())
 +
 +
 +class FormatError(DecodeError):
 +    verb = 'format'
 +
 +
 +def full_cql_version(ver):
 +    while ver.count('.') < 2:
 +        ver += '.0'
 +    ver_parts = ver.split('-', 1) + ['']
 +    vertuple = tuple(map(int, ver_parts[0].split('.')) + [ver_parts[1]])
 +    return ver, vertuple
 +
 +
 +def format_value(val, output_encoding, addcolor=False, date_time_format=None,
 +                 float_precision=None, colormap=None, nullval=None):
 +    if isinstance(val, DecodeError):
 +        if addcolor:
 +            return colorme(repr(val.thebytes), colormap, 'error')
 +        else:
 +            return FormattedValue(repr(val.thebytes))
 +    return format_by_type(type(val), val, output_encoding, colormap=colormap,
 +                          addcolor=addcolor, nullval=nullval, date_time_format=date_time_format,
 +                          float_precision=float_precision)
 +
 +
 +def show_warning_without_quoting_line(message, category, filename, lineno, file=None, line=None):
 +    if file is None:
 +        file = sys.stderr
 +    try:
 +        file.write(warnings.formatwarning(message, category, filename, lineno, line=''))
 +    except IOError:
 +        pass
 +warnings.showwarning = show_warning_without_quoting_line
 +warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure)
 +
 +
 +def describe_interval(seconds):
 +    desc = []
 +    for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')):
 +        num = int(seconds) / length
 +        if num > 0:
 +            desc.append('%d %s' % (num, unit))
 +            if num > 1:
 +                desc[-1] += 's'
 +        seconds %= length
 +    words = '%.03f seconds' % seconds
 +    if len(desc) > 1:
 +        words = ', '.join(desc) + ', and ' + words
 +    elif len(desc) == 1:
 +        words = desc[0] + ' and ' + words
 +    return words
 +
 +
 +def insert_driver_hooks():
 +    extend_cql_deserialization()
 +    auto_format_udts()
 +
 +
 +def extend_cql_deserialization():
 +    """
 +    The python driver returns BLOBs as string, but we expect them as bytearrays
 +    """
 +    cassandra.cqltypes.BytesType.deserialize = staticmethod(lambda byts, protocol_version: bytearray(byts))
 +    cassandra.cqltypes.CassandraType.support_empty_values = True
 +
 +
 +def auto_format_udts():
 +    # when we see a new user defined type, set up the shell formatting for it
 +    udt_apply_params = cassandra.cqltypes.UserType.apply_parameters
 +
 +    def new_apply_params(cls, *args, **kwargs):
 +        udt_class = udt_apply_params(*args, **kwargs)
 +        formatter_for(udt_class.typename)(format_value_utype)
 +        return udt_class
 +
 +    cassandra.cqltypes.UserType.udt_apply_parameters = classmethod(new_apply_params)
 +
 +    make_udt_class = cassandra.cqltypes.UserType.make_udt_class
 +
 +    def new_make_udt_class(cls, *args, **kwargs):
 +        udt_class = make_udt_class(*args, **kwargs)
 +        formatter_for(udt_class.tuple_type.__name__)(format_value_utype)
 +        return udt_class
 +
 +    cassandra.cqltypes.UserType.make_udt_class = classmethod(new_make_udt_class)
 +
 +
 +class FrozenType(cassandra.cqltypes._ParameterizedType):
 +    """
 +    Needed until the bundled python driver adds FrozenType.
 +    """
 +    typename = "frozen"
 +    num_subtypes = 1
 +
 +    @classmethod
 +    def deserialize_safe(cls, byts, protocol_version):
 +        subtype, = cls.subtypes
 +        return subtype.from_binary(byts)
 +
 +    @classmethod
 +    def serialize_safe(cls, val, protocol_version):
 +        subtype, = cls.subtypes
 +        return subtype.to_binary(val, protocol_version)
 +
 +
 +class Shell(cmd.Cmd):
 +    custom_prompt = os.getenv('CQLSH_PROMPT', '')
 +    if custom_prompt is not '':
 +        custom_prompt += "\n"
 +    default_prompt = custom_prompt + "cqlsh> "
 +    continue_prompt = "   ... "
 +    keyspace_prompt = custom_prompt + "cqlsh:%s> "
 +    keyspace_continue_prompt = "%s    ... "
 +    show_line_nums = False
 +    debug = False
 +    stop = False
 +    last_hist = None
 +    shunted_query_out = None
 +    use_paging = True
 +    csv_dialect_defaults = dict(delimiter=',', doublequote=False,
 +                                escapechar='\\', quotechar='"')
 +    default_page_size = 100
 +
 +    def __init__(self, hostname, port, color=False,
 +                 username=None, password=None, encoding=None, stdin=None, tty=True,
 +                 completekey=DEFAULT_COMPLETEKEY, browser=None, use_conn=None,
 +                 cqlver=DEFAULT_CQLVER, keyspace=None,
 +                 tracing_enabled=False, expand_enabled=False,
 +                 display_nanotime_format=DEFAULT_NANOTIME_FORMAT,
 +                 display_timestamp_format=DEFAULT_TIMESTAMP_FORMAT,
 +                 display_date_format=DEFAULT_DATE_FORMAT,
 +                 display_float_precision=DEFAULT_FLOAT_PRECISION,
 +                 max_trace_wait=DEFAULT_MAX_TRACE_WAIT,
 +                 ssl=False,
 +                 single_statement=None,
 +                 client_timeout=10,
 +                 protocol_version=DEFAULT_PROTOCOL_VERSION,
 +                 connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS):
 +        cmd.Cmd.__init__(self, completekey=completekey)
 +        self.hostname = hostname
 +        self.port = port
 +        self.auth_provider = None
 +        if username:
 +            if not password:
 +                password = getpass.getpass()
 +            self.auth_provider = PlainTextAuthProvider(username=username, password=password)
 +        self.username = username
 +        self.keyspace = keyspace
 +        self.ssl = ssl
 +        self.tracing_enabled = tracing_enabled
 +        self.page_size = self.default_page_size
 +        self.expand_enabled = expand_enabled
 +        if use_conn:
 +            self.conn = use_conn
 +        else:
 +            self.conn = Cluster(contact_points=(self.hostname,), port=self.port, cql_version=cqlver,
 +                                protocol_version=protocol_version,
 +                                auth_provider=self.auth_provider,
 +                                ssl_options=sslhandling.ssl_settings(hostname, CONFIG_FILE) if ssl else None,
 +                                load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]),
 +                                connect_timeout=connect_timeout)
 +        self.owns_connection = not use_conn
 +        self.set_expanded_cql_version(cqlver)
 +
 +        if keyspace:
 +            self.session = self.conn.connect(keyspace)
 +        else:
 +            self.session = self.conn.connect()
 +
 +        if browser == "":
 +            browser = None
 +        self.browser = browser
 +        self.color = color
 +
 +        self.display_nanotime_format = display_nanotime_format
 +        self.display_timestamp_format = display_timestamp_format
 +        self.display_date_format = display_date_format
 +
 +        self.display_float_precision = display_float_precision
 +
 +        # If there is no schema metadata present (due to a schema mismatch), force schema refresh
 +        if not self.conn.metadata.keyspaces:
 +            self.refresh_schema_metadata_best_effort()
 +
 +        self.session.default_timeout = client_timeout
 +        self.session.row_factory = ordered_dict_factory
 +        self.session.default_consistency_level = cassandra.ConsistencyLevel.ONE
 +        self.get_connection_versions()
 +
 +        self.current_keyspace = keyspace
 +
 +        self.display_timestamp_format = display_timestamp_format
 +        self.display_nanotime_format = display_nanotime_format
 +        self.display_date_format = display_date_format
 +
 +        self.max_trace_wait = max_trace_wait
 +        self.session.max_trace_wait = max_trace_wait
 +        if encoding is None:
 +            encoding = locale.getpreferredencoding()
 +        self.encoding = encoding
 +        self.output_codec = codecs.lookup(encoding)
 +
 +        self.statement = StringIO()
 +        self.lineno = 1
 +        self.in_comment = False
 +
 +        self.prompt = ''
 +        if stdin is None:
 +            stdin = sys.stdin
 +        self.tty = tty
 +        if tty:
 +            self.reset_prompt()
 +            self.report_connection()
 +            print 'Use HELP for help.'
 +        else:
 +            self.show_line_nums = True
 +        self.stdin = stdin
 +        self.query_out = sys.stdout
 +        self.consistency_level = cassandra.ConsistencyLevel.ONE
 +        self.serial_consistency_level = cassandra.ConsistencyLevel.SERIAL
 +
 +        self.empty_lines = 0
 +        self.statement_error = False
 +        self.single_statement = single_statement
 +
 +    def refresh_schema_metadata_best_effort(self):
 +        try:
 +            self.conn.refresh_schema_metadata(5)  # will throw exception if there is a schema mismatch
 +        except Exception:
 +            self.printerr("Warning: schema version mismatch detected, which might be caused by DOWN nodes; if "
 +                          "this is not the case, check the schema versions of your nodes in system.local and "
 +                          "system.peers.")
 +            self.conn.refresh_schema_metadata(0)
 +
 +    def set_expanded_cql_version(self, ver):
 +        ver, vertuple = full_cql_version(ver)
 +        self.cql_version = ver
 +        self.cql_ver_tuple = vertuple
 +
 +    def cqlver_atleast(self, major, minor=0, patch=0):
 +        return self.cql_ver_tuple[:3] >= (major, minor, patch)
 +
 +    def myformat_value(self, val, **kwargs):
 +        if isinstance(val, DecodeError):
 +            self.decoding_errors.append(val)
 +        try:
 +            dtformats = DateTimeFormat(timestamp_format=self.display_timestamp_format,
 +                                       date_format=self.display_date_format, nanotime_format=self.display_nanotime_format)
 +            return format_value(val, self.output_codec.name,
 +                                addcolor=self.color, date_time_format=dtformats,
 +                                float_precision=self.display_float_precision, **kwargs)
 +        except Exception, e:
 +            err = FormatError(val, e)
 +            self.decoding_errors.append(err)
 +            return format_value(err, self.output_codec.name, addcolor=self.color)
 +
 +    def myformat_colname(self, name, table_meta=None):
 +        column_colors = COLUMN_NAME_COLORS.copy()
 +        # check column role and color appropriately
 +        if table_meta:
 +            if name in [col.name for col in table_meta.partition_key]:
 +                column_colors.default_factory = lambda: RED
 +            elif name in [col.name for col in table_meta.clustering_key]:
 +                column_colors.default_factory = lambda: CYAN
 +        return self.myformat_value(name, colormap=column_colors)
 +
 +    def report_connection(self):
 +        self.show_host()
 +        self.show_version()
 +
 +    def show_host(self):
 +        print "Connected to %s at %s:%d." % \
 +            (self.applycolor(self.get_cluster_name(), BLUE),
 +              self.hostname,
 +              self.port)
 +
 +    def show_version(self):
 +        vers = self.connection_versions.copy()
 +        vers['shver'] = version
 +        # system.Versions['cql'] apparently does not reflect changes with
 +        # set_cql_version.
 +        vers['cql'] = self.cql_version
 +        print "[cqlsh %(shver)s | Cassandra %(build)s | CQL spec %(cql)s | Native protocol v%(protocol)s]" % vers
 +
 +    def show_session(self, sessionid, partial_session=False):
 +        print_trace_session(self, self.session, sessionid, partial_session)
 +
 +    def get_connection_versions(self):
 +        result, = self.session.execute("select * from system.local where key = 'local'")
 +        vers = {
 +            'build': result['release_version'],
 +            'protocol': result['native_protocol_version'],
 +            'cql': result['cql_version'],
 +        }
 +        self.connection_versions = vers
 +
 +    def get_keyspace_names(self):
 +        return map(str, self.conn.metadata.keyspaces.keys())
 +
 +    def get_columnfamily_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return map(str, self.get_keyspace_meta(ksname).tables.keys())
 +
 +    def get_index_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return map(str, self.get_keyspace_meta(ksname).indexes.keys())
 +
 +    def get_column_names(self, ksname, cfname):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        layout = self.get_table_meta(ksname, cfname)
 +        return [str(col) for col in layout.columns]
 +
 +    def get_usertype_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return self.get_keyspace_meta(ksname).user_types.keys()
 +
 +    def get_usertype_layout(self, ksname, typename):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        ks_meta = self.get_keyspace_meta(ksname)
 +
 +        try:
 +            user_type = ks_meta.user_types[typename]
 +        except KeyError:
 +            raise UserTypeNotFound("User type %r not found" % typename)
 +
 +        return [(field_name, field_type.cql_parameterized_type())
 +                for field_name, field_type in zip(user_type.field_names, user_type.field_types)]
 +
 +    def get_userfunction_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return map(lambda f: f.name, self.get_keyspace_meta(ksname).functions.values())
 +
 +    def get_useraggregate_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return map(lambda f: f.name, self.get_keyspace_meta(ksname).aggregates.values())
 +
 +    def get_cluster_name(self):
 +        return self.conn.metadata.cluster_name
 +
 +    def get_partitioner(self):
 +        return self.conn.metadata.partitioner
 +
 +    def get_keyspace_meta(self, ksname):
 +        if ksname not in self.conn.metadata.keyspaces:
 +            raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
 +        return self.conn.metadata.keyspaces[ksname]
 +
 +    def get_keyspaces(self):
 +        return self.conn.metadata.keyspaces.values()
 +
 +    def get_ring(self, ks):
 +        self.conn.metadata.token_map.rebuild_keyspace(ks, build_if_absent=True)
 +        return self.conn.metadata.token_map.tokens_to_hosts_by_ks[ks]
 +
 +    def get_table_meta(self, ksname, tablename):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        ksmeta = self.get_keyspace_meta(ksname)
 +
 +        if tablename not in ksmeta.tables:
 +            if ksname == 'system_auth' and tablename in ['roles', 'role_permissions']:
 +                self.get_fake_auth_table_meta(ksname, tablename)
 +            else:
 +                raise ColumnFamilyNotFound("Column family %r not found" % tablename)
 +        else:
 +            return ksmeta.tables[tablename]
 +
 +    def get_fake_auth_table_meta(self, ksname, tablename):
 +        # may be using external auth implementation so internal tables
 +        # aren't actually defined in schema. In this case, we'll fake
 +        # them up
 +        if tablename == 'roles':
 +            ks_meta = KeyspaceMetadata(ksname, True, None, None)
 +            table_meta = TableMetadata(ks_meta, 'roles')
 +            table_meta.columns['role'] = ColumnMetadata(table_meta, 'role', cassandra.cqltypes.UTF8Type)
 +            table_meta.columns['is_superuser'] = ColumnMetadata(table_meta, 'is_superuser', cassandra.cqltypes.BooleanType)
 +            table_meta.columns['can_login'] = ColumnMetadata(table_meta, 'can_login', cassandra.cqltypes.BooleanType)
 +        elif tablename == 'role_permissions':
 +            ks_meta = KeyspaceMetadata(ksname, True, None, None)
 +            table_meta = TableMetadata(ks_meta, 'role_permissions')
 +            table_meta.columns['role'] = ColumnMetadata(table_meta, 'role', cassandra.cqltypes.UTF8Type)
 +            table_meta.columns['resource'] = ColumnMetadata(table_meta, 'resource', cassandra.cqltypes.UTF8Type)
 +            table_meta.columns['permission'] = ColumnMetadata(table_meta, 'permission', cassandra.cqltypes.UTF8Type)
 +        else:
 +            raise ColumnFamilyNotFound("Column family %r not found" % tablename)
 +
 +    def get_index_meta(self, ksname, idxname):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        ksmeta = self.get_keyspace_meta(ksname)
 +
 +        if idxname not in ksmeta.indexes:
 +            raise IndexNotFound("Index %r not found" % idxname)
 +
 +        return ksmeta.indexes[idxname]
 +
 +    def get_object_meta(self, ks, name):
 +        if name is None:
 +            if ks and ks in self.conn.metadata.keyspaces:
 +                return self.conn.metadata.keyspaces[ks]
 +            elif self.current_keyspace is None:
 +                raise ObjectNotFound("%r not found in keyspaces" % (ks))
 +            else:
 +                name = ks
 +                ks = self.current_keyspace
 +
 +        if ks is None:
 +            ks = self.current_keyspace
 +
 +        ksmeta = self.get_keyspace_meta(ks)
 +
 +        if name in ksmeta.tables:
 +            return ksmeta.tables[name]
 +        elif name in ksmeta.indexes:
 +            return ksmeta.indexes[name]
 +
 +        raise ObjectNotFound("%r not found in keyspace %r" % (name, ks))
 +
 +    def get_usertypes_meta(self):
 +        data = self.session.execute("select * from system.schema_usertypes")
 +        if not data:
 +            return cql3handling.UserTypesMeta({})
 +
 +        return cql3handling.UserTypesMeta.from_layout(data)
 +
 +    def get_trigger_names(self, ksname=None):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +
 +        return [trigger.name
 +                for table in self.get_keyspace_meta(ksname).tables.values()
 +                for trigger in table.triggers.values()]
 +
 +    def reset_statement(self):
 +        self.reset_prompt()
 +        self.statement.truncate(0)
 +        self.empty_lines = 0
 +
 +    def reset_prompt(self):
 +        if self.current_keyspace is None:
 +            self.set_prompt(self.default_prompt, True)
 +        else:
 +            self.set_prompt(self.keyspace_prompt % self.current_keyspace, True)
 +
 +    def set_continue_prompt(self):
 +        if self.empty_lines >= 3:
 +            self.set_prompt("Statements are terminated with a ';'.  You can press CTRL-C to cancel an incomplete statement.")
 +            self.empty_lines = 0
 +            return
 +        if self.current_keyspace is None:
 +            self.set_prompt(self.continue_prompt)
 +        else:
 +            spaces = ' ' * len(str(self.current_keyspace))
 +            self.set_prompt(self.keyspace_continue_prompt % spaces)
 +        self.empty_lines = self.empty_lines + 1 if not self.lastcmd else 0
 +
 +    @contextmanager
 +    def prepare_loop(self):
 +        readline = None
 +        if self.tty and self.completekey:
 +            try:
 +                import readline
 +            except ImportError:
 +                if myplatform == 'Windows':
 +                    print "WARNING: pyreadline dependency missing.  Install to enable tab completion."
 +                pass
 +            else:
 +                old_completer = readline.get_completer()
 +                readline.set_completer(self.complete)
 +                if readline.__doc__ is not None and 'libedit' in readline.__doc__:
 +                    readline.parse_and_bind("bind -e")
 +                    readline.parse_and_bind("bind '" + self.completekey + "' rl_complete")
 +                    readline.parse_and_bind("bind ^R em-inc-search-prev")
 +                else:
 +                    readline.parse_and_bind(self.completekey + ": complete")
 +        try:
 +            yield
 +        finally:
 +            if readline is not None:
 +                readline.set_completer(old_completer)
 +
 +    def get_input_line(self, prompt=''):
 +        if self.tty:
 +            self.lastcmd = raw_input(prompt)
 +            line = self.lastcmd + '\n'
 +        else:
 +            self.lastcmd = self.stdin.readline()
 +            line = self.lastcmd
 +            if not len(line):
 +                raise EOFError
 +        self.lineno += 1
 +        return line
 +
 +    def use_stdin_reader(self, until='', prompt=''):
 +        until += '\n'
 +        while True:
 +            try:
 +                newline = self.get_input_line(prompt=prompt)
 +            except EOFError:
 +                return
 +            if newline == until:
 +                return
 +            yield newline
 +
 +    def cmdloop(self):
 +        """
 +        Adapted from cmd.Cmd's version, because there is literally no way with
 +        cmd.Cmd.cmdloop() to tell the difference between "EOF" showing up in
 +        input and an actual EOF.
 +        """
 +        with self.prepare_loop():
 +            while not self.stop:
 +                try:
 +                    if self.single_statement:
 +                        line = self.single_statement
 +                        self.stop = True
 +                    else:
 +                        line = self.get_input_line(self.prompt)
 +                    self.statement.write(line)
 +                    if self.onecmd(self.statement.getvalue()):
 +                        self.reset_statement()
 +                except EOFError:
 +                    self.handle_eof()
 +                except CQL_ERRORS, cqlerr:
 +                    self.printerr(str(cqlerr))
 +                except KeyboardInterrupt:
 +                    self.reset_statement()
 +                    print
 +
 +    def onecmd(self, statementtext):
 +        """
 +        Returns true if the statement is complete and was handled (meaning it
 +        can be reset).
 +        """
 +
 +        try:
 +            statements, in_batch = cqlruleset.cql_split_statements(statementtext)
 +        except pylexotron.LexingError, e:
 +            if self.show_line_nums:
 +                self.printerr('Invalid syntax at char %d' % (e.charnum,))
 +            else:
 +                self.printerr('Invalid syntax at line %d, char %d'
 +                              % (e.linenum, e.charnum))
 +            statementline = statementtext.split('\n')[e.linenum - 1]
 +            self.printerr('  %s' % statementline)
 +            self.printerr(' %s^' % (' ' * e.charnum))
 +            return True
 +
 +        while statements and not statements[-1]:
 +            statements = statements[:-1]
 +        if not statements:
 +            return True
 +        if in_batch or statements[-1][-1][0] != 'endtoken':
 +            self.set_continue_prompt()
 +            return
 +        for st in statements:
 +            try:
 +                self.handle_statement(st, statementtext)
 +            except Exception, e:
 +                if self.debug:
 +                    traceback.print_exc()
 +                else:
 +                    self.printerr(e)
 +        return True
 +
 +    def handle_eof(self):
 +        if self.tty:
 +            print
 +        statement = self.statement.getvalue()
 +        if statement.strip():
 +            if not self.onecmd(statement):
 +                self.printerr('Incomplete statement at end of file')
 +        self.do_exit()
 +
 +    def handle_statement(self, tokens, srcstr):
 +        # Concat multi-line statements and insert into history
 +        if readline is not None:
 +            nl_count = srcstr.count("\n")
 +
 +            new_hist = srcstr.replace("\n", " ").rstrip()
 +
 +            if nl_count > 1 and self.last_hist != new_hist:
 +                readline.add_history(new_hist)
 +
 +            self.last_hist = new_hist
 +        cmdword = tokens[0][1]
 +        if cmdword == '?':
 +            cmdword = 'help'
 +        custom_handler = getattr(self, 'do_' + cmdword.lower(), None)
 +        if custom_handler:
 +            parsed = cqlruleset.cql_whole_parse_tokens(tokens, srcstr=srcstr,
 +                                                       startsymbol='cqlshCommand')
 +            if parsed and not parsed.remainder:
 +                # successful complete parse
 +                return custom_handler(parsed)
 +            else:
 +                return self.handle_parse_error(cmdword, tokens, parsed, srcstr)
 +        return self.perform_statement(cqlruleset.cql_extract_orig(tokens, srcstr))
 +
 +    def handle_parse_error(self, cmdword, tokens, parsed, srcstr):
 +        if cmdword.lower() in ('select', 'insert', 'update', 'delete', 'truncate',
 +                               'create', 'drop', 'alter', 'grant', 'revoke',
 +                               'batch', 'list'):
 +            # hey, maybe they know about some new syntax we don't. type
 +            # assumptions won't work, but maybe the query will.
 +            return self.perform_statement(cqlruleset.cql_extract_orig(tokens, srcstr))
 +        if parsed:
 +            self.printerr('Improper %s command (problem at %r).' % (cmdword, parsed.remainder[0]))
 +        else:
 +            self.printerr('Improper %s command.' % cmdword)
 +
 +    def do_use(self, parsed):
 +        ksname = parsed.get_binding('ksname')
 +        success, _ = self.perform_simple_statement(SimpleStatement(parsed.extract_orig()))
 +        if success:
 +            if ksname[0] == '"' and ksname[-1] == '"':
 +                self.current_keyspace = self.cql_unprotect_name(ksname)
 +            else:
 +                self.current_keyspace = ksname.lower()
 +
 +    def do_select(self, parsed):
 +        tracing_was_enabled = self.tracing_enabled
 +        ksname = parsed.get_binding('ksname')
 +        stop_tracing = ksname == 'system_traces' or (ksname is None and self.current_keyspace == 'system_traces')
 +        self.tracing_enabled = self.tracing_enabled and not stop_tracing
 +        statement = parsed.extract_orig()
 +        self.perform_statement(statement)
 +        self.tracing_enabled = tracing_was_enabled
 +
 +    def perform_statement(self, statement):
 +        stmt = SimpleStatement(statement, consistency_level=self.consistency_level, serial_consistency_level=self.serial_consistency_level, fetch_size=self.page_size if self.use_paging else None)
 +        success, future = self.perform_simple_statement(stmt)
 +
 +        if future:
 +            if future.warnings:
 +                self.print_warnings(future.warnings)
 +
 +            if self.tracing_enabled:
 +                try:
 +                    for trace in future.get_all_query_traces(self.max_trace_wait):
 +                        print_trace(self, trace)
 +                except TraceUnavailable:
 +                    msg = "Statement trace did not complete within %d seconds; trace data may be incomplete." % (self.session.max_trace_wait,)
 +                    self.writeresult(msg, color=RED)
 +                    for trace_id in future.get_query_trace_ids():
 +                        self.show_session(trace_id, partial_session=True)
 +                except Exception, err:
 +                    self.printerr("Unable to fetch query trace: %s" % (str(err),))
 +
 +        return success
 +
 +    def parse_for_table_meta(self, query_string):
 +        try:
 +            parsed = cqlruleset.cql_parse(query_string)[1]
 +        except IndexError:
 +            return None
 +        ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +        cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
 +        return self.get_table_meta(ks, cf)
 +
 +    def perform_simple_statement(self, statement):
 +        if not statement:
 +            return False, None
 +        rows = None
 +        while True:
 +            try:
 +                future = self.session.execute_async(statement, trace=self.tracing_enabled)
 +                result = future.result()
 +                break
 +            except cassandra.OperationTimedOut, err:
 +                self.refresh_schema_metadata_best_effort()
 +                self.printerr(str(err.__class__.__name__) + ": " + str(err))
 +                return False, None
 +            except CQL_ERRORS, err:
 +                self.printerr(str(err.__class__.__name__) + ": " + str(err))
 +                return False, None
 +            except Exception, err:
 +                import traceback
 +                self.printerr(traceback.format_exc())
 +                return False, None
 +
 +        if statement.query_string[:6].lower() == 'select':
 +            self.print_result(result, self.parse_for_table_meta(statement.query_string))
 +        elif statement.query_string.lower().startswith("list users") or statement.query_string.lower().startswith("list roles"):
 +            self.print_result(result, self.get_table_meta('system_auth', 'roles'))
 +        elif statement.query_string.lower().startswith("list"):
 +            self.print_result(result, self.get_table_meta('system_auth', 'role_permissions'))
 +        elif result:
 +            # CAS INSERT/UPDATE
 +            self.writeresult("")
 +            self.print_static_result(list(result), self.parse_for_table_meta(statement.query_string))
 +        self.flush_output()
 +        return True, future
 +
 +    def print_result(self, result, table_meta):
 +        self.decoding_errors = []
 +
 +        self.writeresult("")
 +        if result.has_more_pages and self.tty:
 +            num_rows = 0
 +            while True:
 +                page = result.current_rows
 +                if page:
 +                    num_rows += len(page)
 +                    self.print_static_result(page, table_meta)
 +                if result.has_more_pages:
 +                    raw_input("---MORE---")
 +                    result.fetch_next_page()
 +                else:
 +                    break
 +        else:
 +            rows = list(result)
 +            num_rows = len(rows)
 +            self.print_static_result(rows, table_meta)
 +        self.writeresult("(%d rows)" % num_rows)
 +
 +        if self.decoding_errors:
 +            for err in self.decoding_errors[:2]:
 +                self.writeresult(err.message(), color=RED)
 +            if len(self.decoding_errors) > 2:
 +                self.writeresult('%d more decoding errors suppressed.'
 +                                 % (len(self.decoding_errors) - 2), color=RED)
 +
 +    def print_static_result(self, rows, table_meta):
 +        if not rows:
 +            if not table_meta:
 +                return
 +            # print header only
 +            colnames = table_meta.columns.keys()  # full header
 +            formatted_names = [self.myformat_colname(name, table_meta) for name in colnames]
 +            self.print_formatted_result(formatted_names, None)
 +            return
 +
 +        colnames = rows[0].keys()
 +        formatted_names = [self.myformat_colname(name, table_meta) for name in colnames]
 +        formatted_values = [map(self.myformat_value, row.values()) for row in rows]
 +
 +        if self.expand_enabled:
 +            self.print_formatted_result_vertically(formatted_names, formatted_values)
 +        else:
 +            self.print_formatted_result(formatted_names, formatted_values)
 +
 +    def print_formatted_result(self, formatted_names, formatted_values):
 +        # determine column widths
 +        widths = [n.displaywidth for n in formatted_names]
 +        if formatted_values is not None:
 +            for fmtrow in formatted_values:
 +                for num, col in enumerate(fmtrow):
 +                    widths[num] = max(widths[num], col.displaywidth)
 +
 +        # print header
 +        header = ' | '.join(hdr.ljust(w, color=self.color) for (hdr, w) in zip(formatted_names, widths))
 +        self.writeresult(' ' + header.rstrip())
 +        self.writeresult('-%s-' % '-+-'.join('-' * w for w in widths))
 +
 +        # stop if there are no rows
 +        if formatted_values is None:
 +            self.writeresult("")
 +            return
 +
 +        # print row data
 +        for row in formatted_values:
 +            line = ' | '.join(col.rjust(w, color=self.color) for (col, w) in zip(row, widths))
 +            self.writeresult(' ' + line)
 +
 +        self.writeresult("")
 +
 +    def print_formatted_result_vertically(self, formatted_names, formatted_values):
 +        max_col_width = max([n.displaywidth for n in formatted_names])
 +        max_val_width = max([n.displaywidth for row in formatted_values for n in row])
 +
 +        # for each row returned, list all the column-value pairs
 +        for row_id, row in enumerate(formatted_values):
 +            self.writeresult("@ Row %d" % (row_id + 1))
 +            self.writeresult('-%s-' % '-+-'.join(['-' * max_col_width, '-' * max_val_width]))
 +            for field_id, field in enumerate(row):
 +                column = formatted_names[field_id].ljust(max_col_width, color=self.color)
 +                value = field.ljust(field.displaywidth, color=self.color)
 +                self.writeresult(' ' + " | ".join([column, value]))
 +            self.writeresult('')
 +
 +    def print_warnings(self, warnings):
 +        if warnings is None or len(warnings) == 0:
 +            return
 +
 +        self.writeresult('')
 +        self.writeresult('Warnings :')
 +        for warning in warnings:
 +            self.writeresult(warning)
 +            self.writeresult('')
 +
 +    def emptyline(self):
 +        pass
 +
 +    def parseline(self, line):
 +        # this shouldn't be needed
 +        raise NotImplementedError
 +
 +    def complete(self, text, state):
 +        if readline is None:
 +            return
 +        if state == 0:
 +            try:
 +                self.completion_matches = self.find_completions(text)
 +            except Exception:
 +                if debug_completion:
 +                    import traceback
 +                    traceback.print_exc()
 +                else:
 +                    raise
 +        try:
 +            return self.completion_matches[state]
 +        except IndexError:
 +            return None
 +
 +    def find_completions(self, text):
 +        curline = readline.get_line_buffer()
 +        prevlines = self.statement.getvalue()
 +        wholestmt = prevlines + curline
 +        begidx = readline.get_begidx() + len(prevlines)
 +        stuff_to_complete = wholestmt[:begidx]
 +        return cqlruleset.cql_complete(stuff_to_complete, text, cassandra_conn=self,
 +                                       debug=debug_completion, startsymbol='cqlshCommand')
 +
 +    def set_prompt(self, prompt, prepend_user=False):
 +        if prepend_user and self.username:
 +            self.prompt = "%s@%s" % (self.username, prompt)
 +            return
 +        self.prompt = prompt
 +
 +    def cql_unprotect_name(self, namestr):
 +        if namestr is None:
 +            return
 +        return cqlruleset.dequote_name(namestr)
 +
 +    def cql_unprotect_value(self, valstr):
 +        if valstr is not None:
 +            return cqlruleset.dequote_value(valstr)
 +
 +    def print_recreate_keyspace(self, ksdef, out):
 +        out.write(ksdef.export_as_string())
 +        out.write("\n")
 +
 +    def print_recreate_columnfamily(self, ksname, cfname, out):
 +        """
 +        Output CQL commands which should be pasteable back into a CQL session
 +        to recreate the given table.
 +
 +        Writes output to the given out stream.
 +        """
 +        out.write(self.get_table_meta(ksname, cfname).export_as_string())
 +        out.write("\n")
 +
 +    def print_recreate_index(self, ksname, idxname, out):
 +        """
 +        Output CQL commands which should be pasteable back into a CQL session
 +        to recreate the given index.
 +
 +        Writes output to the given out stream.
 +        """
 +        out.write(self.get_index_meta(ksname, idxname).export_as_string())
 +        out.write("\n")
 +
 +    def print_recreate_object(self, ks, name, out):
 +        """
 +        Output CQL commands which should be pasteable back into a CQL session
 +        to recreate the given object (ks, table or index).
 +
 +        Writes output to the given out stream.
 +        """
 +        out.write(self.get_object_meta(ks, name).export_as_string())
 +        out.write("\n")
 +
 +    def describe_keyspaces(self):
 +        print
 +        cmd.Cmd.columnize(self, protect_names(self.get_keyspace_names()))
 +        print
 +
 +    def describe_keyspace(self, ksname):
 +        print
 +        self.print_recreate_keyspace(self.get_keyspace_meta(ksname), sys.stdout)
 +        print
 +
 +    def describe_columnfamily(self, ksname, cfname):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        if ksname is None:
 +            raise NoKeyspaceError("No keyspace specified and no current keyspace")
 +        print
 +        self.print_recreate_columnfamily(ksname, cfname, sys.stdout)
 +        print
 +
 +    def describe_index(self, ksname, idxname):
 +        print
 +        self.print_recreate_index(ksname, idxname, sys.stdout)
 +        print
 +
 +    def describe_object(self, ks, name):
 +        print
 +        self.print_recreate_object(ks, name, sys.stdout)
 +        print
 +
 +    def describe_columnfamilies(self, ksname):
 +        print
 +        if ksname is None:
 +            for k in self.get_keyspaces():
 +                name = protect_name(k.name)
 +                print 'Keyspace %s' % (name,)
 +                print '---------%s' % ('-' * len(name))
 +                cmd.Cmd.columnize(self, protect_names(self.get_columnfamily_names(k.name)))
 +                print
 +        else:
 +            cmd.Cmd.columnize(self, protect_names(self.get_columnfamily_names(ksname)))
 +            print
 +
 +    def describe_functions(self, ksname):
 +        print
 +        if ksname is None:
 +            for ksmeta in self.get_keyspaces():
 +                name = protect_name(ksmeta.name)
 +                print 'Keyspace %s' % (name,)
 +                print '---------%s' % ('-' * len(name))
 +                cmd.Cmd.columnize(self, protect_names(ksmeta.functions.keys()))
 +                print
 +        else:
 +            ksmeta = self.get_keyspace_meta(ksname)
 +            cmd.Cmd.columnize(self, protect_names(ksmeta.functions.keys()))
 +            print
 +
 +    def describe_function(self, ksname, functionname):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        if ksname is None:
 +            raise NoKeyspaceError("No keyspace specified and no current keyspace")
 +        print
 +        ksmeta = self.get_keyspace_meta(ksname)
 +        functions = filter(lambda f: f.name == functionname, ksmeta.functions.values())
 +        if len(functions) == 0:
 +            raise FunctionNotFound("User defined function %r not found" % functionname)
 +        print "\n\n".join(func.as_cql_query(formatted=True) for func in functions)
 +        print
 +
 +    def describe_aggregates(self, ksname):
 +        print
 +        if ksname is None:
 +            for ksmeta in self.get_keyspaces():
 +                name = protect_name(ksmeta.name)
 +                print 'Keyspace %s' % (name,)
 +                print '---------%s' % ('-' * len(name))
 +                cmd.Cmd.columnize(self, protect_names(ksmeta.aggregates.keys()))
 +                print
 +        else:
 +            ksmeta = self.get_keyspace_meta(ksname)
 +            cmd.Cmd.columnize(self, protect_names(ksmeta.aggregates.keys()))
 +            print
 +
 +    def describe_aggregate(self, ksname, aggregatename):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        if ksname is None:
 +            raise NoKeyspaceError("No keyspace specified and no current keyspace")
 +        print
 +        ksmeta = self.get_keyspace_meta(ksname)
 +        aggregates = filter(lambda f: f.name == aggregatename, ksmeta.aggregates.values())
 +        if len(aggregates) == 0:
 +            raise FunctionNotFound("User defined aggregate %r not found" % aggregatename)
 +        print "\n\n".join(aggr.as_cql_query(formatted=True) for aggr in aggregates)
 +        print
 +
 +    def describe_usertypes(self, ksname):
 +        print
 +        if ksname is None:
 +            for ksmeta in self.get_keyspaces():
 +                name = protect_name(ksmeta.name)
 +                print 'Keyspace %s' % (name,)
 +                print '---------%s' % ('-' * len(name))
 +                cmd.Cmd.columnize(self, protect_names(ksmeta.user_types.keys()))
 +                print
 +        else:
 +            ksmeta = self.get_keyspace_meta(ksname)
 +            cmd.Cmd.columnize(self, protect_names(ksmeta.user_types.keys()))
 +            print
 +
 +    def describe_usertype(self, ksname, typename):
 +        if ksname is None:
 +            ksname = self.current_keyspace
 +        if ksname is None:
 +            raise NoKeyspaceError("No keyspace specified and no current keyspace")
 +        print
 +        ksmeta = self.get_keyspace_meta(ksname)
 +        try:
 +            usertype = ksmeta.user_types[typename]
 +        except KeyError:
 +            raise UserTypeNotFound("User type %r not found" % typename)
 +        print usertype.as_cql_query(formatted=True)
 +        print
 +
 +    def describe_cluster(self):
 +        print '\nCluster: %s' % self.get_cluster_name()
 +        p = trim_if_present(self.get_partitioner(), 'org.apache.cassandra.dht.')
 +        print 'Partitioner: %s\n' % p
 +        # TODO: snitch?
 +        # snitch = trim_if_present(self.get_snitch(), 'org.apache.cassandra.locator.')
 +        # print 'Snitch: %s\n' % snitch
 +        if self.current_keyspace is not None and self.current_keyspace != 'system':
 +            print "Range ownership:"
 +            ring = self.get_ring(self.current_keyspace)
 +            for entry in ring.items():
 +                print ' %39s  [%s]' % (str(entry[0].value), ', '.join([host.address for host in entry[1]]))
 +            print
 +
 +    def describe_schema(self, include_system=False):
 +        print
 +        for k in self.get_keyspaces():
 +            if include_system or k.name not in cql3handling.SYSTEM_KEYSPACES:
 +                self.print_recreate_keyspace(k, sys.stdout)
 +                print
 +
 +    def do_describe(self, parsed):
 +        """
 +        DESCRIBE [cqlsh only]
 +
 +        (DESC may be used as a shorthand.)
 +
 +          Outputs information about the connected Cassandra cluster, or about
 +          the data objects stored in the cluster. Use in one of the following ways:
 +
 +        DESCRIBE KEYSPACES
 +
 +          Output the names of all keyspaces.
 +
 +        DESCRIBE KEYSPACE [<keyspacename>]
 +
 +          Output CQL commands that could be used to recreate the given keyspace,
 +          and the objects in it (such as tables, types, functions, etc.).
 +          In some cases, as the CQL interface matures, there will be some metadata
 +          about a keyspace that is not representable with CQL. That metadata will not be shown.
 +
 +          The '<keyspacename>' argument may be omitted, in which case the current
 +          keyspace will be described.
 +
 +        DESCRIBE TABLES
 +
 +          Output the names of all tables in the current keyspace, or in all
 +          keyspaces if there is no current keyspace.
 +
 +        DESCRIBE TABLE [<keyspace>.]<tablename>
 +
 +          Output CQL commands that could be used to recreate the given table.
 +          In some cases, as above, there may be table metadata which is not
 +          representable and which will not be shown.
 +
 +        DESCRIBE INDEX <indexname>
 +
 +          Output the CQL command that could be used to recreate the given index.
 +          In some cases, there may be index metadata which is not representable
 +          and which will not be shown.
 +
 +        DESCRIBE CLUSTER
 +
 +          Output information about the connected Cassandra cluster, such as the
 +          cluster name, and the partitioner and snitch in use. When you are
 +          connected to a non-system keyspace, also shows endpoint-range
 +          ownership information for the Cassandra ring.
 +
 +        DESCRIBE [FULL] SCHEMA
 +
 +          Output CQL commands that could be used to recreate the entire (non-system) schema.
 +          Works as though "DESCRIBE KEYSPACE k" was invoked for each non-system keyspace
 +          k. Use DESCRIBE FULL SCHEMA to include the system keyspaces.
 +
 +        DESCRIBE TYPES
 +
 +          Output the names of all user-defined-types in the current keyspace, or in all
 +          keyspaces if there is no current keyspace.
 +
 +        DESCRIBE TYPE [<keyspace>.]<type>
 +
 +          Output the CQL command that could be used to recreate the given user-defined-type.
 +
 +        DESCRIBE FUNCTIONS
 +
 +          Output the names of all user-defined-functions in the current keyspace, or in all
 +          keyspaces if there is no current keyspace.
 +
 +        DESCRIBE FUNCTION [<keyspace>.]<function>
 +
 +          Output the CQL command that could be used to recreate the given user-defined-function.
 +
 +        DESCRIBE AGGREGATES
 +
 +          Output the names of all user-defined-aggregates in the current keyspace, or in all
 +          keyspaces if there is no current keyspace.
 +
 +        DESCRIBE AGGREGATE [<keyspace>.]<aggregate>
 +
 +          Output the CQL command that could be used to recreate the given user-defined-aggregate.
 +
 +        DESCRIBE <objname>
 +
 +          Output CQL commands that could be used to recreate the entire object schema,
 +          where object can be either a keyspace or a table or an index (in this order).
 +  """
 +        what = parsed.matched[1][1].lower()
 +        if what == 'functions':
 +            self.describe_functions(self.current_keyspace)
 +        elif what == 'function':
 +            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            functionname = self.cql_unprotect_name(parsed.get_binding('udfname'))
 +            self.describe_function(ksname, functionname)
 +        elif what == 'aggregates':
 +            self.describe_aggregates(self.current_keyspace)
 +        elif what == 'aggregate':
 +            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            aggregatename = self.cql_unprotect_name(parsed.get_binding('udaname'))
 +            self.describe_aggregate(ksname, aggregatename)
 +        elif what == 'keyspaces':
 +            self.describe_keyspaces()
 +        elif what == 'keyspace':
 +            ksname = self.cql_unprotect_name(parsed.get_binding('ksname', ''))
 +            if not ksname:
 +                ksname = self.current_keyspace
 +                if ksname is None:
 +                    self.printerr('Not in any keyspace.')
 +                    return
 +            self.describe_keyspace(ksname)
 +        elif what in ('columnfamily', 'table'):
 +            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
 +            self.describe_columnfamily(ks, cf)
 +        elif what == 'index':
 +            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            idx = self.cql_unprotect_name(parsed.get_binding('idxname', None))
 +            self.describe_index(ks, idx)
 +        elif what in ('columnfamilies', 'tables'):
 +            self.describe_columnfamilies(self.current_keyspace)
 +        elif what == 'types':
 +            self.describe_usertypes(self.current_keyspace)
 +        elif what == 'type':
 +            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            ut = self.cql_unprotect_name(parsed.get_binding('utname'))
 +            self.describe_usertype(ks, ut)
 +        elif what == 'cluster':
 +            self.describe_cluster()
 +        elif what == 'schema':
 +            self.describe_schema(False)
 +        elif what == 'full' and parsed.matched[2][1].lower() == 'schema':
 +            self.describe_schema(True)
 +        elif what:
 +            ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +            name = self.cql_unprotect_name(parsed.get_binding('cfname'))
 +            if not name:
 +                name = self.cql_unprotect_name(parsed.get_binding('idxname', None))
 +            self.describe_object(ks, name)
 +    do_desc = do_describe
 +
 +    def do_copy(self, parsed):
 +        r"""
 +        COPY [cqlsh only]
 +
 +          COPY x FROM: Imports CSV data into a Cassandra table
 +          COPY x TO: Exports data from a Cassandra table in CSV format.
 +
 +        COPY <table_name> [ ( column [, ...] ) ]
 +             FROM ( '<filename>' | STDIN )
 +             [ WITH <option>='value' [AND ...] ];
 +
 +        COPY <table_name> [ ( column [, ...] ) ]
 +             TO ( '<filename>' | STDOUT )
 +             [ WITH <option>='value' [AND ...] ];
 +
 +        Available options and defaults:
 +
 +          DELIMITER=','           - character that appears between records
 +          QUOTE='"'               - quoting character to be used to quote fields
 +          ESCAPE='\'              - character to appear before the QUOTE char when quoted
 +          HEADER=false            - whether to ignore the first line
 +          NULL=''                 - string that represents a null value
-           ENCODING='utf8'         - encoding for CSV output (COPY TO only)
-           TIMEFORMAT=             - timestamp strftime format (COPY TO only)
++          ENCODING='utf8'         - encoding for CSV output (COPY TO)
++          TIMEFORMAT=             - timestamp strftime format (COPY TO)
 +            '%Y-%m-%d %H:%M:%S%z'   defaults to time_format value in cqlshrc
++          MAXREQUESTS=6           - the maximum number of requests each worker process can work on in parallel (COPY TO)
++          PAGESIZE=1000           - the page size for fetching results (COPY TO)
++          PAGETIMEOUT=10          - the page timeout for fetching results (COPY TO)
++          MAXATTEMPTS=5           - the maximum number of attempts for errors
++          CHUNKSIZE=1000          - the size of chunks passed to worker processes (COPY FROM)
++          INGESTRATE=100000       - an approximate ingest rate in rows per second (COPY FROM)
++          MAXBATCHSIZE=20         - the maximum size of an import batch (COPY FROM)
++          MINBATCHSIZE=2          - the minimum size of an import batch (COPY FROM)
++          REPORTFREQUENCY=0.25    - the frequency with which we display status updates in seconds
 +
 +        When entering CSV data on STDIN, you can use the sequence "\."
 +        on a line by itself to end the data input.
 +        """
++
 +        ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
 +        if ks is None:
 +            ks = self.current_keyspace
 +            if ks is None:
 +                raise NoKeyspaceError("Not in any keyspace.")
 +        cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
 +        columns = parsed.get_binding('colnames', None)
 +        if columns is not None:
 +            columns = map(self.cql_unprotect_name, columns)
 +        else:
 +            # default to all known columns
 +            columns = self.get_column_names(ks, cf)
 +        fname = parsed.get_binding('fname', None)
 +        if fname is not None:
 +            fname = os.path.expanduser(self.cql_unprotect_value(fname))
 +        copyoptnames = map(str.lower, parsed.get_binding('optnames', ()))
 +        copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
 +        cleancopyoptvals = [optval.decode('string-escape') for optval in copyoptvals]
 +        opts = dict(zip(copyoptnames, cleancopyoptvals))
 +
 +        print "\nStarting copy of %s.%s with columns %s." % (ks, cf, columns)
 +
 +        timestart = time.time()
 +
 +        direction = parsed.get_binding('dir').upper()
 +        if direction == 'FROM':
 +            rows = self.perform_csv_import(ks, cf, columns, fname, opts)
 +            verb = 'imported'
 +        elif direction == 'TO':
 +            rows = self.perform_csv_export(ks, cf, columns, fname, opts)
 +            verb = 'exported'
 +        else:
 +            raise SyntaxError("Unknown direction %s" % direction)
 +
 +        timeend = time.time()
 +        print "\n%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart))
 +
 +    def perform_csv_import(self, ks, cf, columns, fname, opts):
 +        csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts)
 +        if unrecognized_options:
-             self.printerr('Unrecognized COPY FROM options: %s'
-                           % ', '.join(unrecognized_options.keys()))
++            self.printerr('Unrecognized COPY FROM options: %s' % ', '.join(unrecognized_options.keys()))
 +            return 0
-         nullval, header = csv_options['nullval'], csv_options['header']
- 
-         if fname is None:
-             do_close = False
-             print "[Use \. on a line by itself to end input]"
-             linesource = self.use_stdin_reader(prompt='[copy] ', until=r'\.')
-         else:
-             do_close = True
-             try:
-                 linesource = open(fname, 'rb')
-             except IOError, e:
-                 self.printerr("Can't open %r for reading: %s" % (fname, e))
-                 return 0
 +
-         current_record = None
-         processes, pipes = [], [],
-         try:
-             if header:
-                 linesource.next()
-             reader = csv.reader(linesource, **dialect_options)
- 
-             num_processes = copyutil.get_num_processes(cap=4)
- 
-             for i in range(num_processes):
-                 parent_conn, child_conn = mp.Pipe()
-                 pipes.append(parent_conn)
-                 processes.append(ImportProcess(self, child_conn, ks, cf, columns, nullval))
- 
-             for process in processes:
-                 process.start()
- 
-             meter = copyutil.RateMeter(10000)
-             for current_record, row in enumerate(reader, start=1):
-                 # write to the child process
-                 pipes[current_record % num_processes].send((current_record, row))
- 
-                 # update the progress and current rate periodically
-                 meter.increment()
- 
-                 # check for any errors reported by the children
-                 if (current_record % 100) == 0:
-                     if self._check_import_processes(current_record, pipes):
-                         # no errors seen, continue with outer loop
-                         continue
-                     else:
-                         # errors seen, break out of outer loop
-                         break
-         except Exception, exc:
-             if current_record is None:
-                 # we failed before we started
-                 self.printerr("\nError starting import process:\n")
-                 self.printerr(str(exc))
-                 if self.debug:
-                     traceback.print_exc()
-             else:
-                 self.printerr("\n" + str(exc))
-                 self.printerr("\nAborting import at record #%d. "
-                               "Previously inserted records and some records after "
-                               "this number may be present."
-                               % (current_record,))
-                 if self.debug:
-                     traceback.print_exc()
-         finally:
-             # send a message that indicates we're done
-             for pipe in pipes:
-                 pipe.send((None, None))
- 
-             for process in processes:
-                 process.join()
- 
-             self._check_import_processes(current_record, pipes)
- 
-             for pipe in pipes:
-                 pipe.close()
- 
-             if do_close:
-                 linesource.close()
-             elif self.tty:
-                 print
- 
-         return current_record
- 
-     def _check_import_processes(self, current_record, pipes):
-         for pipe in pipes:
-             if pipe.poll():
-                 try:
-                     (record_num, error) = pipe.recv()
-                     self.printerr("\n" + str(error))
-                     self.printerr(
-                         "Aborting import at record #%d. "
-                         "Previously inserted records are still present, "
-                         "and some records after that may be present as well."
-                         % (record_num,))
-                     return False
-                 except EOFError:
-                     # pipe is closed, nothing to read
-                     self.printerr("\nChild process died without notification, "
-                                   "aborting import at record #%d. Previously "
-                                   "inserted records are probably still present, "
-                                   "and some records after that may be present "
-                                   "as well." % (current_record,))
-                     return False
-         return True
++        return copyutil.ImportTask(self, ks, cf, columns, fname, csv_options, dialect_options,
++                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
 +
 +    def perform_csv_export(self, ks, cf, columns, fname, opts):
 +        csv_options, dialect_options, unrecognized_options = copyutil.parse_options(self, opts)
 +        if unrecognized_options:
 +            self.printerr('Unrecognized COPY TO options: %s' % ', '.join(unrecognized_options.keys()))
 +            return 0
 +
 +        return copyutil.ExportTask(self, ks, cf, columns, fname, csv_options, dialect_options,
 +                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
 +
 +    def do_show(self, parsed):
 +        """
 +        SHOW [cqlsh only]
 +
 +          Displays information about the current cqlsh session. Can be called in
 +          the following ways:
 +
 +        SHOW VERSION
 +
 +          Shows the version and build of the connected Cassandra instance, as
 +          well as the versions of the CQL spec and the Thrift protocol that
 +          the connected Cassandra instance understands.
 +
 +        SHOW HOST
 +
 +          Shows where cqlsh is currently connected.
 +
 +        SHOW SESSION <sessionid>
 +
 +          Pretty-prints the requested tracing session.
 +        """
 +        showwhat = parsed.get_binding('what').lower()
 +        if showwhat == 'version':
 +            self.get_connection_versions()
 +            self.show_version()
 +        elif showwhat == 'host':
 +            self.show_host()
 +        elif showwhat.startswith('session'):
 +            session_id = parsed.get_binding('sessionid').lower()
 +            self.show_session(UUID(session_id))
 +        else:
 +            self.printerr('Wait, how do I show %r?' % (showwhat,))
 +
 +    def do_source(self, parsed):
 +        """
 +        SOURCE [cqlsh only]
 +
 +        Executes a file containing CQL statements. Gives the output for each
 +        statement in turn, if any, or any errors that occur along the way.
 +
 +        Errors do NOT abort execution of the CQL source file.
 +
 +        Usage:
 +
 +          SOURCE '<file>';
 +
 +        That is, the path to the file to be executed must be given inside a
 +        string literal. The path is interpreted relative to the current working
 +        directory. The tilde shorthand notation ('~/mydir') is supported for
 +        referring to $HOME.
 +
 +        See also the --file option to cqlsh.
 +        """
 +        fname = parsed.get_binding('fname')
 +        fname = os.path.expanduser(self.cql_unprotect_value(fname))
 +        try:
 +            encoding, bom_size = get_file_encoding_bomsize(fname)
 +            f = codecs.open(fname, 'r', encoding)
 +            f.seek(bom_size)
 +        except IOError, e:
 +            self.printerr('Could not open %r: %s' % (fname, e))
 +            return
 +        subshell = Shell(self.hostname, self.port,
 +                         color=self.color, encoding=self.encoding, stdin=f,
 +                         tty=False, use_conn=self.conn, cqlver=self.cql_version,
 +                         display_timestamp_format=self.display_timestamp_format,
 +                         display_date_format=self.display_date_format,
 +                         display_nanotime_format=self.display_nanotime_format,
 +                         display_float_precision=self.display_float_precision,
 +                         max_trace_wait=self.max_trace_wait)
 +        subshell.cmdloop()
 +        f.close()
 +
 +    def do_capture(self, parsed):
 +        """
 +        CAPTURE [cqlsh only]
 +
 +        Begins capturing command output and appending it to a specified file.
 +        Output will not be shown at the console while it is captured.
 +
 +        Usage:
 +
 +          CAPTURE '<file>';
 +          CAPTURE OFF;
 +          CAPTURE;
 +
 +        That is, the path to the file to be appended to must be given inside a
 +        string literal. The path is interpreted relative to the current working
 +        directory. The tilde shorthand notation ('~/mydir') is supported for
 +        referring to $HOME.
 +
 +        Only query result output is captured. Errors and output from cqlsh-only
 +        commands will still be shown in the cqlsh session.
 +
 +        To stop capturing output and show it in the cqlsh session again, use
 +        CAPTURE OFF.
 +
 +        To inspect the current capture configuration, use CAPTURE with no
 +        arguments.
 +        """
 +        fname = parsed.get_binding('fname')
 +        if fname is None:
 +            if self.shunted_query_out is not None:
 +                print "Currently capturing query output to %r." % (self.query_out.name,)
 +            else:
 +                print "Currently not capturing query output."
 +            return
 +
 +        if fname.upper() == 'OFF':
 +            if self.shunted_query_out is None:
 +                self.printerr('Not currently capturing output.')
 +                return
 +            self.query_out.close()
 +            self.query_out = self.shunted_query_out
 +            self.color = self.shunted_color
 +            self.shunted_query_out = None
 +            del self.shunted_color
 +            return
 +
 +        if self.shunted_query_out is not None:
 +            self.printerr('Already capturing output to %s. Use CAPTURE OFF'
 +                          ' to disable.' % (self.query_out.name,))
 +            return
 +
 +        fname = os.path.expanduser(self.cql_unprotect_value(fname))
 +        try:
 +            f = open(fname, 'a')
 +        except IOError, e:
 +            self.printerr('Could not open %r for append: %s' % (fname, e))
 +            return
 +        self.shunted_query_out = self.query_out
 +        self.shunted_color = self.color
 +        self.query_out = f
 +        self.color = False
 +        print 'Now capturing query output to %r.' % (fname,)
 +
 +    def do_tracing(self, parsed):
 +        """
 +        TRACING [cqlsh]
 +
 +          Enables or disables request tracing.
 +
 +        TRACING ON
 +
 +          Enables tracing for all further requests.
 +
 +        TRACING OFF
 +
 +          Disables tracing.
 +
 +        TRACING
 +
 +          TRACING with no arguments shows the current tracing status.
 +        """
 +        self.tracing_enabled = SwitchCommand("TRACING", "Tracing").execute(self.tracing_enabled, parsed, self.printerr)
 +
 +    def do_expand(self, parsed):
 +        """
 +        EXPAND [cqlsh]
 +
 +          Enables or disables expanded (vertical) output.
 +
 +        EXPAND ON
 +
 +          Enables expanded (vertical) output.
 +
 +        EXPAND OFF
 +
 +          Disables expanded (vertical) output.
 +
 +        EXPAND
 +
 +          EXPAND with no arguments shows the current value of expand setting.
 +        """
 +        self.expand_enabled = SwitchCommand("EXPAND", "Expanded output").execute(self.expand_enabled, parsed, self.printerr)
 +
 +    def do_consistency(self, parsed):
 +        """
 +        CONSISTENCY [cqlsh only]
 +
 +           Overrides default consistency level (default level is ONE).
 +
 +        CONSISTENCY <level>
 +
 +           Sets consistency level for future requests.
 +
 +           Valid consistency levels:
 +
 +           ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_ONE, LOCAL_QUORUM, EACH_QUORUM, SERIAL and LOCAL_SERIAL.
 +
 +           SERIAL and LOCAL_SERIAL may be used only for SELECTs; will be rejected with updates.
 +
 +        CONSISTENCY
 +
 +           CONSISTENCY with no arguments shows the current consistency level.
 +        """
 +        level = parsed.get_binding('level')
 +        if level is None:
 +            print 'Current consistency level is %s.' % (cassandra.ConsistencyLevel.value_to_name[self.consistency_level])
 +            return
 +
 +        self.consistency_level = cassandra.ConsistencyLevel.name_to_value[level.upper()]
 +        print 'Consistency level set to %s.' % (level.upper(),)
 +
 +    def do_serial(self, parsed):
 +        """
 +        SERIAL CONSISTENCY [cqlsh only]
 +
 +           Overrides serial consistency level (default level is SERIAL).
 +
 +        SERIAL CONSISTENCY <level>
 +
 +           Sets consistency level for future conditional updates.
 +
 +           Valid consistency levels:
 +
 +           SERIAL, LOCAL_SERIAL.
 +
 +        SERIAL CONSISTENCY
 +
 +           SERIAL CONSISTENCY with no arguments shows the current consistency level.
 +        """
 +        level = parsed.get_binding('level')
 +        if level is None:
 +            print 'Current serial consistency level is %s.' % (cassandra.ConsistencyLevel.value_to_name[self.serial_consistency_level])
 +            return
 +
 +        self.serial_consistency_level = cassandra.ConsistencyLevel.name_to_value[level.upper()]
 +        print 'Serial consistency level set to %s.' % (level.upper(),)
 +
 +    def do_login(self, parsed):
 +        """
 +        LOGIN [cqlsh only]
 +
 +           Changes login information without requiring restart.
 +
 +        LOGIN <username> (<password>)
 +
 +           Login using the specified username. If password is specified, it will be used
 +           otherwise, you will be prompted to enter.
 +        """
 +        username = parsed.get_binding('username')
 +        password = parsed.get_binding('password')
 +        if password is None:
 +            password = getpass.getpass()
 +        else:
 +            password = password[1:-1]
 +
 +        auth_provider = PlainTextAuthProvider(username=username, password=password)
 +
 +        conn = Cluster(contact_points=(self.hostname,), port=self.port, cql_version=self.conn.cql_version,
 +                       protocol_version=self.conn.protocol_version,
 +                       auth_provider=auth_provider,
 +                       ssl_options=self.conn.ssl_options,
 +                       load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]),
 +                       connect_timeout=self.conn.connect_timeout)
 +
 +        if self.current_keyspace:
 +            session = conn.connect(self.current_keyspace)
 +        else:
 +            session = conn.connect()
 +
 +        # Update after we've connected in case we fail to authenticate
 +        self.conn = conn
 +        self.auth_provider = auth_provider
 +        self.username = username
 +        self.session = session
 +
 +    def do_exit(self, parsed=None):
 +        """
 +        EXIT/QUIT [cqlsh only]
 +
 +        Exits cqlsh.
 +        """
 +        self.stop = True
 +        if se

<TRUNCATED>

[2/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57d558fc/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index a2fab00,f699e64..a117ec3
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -23,19 -26,25 +26,25 @@@ import sy
  import time
  import traceback
  
- from StringIO import StringIO
+ from calendar import timegm
+ from collections import defaultdict, deque, namedtuple
+ from decimal import Decimal
  from random import randrange
+ from StringIO import StringIO
  from threading import Lock
+ from uuid import UUID
  
  from cassandra.cluster import Cluster
+ from cassandra.cqltypes import ReversedType, UserType
  from cassandra.metadata import protect_name, protect_names
- from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, TokenAwarePolicy
- from cassandra.query import tuple_factory
+ from cassandra.policies import RetryPolicy, WhiteListRoundRobinPolicy, TokenAwarePolicy, DCAwareRoundRobinPolicy
+ from cassandra.query import BatchStatement, BatchType, SimpleStatement, tuple_factory
+ from cassandra.util import Date, Time
  
- 
- import sslhandling
+ from cql3handling import CqlRuleSet
  from displaying import NO_COLOR_MAP
 -from formatting import format_value_default, EMPTY, get_formatter
 +from formatting import format_value_default, DateTimeFormat, EMPTY, get_formatter
+ from sslhandling import ssl_settings
  
  
  def parse_options(shell, opts):
@@@ -65,10 -74,13 +74,15 @@@
      # by default the page timeout is 10 seconds per 1000 entries in the page size or 10 seconds if pagesize is smaller
      csv_options['pagetimeout'] = int(opts.pop('pagetimeout', max(10, 10 * (csv_options['pagesize'] / 1000))))
      csv_options['maxattempts'] = int(opts.pop('maxattempts', 5))
-     csv_options['float_precision'] = shell.display_float_precision
 -    csv_options['dtformats'] = opts.pop('timeformat', shell.display_time_format)
 +    csv_options['dtformats'] = DateTimeFormat(opts.pop('timeformat', shell.display_timestamp_format),
 +                                              shell.display_date_format,
 +                                              shell.display_nanotime_format)
+     csv_options['float_precision'] = shell.display_float_precision
+     csv_options['chunksize'] = int(opts.pop('chunksize', 1000))
+     csv_options['ingestrate'] = int(opts.pop('ingestrate', 100000))
+     csv_options['maxbatchsize'] = int(opts.pop('maxbatchsize', 20))
+     csv_options['minbatchsize'] = int(opts.pop('minbatchsize', 2))
+     csv_options['reportfrequency'] = float(opts.pop('reportfrequency', 0.25))
  
      return csv_options, dialect_options, opts
  
@@@ -371,30 -648,18 +650,18 @@@ class ExportProcess(ChildProcess)
      An child worker process for the export task, ExportTask.
      """
  
-     def __init__(self, inmsg, outmsg, ks, cf, columns, dialect_options, csv_options,
-                  debug, port, cql_version, auth_provider, ssl, protocol_version, config_file):
-         mp.Process.__init__(self, target=self.run)
-         self.inmsg = inmsg
-         self.outmsg = outmsg
-         self.ks = ks
-         self.cf = cf
-         self.columns = columns
-         self.dialect_options = dialect_options
+     def __init__(self, params):
+         ChildProcess.__init__(self, params=params, target=self.run)
+         self.dialect_options = params['dialect_options']
          self.hosts_to_sessions = dict()
  
-         self.debug = debug
-         self.port = port
-         self.cql_version = cql_version
-         self.auth_provider = auth_provider
-         self.ssl = ssl
-         self.protocol_version = protocol_version
-         self.config_file = config_file
- 
+         csv_options = params['csv_options']
          self.encoding = csv_options['encoding']
 -        self.time_format = csv_options['dtformats']
 +        self.date_time_format = csv_options['dtformats']
          self.float_precision = csv_options['float_precision']
          self.nullval = csv_options['nullval']
-         self.maxjobs = csv_options['jobs']
+         self.max_attempts = csv_options['maxattempts']
+         self.max_requests = csv_options['maxrequests']
          self.csv_options = csv_options
          self.formatters = dict()
  
@@@ -600,13 -851,424 +853,424 @@@
          return query
  
  
+ class ImportConversion(object):
+     """
+     A class for converting strings to values when importing from csv, used by ImportProcess,
+     the parent.
+     """
+     def __init__(self, parent, table_meta, statement):
+         self.ks = parent.ks
+         self.cf = parent.cf
+         self.columns = parent.columns
+         self.nullval = parent.nullval
+         self.printmsg = parent.printmsg
+         self.table_meta = table_meta
+         self.primary_key_indexes = [self.columns.index(col.name) for col in self.table_meta.primary_key]
+         self.partition_key_indexes = [self.columns.index(col.name) for col in self.table_meta.partition_key]
+ 
+         self.proto_version = statement.protocol_version
+         self.cqltypes = dict([(c.name, c.type) for c in statement.column_metadata])
+         self.converters = dict([(c.name, self._get_converter(c.type)) for c in statement.column_metadata])
+ 
+     def _get_converter(self, cql_type):
+         """
+         Return a function that converts a string into a value the can be passed
+         into BoundStatement.bind() for the given cql type. See cassandra.cqltypes
+         for more details.
+         """
+         def unprotect(v):
+             if v is not None:
+                 return CqlRuleSet.dequote_value(v)
+ 
+         def convert(t, v):
+             return converters.get(t.typename, convert_unknown)(unprotect(v), ct=t)
+ 
+         def split(val, sep=','):
+             """
+             Split into a list of values whenever we encounter a separator but
+             ignore separators inside parentheses or single quotes, except for the two
+             outermost parentheses, which will be ignored. We expect val to be at least
+             2 characters long (the two outer parentheses).
+             """
+             ret = []
+             last = 1
+             level = 0
+             quote = False
+             for i, c in enumerate(val):
+                 if c == '{' or c == '[' or c == '(':
+                     level += 1
+                 elif c == '}' or c == ']' or c == ')':
+                     level -= 1
+                 elif c == '\'':
+                     quote = not quote
+                 elif c == sep and level == 1 and not quote:
+                     ret.append(val[last:i])
+                     last = i + 1
+             else:
+                 if last < len(val) - 1:
+                     ret.append(val[last:-1])
+ 
+             return ret
+ 
+         # this should match all possible CQL datetime formats
+         p = re.compile("(\d{4})\-(\d{2})\-(\d{2})\s?(?:'T')?" +  # YYYY-MM-DD[( |'T')]
+                        "(?:(\d{2}):(\d{2})(?::(\d{2}))?)?" +  # [HH:MM[:SS]]
+                        "(?:([+\-])(\d{2}):?(\d{2}))?")  # [(+|-)HH[:]MM]]
+ 
+         def convert_date(val, **_):
+             m = p.match(val)
+             if not m:
+                 raise ValueError("can't interpret %r as a date" % (val,))
+ 
+             # https://docs.python.org/2/library/time.html#time.struct_time
+             tval = time.struct_time((int(m.group(1)), int(m.group(2)), int(m.group(3)),  # year, month, day
+                                      int(m.group(4)) if m.group(4) else 0,  # hour
+                                      int(m.group(5)) if m.group(5) else 0,  # minute
+                                      int(m.group(6)) if m.group(6) else 0,  # second
+                                      0, 1, -1))  # day of week, day of year, dst-flag
+ 
+             if m.group(7):
+                 offset = (int(m.group(8)) * 3600 + int(m.group(9)) * 60) * int(m.group(7) + '1')
+             else:
+                 offset = -time.timezone
+ 
+             # scale seconds to millis for the raw value
+             return (timegm(tval) + offset) * 1e3
+ 
+         def convert_tuple(val, ct=cql_type):
+             return tuple(convert(t, v) for t, v in zip(ct.subtypes, split(val)))
+ 
+         def convert_list(val, ct=cql_type):
+             return list(convert(ct.subtypes[0], v) for v in split(val))
+ 
+         def convert_set(val, ct=cql_type):
+             return frozenset(convert(ct.subtypes[0], v) for v in split(val))
+ 
+         def convert_map(val, ct=cql_type):
+             """
+             We need to pass to BoundStatement.bind() a dict() because it calls iteritems(),
+             except we can't create a dict with another dict as the key, hence we use a class
+             that adds iteritems to a frozen set of tuples (which is how dict are normally made
+             immutable in python).
+             """
+             class ImmutableDict(frozenset):
+                 iteritems = frozenset.__iter__
+ 
+             return ImmutableDict(frozenset((convert(ct.subtypes[0], v[0]), convert(ct.subtypes[1], v[1]))
+                                  for v in [split('{%s}' % vv, sep=':') for vv in split(val)]))
+ 
+         def convert_user_type(val, ct=cql_type):
+             """
+             A user type is a dictionary except that we must convert each key into
+             an attribute, so we are using named tuples. It must also be hashable,
+             so we cannot use dictionaries. Maybe there is a way to instantiate ct
+             directly but I could not work it out.
+             """
+             vals = [v for v in [split('{%s}' % vv, sep=':') for vv in split(val)]]
+             ret_type = namedtuple(ct.typename, [unprotect(v[0]) for v in vals])
+             return ret_type(*tuple(convert(t, v[1]) for t, v in zip(ct.subtypes, vals)))
+ 
+         def convert_single_subtype(val, ct=cql_type):
+             return converters.get(ct.subtypes[0].typename, convert_unknown)(val, ct=ct.subtypes[0])
+ 
+         def convert_unknown(val, ct=cql_type):
+             if issubclass(ct, UserType):
+                 return convert_user_type(val, ct=ct)
+             elif issubclass(ct, ReversedType):
+                 return convert_single_subtype(val, ct=ct)
+ 
+             self.printmsg("Unknown type %s (%s) for val %s" % (ct, ct.typename, val))
+             return val
+ 
+         converters = {
+             'blob': (lambda v, ct=cql_type: bytearray.fromhex(v[2:])),
+             'decimal': (lambda v, ct=cql_type: Decimal(v)),
+             'uuid': (lambda v, ct=cql_type: UUID(v)),
+             'boolean': (lambda v, ct=cql_type: bool(v)),
+             'tinyint': (lambda v, ct=cql_type: int(v)),
+             'ascii': (lambda v, ct=cql_type: v),
+             'float': (lambda v, ct=cql_type: float(v)),
+             'double': (lambda v, ct=cql_type: float(v)),
+             'bigint': (lambda v, ct=cql_type: long(v)),
+             'int': (lambda v, ct=cql_type: int(v)),
+             'varint': (lambda v, ct=cql_type: int(v)),
+             'inet': (lambda v, ct=cql_type: v),
+             'counter': (lambda v, ct=cql_type: long(v)),
+             'timestamp': convert_date,
+             'timeuuid': (lambda v, ct=cql_type: UUID(v)),
+             'date': (lambda v, ct=cql_type: Date(v)),
+             'smallint': (lambda v, ct=cql_type: int(v)),
+             'time': (lambda v, ct=cql_type: Time(v)),
+             'text': (lambda v, ct=cql_type: v),
+             'varchar': (lambda v, ct=cql_type: v),
+             'list': convert_list,
+             'set': convert_set,
+             'map': convert_map,
+             'tuple': convert_tuple,
+             'frozen': convert_single_subtype,
+         }
+ 
+         return converters.get(cql_type.typename, convert_unknown)
+ 
+     def get_row_values(self, row):
+         """
+         Parse the row into a list of row values to be returned
+         """
+         ret = [None] * len(row)
+         for i, val in enumerate(row):
+             if val != self.nullval:
+                 ret[i] = self.converters[self.columns[i]](val)
+             else:
+                 if i in self.primary_key_indexes:
+                     message = "Cannot insert null value for primary key column '%s'." % (self.columns[i],)
+                     if self.nullval == '':
+                         message += " If you want to insert empty strings, consider using" \
+                                    " the WITH NULL=<marker> option for COPY."
+                     raise Exception(message=message)
+ 
+                 ret[i] = None
+ 
+         return ret
+ 
+     def get_row_partition_key_values(self, row):
+         """
+         Return a string composed of the partition key values, serialized and binary packed -
+         as expected by metadata.get_replicas(), see also BoundStatement.routing_key.
+         """
+         def serialize(n):
+             c, v = self.columns[n], row[n]
+             return self.cqltypes[c].serialize(self.converters[c](v), self.proto_version)
+ 
+         partition_key_indexes = self.partition_key_indexes
+         if len(partition_key_indexes) == 1:
+             return serialize(partition_key_indexes[0])
+         else:
+             pk_values = []
+             for i in partition_key_indexes:
+                 val = serialize(i)
+                 l = len(val)
+                 pk_values.append(struct.pack(">H%dsB" % l, l, val, 0))
+             return b"".join(pk_values)
+ 
+ 
+ class ImportProcess(ChildProcess):
+ 
+     def __init__(self, params):
+         ChildProcess.__init__(self, params=params, target=self.run)
+ 
+         csv_options = params['csv_options']
+         self.nullval = csv_options['nullval']
+         self.max_attempts = csv_options['maxattempts']
+         self.min_batch_size = csv_options['minbatchsize']
+         self.max_batch_size = csv_options['maxbatchsize']
+         self._session = None
+ 
+     @property
+     def session(self):
+         if not self._session:
+             cluster = Cluster(
+                 contact_points=(self.hostname,),
+                 port=self.port,
+                 cql_version=self.cql_version,
+                 protocol_version=self.protocol_version,
+                 auth_provider=self.auth_provider,
+                 load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
+                 ssl_options=ssl_settings(self.hostname, self.config_file) if self.ssl else None,
+                 default_retry_policy=ExpBackoffRetryPolicy(self),
+                 compression=None,
+                 connect_timeout=self.connect_timeout)
+ 
+             self._session = cluster.connect(self.ks)
+             self._session.default_timeout = None
+         return self._session
+ 
+     def run(self):
+         try:
+             table_meta = self.session.cluster.metadata.keyspaces[self.ks].tables[self.cf]
 -            is_counter = ("counter" in [table_meta.columns[name].typestring for name in self.columns])
++            is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.columns])
+ 
+             if is_counter:
+                 self.run_counter(table_meta)
+             else:
+                 self.run_normal(table_meta)
+ 
+         except Exception, exc:
+             if self.debug:
+                 traceback.print_exc(exc)
+ 
+         finally:
+             self.close()
+ 
+     def close(self):
+         if self._session:
+             self._session.cluster.shutdown()
+         ChildProcess.close(self)
+ 
+     def run_counter(self, table_meta):
+         """
+         Main run method for tables that contain counter columns.
+         """
+         query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.cf))
+ 
+         # We prepare a query statement to find out the types of the partition key columns so we can
+         # route the update query to the correct replicas. As far as I understood this is the easiest
+         # way to find out the types of the partition columns, we will never use this prepared statement
+         where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
+         select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.cf), where_clause)
+         conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
+ 
+         while True:
+             try:
+                 batch = self.inmsg.get()
+ 
+                 for batches in self.split_batches(batch, conv):
+                     for b in batches:
+                         self.send_counter_batch(query, conv, b)
+ 
+             except Exception, exc:
+                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
+                 if self.debug:
+                     traceback.print_exc(exc)
+ 
+     def run_normal(self, table_meta):
+         """
+         Main run method for normal tables, i.e. tables that do not contain counter columns.
+         """
+         query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+                                                         protect_name(self.cf),
+                                                         ', '.join(protect_names(self.columns),),
+                                                         ', '.join(['?' for _ in self.columns]))
+         query_statement = self.session.prepare(query)
+         conv = ImportConversion(self, table_meta, query_statement)
+ 
+         while True:
+             try:
+                 batch = self.inmsg.get()
+ 
+                 for batches in self.split_batches(batch, conv):
+                     for b in batches:
+                         self.send_normal_batch(conv, query_statement, b)
+ 
+             except Exception, exc:
+                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
+                 if self.debug:
+                     traceback.print_exc(exc)
+ 
+     def send_counter_batch(self, query_text, conv, batch):
+         if self.test_failures and self.maybe_inject_failures(batch):
+             return
+ 
+         columns = self.columns
+         batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+         for row in batch['rows']:
+             where_clause = []
+             set_clause = []
+             for i, value in enumerate(row):
+                 if i in conv.primary_key_indexes:
+                     where_clause.append("%s=%s" % (columns[i], value))
+                 else:
+                     set_clause.append("%s=%s+%s" % (columns[i], columns[i], value))
+ 
+             full_query_text = query_text % (','.join(set_clause), ' AND '.join(where_clause))
+             batch_statement.add(full_query_text)
+ 
+         self.execute_statement(batch_statement, batch)
+ 
+     def send_normal_batch(self, conv, query_statement, batch):
+         try:
+             if self.test_failures and self.maybe_inject_failures(batch):
+                 return
+ 
+             batch_statement = BatchStatement(batch_type=BatchType.UNLOGGED, consistency_level=self.consistency_level)
+             for row in batch['rows']:
+                 batch_statement.add(query_statement, conv.get_row_values(row))
+ 
+             self.execute_statement(batch_statement, batch)
+ 
+         except Exception, exc:
+             self.err_callback(exc, batch)
+ 
+     def maybe_inject_failures(self, batch):
+         """
+         Examine self.test_failures and see if token_range is either a token range
+         supposed to cause a failure (failing_range) or to terminate the worker process
+         (exit_range). If not then call prepare_export_query(), which implements the
+         normal behavior.
+         """
+         if 'failing_batch' in self.test_failures:
+             failing_batch = self.test_failures['failing_batch']
+             if failing_batch['id'] == batch['id']:
+                 if batch['attempts'] < failing_batch['failures']:
+                     statement = SimpleStatement("INSERT INTO badtable (a, b) VALUES (1, 2)",
+                                                 consistency_level=self.consistency_level)
+                     self.execute_statement(statement, batch)
+                     return True
+ 
+         if 'exit_batch' in self.test_failures:
+             exit_batch = self.test_failures['exit_batch']
+             if exit_batch['id'] == batch['id']:
+                 sys.exit(1)
+ 
+         return False  # carry on as normal
+ 
+     def execute_statement(self, statement, batch):
+         future = self.session.execute_async(statement)
+         future.add_callbacks(callback=self.result_callback, callback_args=(batch, ),
+                              errback=self.err_callback, errback_args=(batch, ))
+ 
+     def split_batches(self, batch, conv):
+         """
+         Split a batch into sub-batches with the same
+         partition key, if possible. If there are at least
+         batch_size rows with the same partition key value then
+         create a sub-batch with that partition key value, else
+         aggregate all remaining rows in a single 'left-overs' batch
+         """
+         rows_by_pk = defaultdict(list)
+ 
+         for row in batch['rows']:
+             pk = conv.get_row_partition_key_values(row)
+             rows_by_pk[pk].append(row)
+ 
+         ret = dict()
+         remaining_rows = []
+ 
+         for pk, rows in rows_by_pk.items():
+             if len(rows) >= self.min_batch_size:
+                 ret[pk] = self.batches(rows, batch)
+             else:
+                 remaining_rows.extend(rows)
+ 
+         if remaining_rows:
+             ret[self.hostname] = self.batches(remaining_rows, batch)
+ 
+         return ret.itervalues()
+ 
+     def batches(self, rows, batch):
+         for i in xrange(0, len(rows), self.max_batch_size):
+             yield ImportTask.make_batch(batch['id'], rows[i:i + self.max_batch_size], batch['attempts'])
+ 
+     def result_callback(self, result, batch):
+         batch['imported'] = len(batch['rows'])
+         batch['rows'] = []  # no need to resend these
+         self.outmsg.put((batch, None))
+ 
+     def err_callback(self, response, batch):
+         batch['imported'] = len(batch['rows'])
+         self.outmsg.put((batch, '%s - %s' % (response.__class__.__name__, response.message)))
+         if self.debug:
+             traceback.print_exc(response)
+ 
+ 
  class RateMeter(object):
  
-     def __init__(self, log_threshold):
-         self.log_threshold = log_threshold  # number of records after which we log
-         self.last_checkpoint_time = time.time()  # last time we logged
+     def __init__(self, update_interval=0.25, log=True):
+         self.log = log  # true if we should log
+         self.update_interval = update_interval  # how often we update in seconds
+         self.start_time = time.time()  # the start time
+         self.last_checkpoint_time = self.start_time  # last time we logged
          self.current_rate = 0.0  # rows per second
-         self.current_record = 0  # number of records since we last logged
+         self.current_record = 0  # number of records since we last updated
          self.total_records = 0   # total number of records
  
      def increment(self, n=1):