You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/24 20:58:07 UTC

git commit: cqlsh: add a COPY TO command Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4434

Updated Branches:
  refs/heads/cassandra-1.1 41c9ba63d -> 9a6339476


cqlsh: add a COPY TO command
Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4434


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a633947
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a633947
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a633947

Branch: refs/heads/cassandra-1.1
Commit: 9a63394765de28160d576c9285be68587e222a86
Parents: 41c9ba6
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 24 13:57:19 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 24 13:57:19 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt |    1 +
 bin/cqlsh   |  126 ++++++++++++++++++++++++++++++++++++++++++++---------
 2 files changed, 105 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0885387..638574c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@ Merged from 1.0:
  * Fix LCS splitting sstable base on uncompressed size (CASSANDRA-4419)
  * Bootstraps that fail are detected upon restart and will retry safely without
    needing to delete existing data first (CASSANDRA-4427)
+ * (cqlsh) add a COPY TO command to copy a CF to a CSV file (CASSANDRA-4434)
 
 
 1.1.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index 574d49b..c67a818 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -224,7 +224,8 @@ cqlsh_extra_syntax_rules = r'''
 
 <copyCommand> ::= "COPY" cf=<columnFamilyName>
                          ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )?
-                         "FROM" ( fname=<stringLiteral> | "STDIN" )
+                         ( dir="FROM" ( fname=<stringLiteral> | "STDIN" )
+                         | dir="TO"   ( fname=<stringLiteral> | "STDOUT" ) )
                          ( "WITH" <copyOption> ( "AND" <copyOption> )* )?
                 ;
 
@@ -303,12 +304,16 @@ def complete_copy_column_names(ctxt, cqlsh):
         return [colnames[0]]
     return set(colnames[1:]) - set(existcols)
 
-COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER')
+COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'ENCODING', 'NULL')
 
 @cqlsh_syntax_completer('copyOption', 'optnames')
 def complete_copy_options(ctxt, cqlsh):
     optnames = map(str.upper, ctxt.get_binding('optnames', ()))
-    return set(COPY_OPTIONS) - set(optnames)
+    direction = ctxt.get_binding('dir').upper()
+    opts = set(COPY_OPTIONS) - set(optnames)
+    if direction == 'FROM':
+        opts -= ('ENCODING', 'NULL')
+    return opts
 
 @cqlsh_syntax_completer('copyOption', 'optvals')
 def complete_copy_opt_values(ctxt, cqlsh):
@@ -448,13 +453,13 @@ def unix_time_from_uuid1(u):
     return (u.get_time() - 0x01B21DD213814000) / 10000000.0
 
 def format_value(val, casstype, output_encoding, addcolor=False, time_format='',
-                 float_precision=3, colormap=DEFAULT_VALUE_COLORS):
+                 float_precision=3, colormap=DEFAULT_VALUE_COLORS, nullval='null'):
     color = colormap['default']
     coloredval = None
     displaywidth = None
 
     if val is None:
-        bval = 'null'
+        bval = nullval
         color = colormap['error']
     elif isinstance(val, DecodeError):
         casstype = 'BytesType'
@@ -727,7 +732,7 @@ class Shell(cmd.Cmd):
     def get_column_names(self, ksname, cfname):
         if ksname is None:
             ksname = self.current_keyspace
-        if self.cqlver_atleast(3):
+        if ksname != 'system' and self.cqlver_atleast(3):
             return self.get_column_names_from_layout(ksname, cfname)
         else:
             return self.get_column_names_from_cfdef(ksname, cfname)
@@ -1433,6 +1438,9 @@ class Shell(cmd.Cmd):
         COPY <table_name> [ ( column [, ...] ) ]
              FROM ( '<filename>' | STDIN )
              [ WITH <option>='value' [AND ...] ];
+        COPY <table_name> [ ( column [, ...] ) ]
+             TO ( '<filename>' | STDOUT )
+             [ WITH <option>='value' [AND ...] ];
 
         Available options and defaults:
 
@@ -1440,6 +1448,8 @@ class Shell(cmd.Cmd):
           QUOTE='"'        - quoting character to be used to quote fields
           ESCAPE='\'       - character to appear before the QUOTE char when quoted
           HEADER=false     - whether to ignore the first line
+          ENCODING='utf8'  - encoding for CSV output (COPY TO only)
+          NULL=''          - string that represents a null value (COPY TO only)
 
         When entering CSV data on STDIN, you can use the sequence "\."
         on a line by itself to end the data input.
@@ -1448,12 +1458,11 @@ class Shell(cmd.Cmd):
         ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
         if ks is None:
             ks = self.current_keyspace
+            if ks is None:
+                raise NoKeyspaceError("Not in any keyspace.")
         cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
         columns = parsed.get_binding('colnames', None)
-        if columns is None:
-            # default to all known columns
-            columns = self.get_column_names(ks, cf)
-        else:
+        if columns is not None:
             columns = map(self.cql_unprotect_name, columns)
         fname = parsed.get_binding('fname', None)
         if fname is not None:
@@ -1462,14 +1471,20 @@ class Shell(cmd.Cmd):
         copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
         opts = dict(zip(copyoptnames, copyoptvals))
 
-        # when/if COPY TO is supported, this would be a good place to branch
-        # on direction.
-
         timestart = time.time()
-        rows = self.perform_csv_import(ks, cf, columns, fname, opts)
-        timeend = time.time()
 
-        print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart))
+        direction = parsed.get_binding('dir').upper()
+        if direction == 'FROM':
+            rows = self.perform_csv_import(ks, cf, columns, fname, opts)
+            verb = 'imported'
+        elif direction == 'TO':
+            rows = self.perform_csv_export(ks, cf, columns, fname, opts)
+            verb = 'exported'
+        else:
+            raise SyntaxError("Unknown direction %s" % direction)
+
+        timeend = time.time()
+        print "%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart))
 
     def perform_csv_import(self, ks, cf, columns, fname, opts):
         dialect_options = self.csv_dialect_defaults.copy()
@@ -1483,7 +1498,6 @@ class Shell(cmd.Cmd):
         if dialect_options['quotechar'] == dialect_options['escapechar']:
             dialect_options['doublequote'] = True
             del dialect_options['escapechar']
-
         if opts:
             self.printerr('Unrecognized COPY FROM options: %s'
                           % ', '.join(opts.keys()))
@@ -1496,15 +1510,15 @@ class Shell(cmd.Cmd):
         else:
             do_close = True
             try:
-                linesource = open(fname, 'r')
+                linesource = open(fname, 'rb')
             except IOError, e:
                 self.printerr("Can't open %r for reading: %s" % (fname, e))
                 return 0
-        if header:
-            linesource.next()
-
-        prepq = self.prep_import_insert(ks, cf, columns)
         try:
+            if header:
+                linesource.next()
+            prepq = self.prep_import_insert(ks, cf, columns)
+            rownum = -1
             reader = csv.reader(linesource, **dialect_options)
             for rownum, row in enumerate(reader):
                 if len(row) != len(columns):
@@ -1525,6 +1539,10 @@ class Shell(cmd.Cmd):
         return rownum + 1
 
     def prep_import_insert(self, ks, cf, columns):
+        if columns is None:
+            # default to all known columns
+            columns = self.get_column_names(ks, cf)
+
         # would be nice to be able to use a prepared query here, but in order
         # to use that interface, we'd need to have all the input as native
         # values already, reading them from text just like the various
@@ -1543,6 +1561,70 @@ class Shell(cmd.Cmd):
             print "Import using CQL: %s" % cql
         return self.perform_statement(cql)
 
+    def perform_csv_export(self, ks, cf, columns, fname, opts):
+        dialect_options = self.csv_dialect_defaults.copy()
+        if 'quote' in opts:
+            dialect_options['quotechar'] = opts.pop('quote')
+        if 'escape' in opts:
+            dialect_options['escapechar'] = opts.pop('escape')
+        if 'delimiter' in opts:
+            dialect_options['delimiter'] = opts.pop('delimiter')
+        encoding = opts.pop('encoding', 'utf8')
+        nullval = opts.pop('null', '')
+        header = bool(opts.pop('header', '').lower() == 'true')
+        if dialect_options['quotechar'] == dialect_options['escapechar']:
+            dialect_options['doublequote'] = True
+            del dialect_options['escapechar']
+
+        if opts:
+            self.printerr('Unrecognized COPY TO options: %s'
+                          % ', '.join(opts.keys()))
+            return 0
+
+        if fname is None:
+            do_close = False
+            csvdest = sys.stdout
+        else:
+            do_close = True
+            try:
+                csvdest = open(fname, 'wb')
+            except IOError, e:
+                self.printerr("Can't open %r for writing: %s" % (fname, e))
+                return 0
+        try:
+            self.prep_export_dump(ks, cf, columns)
+            writer = csv.writer(csvdest, **dialect_options)
+            if header:
+                writer.writerow([d[0] for d in self.cursor.description])
+            rows = 0
+            while True:
+                row = self.cursor.fetchone()
+                if row is None:
+                    break
+                fmt = lambda v, d: \
+                    format_value(v, d[1], output_encoding=encoding, nullval=nullval,
+                                 time_format=self.display_time_format,
+                                 float_precision=self.display_float_precision).strval
+                writer.writerow(map(fmt, row, self.cursor.description))
+                rows += 1
+        finally:
+            if do_close:
+                csvdest.close()
+        return rows
+
+    def prep_export_dump(self, ks, cf, columns):
+        if columns is None:
+            columnlist = '*'
+        else:
+            columnlist = ', '.join(map(self.cql_protect_name, columns))
+        # this limit is pretty awful. would be better to use row-key-paging, so
+        # that the dump could be pretty easily aborted if necessary, but that
+        # can be kind of tricky with cql3. Punt for now, until the real cursor
+        # API is added in CASSANDRA-4415.
+        query = 'SELECT %s FROM %s.%s LIMIT 99999999' \
+                % (columnlist, self.cql_protect_name(ks), self.cql_protect_name(cf))
+        self.cursor.execute(query)
+
     def do_show(self, parsed):
         """
         SHOW [cqlsh only]