You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/25 19:09:23 UTC
[3/4] git commit: cqlsh: add a COPY TO command Patch by paul cannon,
reviewed by brandonwilliams for CASSANDRA-4434
cqlsh: add a COPY TO command
Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4434
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a633947
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a633947
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a633947
Branch: refs/heads/trunk
Commit: 9a63394765de28160d576c9285be68587e222a86
Parents: 41c9ba6
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 24 13:57:19 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 24 13:57:19 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/cqlsh | 126 ++++++++++++++++++++++++++++++++++++++++++++---------
2 files changed, 105 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0885387..638574c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@ Merged from 1.0:
* Fix LCS splitting sstable base on uncompressed size (CASSANDRA-4419)
* Bootstraps that fail are detected upon restart and will retry safely without
needing to delete existing data first (CASSANDRA-4427)
+ * (cqlsh) add a COPY TO command to copy a CF to a CSV file (CASSANDRA-4434)
1.1.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a633947/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index 574d49b..c67a818 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -224,7 +224,8 @@ cqlsh_extra_syntax_rules = r'''
<copyCommand> ::= "COPY" cf=<columnFamilyName>
( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )?
- "FROM" ( fname=<stringLiteral> | "STDIN" )
+ ( dir="FROM" ( fname=<stringLiteral> | "STDIN" )
+ | dir="TO" ( fname=<stringLiteral> | "STDOUT" ) )
( "WITH" <copyOption> ( "AND" <copyOption> )* )?
;
@@ -303,12 +304,16 @@ def complete_copy_column_names(ctxt, cqlsh):
return [colnames[0]]
return set(colnames[1:]) - set(existcols)
-COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER')
+COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'ENCODING', 'NULL')
@cqlsh_syntax_completer('copyOption', 'optnames')
def complete_copy_options(ctxt, cqlsh):
optnames = map(str.upper, ctxt.get_binding('optnames', ()))
- return set(COPY_OPTIONS) - set(optnames)
+ direction = ctxt.get_binding('dir').upper()
+ opts = set(COPY_OPTIONS) - set(optnames)
+ if direction == 'FROM':
+ opts -= ('ENCODING', 'NULL')
+ return opts
@cqlsh_syntax_completer('copyOption', 'optvals')
def complete_copy_opt_values(ctxt, cqlsh):
@@ -448,13 +453,13 @@ def unix_time_from_uuid1(u):
return (u.get_time() - 0x01B21DD213814000) / 10000000.0
def format_value(val, casstype, output_encoding, addcolor=False, time_format='',
- float_precision=3, colormap=DEFAULT_VALUE_COLORS):
+ float_precision=3, colormap=DEFAULT_VALUE_COLORS, nullval='null'):
color = colormap['default']
coloredval = None
displaywidth = None
if val is None:
- bval = 'null'
+ bval = nullval
color = colormap['error']
elif isinstance(val, DecodeError):
casstype = 'BytesType'
@@ -727,7 +732,7 @@ class Shell(cmd.Cmd):
def get_column_names(self, ksname, cfname):
if ksname is None:
ksname = self.current_keyspace
- if self.cqlver_atleast(3):
+ if ksname != 'system' and self.cqlver_atleast(3):
return self.get_column_names_from_layout(ksname, cfname)
else:
return self.get_column_names_from_cfdef(ksname, cfname)
@@ -1433,6 +1438,9 @@ class Shell(cmd.Cmd):
COPY <table_name> [ ( column [, ...] ) ]
FROM ( '<filename>' | STDIN )
[ WITH <option>='value' [AND ...] ];
+ COPY <table_name> [ ( column [, ...] ) ]
+ TO ( '<filename>' | STDOUT )
+ [ WITH <option>='value' [AND ...] ];
Available options and defaults:
@@ -1440,6 +1448,8 @@ class Shell(cmd.Cmd):
QUOTE='"' - quoting character to be used to quote fields
ESCAPE='\' - character to appear before the QUOTE char when quoted
HEADER=false - whether to ignore the first line
+ ENCODING='utf8' - encoding for CSV output (COPY TO only)
+ NULL='' - string that represents a null value (COPY TO only)
When entering CSV data on STDIN, you can use the sequence "\."
on a line by itself to end the data input.
@@ -1448,12 +1458,11 @@ class Shell(cmd.Cmd):
ks = self.cql_unprotect_name(parsed.get_binding('ksname', None))
if ks is None:
ks = self.current_keyspace
+ if ks is None:
+ raise NoKeyspaceError("Not in any keyspace.")
cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
columns = parsed.get_binding('colnames', None)
- if columns is None:
- # default to all known columns
- columns = self.get_column_names(ks, cf)
- else:
+ if columns is not None:
columns = map(self.cql_unprotect_name, columns)
fname = parsed.get_binding('fname', None)
if fname is not None:
@@ -1462,14 +1471,20 @@ class Shell(cmd.Cmd):
copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ()))
opts = dict(zip(copyoptnames, copyoptvals))
- # when/if COPY TO is supported, this would be a good place to branch
- # on direction.
-
timestart = time.time()
- rows = self.perform_csv_import(ks, cf, columns, fname, opts)
- timeend = time.time()
- print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart))
+ direction = parsed.get_binding('dir').upper()
+ if direction == 'FROM':
+ rows = self.perform_csv_import(ks, cf, columns, fname, opts)
+ verb = 'imported'
+ elif direction == 'TO':
+ rows = self.perform_csv_export(ks, cf, columns, fname, opts)
+ verb = 'exported'
+ else:
+ raise SyntaxError("Unknown direction %s" % direction)
+
+ timeend = time.time()
+ print "%d rows %s in %s." % (rows, verb, describe_interval(timeend - timestart))
def perform_csv_import(self, ks, cf, columns, fname, opts):
dialect_options = self.csv_dialect_defaults.copy()
@@ -1483,7 +1498,6 @@ class Shell(cmd.Cmd):
if dialect_options['quotechar'] == dialect_options['escapechar']:
dialect_options['doublequote'] = True
del dialect_options['escapechar']
-
if opts:
self.printerr('Unrecognized COPY FROM options: %s'
% ', '.join(opts.keys()))
@@ -1496,15 +1510,15 @@ class Shell(cmd.Cmd):
else:
do_close = True
try:
- linesource = open(fname, 'r')
+ linesource = open(fname, 'rb')
except IOError, e:
self.printerr("Can't open %r for reading: %s" % (fname, e))
return 0
- if header:
- linesource.next()
-
- prepq = self.prep_import_insert(ks, cf, columns)
try:
+ if header:
+ linesource.next()
+ prepq = self.prep_import_insert(ks, cf, columns)
+ rownum = -1
reader = csv.reader(linesource, **dialect_options)
for rownum, row in enumerate(reader):
if len(row) != len(columns):
@@ -1525,6 +1539,10 @@ class Shell(cmd.Cmd):
return rownum + 1
def prep_import_insert(self, ks, cf, columns):
+ if columns is None:
+ # default to all known columns
+ columns = self.get_column_names(ks, cf)
+
# would be nice to be able to use a prepared query here, but in order
# to use that interface, we'd need to have all the input as native
# values already, reading them from text just like the various
@@ -1543,6 +1561,70 @@ class Shell(cmd.Cmd):
print "Import using CQL: %s" % cql
return self.perform_statement(cql)
+ def perform_csv_export(self, ks, cf, columns, fname, opts):
+ dialect_options = self.csv_dialect_defaults.copy()
+ if 'quote' in opts:
+ dialect_options['quotechar'] = opts.pop('quote')
+ if 'escape' in opts:
+ dialect_options['escapechar'] = opts.pop('escape')
+ if 'delimiter' in opts:
+ dialect_options['delimiter'] = opts.pop('delimiter')
+ encoding = opts.pop('encoding', 'utf8')
+ nullval = opts.pop('null', '')
+ header = bool(opts.pop('header', '').lower() == 'true')
+ if dialect_options['quotechar'] == dialect_options['escapechar']:
+ dialect_options['doublequote'] = True
+ del dialect_options['escapechar']
+
+ if opts:
+ self.printerr('Unrecognized COPY TO options: %s'
+ % ', '.join(opts.keys()))
+ return 0
+
+ if fname is None:
+ do_close = False
+ csvdest = sys.stdout
+ else:
+ do_close = True
+ try:
+ csvdest = open(fname, 'wb')
+ except IOError, e:
+ self.printerr("Can't open %r for writing: %s" % (fname, e))
+ return 0
+ try:
+ self.prep_export_dump(ks, cf, columns)
+ writer = csv.writer(csvdest, **dialect_options)
+ if header:
+ writer.writerow([d[0] for d in self.cursor.description])
+ rows = 0
+ while True:
+ row = self.cursor.fetchone()
+ if row is None:
+ break
+ fmt = lambda v, d: \
+ format_value(v, d[1], output_encoding=encoding, nullval=nullval,
+ time_format=self.display_time_format,
+ float_precision=self.display_float_precision).strval
+ writer.writerow(map(fmt, row, self.cursor.description))
+ rows += 1
+ finally:
+ if do_close:
+ csvdest.close()
+ return rows
+
+ def prep_export_dump(self, ks, cf, columns):
+ if columns is None:
+ columnlist = '*'
+ else:
+ columnlist = ', '.join(map(self.cql_protect_name, columns))
+ # this limit is pretty awful. would be better to use row-key-paging, so
+ # that the dump could be pretty easily aborted if necessary, but that
+ # can be kind of tricky with cql3. Punt for now, until the real cursor
+ # API is added in CASSANDRA-4415.
+ query = 'SELECT %s FROM %s.%s LIMIT 99999999' \
+ % (columnlist, self.cql_protect_name(ks), self.cql_protect_name(cf))
+ self.cursor.execute(query)
+
def do_show(self, parsed):
"""
SHOW [cqlsh only]