You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/03/08 10:27:01 UTC

[22/23] cassandra git commit: Merge commit '49c616cf0fc9c8d4649e2ec71a07fb9fd7831318' into cassandra-3.5

Merge commit '49c616cf0fc9c8d4649e2ec71a07fb9fd7831318' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6329d54a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6329d54a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6329d54a

Branch: refs/heads/trunk
Commit: 6329d54a6c802e703902ee6cf842e983820d144b
Parents: c9e9b62 49c616c
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Mar 8 10:25:32 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Mar 8 10:25:54 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                |    1 +
 bin/cqlsh.py               |   33 +-
 pylib/cqlshlib/copyutil.py | 1173 +++++++++++++++++++++++++--------------
 pylib/cqlshlib/util.py     |   35 +-
 pylib/setup.py             |    2 +
 5 files changed, 824 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c791139,5b92143..c88ae54
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -7,42 -7,11 +7,43 @@@ Merged from 2.2
   * Only log yaml config once, at startup (CASSANDRA-11217)
   * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  Merged from 2.1:
 - * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
+  * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
 + * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
  
 -3.0.4
 - * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 +
 +3.4
 + * (cqlsh) add cqlshrc option to always connect using ssl (CASSANDRA-10458)
 + * Cleanup a few resource warnings (CASSANDRA-11085)
 + * Allow custom tracing implementations (CASSANDRA-10392)
 + * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637)
 + * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205)
 + * fix TrackerTest to handle new notifications (CASSANDRA-11178)
 + * add SASI validation for partitioner and complex columns (CASSANDRA-11169)
 + * Add caching of encrypted credentials in PasswordAuthenticator (CASSANDRA-7715)
 + * fix SASI memtable switching on flush (CASSANDRA-11159)
 + * Remove duplicate offline compaction tracking (CASSANDRA-11148)
 + * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130)
 + * Support long name output for nodetool commands (CASSANDRA-7950)
 + * Encrypted hints (CASSANDRA-11040)
 + * SASI index options validation (CASSANDRA-11136)
 + * Optimize disk seek using min/max column name meta data when the LIMIT clause is used
 +   (CASSANDRA-8180)
 + * Add LIKE support to CQL3 (CASSANDRA-11067)
 + * Generic Java UDF types (CASSANDRA-10819)
 + * cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428)
 + * Set javac encoding to utf-8 (CASSANDRA-11077)
 + * Integrate SASI index into Cassandra (CASSANDRA-10661)
 + * Add --skip-flush option to nodetool snapshot
 + * Skip values for non-queried columns (CASSANDRA-10657)
 + * Add support for secondary indexes on static columns (CASSANDRA-8103)
 + * CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051)
 + * Add metric for number of dropped mutations (CASSANDRA-10866)
 + * Simplify row cache invalidation code (CASSANDRA-10396)
 + * Support user-defined compaction through nodetool (CASSANDRA-10660)
 + * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981)
 + * Add nodetool gettimeout and settimeout commands (CASSANDRA-10953)
 + * Add 3.0 metadata to sstablemetadata output (CASSANDRA-10838)
 +Merged from 3.0:
   * MV should only query complex columns included in the view (CASSANDRA-11069)
   * Failed aggregate creation breaks server permanently (CASSANDRA-11064)
   * Add sstabledump tool (CASSANDRA-7464)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/bin/cqlsh.py
----------------------------------------------------------------------
diff --cc bin/cqlsh.py
index 78fedeb,83dbeed..d007d75
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@@ -480,7 -475,7 +480,7 @@@ COPY_COMMON_OPTIONS = ['DELIMITER', 'QU
                         'MAXATTEMPTS', 'REPORTFREQUENCY', 'DECIMALSEP', 'THOUSANDSSEP', 'BOOLSTYLE',
                         'NUMPROCESSES', 'CONFIGFILE', 'RATEFILE']
  COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE', 'MAXROWS',
-                      'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'TTL']
 -                     'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'PREPAREDSTATEMENTS']
++                     'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 'MAXINSERTERRORS', 'ERRFILE', 'PREPAREDSTATEMENTS', 'TTL']
  COPY_TO_OPTIONS = ['ENCODING', 'PAGESIZE', 'PAGETIMEOUT', 'BEGINTOKEN', 'ENDTOKEN', 'MAXOUTPUTSIZE', 'MAXREQUESTS']
  
  
@@@ -1865,7 -1884,11 +1885,12 @@@ class Shell(cmd.Cmd)
            MAXINSERTERRORS=-1      - the maximum global number of insert errors, -1 means no maximum
            ERRFILE=''              - a file where to store all rows that could not be imported, by default this is
                                      import_ks_table.err where <ks> is your keyspace and <table> is your table name.
+           PREPAREDSTATEMENTS=True - whether to use prepared statements when importing, by default True. Set this to
+                                     False if you don't mind shifting data parsing to the cluster. The cluster will also
+                                     have to compile every batch statement. For large and oversized clusters
+                                     this will result in a faster import but for smaller clusters it may generate
+                                     timeouts.
 +          TTL=3600                - the time to live in seconds, by default data will not expire
  
          Available COPY TO options and defaults:
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6329d54a/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index 95da679,6be990d..e690e82
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -187,7 -275,7 +275,8 @@@ class CopyTask(object)
          copy_options['errfile'] = safe_normpath(opts.pop('errfile', 'import_%s_%s.err' % (self.ks, self.table,)))
          copy_options['ratefile'] = safe_normpath(opts.pop('ratefile', ''))
          copy_options['maxoutputsize'] = int(opts.pop('maxoutputsize', '-1'))
+         copy_options['preparedstatements'] = bool(opts.pop('preparedstatements', 'true').lower() == 'true')
 +        copy_options['ttl'] = int(opts.pop('ttl', -1))
  
          self.check_options(copy_options)
          return CopyOptions(copy=copy_options, dialect=dialect_options, unrecognized=opts)
@@@ -1655,8 -1965,12 +1969,13 @@@ class ImportProcess(ChildProcess)
          self.max_attempts = options.copy['maxattempts']
          self.min_batch_size = options.copy['minbatchsize']
          self.max_batch_size = options.copy['maxbatchsize']
+         self.use_prepared_statements = options.copy['preparedstatements']
 +        self.ttl = options.copy['ttl']
+         self.dialect_options = options.dialect
          self._session = None
+         self.query = None
+         self.conv = None
+         self.make_statement = None
  
      @property
      def session(self):
@@@ -1700,69 -2014,88 +2019,91 @@@
              self._session.cluster.shutdown()
          ChildProcess.close(self)
  
-     def run_counter(self, table_meta):
-         """
-         Main run method for tables that contain counter columns.
-         """
-         query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
- 
-         # We prepare a query statement to find out the types of the partition key columns so we can
-         # route the update query to the correct replicas. As far as I understood this is the easiest
-         # way to find out the types of the partition columns, we will never use this prepared statement
-         where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
-         select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.table), where_clause)
-         conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
- 
-         while True:
-             batch = self.inmsg.get()
-             try:
-                 for b in self.split_batches(batch, conv):
-                     self.send_counter_batch(query, conv, b)
+     def make_params(self):
+         metadata = self.session.cluster.metadata
+         table_meta = metadata.keyspaces[self.ks].tables[self.table]
+ 
+         prepared_statement = None
+         is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.valid_columns])
+         if is_counter:
+             query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
+             make_statement = self.wrap_make_statement(self.make_counter_batch_statement)
+         elif self.use_prepared_statements:
+             query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+                                                             protect_name(self.table),
+                                                             ', '.join(protect_names(self.valid_columns),),
+                                                             ', '.join(['?' for _ in self.valid_columns]))
 -
++            if self.ttl >= 0:
++                query += 'USING TTL %s' % (self.ttl,)
+             query = self.session.prepare(query)
+             query.consistency_level = self.consistency_level
+             prepared_statement = query
+             make_statement = self.wrap_make_statement(self.make_prepared_batch_statement)
+         else:
+             query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (protect_name(self.ks),
+                                                              protect_name(self.table),
+                                                              ', '.join(protect_names(self.valid_columns),))
++            if self.ttl >= 0:
++                query += 'USING TTL %s' % (self.ttl,)
+             make_statement = self.wrap_make_statement(self.make_non_prepared_batch_statement)
  
-             except Exception, exc:
-                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
-                 if self.debug:
-                     traceback.print_exc(exc)
+         conv = ImportConversion(self, table_meta, prepared_statement)
+         tm = TokenMap(self.ks, self.hostname, self.local_dc, self.session)
+         return query, conv, tm, make_statement
  
-     def run_normal(self, table_meta):
+     def inner_run(self, query, conv, tm, make_statement):
          """
-         Main run method for normal tables, i.e. tables that do not contain counter columns.
+         Main run method. Note that we bind self methods that are called inside loops
+         for performance reasons.
          """
-         query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
-                                                         protect_name(self.table),
-                                                         ', '.join(protect_names(self.valid_columns),),
-                                                         ', '.join(['?' for _ in self.valid_columns]))
-         if self.ttl >= 0:
-             query += 'USING TTL %s' % (self.ttl,)
+         self.query = query
+         self.conv = conv
+         self.make_statement = make_statement
  
-         query_statement = self.session.prepare(query)
-         query_statement.consistency_level = self.consistency_level
-         conv = ImportConversion(self, table_meta, query_statement)
+         convert_rows = self.convert_rows
+         split_into_batches = self.split_into_batches
+         result_callback = self.result_callback
+         err_callback = self.err_callback
+         session = self.session
  
          while True:
-             batch = self.inmsg.get()
+             chunk = self.inmsg.recv()
+             if chunk is None:
+                 break
+ 
              try:
-                 for b in self.split_batches(batch, conv):
-                     self.send_normal_batch(conv, query_statement, b)
+                 chunk['rows'] = convert_rows(conv, chunk)
+                 for replicas, batch in split_into_batches(chunk, conv, tm):
+                     statement = make_statement(query, conv, chunk, batch, replicas)
+                     future = session.execute_async(statement)
+                     future.add_callbacks(callback=result_callback, callback_args=(batch, chunk),
+                                          errback=err_callback, errback_args=(batch, chunk, replicas))
  
              except Exception, exc:
-                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
-                 if self.debug:
-                     traceback.print_exc(exc)
+                 self.report_error(exc, chunk, chunk['rows'])
  
-     def send_counter_batch(self, query_text, conv, batch):
-         if self.test_failures and self.maybe_inject_failures(batch):
-             return
+     def wrap_make_statement(self, inner_make_statement):
+         def make_statement(query, conv, chunk, batch, replicas):
+             try:
+                 return inner_make_statement(query, conv, batch, replicas)
+             except Exception, exc:
+                 print "Failed to make batch statement: {}".format(exc)
+                 self.report_error(exc, chunk, batch['rows'])
+                 return None
  
-         error_rows = []
-         batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+         def make_statement_with_failures(query, conv, chunk, batch, replicas):
+             failed_batch = self.maybe_inject_failures(batch)
+             if failed_batch:
+                 return failed_batch
+             return make_statement(query, conv, chunk, batch, replicas)
  
-         for r in batch['rows']:
-             row = self.filter_row_values(r)
-             if len(row) != len(self.valid_columns):
-                 error_rows.append(row)
-                 continue
+         return make_statement_with_failures if self.test_failures else make_statement
  
+     def make_counter_batch_statement(self, query, conv, batch, replicas):
+         statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+         statement.replicas = replicas
+         statement.keyspace = self.ks
+         for row in batch['rows']:
              where_clause = []
              set_clause = []
              for i, value in enumerate(row):