You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/03/28 20:02:49 UTC

[03/15] cassandra git commit: COPY FROM on large datasets: fixed problem on single core machines

COPY FROM on large datasets: fixed problem on single core machines

patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220

Branch: refs/heads/trunk
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400

----------------------------------------------------------------------
 pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
 1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
 import struct
 import sys
 import time
+import threading
 import traceback
 
 from bisect import bisect_right
 from calendar import timegm
 from collections import defaultdict, namedtuple
 from decimal import Decimal
+from Queue import Queue
 from random import randrange
 from StringIO import StringIO
 from select import select
-from threading import Lock
 from uuid import UUID
 from util import profile_on, profile_off
 
@@ -161,11 +162,11 @@ class CopyTask(object):
         self.options = self.parse_options(opts, direction)
 
         self.num_processes = self.options.copy['numprocesses']
-        if direction == 'in':
-            self.num_processes += 1  # add the feeder process
-
         self.printmsg('Using %d child processes' % (self.num_processes,))
 
+        if direction == 'from':
+            self.num_processes += 1  # add the feeder process
+
         self.processes = []
         self.inmsg = OneWayChannels(self.num_processes)
         self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
     def get_num_processes(cap):
         """
         Pick a reasonable number of child processes. We need to leave at
-        least one core for the parent process.
+        least one core for the parent or feeder process.
         """
         return max(1, min(cap, CopyTask.get_num_cores() - 1))
 
     @staticmethod
     def get_num_cores():
         """
-        Return the number of cores if available.
+        Return the number of cores if available. If the test environment variable
+        is set, then return the number carried by this variable. This is to test single-core
+        machine more easily.
         """
         try:
-            return mp.cpu_count()
+            num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+            return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
         except NotImplementedError:
             return 1
 
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
                 if token_range is None and result is None:  # a request has finished
                     succeeded += 1
                 elif isinstance(result, Exception):  # an error occurred
-                    if token_range is None:  # the entire process failed
-                        shell.printerr('Error from worker process: %s' % (result))
-                    else:   # only this token_range failed, retry up to max_attempts if no rows received yet,
-                            # If rows were already received we'd risk duplicating data.
-                            # Note that there is still a slight risk of duplicating data, even if we have
-                            # an error with no rows received yet, it's just less likely. To avoid retrying on
-                            # all timeouts would however mean we could risk not exporting some rows.
-                        if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
-                            shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
-                                           % (token_range, result, ranges[token_range]['attempts'], max_attempts))
-                            self.send_work(ranges, [token_range])
-                        else:
-                            shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
-                                           % (token_range, result, ranges[token_range]['rows'],
-                                              ranges[token_range]['attempts']))
-                            failed += 1
+                    # This token_range failed, retry up to max_attempts if no rows received yet,
+                    # If rows were already received we'd risk duplicating data.
+                    # Note that there is still a slight risk of duplicating data, even if we have
+                    # an error with no rows received yet, it's just less likely. To avoid retrying on
+                    # all timeouts would however mean we could risk not exporting some rows.
+                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+                        self.send_work(ranges, [token_range])
+                    else:
+                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+                                       % (token_range, result, ranges[token_range]['rows'],
+                                          ranges[token_range]['attempts']))
+                        failed += 1
                 else:  # partial result received
                     data, num = result
                     self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
         self.cluster = cluster
         self.session = session
         self.requests = 1
-        self.lock = Lock()
+        self.lock = threading.Lock()
         self.consistency_level = export_process.consistency_level
 
     def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
         self.hosts_to_sessions = dict()
         self.formatters = dict()
         self.options = options
+        self.responses = None
 
     def run(self):
         try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+        self.init_feeder_thread()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
             token_range, info = self.inmsg.recv()
             self.start_request(token_range, info)
 
+    def init_feeder_thread(self):
+        """
+        Start a thread to feed response messages to the parent process.
+
+        It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+        not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+        recv(), and will therefore block the parent process on its send().
+
+        It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+        sending work, because if the receiving thread stops making progress, then the main thread may no longer
+        call recv() due to the check on the maximum number of requests in inner_run().
+
+        These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+        processes too.
+
+        It is important that the order of the responses in the queue is respected, or else the parent process may
+        kill off worker processes before it has received all the pages of the last token range.
+        """
+        def feed_errors():
+            while True:
+                try:
+                    self.outmsg.send(self.responses.get())
+                except Exception, e:
+                    self.printdebugmsg(e.message)
+
+        self.responses = Queue()
+
+        thread = threading.Thread(target=feed_errors)
+        thread.setDaemon(True)
+        thread.start()
+
     @staticmethod
     def get_error_message(err, print_traceback=False):
         if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
             msg = str(err)
         return msg
 
-    def report_error(self, err, token_range=None):
+    def report_error(self, err, token_range):
         msg = self.get_error_message(err, print_traceback=self.debug)
         self.printdebugmsg(msg)
-        self.outmsg.send((token_range, Exception(msg)))
+        self.send((token_range, Exception(msg)))
+
+    def send(self, response):
+        self.responses.put(response)
 
     def start_request(self, token_range, info):
         """
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
                     self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
                 return ret
 
-        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+        self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+                          token_range)
         return None
 
     def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
                 self.write_rows_to_csv(token_range, rows)
             else:
                 self.write_rows_to_csv(token_range, rows)
-                self.outmsg.send((None, None))
+                self.send((None, None))
                 session.complete_request()
 
         def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
                 writer.writerow(map(self.format_value, row))
 
             data = (output.getvalue(), len(rows))
-            self.outmsg.send((token_range, data))
+            self.send((token_range, data))
             output.close()
 
         except Exception, e: