You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/03/08 10:27:00 UTC
[21/23] cassandra git commit: Merge commit
'49c616cf0fc9c8d4649e2ec71a07fb9fd7831318' into cassandra-3.5
Merge commit '49c616cf0fc9c8d4649e2ec71a07fb9fd7831318' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6329d54a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6329d54a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6329d54a
Branch: refs/heads/cassandra-3.5
Commit: 6329d54a6c802e703902ee6cf842e983820d144b
Parents: c9e9b62 49c616c
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 8 10:25:32 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 8 10:25:54 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/cqlsh.py | 33 +-
pylib/cqlshlib/copyutil.py | 1173 +++++++++++++++++++++++++--------------
pylib/cqlshlib/util.py | 35 +-
pylib/setup.py | 2 +
5 files changed, 824 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c791139,5b92143..c88ae54
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -7,42 -7,11 +7,43 @@@ Merged from 2.2
* Only log yaml config once, at startup (CASSANDRA-11217)
* Reference leak with parallel repairs on the same table (CASSANDRA-11215)
Merged from 2.1:
- * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
+ * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
+ * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
-3.0.4
- * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+
+3.4
+ * (cqlsh) add cqlshrc option to always connect using ssl (CASSANDRA-10458)
+ * Cleanup a few resource warnings (CASSANDRA-11085)
+ * Allow custom tracing implementations (CASSANDRA-10392)
+ * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637)
+ * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205)
+ * fix TrackerTest to handle new notifications (CASSANDRA-11178)
+ * add SASI validation for partitioner and complex columns (CASSANDRA-11169)
+ * Add caching of encrypted credentials in PasswordAuthenticator (CASSANDRA-7715)
+ * fix SASI memtable switching on flush (CASSANDRA-11159)
+ * Remove duplicate offline compaction tracking (CASSANDRA-11148)
+ * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130)
+ * Support long name output for nodetool commands (CASSANDRA-7950)
+ * Encrypted hints (CASSANDRA-11040)
+ * SASI index options validation (CASSANDRA-11136)
+ * Optimize disk seek using min/max column name meta data when the LIMIT clause is used
+ (CASSANDRA-8180)
+ * Add LIKE support to CQL3 (CASSANDRA-11067)
+ * Generic Java UDF types (CASSANDRA-10819)
+ * cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428)
+ * Set javac encoding to utf-8 (CASSANDRA-11077)
+ * Integrate SASI index into Cassandra (CASSANDRA-10661)
+ * Add --skip-flush option to nodetool snapshot
+ * Skip values for non-queried columns (CASSANDRA-10657)
+ * Add support for secondary indexes on static columns (CASSANDRA-8103)
+ * CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051)
+ * Add metric for number of dropped mutations (CASSANDRA-10866)
+ * Simplify row cache invalidation code (CASSANDRA-10396)
+ * Support user-defined compaction through nodetool (CASSANDRA-10660)
+ * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981)
+ * Add nodetool gettimeout and settimeout commands (CASSANDRA-10953)
+ * Add 3.0 metadata to sstablemetadata output (CASSANDRA-10838)
+Merged from 3.0:
* MV should only query complex columns included in the view (CASSANDRA-11069)
* Failed aggregate creation breaks server permanently (CASSANDRA-11064)
* Add sstabledump tool (CASSANDRA-7464)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/bin/cqlsh.py
----------------------------------------------------------------------
diff --cc bin/cqlsh.py
index 78fedeb,83dbeed..d007d75
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@@ -480,7 -475,7 +480,7 @@@ COPY_COMMON_OPTIONS = ['DELIMITER', 'QU
'MAXATTEMPTS', 'REPORTFREQUENCY', 'DECIMALSEP', 'THOUSANDSSEP', 'BOOLSTYLE',
'NUMPROCESSES', 'CONFIGFILE', 'RATEFILE']
COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE', 'MAXROWS',
- 'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'TTL']
- 'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'PREPAREDSTATEMENTS']
++ 'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'PREPAREDSTATEMENTS', 'TTL']
COPY_TO_OPTIONS = ['ENCODING', 'PAGESIZE', 'PAGETIMEOUT', 'BEGINTOKEN', 'ENDTOKEN', 'MAXOUTPUTSIZE', 'MAXREQUESTS']
@@@ -1865,7 -1884,11 +1885,12 @@@ class Shell(cmd.Cmd)
MAXINSERTERRORS=-1 - the maximum global number of insert errors, -1 means no maximum
ERRFILE='' - a file where to store all rows that could not be imported, by default this is
import_ks_table.err where <ks> is your keyspace and <table> is your table name.
+ PREPAREDSTATEMENTS=True - whether to use prepared statements when importing, by default True. Set this to
+ False if you don't mind shifting data parsing to the cluster. The cluster will also
+ have to compile every batch statement. For large and oversized clusters
+ this will result in a faster import but for smaller clusters it may generate
+ timeouts.
+ TTL=3600 - the time to live in seconds, by default data will not expire
Available COPY TO options and defaults:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index 95da679,6be990d..e690e82
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -187,7 -275,7 +275,8 @@@ class CopyTask(object)
copy_options['errfile'] = safe_normpath(opts.pop('errfile', 'import_%s_%s.err' % (self.ks, self.table,)))
copy_options['ratefile'] = safe_normpath(opts.pop('ratefile', ''))
copy_options['maxoutputsize'] = int(opts.pop('maxoutputsize', '-1'))
+ copy_options['preparedstatements'] = bool(opts.pop('preparedstatements', 'true').lower() == 'true')
+ copy_options['ttl'] = int(opts.pop('ttl', -1))
self.check_options(copy_options)
return CopyOptions(copy=copy_options, dialect=dialect_options, unrecognized=opts)
@@@ -1655,8 -1965,12 +1969,13 @@@ class ImportProcess(ChildProcess)
self.max_attempts = options.copy['maxattempts']
self.min_batch_size = options.copy['minbatchsize']
self.max_batch_size = options.copy['maxbatchsize']
+ self.use_prepared_statements = options.copy['preparedstatements']
+ self.ttl = options.copy['ttl']
+ self.dialect_options = options.dialect
self._session = None
+ self.query = None
+ self.conv = None
+ self.make_statement = None
@property
def session(self):
@@@ -1700,69 -2014,88 +2019,91 @@@
self._session.cluster.shutdown()
ChildProcess.close(self)
- def run_counter(self, table_meta):
- """
- Main run method for tables that contain counter columns.
- """
- query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
-
- # We prepare a query statement to find out the types of the partition key columns so we can
- # route the update query to the correct replicas. As far as I understood this is the easiest
- # way to find out the types of the partition columns, we will never use this prepared statement
- where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
- select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.table), where_clause)
- conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
-
- while True:
- batch = self.inmsg.get()
- try:
- for b in self.split_batches(batch, conv):
- self.send_counter_batch(query, conv, b)
+ def make_params(self):
+ metadata = self.session.cluster.metadata
+ table_meta = metadata.keyspaces[self.ks].tables[self.table]
+
+ prepared_statement = None
+ is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.valid_columns])
+ if is_counter:
+ query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
+ make_statement = self.wrap_make_statement(self.make_counter_batch_statement)
+ elif self.use_prepared_statements:
+ query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+ protect_name(self.table),
+ ', '.join(protect_names(self.valid_columns),),
+ ', '.join(['?' for _ in self.valid_columns]))
-
++ if self.ttl >= 0:
++ query += 'USING TTL %s' % (self.ttl,)
+ query = self.session.prepare(query)
+ query.consistency_level = self.consistency_level
+ prepared_statement = query
+ make_statement = self.wrap_make_statement(self.make_prepared_batch_statement)
+ else:
+ query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (protect_name(self.ks),
+ protect_name(self.table),
+ ', '.join(protect_names(self.valid_columns),))
++ if self.ttl >= 0:
++ query += 'USING TTL %s' % (self.ttl,)
+ make_statement = self.wrap_make_statement(self.make_non_prepared_batch_statement)
- except Exception, exc:
- self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
- if self.debug:
- traceback.print_exc(exc)
+ conv = ImportConversion(self, table_meta, prepared_statement)
+ tm = TokenMap(self.ks, self.hostname, self.local_dc, self.session)
+ return query, conv, tm, make_statement
- def run_normal(self, table_meta):
+ def inner_run(self, query, conv, tm, make_statement):
"""
- Main run method for normal tables, i.e. tables that do not contain counter columns.
+ Main run method. Note that we bind self methods that are called inside loops
+ for performance reasons.
"""
- query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
- protect_name(self.table),
- ', '.join(protect_names(self.valid_columns),),
- ', '.join(['?' for _ in self.valid_columns]))
- if self.ttl >= 0:
- query += 'USING TTL %s' % (self.ttl,)
+ self.query = query
+ self.conv = conv
+ self.make_statement = make_statement
- query_statement = self.session.prepare(query)
- query_statement.consistency_level = self.consistency_level
- conv = ImportConversion(self, table_meta, query_statement)
+ convert_rows = self.convert_rows
+ split_into_batches = self.split_into_batches
+ result_callback = self.result_callback
+ err_callback = self.err_callback
+ session = self.session
while True:
- batch = self.inmsg.get()
+ chunk = self.inmsg.recv()
+ if chunk is None:
+ break
+
try:
- for b in self.split_batches(batch, conv):
- self.send_normal_batch(conv, query_statement, b)
+ chunk['rows'] = convert_rows(conv, chunk)
+ for replicas, batch in split_into_batches(chunk, conv, tm):
+ statement = make_statement(query, conv, chunk, batch, replicas)
+ future = session.execute_async(statement)
+ future.add_callbacks(callback=result_callback, callback_args=(batch, chunk),
+ errback=err_callback, errback_args=(batch, chunk, replicas))
except Exception, exc:
- self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
- if self.debug:
- traceback.print_exc(exc)
+ self.report_error(exc, chunk, chunk['rows'])
- def send_counter_batch(self, query_text, conv, batch):
- if self.test_failures and self.maybe_inject_failures(batch):
- return
+ def wrap_make_statement(self, inner_make_statement):
+ def make_statement(query, conv, chunk, batch, replicas):
+ try:
+ return inner_make_statement(query, conv, batch, replicas)
+ except Exception, exc:
+ print "Failed to make batch statement: {}".format(exc)
+ self.report_error(exc, chunk, batch['rows'])
+ return None
- error_rows = []
- batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+ def make_statement_with_failures(query, conv, chunk, batch, replicas):
+ failed_batch = self.maybe_inject_failures(batch)
+ if failed_batch:
+ return failed_batch
+ return make_statement(query, conv, chunk, batch, replicas)
- for r in batch['rows']:
- row = self.filter_row_values(r)
- if len(row) != len(self.valid_columns):
- error_rows.append(row)
- continue
+ return make_statement_with_failures if self.test_failures else make_statement
+ def make_counter_batch_statement(self, query, conv, batch, replicas):
+ statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+ statement.replicas = replicas
+ statement.keyspace = self.ks
+ for row in batch['rows']:
where_clause = []
set_clause = []
for i, value in enumerate(row):