You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/03/08 10:26:45 UTC

[06/23] cassandra git commit: Merge commit 'c3d2f26f46c2d37b6cf918cbb5565fe57a5904cc' into cassandra-2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74ffeaf/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index aeb2d0b,cd03765..2755dd5
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -45,9 -51,13 +51,13 @@@ from cassandra.util import Date, Tim
  
  from cql3handling import CqlRuleSet
  from displaying import NO_COLOR_MAP
 -from formatting import format_value_default, EMPTY, get_formatter
 +from formatting import format_value_default, DateTimeFormat, EMPTY, get_formatter
  from sslhandling import ssl_settings
  
+ PROFILE_ON = False
+ STRACE_ON = False
+ IS_LINUX = platform.system() == 'Linux'
+ 
  CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
  
  
@@@ -164,13 -252,12 +252,13 @@@ class CopyTask(object)
          # in the page size or 10 seconds if pagesize is smaller
          copy_options['pagetimeout'] = int(opts.pop('pagetimeout', max(10, 10 * (copy_options['pagesize'] / 1000))))
          copy_options['maxattempts'] = int(opts.pop('maxattempts', 5))
 -        copy_options['dtformats'] = opts.pop('datetimeformat', shell.display_time_format)
 +        copy_options['dtformats'] = DateTimeFormat(opts.pop('datetimeformat', shell.display_timestamp_format),
 +                                                   shell.display_date_format, shell.display_nanotime_format)
          copy_options['float_precision'] = shell.display_float_precision
-         copy_options['chunksize'] = int(opts.pop('chunksize', 1000))
+         copy_options['chunksize'] = int(opts.pop('chunksize', 5000))
          copy_options['ingestrate'] = int(opts.pop('ingestrate', 100000))
          copy_options['maxbatchsize'] = int(opts.pop('maxbatchsize', 20))
-         copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 2))
+         copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 10))
          copy_options['reportfrequency'] = float(opts.pop('reportfrequency', 0.25))
          copy_options['consistencylevel'] = shell.consistency_level
          copy_options['decimalsep'] = opts.pop('decimalsep', '.')
@@@ -1392,9 -1593,37 +1594,37 @@@ class ImportConversion(object)
          self.primary_key_indexes = [self.columns.index(col.name) for col in self.table_meta.primary_key]
          self.partition_key_indexes = [self.columns.index(col.name) for col in self.table_meta.partition_key]
  
+         if statement is None:
+             self.use_prepared_statements = False
+             statement = self._get_primary_key_statement(parent, table_meta)
+         else:
+             self.use_prepared_statements = True
+ 
          self.proto_version = statement.protocol_version
-         self.cqltypes = dict([(c.name, c.type) for c in statement.column_metadata])
-         self.converters = dict([(c.name, self._get_converter(c.type)) for c in statement.column_metadata])
+ 
+         # the cql types and converters for the prepared statement, either the full statement or only the primary keys
+         self.cqltypes = [c.type for c in statement.column_metadata]
+         self.converters = [self._get_converter(c.type) for c in statement.column_metadata]
+ 
+         # the cql types for the entire statement, these are the same as the types above but
+         # only when using prepared statements
 -        self.coltypes = [table_meta.columns[name].typestring for name in parent.valid_columns]
++        self.coltypes = [table_meta.columns[name].cql_type for name in parent.valid_columns]
+         # these functions are used for non-prepared statements to protect values with quotes if required
+         self.protectors = [protect_value if t in ('ascii', 'text', 'timestamp', 'date', 'time', 'inet') else lambda v: v
+                            for t in self.coltypes]
+ 
+     @staticmethod
+     def _get_primary_key_statement(parent, table_meta):
+         """
+         We prepare a query statement to find out the types of the partition key columns so we can
+         route the update query to the correct replicas. As far as I understood this is the easiest
+         way to find out the types of the partition columns, we will never use this prepared statement
+         """
+         where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
+         select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(parent.ks),
+                                                          protect_name(parent.table),
+                                                          where_clause)
+         return parent.session.prepare(select_query)
  
      def _get_converter(self, cql_type):
          """
@@@ -1695,67 -2007,88 +2008,88 @@@ class ImportProcess(ChildProcess)
              self._session.cluster.shutdown()
          ChildProcess.close(self)
  
-     def run_counter(self, table_meta):
-         """
-         Main run method for tables that contain counter columns.
-         """
-         query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
- 
-         # We prepare a query statement to find out the types of the partition key columns so we can
-         # route the update query to the correct replicas. As far as I understood this is the easiest
-         # way to find out the types of the partition columns, we will never use this prepared statement
-         where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
-         select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.table), where_clause)
-         conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
- 
-         while True:
-             batch = self.inmsg.get()
-             try:
-                 for b in self.split_batches(batch, conv):
-                     self.send_counter_batch(query, conv, b)
+     def make_params(self):
+         metadata = self.session.cluster.metadata
+         table_meta = metadata.keyspaces[self.ks].tables[self.table]
+ 
+         prepared_statement = None
 -        is_counter = ("counter" in [table_meta.columns[name].typestring for name in self.valid_columns])
++        is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.valid_columns])
+         if is_counter:
+             query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
+             make_statement = self.wrap_make_statement(self.make_counter_batch_statement)
+         elif self.use_prepared_statements:
+             query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+                                                             protect_name(self.table),
+                                                             ', '.join(protect_names(self.valid_columns),),
+                                                             ', '.join(['?' for _ in self.valid_columns]))
+ 
+             query = self.session.prepare(query)
+             query.consistency_level = self.consistency_level
+             prepared_statement = query
+             make_statement = self.wrap_make_statement(self.make_prepared_batch_statement)
+         else:
+             query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (protect_name(self.ks),
+                                                              protect_name(self.table),
+                                                              ', '.join(protect_names(self.valid_columns),))
+             make_statement = self.wrap_make_statement(self.make_non_prepared_batch_statement)
  
-             except Exception, exc:
-                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
-                 if self.debug:
-                     traceback.print_exc(exc)
+         conv = ImportConversion(self, table_meta, prepared_statement)
+         tm = TokenMap(self.ks, self.hostname, self.local_dc, self.session)
+         return query, conv, tm, make_statement
  
-     def run_normal(self, table_meta):
+     def inner_run(self, query, conv, tm, make_statement):
          """
-         Main run method for normal tables, i.e. tables that do not contain counter columns.
+         Main run method. Note that we bind self methods that are called inside loops
+         for performance reasons.
          """
-         query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
-                                                         protect_name(self.table),
-                                                         ', '.join(protect_names(self.valid_columns),),
-                                                         ', '.join(['?' for _ in self.valid_columns]))
+         self.query = query
+         self.conv = conv
+         self.make_statement = make_statement
  
-         query_statement = self.session.prepare(query)
-         query_statement.consistency_level = self.consistency_level
-         conv = ImportConversion(self, table_meta, query_statement)
+         convert_rows = self.convert_rows
+         split_into_batches = self.split_into_batches
+         result_callback = self.result_callback
+         err_callback = self.err_callback
+         session = self.session
  
          while True:
-             batch = self.inmsg.get()
+             chunk = self.inmsg.recv()
+             if chunk is None:
+                 break
+ 
              try:
-                 for b in self.split_batches(batch, conv):
-                     self.send_normal_batch(conv, query_statement, b)
+                 chunk['rows'] = convert_rows(conv, chunk)
+                 for replicas, batch in split_into_batches(chunk, conv, tm):
+                     statement = make_statement(query, conv, chunk, batch, replicas)
+                     future = session.execute_async(statement)
+                     future.add_callbacks(callback=result_callback, callback_args=(batch, chunk),
+                                          errback=err_callback, errback_args=(batch, chunk, replicas))
  
              except Exception, exc:
-                 self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
-                 if self.debug:
-                     traceback.print_exc(exc)
+                 self.report_error(exc, chunk, chunk['rows'])
  
-     def send_counter_batch(self, query_text, conv, batch):
-         if self.test_failures and self.maybe_inject_failures(batch):
-             return
+     def wrap_make_statement(self, inner_make_statement):
+         def make_statement(query, conv, chunk, batch, replicas):
+             try:
+                 return inner_make_statement(query, conv, batch, replicas)
+             except Exception, exc:
+                 print "Failed to make batch statement: {}".format(exc)
+                 self.report_error(exc, chunk, batch['rows'])
+                 return None
  
-         error_rows = []
-         batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+         def make_statement_with_failures(query, conv, chunk, batch, replicas):
+             failed_batch = self.maybe_inject_failures(batch)
+             if failed_batch:
+                 return failed_batch
+             return make_statement(query, conv, chunk, batch, replicas)
  
-         for r in batch['rows']:
-             row = self.filter_row_values(r)
-             if len(row) != len(self.valid_columns):
-                 error_rows.append(row)
-                 continue
+         return make_statement_with_failures if self.test_failures else make_statement
  
+     def make_counter_batch_statement(self, query, conv, batch, replicas):
+         statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+         statement.replicas = replicas
+         statement.keyspace = self.ks
+         for row in batch['rows']:
              where_clause = []
              set_clause = []
              for i, value in enumerate(row):