You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mi...@apache.org on 2014/08/28 00:44:45 UTC
[2/3] git commit: (cqlsh): Show progress of COPY operations.
(cqlsh): Show progress of COPY operations.
patch by Mikhail Stepura; reviewed by Tyler Hobbs for CASSANDRA-7789
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d01250d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d01250d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d01250d6
Branch: refs/heads/trunk
Commit: d01250d63acf08c354bc400c957572bbd68f7ea6
Parents: c7f9c8d
Author: Mikhail Stepura <mi...@apache.org>
Authored: Wed Aug 20 11:55:50 2014 -0700
Committer: Mikhail Stepura <mi...@apache.org>
Committed: Wed Aug 27 15:43:42 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/cqlsh | 27 +++++++++--------
pylib/cqlshlib/async_insert.py | 13 ++++----
pylib/cqlshlib/meter.py | 59 +++++++++++++++++++++++++++++++++++++
4 files changed, 79 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f44c91..4fb773d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * (cqlsh): Show progress of COPY operations (CASSANDRA-7789)
* Add syntax to remove multiple elements from a map (CASSANDRA-6599)
* Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
* Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index c055771..dfce885 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -64,8 +64,6 @@ except ImportError:
pass
CQL_LIB_PREFIX = 'cassandra-driver-internal-only-'
-FUTURES_LIB_PREFIX = 'futures-'
-SIX_LIB_PREFIX = 'six-'
CASSANDRA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
@@ -89,12 +87,13 @@ cql_zip = find_zip(CQL_LIB_PREFIX)
if cql_zip:
ver = os.path.splitext(os.path.basename(cql_zip))[0][len(CQL_LIB_PREFIX):]
sys.path.insert(0, os.path.join(cql_zip, 'cassandra-driver-' + ver))
-futures_zip = find_zip(FUTURES_LIB_PREFIX)
-if futures_zip:
- sys.path.insert(0, futures_zip)
-six_zip = find_zip(SIX_LIB_PREFIX)
-if six_zip:
- sys.path.insert(0, six_zip)
+
+third_parties = ('futures-', 'six-')
+
+for lib in third_parties:
+ lib_zip = find_zip(lib)
+ if lib_zip:
+ sys.path.insert(0, lib_zip)
warnings.filterwarnings("ignore", r".*blist.*")
try:
@@ -118,7 +117,7 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
if os.path.isdir(cqlshlibdir):
sys.path.insert(0, cqlshlibdir)
-from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert
+from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert, meter
from cqlshlib.displaying import (RED, BLUE, CYAN, ANSI_RESET, COLUMN_NAME_COLORS,
FormattedValue, colorme)
from cqlshlib.formatting import format_by_type, formatter_for, format_value_utype
@@ -1361,7 +1360,7 @@ class Shell(cmd.Cmd):
linesource.close()
elif self.tty:
print
- return rownum-1
+ return rownum
def create_insert_statement(self, columns, nullval, table_meta, row):
@@ -1437,23 +1436,25 @@ class Shell(cmd.Cmd):
except IOError, e:
self.printerr("Can't open %r for writing: %s" % (fname, e))
return 0
+ wmeter = meter.Meter()
try:
+
dump = self.prep_export_dump(ks, cf, columns)
writer = csv.writer(csvdest, **dialect_options)
if header:
writer.writerow(columns)
- rows = 0
for row in dump:
fmt = lambda v: \
format_value(v, output_encoding=encoding, nullval=nullval,
time_format=self.display_time_format,
float_precision=self.display_float_precision).strval
writer.writerow(map(fmt, row.values()))
- rows += 1
+ wmeter.mark_written()
+ wmeter.done()
finally:
if do_close:
csvdest.close()
- return rows
+ return wmeter.num_finished()
def prep_export_dump(self, ks, cf, columns):
if columns is None:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/async_insert.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/async_insert.py b/pylib/cqlshlib/async_insert.py
index a4adcd2..d325716 100644
--- a/pylib/cqlshlib/async_insert.py
+++ b/pylib/cqlshlib/async_insert.py
@@ -14,11 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from itertools import count
from threading import Event, Condition
+from . import meter
import sys
-
class _CountDownLatch(object):
def __init__(self, counter=1):
self._count = counter
@@ -47,10 +46,10 @@ class _ChainedWriter(object):
self._session = session
self._cancellation_event = Event()
self._first_error = None
- self._num_finished = count(start=1)
self._task_counter = _CountDownLatch(self.CONCURRENCY)
self._enumerated_reader = enumerated_reader
self._statement_func = statement_func
+ self._meter = meter.Meter()
def insert(self):
if not self._enumerated_reader:
@@ -65,8 +64,9 @@ class _ChainedWriter(object):
self._cancellation_event.set()
sys.stdout.write('Aborting due to keyboard interrupt\n')
self._task_counter.await()
+ self._meter.done()
+ return self._meter.num_finished(), self._first_error
- return next(self._num_finished), self._first_error
def _abort(self, error, failed_record):
if not self._first_error:
@@ -83,10 +83,7 @@ class _ChainedWriter(object):
return
if result is not self._sentinel:
- finished = next(self._num_finished)
- if not finished % 1000:
- sys.stdout.write('Imported %s rows\r' % finished)
- sys.stdout.flush()
+ self._meter.mark_written()
try:
(current_record, row) = next(self._enumerated_reader)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/meter.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/meter.py b/pylib/cqlshlib/meter.py
new file mode 100644
index 0000000..e1a6bfc
--- /dev/null
+++ b/pylib/cqlshlib/meter.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from time import time
+import sys
+from threading import RLock
+
+
+class Meter(object):
+
+ def __init__(self):
+ self._num_finished = 0
+ self._last_checkpoint_time = None
+ self._current_rate = 0.0
+ self._lock = RLock()
+
+ def mark_written(self):
+ with self._lock:
+ if not self._last_checkpoint_time:
+ self._last_checkpoint_time = time()
+ self._num_finished += 1
+
+ if self._num_finished % 10000 == 0:
+ previous_checkpoint_time = self._last_checkpoint_time
+ self._last_checkpoint_time = time()
+ new_rate = 10000.0 / (self._last_checkpoint_time - previous_checkpoint_time)
+ if self._current_rate == 0.0:
+ self._current_rate = new_rate
+ else:
+ self._current_rate = (self._current_rate + new_rate) / 2.0
+
+ if self._num_finished % 1000 != 0:
+ return
+ output = 'Processed %s rows; Write: %.2f rows/s\r' % \
+ (self._num_finished, self._current_rate)
+ sys.stdout.write(output)
+ sys.stdout.flush()
+
+ def num_finished(self):
+ with self._lock:
+ return self._num_finished
+
+ def done(self):
+ print ""
+
+