You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/06/18 20:25:30 UTC
[2/3] git commit: cqlsh: add COPY command to load data from CSV flat
files Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4012
cqlsh: add COPY command to load data from CSV flat files
Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ba2631e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ba2631e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ba2631e
Branch: refs/heads/trunk
Commit: 0ba2631ee228bdefaba61a53d723a65107ca044d
Parents: 0cc168a
Author: Brandon Williams <br...@apache.org>
Authored: Mon Jun 18 13:24:32 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Jun 18 13:24:32 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/cqlsh | 227 +++++++++++++++++++++++++++++++++--
pylib/cqlshlib/cql3handling.py | 4 +-
pylib/cqlshlib/cqlhandling.py | 15 ++-
4 files changed, 233 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 693b03b..ec03ca6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
* (cql3) Reject (not yet supported) creation of 2ndardy indexes on tables with
composite primary keys (CASSANDRA-4328)
* Set JVM stack size to 160k for java 7 (CASSANDRA-4275)
+ * cqlsh: add COPY command to load data from CSV flat files (CASSANDRA-4012)
Merged from 1.0:
* Set gc_grace on index CF to 0 (CASSANDRA-4314)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index fecd472..842a313 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -36,7 +36,7 @@ version = "2.2.0"
from StringIO import StringIO
from itertools import groupby
-from contextlib import contextmanager
+from contextlib import contextmanager, closing
from glob import glob
from functools import partial
from collections import defaultdict
@@ -52,6 +52,7 @@ import locale
import re
import platform
import warnings
+import csv
# cqlsh should run correctly when run out of a Cassandra source tree,
# out of an unpacked Cassandra tarball, and after a proper package install.
@@ -189,6 +190,7 @@ cqlsh_extra_syntax_rules = r'''
| <assumeCommand>
| <sourceCommand>
| <captureCommand>
+ | <copyCommand>
| <debugCommand>
| <helpCommand>
| <exitCommand>
@@ -220,6 +222,15 @@ cqlsh_extra_syntax_rules = r'''
<captureCommand> ::= "CAPTURE" ( fname=( <stringLiteral> | "OFF" ) )?
;
+<copyCommand> ::= "COPY" cf=<columnFamilyName>
+ ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )?
+ "FROM" ( fname=<stringLiteral> | "STDIN" )
+ ( "WITH" <copyOption> ( "AND" <copyOption> )* )?
+ ;
+
+<copyOption> ::= [optnames]=<cfOptionName> "=" [optvals]=<cfOptionVal>
+ ;
+
# avoiding just "DEBUG" so that this rule doesn't get treated as a terminal
<debugCommand> ::= "DEBUG" "THINGS"?
;
@@ -272,6 +283,41 @@ cqlsh_syntax_completer('sourceCommand', 'fname') \
cqlsh_syntax_completer('captureCommand', 'fname') \
(complete_source_quoted_filename)
+@cqlsh_syntax_completer('copyCommand', 'fname')
+def copy_fname_completer(ctxt, cqlsh):
+ lasttype = ctxt.get_binding('*LASTTYPE*')
+ if lasttype == 'unclosedString':
+ return complete_source_quoted_filename(ctxt, cqlsh)
+ partial = ctxt.get_binding('partial')
+ if partial == '':
+ return ["'"]
+ return ()
+
+@cqlsh_syntax_completer('copyCommand', 'colnames')
+def complete_copy_column_names(ctxt, cqlsh):
+ existcols = map(cqlsh.cql_unprotect_name, ctxt.get_binding('colnames', ()))
+ ks = cqlsh.cql_unprotect_name(ctxt.get_binding('ksname', None))
+ cf = cqlsh.cql_unprotect_name(ctxt.get_binding('cfname'))
+ colnames = cqlsh.get_column_names(ks, cf)
+ if len(existcols) == 0:
+ return [colnames[0]]
+ return set(colnames[1:]) - set(existcols)
+
+COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER')
+
+@cqlsh_syntax_completer('copyOption', 'optnames')
+def complete_copy_options(ctxt, cqlsh):
+ optnames = map(str.upper, ctxt.get_binding('optnames', ()))
+ return set(COPY_OPTIONS) - set(optnames)
+
+@cqlsh_syntax_completer('copyOption', 'optvals')
+def complete_copy_opt_values(ctxt, cqlsh):
+ optnames = ctxt.get_binding('optnames', ())
+ lastopt = optnames[-1].lower()
+ if lastopt == 'header':
+ return ['true', 'false']
+ return [cqlhandling.Hint('<single_character_string>')]
+
class NoKeyspaceError(Exception):
pass
@@ -469,6 +515,22 @@ def show_warning_without_quoting_line(message, category, filename, lineno, file=
warnings.showwarning = show_warning_without_quoting_line
warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure)
+def describe_interval(seconds):
+ desc = []
+ for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')):
+ num = int(seconds) / length
+ if num > 0:
+ desc.append('%d %s' % (num, unit))
+ if num > 1:
+ desc[-1] += 's'
+ seconds %= length
+ words = '%.03f seconds' % seconds
+ if len(desc) > 1:
+ words = ', '.join(desc) + ', and ' + words
+ elif len(desc) == 1:
+ words = desc[0] + ' and ' + words
+ return words
+
class Shell(cmd.Cmd):
default_prompt = "cqlsh> "
continue_prompt = " ... "
@@ -481,6 +543,8 @@ class Shell(cmd.Cmd):
debug = False
stop = False
shunted_query_out = None
+ csv_dialect_defaults = dict(delimiter=',', doublequote=False,
+ escapechar='\\', quotechar='"')
def __init__(self, hostname, port, color=False, username=None,
password=None, encoding=None, stdin=None, tty=True,
@@ -522,7 +586,6 @@ class Shell(cmd.Cmd):
stdin = sys.stdin
self.tty = tty
if tty:
- self.prompt = None
self.reset_prompt()
self.report_connection()
print 'Use HELP for help.'
@@ -661,6 +724,25 @@ class Shell(cmd.Cmd):
filterable.add(cm.name)
return filterable
+ def get_column_names(self, ksname, cfname):
+ if ksname is None:
+ ksname = self.current_keyspace
+ if self.cqlver_atleast(3):
+ return self.get_column_names_from_layout(ksname, cfname)
+ else:
+ return self.get_column_names_from_cfdef(ksname, cfname)
+
+ def get_column_names_from_layout(self, ksname, cfname):
+ layout = self.get_columnfamily_layout(ksname, cfname)
+ return [col.name for col in layout.columns]
+
+ def get_column_names_from_cfdef(self, ksname, cfname):
+ cfdef = self.get_columnfamily(cfname, ksname=ksname)
+ key_alias = cfdef.key_alias
+ if key_alias is None:
+ key_alias = 'KEY'
+ return [key_alias] + [cm.name for cm in cfdef.column_metadata]
+
# ===== thrift-dependent parts =====
def get_cluster_name(self):
@@ -758,16 +840,25 @@ class Shell(cmd.Cmd):
def get_input_line(self, prompt=''):
if self.tty:
- line = raw_input(self.prompt) + '\n'
+ line = raw_input(prompt) + '\n'
else:
- sys.stdout.write(self.prompt)
- sys.stdout.flush()
line = self.stdin.readline()
if not len(line):
raise EOFError
self.lineno += 1
return line
+ def use_stdin_reader(self, until='', prompt=''):
+ until += '\n'
+ while True:
+ try:
+ newline = self.get_input_line(prompt=prompt)
+ except EOFError:
+ return
+ if newline == until:
+ return
+ yield newline
+
def cmdloop(self):
"""
Adapted from cmd.Cmd's version, because there is literally no way with
@@ -1065,8 +1156,7 @@ class Shell(cmd.Cmd):
debug=debug_completion, startsymbol='cqlshCommand')
def set_prompt(self, prompt):
- if self.prompt != '':
- self.prompt = prompt
+ self.prompt = prompt
def cql_protect_name(self, name):
return cqlruleset.maybe_escape_name(name)
@@ -1339,6 +1429,125 @@ class Shell(cmd.Cmd):
do_desc = do_describe
+ def do_copy(self, parsed):
+ r"""
+ COPY [cqlsh only]
+
+ Imports CSV data into a Cassandra table.
+
+ COPY <table_name> [ ( column [, ...] ) ]
+ FROM ( '<filename>' | STDIN )
+ [ WITH <option>='value' [AND ...] ];
+
+ Available options and defaults:
+
+ DELIMITER=',' - character that appears between records
+ QUOTE='"' - quoting character to be used to quote fields
+ ESCAPE='\' - character to appear before the QUOTE char when quoted
+ HEADER=false - whether to ignore the first line
+
+ When entering CSV data on STDIN, you can use the sequence "\."
+ on a line by itself to end the data input.
+ """
+
+ ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
+ if ks is None:
+ ks = self.current_keyspace
+ cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
+ columns = parsed.get_binding('colnames', None)
+ if columns is None:
+ # default to all known columns
+ columns = self.get_column_names(ks, cf)
+ else:
+ columns = map(self.cql_unprotect_name, columns)
+ fname = parsed.get_binding('fname', None)
+ if fname is not None:
+ fname = os.path.expanduser(self.cql_unprotect_value(fname))
+ copyoptnames = map(str.lower, parsed.get_binding('optnames', ()))
+ copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
+ opts = dict(zip(copyoptnames, copyoptvals))
+
+ # when/if COPY TO is supported, this would be a good place to branch
+ # on direction.
+
+ timestart = time.time()
+ rows = self.perform_csv_import(ks, cf, columns, fname, opts)
+ timeend = time.time()
+
+ print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart))
+
+ def perform_csv_import(self, ks, cf, columns, fname, opts):
+ dialect_options = self.csv_dialect_defaults.copy()
+ if 'quote' in opts:
+ dialect_options['quotechar'] = opts.pop('quote')
+ if 'escape' in opts:
+ dialect_options['escapechar'] = opts.pop('escape')
+ if 'delimiter' in opts:
+ dialect_options['delimiter'] = opts.pop('delimiter')
+ header = bool(opts.pop('header', '').lower() == 'true')
+ if dialect_options['quotechar'] == dialect_options['escapechar']:
+ dialect_options['doublequote'] = True
+ del dialect_options['escapechar']
+
+ if opts:
+ self.printerr('Unrecognized COPY FROM options: %s'
+ % ', '.join(opts.keys()))
+ return 0
+
+ if fname is None:
+ do_close = False
+ print "[Use \. on a line by itself to end input]"
+ linesource = self.use_stdin_reader(prompt='[copy] ', until=r'\.')
+ else:
+ do_close = True
+ try:
+ linesource = open(fname, 'r')
+ except IOError, e:
+ self.printerr("Can't open %r for reading: %s" % (fname, e))
+ return 0
+ if header:
+ linesource.next()
+
+ prepq = self.prep_import_insert(ks, cf, columns)
+ try:
+ reader = csv.reader(linesource, **dialect_options)
+ for rownum, row in enumerate(reader):
+ if len(row) != len(columns):
+ self.printerr("Record #%d (line %d) has the wrong number of fields "
+ "(%d instead of %d)."
+ % (rownum, reader.line_num, len(row), len(columns)))
+ return rownum
+ if not self.do_import_insert(prepq, row):
+ self.printerr("Aborting import at record #%d (line %d). "
+ "Previously-inserted values still present."
+ % (rownum, reader.line_num))
+ return rownum
+ finally:
+ if do_close:
+ linesource.close()
+ elif self.tty:
+ print
+ return rownum + 1
+
+ def prep_import_insert(self, ks, cf, columns):
+ # would be nice to be able to use a prepared query here, but in order
+ # to use that interface, we'd need to have all the input as native
+ # values already, reading them from text just like the various
+ # Cassandra cql types do. Better just to submit them all as intact
+ # CQL string literals and let Cassandra do its thing.
+ return 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (
+ self.cql_protect_name(ks),
+ self.cql_protect_name(cf),
+ ', '.join(map(self.cql_protect_name, columns))
+ )
+
+ def do_import_insert(self, prepq, rowvalues):
+ valstring = ', '.join(map(self.cql_protect_value, rowvalues))
+ cql = prepq % valstring
+ if self.debug:
+ print "Import using CQL: %s" % cql
+ return self.perform_statement(cql)
+
def do_show(self, parsed):
"""
SHOW [cqlsh only]
@@ -1457,7 +1666,7 @@ class Shell(cmd.Cmd):
"""
fname = parsed.get_binding('fname')
- fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname))
+ fname = os.path.expanduser(self.cql_unprotect_value(fname))
try:
f = open(fname, 'r')
except IOError, e:
@@ -1521,7 +1730,7 @@ class Shell(cmd.Cmd):
' to disable.' % (self.query_out.name,))
return
- fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname))
+ fname = os.path.expanduser(self.cql_unprotect_value(fname))
try:
f = open(fname, 'a')
except IOError, e:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 8785a65..2bc7ef9 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -267,8 +267,8 @@ def unreserved_keyword_completer(ctxt, cass):
return ()
def get_cf_layout(ctxt, cass):
- ks = ctxt.get_binding('ksname', None)
- cf = ctxt.get_binding('cfname')
+ ks = dequote_name(ctxt.get_binding('ksname', None))
+ cf = dequote_name(ctxt.get_binding('cfname'))
return cass.get_columnfamily_layout(ks, cf)
syntax_rules += r'''
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index 3866f3c..aa29c60 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -215,22 +215,25 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
# inside a string literal
prefix = None
dequoter = util.identity
+ lasttype = None
if tokens:
- if tokens[-1][0] == 'unclosedString':
+ lasttype = tokens[-1][0]
+ if lasttype == 'unclosedString':
prefix = self.token_dequote(tokens[-1])
tokens = tokens[:-1]
partial = prefix + partial
dequoter = self.dequote_value
requoter = self.escape_value
- elif tokens[-1][0] == 'unclosedName':
+ elif lasttype == 'unclosedName':
prefix = self.token_dequote(tokens[-1])
tokens = tokens[:-1]
partial = prefix + partial
dequoter = self.dequote_name
requoter = self.escape_name
- elif tokens[-1][0] == 'unclosedComment':
+ elif lasttype == 'unclosedComment':
return []
bindings['partial'] = partial
+ bindings['*LASTTYPE*'] = lasttype
bindings['*SRC*'] = text
# find completions for the position
@@ -302,6 +305,7 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
init_bindings = {'cassandra_conn': cassandra_conn}
if debug:
init_bindings['*DEBUG*'] = True
+ print "cql_complete(%r, partial=%r)" % (text, partial)
completions, hints = self.cql_complete_single(text, partial, init_bindings,
startsymbol=startsymbol)
@@ -495,6 +499,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
;
<colname> ::= <term>
| <identifier>
+ | nocomplete=<K_KEY>
;
<statementBody> ::= <useStatement>
@@ -528,6 +533,10 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ;
<columnFamilyName> ::= ( ksname=<name> "." )? cfname=<name> ;
'''
+@completer_for('colname', 'nocomplete')
+def nocomplete(ctxt, cass):
+ return ()
+
@completer_for('consistencylevel', 'cl')
def cl_completer(ctxt, cass):
return CqlRuleSet.consistency_levels