You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/03/28 20:02:47 UTC

[01/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 42644c324 -> a9b542205
  refs/heads/cassandra-2.2 cab3d5d12 -> 6c1ef2ba4
  refs/heads/cassandra-3.0 70eab633f -> 3efc609e0
  refs/heads/cassandra-3.5 acc2f89c1 -> c7ef7c91c
  refs/heads/trunk 5beedbc66 -> 860291872


COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/cassandra-2.1
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e:


[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c1ef2ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c1ef2ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c1ef2ba

Branch: refs/heads/cassandra-2.2
Commit: 6c1ef2ba4e98d17d7f8b409c2a8c08189b777da9
Parents: cab3d5d a9b5422
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:54:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c1ef2ba/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c1ef2ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c1ef2ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c1ef2ba

Branch: refs/heads/trunk
Commit: 6c1ef2ba4e98d17d7f8b409c2a8c08189b777da9
Parents: cab3d5d a9b5422
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:54:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c1ef2ba/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5

Posted by jm...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7ef7c91
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7ef7c91
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7ef7c91

Branch: refs/heads/trunk
Commit: c7ef7c91c24036e2fdfbc94b5681844c37be1e33
Parents: acc2f89 3efc609
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:55:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:55:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7ef7c91/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5

Posted by jm...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7ef7c91
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7ef7c91
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7ef7c91

Branch: refs/heads/cassandra-3.5
Commit: c7ef7c91c24036e2fdfbc94b5681844c37be1e33
Parents: acc2f89 3efc609
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:55:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:55:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7ef7c91/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[15/15] cassandra git commit: Merge branch 'cassandra-3.5' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-3.5' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/86029187
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/86029187
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/86029187

Branch: refs/heads/trunk
Commit: 8602918722c96f3ffa6b970597d4ff6927b24707
Parents: 5beedbc c7ef7c9
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:56:13 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:56:13 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/86029187/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --cc pylib/cqlshlib/copyutil.py
index 1da5d14,0cae396..f03a1a3
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@@ -1496,10 -1529,10 +1536,10 @@@ class ExportProcess(ChildProcess)
          def result_callback(rows):
              if future.has_more_pages:
                  future.start_fetching_next_page()
 -                self.write_rows_to_csv(token_range, rows)
 +                self.write_rows_to_csv(token_range, rows, cql_types)
              else:
 -                self.write_rows_to_csv(token_range, rows)
 +                self.write_rows_to_csv(token_range, rows, cql_types)
-                 self.outmsg.send((None, None))
+                 self.send((None, None))
                  session.complete_request()
  
          def err_callback(err):
@@@ -1517,10 -1550,10 +1557,10 @@@
              writer = csv.writer(output, **self.options.dialect)
  
              for row in rows:
 -                writer.writerow(map(self.format_value, row))
 +                writer.writerow(map(self.format_value, row, cql_types))
  
              data = (output.getvalue(), len(rows))
-             self.outmsg.send((token_range, data))
+             self.send((token_range, data))
              output.close()
  
          except Exception, e:


[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c1ef2ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c1ef2ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c1ef2ba

Branch: refs/heads/cassandra-3.5
Commit: 6c1ef2ba4e98d17d7f8b409c2a8c08189b777da9
Parents: cab3d5d a9b5422
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:54:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c1ef2ba/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c1ef2ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c1ef2ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c1ef2ba

Branch: refs/heads/cassandra-3.0
Commit: 6c1ef2ba4e98d17d7f8b409c2a8c08189b777da9
Parents: cab3d5d a9b5422
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:54:54 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:54 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c1ef2ba/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[05/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

Posted by jm...@apache.org.
COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/cassandra-3.5
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e:


[02/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

Posted by jm...@apache.org.
COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/cassandra-2.2
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e:


[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jm...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3efc609e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3efc609e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3efc609e

Branch: refs/heads/cassandra-3.0
Commit: 3efc609e01f95cfdcaae3a6d153291c15607455a
Parents: 70eab63 6c1ef2b
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:55:11 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:55:11 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3efc609e/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[03/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

Posted by jm...@apache.org.
COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/trunk
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e:


[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jm...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3efc609e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3efc609e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3efc609e

Branch: refs/heads/cassandra-3.5
Commit: 3efc609e01f95cfdcaae3a6d153291c15607455a
Parents: 70eab63 6c1ef2b
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:55:11 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:55:11 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3efc609e/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------


[04/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

Posted by jm...@apache.org.
COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/cassandra-3.0
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e:


[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jm...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3efc609e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3efc609e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3efc609e

Branch: refs/heads/trunk
Commit: 3efc609e01f95cfdcaae3a6d153291c15607455a
Parents: 70eab63 6c1ef2b
Author: Josh McKenzie <jo...@datastax.com>
Authored: Mon Mar 28 13:55:11 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:55:11 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3efc609e/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------