You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/06/18 20:25:30 UTC

[2/3] git commit: cqlsh: add COPY command to load data from CSV flat files Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4012

cqlsh: add COPY command to load data from CSV flat files
Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ba2631e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ba2631e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ba2631e

Branch: refs/heads/trunk
Commit: 0ba2631ee228bdefaba61a53d723a65107ca044d
Parents: 0cc168a
Author: Brandon Williams <br...@apache.org>
Authored: Mon Jun 18 13:24:32 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Jun 18 13:24:32 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                    |    1 +
 bin/cqlsh                      |  227 +++++++++++++++++++++++++++++++++--
 pylib/cqlshlib/cql3handling.py |    4 +-
 pylib/cqlshlib/cqlhandling.py  |   15 ++-
 4 files changed, 233 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 693b03b..ec03ca6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * (cql3) Reject (not yet supported) creation of 2ndardy indexes on tables with
    composite primary keys (CASSANDRA-4328)
  * Set JVM stack size to 160k for java 7 (CASSANDRA-4275)
+ * cqlsh: add COPY command to load data from CSV flat files (CASSANDRA-4012)
 Merged from 1.0:
  * Set gc_grace on index CF to 0 (CASSANDRA-4314)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index fecd472..842a313 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -36,7 +36,7 @@ version = "2.2.0"
 
 from StringIO import StringIO
 from itertools import groupby
-from contextlib import contextmanager
+from contextlib import contextmanager, closing
 from glob import glob
 from functools import partial
 from collections import defaultdict
@@ -52,6 +52,7 @@ import locale
 import re
 import platform
 import warnings
+import csv
 
 # cqlsh should run correctly when run out of a Cassandra source tree,
 # out of an unpacked Cassandra tarball, and after a proper package install.
@@ -189,6 +190,7 @@ cqlsh_extra_syntax_rules = r'''
                    | <assumeCommand>
                    | <sourceCommand>
                    | <captureCommand>
+                   | <copyCommand>
                    | <debugCommand>
                    | <helpCommand>
                    | <exitCommand>
@@ -220,6 +222,15 @@ cqlsh_extra_syntax_rules = r'''
 <captureCommand> ::= "CAPTURE" ( fname=( <stringLiteral> | "OFF" ) )?
                    ;
 
+<copyCommand> ::= "COPY" cf=<columnFamilyName>
+                         ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )?
+                         "FROM" ( fname=<stringLiteral> | "STDIN" )
+                         ( "WITH" <copyOption> ( "AND" <copyOption> )* )?
+                ;
+
+<copyOption> ::= [optnames]=<cfOptionName> "=" [optvals]=<cfOptionVal>
+               ;
+
 # avoiding just "DEBUG" so that this rule doesn't get treated as a terminal
 <debugCommand> ::= "DEBUG" "THINGS"?
                  ;
@@ -272,6 +283,41 @@ cqlsh_syntax_completer('sourceCommand', 'fname') \
 cqlsh_syntax_completer('captureCommand', 'fname') \
         (complete_source_quoted_filename)
 
+@cqlsh_syntax_completer('copyCommand', 'fname')
+def copy_fname_completer(ctxt, cqlsh):
+    lasttype = ctxt.get_binding('*LASTTYPE*')
+    if lasttype == 'unclosedString':
+        return complete_source_quoted_filename(ctxt, cqlsh)
+    partial = ctxt.get_binding('partial')
+    if partial == '':
+        return ["'"]
+    return ()
+
+@cqlsh_syntax_completer('copyCommand', 'colnames')
+def complete_copy_column_names(ctxt, cqlsh):
+    existcols = map(cqlsh.cql_unprotect_name, ctxt.get_binding('colnames', ()))
+    ks = cqlsh.cql_unprotect_name(ctxt.get_binding('ksname', None))
+    cf = cqlsh.cql_unprotect_name(ctxt.get_binding('cfname'))
+    colnames = cqlsh.get_column_names(ks, cf)
+    if len(existcols) == 0:
+        return [colnames[0]]
+    return set(colnames[1:]) - set(existcols)
+
+COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER')
+
+@cqlsh_syntax_completer('copyOption', 'optnames')
+def complete_copy_options(ctxt, cqlsh):
+    optnames = map(str.upper, ctxt.get_binding('optnames', ()))
+    return set(COPY_OPTIONS) - set(optnames)
+
+@cqlsh_syntax_completer('copyOption', 'optvals')
+def complete_copy_opt_values(ctxt, cqlsh):
+    optnames = ctxt.get_binding('optnames', ())
+    lastopt = optnames[-1].lower()
+    if lastopt == 'header':
+        return ['true', 'false']
+    return [cqlhandling.Hint('<single_character_string>')]
+
 class NoKeyspaceError(Exception):
     pass
 
@@ -469,6 +515,22 @@ def show_warning_without_quoting_line(message, category, filename, lineno, file=
 warnings.showwarning = show_warning_without_quoting_line
 warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure)
 
+def describe_interval(seconds):
+    desc = []
+    for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')):
+        num = int(seconds) / length
+        if num > 0:
+            desc.append('%d %s' % (num, unit))
+            if num > 1:
+                desc[-1] += 's'
+        seconds %= length
+    words = '%.03f seconds' % seconds
+    if len(desc) > 1:
+        words = ', '.join(desc) + ', and ' + words
+    elif len(desc) == 1:
+        words = desc[0] + ' and ' + words
+    return words
+
 class Shell(cmd.Cmd):
     default_prompt  = "cqlsh> "
     continue_prompt = "   ... "
@@ -481,6 +543,8 @@ class Shell(cmd.Cmd):
     debug = False
     stop = False
     shunted_query_out = None
+    csv_dialect_defaults = dict(delimiter=',', doublequote=False,
+                                escapechar='\\', quotechar='"')
 
     def __init__(self, hostname, port, color=False, username=None,
                  password=None, encoding=None, stdin=None, tty=True,
@@ -522,7 +586,6 @@ class Shell(cmd.Cmd):
             stdin = sys.stdin
         self.tty = tty
         if tty:
-            self.prompt = None
             self.reset_prompt()
             self.report_connection()
             print 'Use HELP for help.'
@@ -661,6 +724,25 @@ class Shell(cmd.Cmd):
                 filterable.add(cm.name)
         return filterable
 
+    def get_column_names(self, ksname, cfname):
+        if ksname is None:
+            ksname = self.current_keyspace
+        if self.cqlver_atleast(3):
+            return self.get_column_names_from_layout(ksname, cfname)
+        else:
+            return self.get_column_names_from_cfdef(ksname, cfname)
+
+    def get_column_names_from_layout(self, ksname, cfname):
+        layout = self.get_columnfamily_layout(ksname, cfname)
+        return [col.name for col in layout.columns]
+
+    def get_column_names_from_cfdef(self, ksname, cfname):
+        cfdef = self.get_columnfamily(cfname, ksname=ksname)
+        key_alias = cfdef.key_alias
+        if key_alias is None:
+            key_alias = 'KEY'
+        return [key_alias] + [cm.name for cm in cfdef.column_metadata]
+
     # ===== thrift-dependent parts =====
 
     def get_cluster_name(self):
@@ -758,16 +840,25 @@ class Shell(cmd.Cmd):
 
     def get_input_line(self, prompt=''):
         if self.tty:
-            line = raw_input(self.prompt) + '\n'
+            line = raw_input(prompt) + '\n'
         else:
-            sys.stdout.write(self.prompt)
-            sys.stdout.flush()
             line = self.stdin.readline()
             if not len(line):
                 raise EOFError
         self.lineno += 1
         return line
 
+    def use_stdin_reader(self, until='', prompt=''):
+        until += '\n'
+        while True:
+            try:
+                newline = self.get_input_line(prompt=prompt)
+            except EOFError:
+                return
+            if newline == until:
+                return
+            yield newline
+
     def cmdloop(self):
         """
         Adapted from cmd.Cmd's version, because there is literally no way with
@@ -1065,8 +1156,7 @@ class Shell(cmd.Cmd):
                                        debug=debug_completion, startsymbol='cqlshCommand')
 
     def set_prompt(self, prompt):
-        if self.prompt != '':
-            self.prompt = prompt
+        self.prompt = prompt
 
     def cql_protect_name(self, name):
         return cqlruleset.maybe_escape_name(name)
@@ -1339,6 +1429,125 @@ class Shell(cmd.Cmd):
 
     do_desc = do_describe
 
+    def do_copy(self, parsed):
+        r"""
+        COPY [cqlsh only]
+
+          Imports CSV data into a Cassandra table.
+
+        COPY <table_name> [ ( column [, ...] ) ]
+             FROM ( '<filename>' | STDIN )
+             [ WITH <option>='value' [AND ...] ];
+
+        Available options and defaults:
+
+          DELIMITER=','    - character that appears between records
+          QUOTE='"'        - quoting character to be used to quote fields
+          ESCAPE='\'       - character to appear before the QUOTE char when quoted
+          HEADER=false     - whether to ignore the first line
+
+        When entering CSV data on STDIN, you can use the sequence "\."
+        on a line by itself to end the data input.
+        """
+
+        ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
+        if ks is None:
+            ks = self.current_keyspace
+        cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
+        columns = parsed.get_binding('colnames', None)
+        if columns is None:
+            # default to all known columns
+            columns = self.get_column_names(ks, cf)
+        else:
+            columns = map(self.cql_unprotect_name, columns)
+        fname = parsed.get_binding('fname', None)
+        if fname is not None:
+            fname = os.path.expanduser(self.cql_unprotect_value(fname))
+        copyoptnames = map(str.lower, parsed.get_binding('optnames', ()))
+        copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
+        opts = dict(zip(copyoptnames, copyoptvals))
+
+        # when/if COPY TO is supported, this would be a good place to branch
+        # on direction.
+
+        timestart = time.time()
+        rows = self.perform_csv_import(ks, cf, columns, fname, opts)
+        timeend = time.time()
+
+        print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart))
+
+    def perform_csv_import(self, ks, cf, columns, fname, opts):
+        dialect_options = self.csv_dialect_defaults.copy()
+        if 'quote' in opts:
+            dialect_options['quotechar'] = opts.pop('quote')
+        if 'escape' in opts:
+            dialect_options['escapechar'] = opts.pop('escape')
+        if 'delimiter' in opts:
+            dialect_options['delimiter'] = opts.pop('delimiter')
+        header = bool(opts.pop('header', '').lower() == 'true')
+        if dialect_options['quotechar'] == dialect_options['escapechar']:
+            dialect_options['doublequote'] = True
+            del dialect_options['escapechar']
+
+        if opts:
+            self.printerr('Unrecognized COPY FROM options: %s'
+                          % ', '.join(opts.keys()))
+            return 0
+
+        if fname is None:
+            do_close = False
+            print "[Use \. on a line by itself to end input]"
+            linesource = self.use_stdin_reader(prompt='[copy] ', until=r'\.')
+        else:
+            do_close = True
+            try:
+                linesource = open(fname, 'r')
+            except IOError, e:
+                self.printerr("Can't open %r for reading: %s" % (fname, e))
+                return 0
+        if header:
+            linesource.next()
+
+        prepq = self.prep_import_insert(ks, cf, columns)
+        try:
+            reader = csv.reader(linesource, **dialect_options)
+            for rownum, row in enumerate(reader):
+                if len(row) != len(columns):
+                    self.printerr("Record #%d (line %d) has the wrong number of fields "
+                                  "(%d instead of %d)."
+                                  % (rownum, reader.line_num, len(row), len(columns)))
+                    return rownum
+                if not self.do_import_insert(prepq, row):
+                    self.printerr("Aborting import at record #%d (line %d). "
+                                  "Previously-inserted values still present."
+                                  % (rownum, reader.line_num))
+                    return rownum
+        finally:
+            if do_close:
+                linesource.close()
+            elif self.tty:
+                print
+        return rownum + 1
+
+    def prep_import_insert(self, ks, cf, columns):
+        # would be nice to be able to use a prepared query here, but in order
+        # to use that interface, we'd need to have all the input as native
+        # values already, reading them from text just like the various
+        # Cassandra cql types do. Better just to submit them all as intact
+        # CQL string literals and let Cassandra do its thing.
+        return 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (
+            self.cql_protect_name(ks),
+            self.cql_protect_name(cf),
+            ', '.join(map(self.cql_protect_name, columns))
+        )
+
+    def do_import_insert(self, prepq, rowvalues):
+        valstring = ', '.join(map(self.cql_protect_value, rowvalues))
+        cql = prepq % valstring
+        if self.debug:
+            print "Import using CQL: %s" % cql
+        return self.perform_statement(cql)
+
     def do_show(self, parsed):
         """
         SHOW [cqlsh only]
@@ -1457,7 +1666,7 @@ class Shell(cmd.Cmd):
         """
 
         fname = parsed.get_binding('fname')
-        fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname))
+        fname = os.path.expanduser(self.cql_unprotect_value(fname))
         try:
             f = open(fname, 'r')
         except IOError, e:
@@ -1521,7 +1730,7 @@ class Shell(cmd.Cmd):
                           ' to disable.' % (self.query_out.name,))
             return
 
-        fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname))
+        fname = os.path.expanduser(self.cql_unprotect_value(fname))
         try:
             f = open(fname, 'a')
         except IOError, e:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 8785a65..2bc7ef9 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -267,8 +267,8 @@ def unreserved_keyword_completer(ctxt, cass):
     return ()
 
 def get_cf_layout(ctxt, cass):
-    ks = ctxt.get_binding('ksname', None)
-    cf = ctxt.get_binding('cfname')
+    ks = dequote_name(ctxt.get_binding('ksname', None))
+    cf = dequote_name(ctxt.get_binding('cfname'))
     return cass.get_columnfamily_layout(ks, cf)
 
 syntax_rules += r'''

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index 3866f3c..aa29c60 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -215,22 +215,25 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
         # inside a string literal
         prefix = None
         dequoter = util.identity
+        lasttype = None
         if tokens:
-            if tokens[-1][0] == 'unclosedString':
+            lasttype = tokens[-1][0]
+            if lasttype == 'unclosedString':
                 prefix = self.token_dequote(tokens[-1])
                 tokens = tokens[:-1]
                 partial = prefix + partial
                 dequoter = self.dequote_value
                 requoter = self.escape_value
-            elif tokens[-1][0] == 'unclosedName':
+            elif lasttype == 'unclosedName':
                 prefix = self.token_dequote(tokens[-1])
                 tokens = tokens[:-1]
                 partial = prefix + partial
                 dequoter = self.dequote_name
                 requoter = self.escape_name
-            elif tokens[-1][0] == 'unclosedComment':
+            elif lasttype == 'unclosedComment':
                 return []
         bindings['partial'] = partial
+        bindings['*LASTTYPE*'] = lasttype
         bindings['*SRC*'] = text
 
         # find completions for the position
@@ -302,6 +305,7 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
         init_bindings = {'cassandra_conn': cassandra_conn}
         if debug:
             init_bindings['*DEBUG*'] = True
+            print "cql_complete(%r, partial=%r)" % (text, partial)
 
         completions, hints = self.cql_complete_single(text, partial, init_bindings,
                                                       startsymbol=startsymbol)
@@ -495,6 +499,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
          ;
 <colname> ::= <term>
             | <identifier>
+            | nocomplete=<K_KEY>
             ;
 
 <statementBody> ::= <useStatement>
@@ -528,6 +533,10 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
 <columnFamilyName> ::= ( ksname=<name> "." )? cfname=<name> ;
 '''
 
+@completer_for('colname', 'nocomplete')
+def nocomplete(ctxt, cass):
+    return ()
+
 @completer_for('consistencylevel', 'cl')
 def cl_completer(ctxt, cass):
     return CqlRuleSet.consistency_levels