You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mi...@apache.org on 2014/08/28 00:44:45 UTC

[2/3] git commit: (cqlsh): Show progress of COPY operations.

(cqlsh): Show progress of COPY operations.

patch by Mikhail Stepura; reviewed by Tyler Hobbs for CASSANDRA-7789


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d01250d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d01250d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d01250d6

Branch: refs/heads/trunk
Commit: d01250d63acf08c354bc400c957572bbd68f7ea6
Parents: c7f9c8d
Author: Mikhail Stepura <mi...@apache.org>
Authored: Wed Aug 20 11:55:50 2014 -0700
Committer: Mikhail Stepura <mi...@apache.org>
Committed: Wed Aug 27 15:43:42 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                    |  1 +
 bin/cqlsh                      | 27 +++++++++--------
 pylib/cqlshlib/async_insert.py | 13 ++++----
 pylib/cqlshlib/meter.py        | 59 +++++++++++++++++++++++++++++++++++++
 4 files changed, 79 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f44c91..4fb773d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * (cqlsh): Show progress of COPY operations (CASSANDRA-7789)
  * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
  * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
  * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index c055771..dfce885 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -64,8 +64,6 @@ except ImportError:
     pass
 
 CQL_LIB_PREFIX = 'cassandra-driver-internal-only-'
-FUTURES_LIB_PREFIX = 'futures-'
-SIX_LIB_PREFIX = 'six-'
 
 CASSANDRA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
 
@@ -89,12 +87,13 @@ cql_zip = find_zip(CQL_LIB_PREFIX)
 if cql_zip:
     ver = os.path.splitext(os.path.basename(cql_zip))[0][len(CQL_LIB_PREFIX):]
     sys.path.insert(0, os.path.join(cql_zip, 'cassandra-driver-' + ver))
-futures_zip = find_zip(FUTURES_LIB_PREFIX)
-if futures_zip:
-    sys.path.insert(0, futures_zip)
-six_zip = find_zip(SIX_LIB_PREFIX)
-if six_zip:
-    sys.path.insert(0, six_zip)
+
+third_parties = ('futures-', 'six-')
+
+for lib in third_parties:
+    lib_zip = find_zip(lib)
+    if lib_zip:
+        sys.path.insert(0, lib_zip)
 
 warnings.filterwarnings("ignore", r".*blist.*")
 try:
@@ -118,7 +117,7 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
 if os.path.isdir(cqlshlibdir):
     sys.path.insert(0, cqlshlibdir)
 
-from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert
+from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert, meter
 from cqlshlib.displaying import (RED, BLUE, CYAN, ANSI_RESET, COLUMN_NAME_COLORS,
                                  FormattedValue, colorme)
 from cqlshlib.formatting import format_by_type, formatter_for, format_value_utype
@@ -1361,7 +1360,7 @@ class Shell(cmd.Cmd):
                 linesource.close()
             elif self.tty:
                 print
-        return rownum-1
+        return rownum
 
     def create_insert_statement(self, columns, nullval, table_meta, row):
 
@@ -1437,23 +1436,25 @@ class Shell(cmd.Cmd):
             except IOError, e:
                 self.printerr("Can't open %r for writing: %s" % (fname, e))
                 return 0
+        wmeter = meter.Meter()
         try:
+
             dump = self.prep_export_dump(ks, cf, columns)
             writer = csv.writer(csvdest, **dialect_options)
             if header:
                 writer.writerow(columns)
-            rows = 0
             for row in dump:
                 fmt = lambda v: \
                     format_value(v, output_encoding=encoding, nullval=nullval,
                                  time_format=self.display_time_format,
                                  float_precision=self.display_float_precision).strval
                 writer.writerow(map(fmt, row.values()))
-                rows += 1
+                wmeter.mark_written()
+            wmeter.done()
         finally:
             if do_close:
                 csvdest.close()
-        return rows
+        return wmeter.num_finished()
 
     def prep_export_dump(self, ks, cf, columns):
         if columns is None:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/async_insert.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/async_insert.py b/pylib/cqlshlib/async_insert.py
index a4adcd2..d325716 100644
--- a/pylib/cqlshlib/async_insert.py
+++ b/pylib/cqlshlib/async_insert.py
@@ -14,11 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from itertools import count
 from threading import Event, Condition
+from . import meter
 import sys
 
-
 class _CountDownLatch(object):
     def __init__(self, counter=1):
         self._count = counter
@@ -47,10 +46,10 @@ class _ChainedWriter(object):
         self._session = session
         self._cancellation_event = Event()
         self._first_error = None
-        self._num_finished = count(start=1)
         self._task_counter = _CountDownLatch(self.CONCURRENCY)
         self._enumerated_reader = enumerated_reader
         self._statement_func = statement_func
+        self._meter = meter.Meter()
 
     def insert(self):
         if not self._enumerated_reader:
@@ -65,8 +64,9 @@ class _ChainedWriter(object):
             self._cancellation_event.set()
             sys.stdout.write('Aborting due to keyboard interrupt\n')
             self._task_counter.await()
+        self._meter.done()
+        return self._meter.num_finished(), self._first_error
 
-        return next(self._num_finished), self._first_error
 
     def _abort(self, error, failed_record):
         if not self._first_error:
@@ -83,10 +83,7 @@ class _ChainedWriter(object):
             return
 
         if result is not self._sentinel:
-            finished = next(self._num_finished)
-            if not finished % 1000:
-                sys.stdout.write('Imported %s rows\r' % finished)
-                sys.stdout.flush()
+            self._meter.mark_written()
 
         try:
             (current_record, row) = next(self._enumerated_reader)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/meter.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/meter.py b/pylib/cqlshlib/meter.py
new file mode 100644
index 0000000..e1a6bfc
--- /dev/null
+++ b/pylib/cqlshlib/meter.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from time import time
+import sys
+from threading import RLock
+
+
+class Meter(object):
+
+    def __init__(self):
+        self._num_finished = 0
+        self._last_checkpoint_time = None
+        self._current_rate = 0.0
+        self._lock = RLock()
+
+    def mark_written(self):
+        with self._lock:
+            if not self._last_checkpoint_time:
+                self._last_checkpoint_time = time()
+            self._num_finished += 1
+
+            if self._num_finished % 10000 == 0:
+                previous_checkpoint_time = self._last_checkpoint_time
+                self._last_checkpoint_time = time()
+                new_rate = 10000.0 / (self._last_checkpoint_time - previous_checkpoint_time)
+                if self._current_rate == 0.0:
+                    self._current_rate = new_rate
+                else:
+                    self._current_rate = (self._current_rate + new_rate) / 2.0
+
+            if self._num_finished % 1000 != 0:
+                return
+            output = 'Processed %s rows; Write: %.2f rows/s\r' % \
+                     (self._num_finished, self._current_rate)
+            sys.stdout.write(output)
+            sys.stdout.flush()
+
+    def num_finished(self):
+        with self._lock:
+            return self._num_finished
+
+    def done(self):
+        print ""
+
+