You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/03/08 10:26:45 UTC
[06/23] cassandra git commit: Merge commit
'c3d2f26f46c2d37b6cf918cbb5565fe57a5904cc' into cassandra-2.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74ffeaf/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index aeb2d0b,cd03765..2755dd5
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -45,9 -51,13 +51,13 @@@ from cassandra.util import Date, Tim
from cql3handling import CqlRuleSet
from displaying import NO_COLOR_MAP
-from formatting import format_value_default, EMPTY, get_formatter
+from formatting import format_value_default, DateTimeFormat, EMPTY, get_formatter
from sslhandling import ssl_settings
+ PROFILE_ON = False
+ STRACE_ON = False
+ IS_LINUX = platform.system() == 'Linux'
+
CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
@@@ -164,13 -252,12 +252,13 @@@ class CopyTask(object)
# in the page size or 10 seconds if pagesize is smaller
copy_options['pagetimeout'] = int(opts.pop('pagetimeout', max(10, 10 * (copy_options['pagesize'] / 1000))))
copy_options['maxattempts'] = int(opts.pop('maxattempts', 5))
- copy_options['dtformats'] = opts.pop('datetimeformat', shell.display_time_format)
+ copy_options['dtformats'] = DateTimeFormat(opts.pop('datetimeformat', shell.display_timestamp_format),
+ shell.display_date_format, shell.display_nanotime_format)
copy_options['float_precision'] = shell.display_float_precision
- copy_options['chunksize'] = int(opts.pop('chunksize', 1000))
+ copy_options['chunksize'] = int(opts.pop('chunksize', 5000))
copy_options['ingestrate'] = int(opts.pop('ingestrate', 100000))
copy_options['maxbatchsize'] = int(opts.pop('maxbatchsize', 20))
- copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 2))
+ copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 10))
copy_options['reportfrequency'] = float(opts.pop('reportfrequency', 0.25))
copy_options['consistencylevel'] = shell.consistency_level
copy_options['decimalsep'] = opts.pop('decimalsep', '.')
@@@ -1392,9 -1593,37 +1594,37 @@@ class ImportConversion(object)
self.primary_key_indexes = [self.columns.index(col.name) for col in self.table_meta.primary_key]
self.partition_key_indexes = [self.columns.index(col.name) for col in self.table_meta.partition_key]
+ if statement is None:
+ self.use_prepared_statements = False
+ statement = self._get_primary_key_statement(parent, table_meta)
+ else:
+ self.use_prepared_statements = True
+
self.proto_version = statement.protocol_version
- self.cqltypes = dict([(c.name, c.type) for c in statement.column_metadata])
- self.converters = dict([(c.name, self._get_converter(c.type)) for c in statement.column_metadata])
+
+ # the cql types and converters for the prepared statement, either the full statement or only the primary keys
+ self.cqltypes = [c.type for c in statement.column_metadata]
+ self.converters = [self._get_converter(c.type) for c in statement.column_metadata]
+
+ # the cql types for the entire statement, these are the same as the types above but
+ # only when using prepared statements
- self.coltypes = [table_meta.columns[name].typestring for name in parent.valid_columns]
++ self.coltypes = [table_meta.columns[name].cql_type for name in parent.valid_columns]
+ # these functions are used for non-prepared statements to protect values with quotes if required
+ self.protectors = [protect_value if t in ('ascii', 'text', 'timestamp', 'date', 'time', 'inet') else lambda v: v
+ for t in self.coltypes]
+
+ @staticmethod
+ def _get_primary_key_statement(parent, table_meta):
+ """
+ We prepare a query statement to find out the types of the partition key columns so we can
+ route the update query to the correct replicas. As far as I understood this is the easiest
+ way to find out the types of the partition columns, we will never use this prepared statement
+ """
+ where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
+ select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(parent.ks),
+ protect_name(parent.table),
+ where_clause)
+ return parent.session.prepare(select_query)
def _get_converter(self, cql_type):
"""
@@@ -1695,67 -2007,88 +2008,88 @@@ class ImportProcess(ChildProcess)
self._session.cluster.shutdown()
ChildProcess.close(self)
- def run_counter(self, table_meta):
- """
- Main run method for tables that contain counter columns.
- """
- query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
-
- # We prepare a query statement to find out the types of the partition key columns so we can
- # route the update query to the correct replicas. As far as I understood this is the easiest
- # way to find out the types of the partition columns, we will never use this prepared statement
- where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key])
- select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.table), where_clause)
- conv = ImportConversion(self, table_meta, self.session.prepare(select_query))
-
- while True:
- batch = self.inmsg.get()
- try:
- for b in self.split_batches(batch, conv):
- self.send_counter_batch(query, conv, b)
+ def make_params(self):
+ metadata = self.session.cluster.metadata
+ table_meta = metadata.keyspaces[self.ks].tables[self.table]
+
+ prepared_statement = None
- is_counter = ("counter" in [table_meta.columns[name].typestring for name in self.valid_columns])
++ is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.valid_columns])
+ if is_counter:
+ query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table))
+ make_statement = self.wrap_make_statement(self.make_counter_batch_statement)
+ elif self.use_prepared_statements:
+ query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
+ protect_name(self.table),
+ ', '.join(protect_names(self.valid_columns),),
+ ', '.join(['?' for _ in self.valid_columns]))
+
+ query = self.session.prepare(query)
+ query.consistency_level = self.consistency_level
+ prepared_statement = query
+ make_statement = self.wrap_make_statement(self.make_prepared_batch_statement)
+ else:
+ query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (protect_name(self.ks),
+ protect_name(self.table),
+ ', '.join(protect_names(self.valid_columns),))
+ make_statement = self.wrap_make_statement(self.make_non_prepared_batch_statement)
- except Exception, exc:
- self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
- if self.debug:
- traceback.print_exc(exc)
+ conv = ImportConversion(self, table_meta, prepared_statement)
+ tm = TokenMap(self.ks, self.hostname, self.local_dc, self.session)
+ return query, conv, tm, make_statement
- def run_normal(self, table_meta):
+ def inner_run(self, query, conv, tm, make_statement):
"""
- Main run method for normal tables, i.e. tables that do not contain counter columns.
+ Main run method. Note that we bind self methods that are called inside loops
+ for performance reasons.
"""
- query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks),
- protect_name(self.table),
- ', '.join(protect_names(self.valid_columns),),
- ', '.join(['?' for _ in self.valid_columns]))
+ self.query = query
+ self.conv = conv
+ self.make_statement = make_statement
- query_statement = self.session.prepare(query)
- query_statement.consistency_level = self.consistency_level
- conv = ImportConversion(self, table_meta, query_statement)
+ convert_rows = self.convert_rows
+ split_into_batches = self.split_into_batches
+ result_callback = self.result_callback
+ err_callback = self.err_callback
+ session = self.session
while True:
- batch = self.inmsg.get()
+ chunk = self.inmsg.recv()
+ if chunk is None:
+ break
+
try:
- for b in self.split_batches(batch, conv):
- self.send_normal_batch(conv, query_statement, b)
+ chunk['rows'] = convert_rows(conv, chunk)
+ for replicas, batch in split_into_batches(chunk, conv, tm):
+ statement = make_statement(query, conv, chunk, batch, replicas)
+ future = session.execute_async(statement)
+ future.add_callbacks(callback=result_callback, callback_args=(batch, chunk),
+ errback=err_callback, errback_args=(batch, chunk, replicas))
except Exception, exc:
- self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message)))
- if self.debug:
- traceback.print_exc(exc)
+ self.report_error(exc, chunk, chunk['rows'])
- def send_counter_batch(self, query_text, conv, batch):
- if self.test_failures and self.maybe_inject_failures(batch):
- return
+ def wrap_make_statement(self, inner_make_statement):
+ def make_statement(query, conv, chunk, batch, replicas):
+ try:
+ return inner_make_statement(query, conv, batch, replicas)
+ except Exception, exc:
+ print "Failed to make batch statement: {}".format(exc)
+ self.report_error(exc, chunk, batch['rows'])
+ return None
- error_rows = []
- batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+ def make_statement_with_failures(query, conv, chunk, batch, replicas):
+ failed_batch = self.maybe_inject_failures(batch)
+ if failed_batch:
+ return failed_batch
+ return make_statement(query, conv, chunk, batch, replicas)
- for r in batch['rows']:
- row = self.filter_row_values(r)
- if len(row) != len(self.valid_columns):
- error_rows.append(row)
- continue
+ return make_statement_with_failures if self.test_failures else make_statement
+ def make_counter_batch_statement(self, query, conv, batch, replicas):
+ statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level)
+ statement.replicas = replicas
+ statement.keyspace = self.ks
+ for row in batch['rows']:
where_clause = []
set_clause = []
for i, value in enumerate(row):