You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/03/28 20:02:50 UTC
[04/15] cassandra git commit: COPY FROM on large datasets: fixed
problem on single core machines
COPY FROM on large datasets: fixed problem on single core machines
patch by Stefania Alborghetti; reviewed by Adam Holmberg for CASSANDRA-11053
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9b54220
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9b54220
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9b54220
Branch: refs/heads/cassandra-3.0
Commit: a9b5422057054b0ba612164d56d7cce5567e48df
Parents: 42644c3
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Mar 18 13:33:21 2016 +0800
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Mon Mar 28 13:54:37 2016 -0400
----------------------------------------------------------------------
pylib/cqlshlib/copyutil.py | 98 +++++++++++++++++++++++++++++------------
1 file changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9b54220/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index cd03765..ba2a47b 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -29,16 +29,17 @@ import re
import struct
import sys
import time
+import threading
import traceback
from bisect import bisect_right
from calendar import timegm
from collections import defaultdict, namedtuple
from decimal import Decimal
+from Queue import Queue
from random import randrange
from StringIO import StringIO
from select import select
-from threading import Lock
from uuid import UUID
from util import profile_on, profile_off
@@ -161,11 +162,11 @@ class CopyTask(object):
self.options = self.parse_options(opts, direction)
self.num_processes = self.options.copy['numprocesses']
- if direction == 'in':
- self.num_processes += 1 # add the feeder process
-
self.printmsg('Using %d child processes' % (self.num_processes,))
+ if direction == 'from':
+ self.num_processes += 1 # add the feeder process
+
self.processes = []
self.inmsg = OneWayChannels(self.num_processes)
self.outmsg = OneWayChannels(self.num_processes)
@@ -295,17 +296,20 @@ class CopyTask(object):
def get_num_processes(cap):
"""
Pick a reasonable number of child processes. We need to leave at
- least one core for the parent process.
+ least one core for the parent or feeder process.
"""
return max(1, min(cap, CopyTask.get_num_cores() - 1))
@staticmethod
def get_num_cores():
"""
- Return the number of cores if available.
+ Return the number of cores if available. If the test environment variable
+ is set, then return the number carried by this variable. This is to test single-core
+ machine more easily.
"""
try:
- return mp.cpu_count()
+ num_cores_for_testing = os.environ.get('CQLSH_COPY_TEST_NUM_CORES', '')
+ return int(num_cores_for_testing) if num_cores_for_testing else mp.cpu_count()
except NotImplementedError:
return 1
@@ -690,22 +694,20 @@ class ExportTask(CopyTask):
if token_range is None and result is None: # a request has finished
succeeded += 1
elif isinstance(result, Exception): # an error occurred
- if token_range is None: # the entire process failed
- shell.printerr('Error from worker process: %s' % (result))
- else: # only this token_range failed, retry up to max_attempts if no rows received yet,
- # If rows were already received we'd risk duplicating data.
- # Note that there is still a slight risk of duplicating data, even if we have
- # an error with no rows received yet, it's just less likely. To avoid retrying on
- # all timeouts would however mean we could risk not exporting some rows.
- if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
- shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
- % (token_range, result, ranges[token_range]['attempts'], max_attempts))
- self.send_work(ranges, [token_range])
- else:
- shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
- % (token_range, result, ranges[token_range]['rows'],
- ranges[token_range]['attempts']))
- failed += 1
+ # This token_range failed, retry up to max_attempts if no rows received yet,
+ # If rows were already received we'd risk duplicating data.
+ # Note that there is still a slight risk of duplicating data, even if we have
+ # an error with no rows received yet, it's just less likely. To avoid retrying on
+ # all timeouts would however mean we could risk not exporting some rows.
+ if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
+ shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
+ % (token_range, result, ranges[token_range]['attempts'], max_attempts))
+ self.send_work(ranges, [token_range])
+ else:
+ shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
+ % (token_range, result, ranges[token_range]['rows'],
+ ranges[token_range]['attempts']))
+ failed += 1
else: # partial result received
data, num = result
self.writer.write(data, num)
@@ -1313,7 +1315,7 @@ class ExportSession(object):
self.cluster = cluster
self.session = session
self.requests = 1
- self.lock = Lock()
+ self.lock = threading.Lock()
self.consistency_level = export_process.consistency_level
def add_request(self):
@@ -1351,6 +1353,7 @@ class ExportProcess(ChildProcess):
self.hosts_to_sessions = dict()
self.formatters = dict()
self.options = options
+ self.responses = None
def run(self):
try:
@@ -1368,6 +1371,8 @@ class ExportProcess(ChildProcess):
we can signal a global error by sending (None, error).
We terminate when the inbound queue is closed.
"""
+ self.init_feeder_thread()
+
while True:
if self.num_requests() > self.max_requests:
time.sleep(0.001) # 1 millisecond
@@ -1376,6 +1381,37 @@ class ExportProcess(ChildProcess):
token_range, info = self.inmsg.recv()
self.start_request(token_range, info)
+ def init_feeder_thread(self):
+ """
+ Start a thread to feed response messages to the parent process.
+
+ It is not safe to write on the pipe from the main thread if the parent process is still sending work and
+ not receiving yet. This will in fact block the main thread on the send, which in turn won't be able to call
+ recv(), and will therefore block the parent process on its send().
+
+ It is also not safe to write on the pipe from the driver receiving thread whilst the parent process is
+ sending work, because if the receiving thread stops making progress, then the main thread may no longer
+ call recv() due to the check on the maximum number of requests in inner_run().
+
+ These deadlocks are easiest to reproduce with a single worker process, but may well affect multiple worker
+ processes too.
+
+ It is important that the order of the responses in the queue is respected, or else the parent process may
+ kill off worker processes before it has received all the pages of the last token range.
+ """
+ def feed_errors():
+ while True:
+ try:
+ self.outmsg.send(self.responses.get())
+ except Exception, e:
+ self.printdebugmsg(e.message)
+
+ self.responses = Queue()
+
+ thread = threading.Thread(target=feed_errors)
+ thread.setDaemon(True)
+ thread.start()
+
@staticmethod
def get_error_message(err, print_traceback=False):
if isinstance(err, str):
@@ -1388,10 +1424,13 @@ class ExportProcess(ChildProcess):
msg = str(err)
return msg
- def report_error(self, err, token_range=None):
+ def report_error(self, err, token_range):
msg = self.get_error_message(err, print_traceback=self.debug)
self.printdebugmsg(msg)
- self.outmsg.send((token_range, Exception(msg)))
+ self.send((token_range, Exception(msg)))
+
+ def send(self, response):
+ self.responses.put(response)
def start_request(self, token_range, info):
"""
@@ -1434,7 +1473,8 @@ class ExportProcess(ChildProcess):
self.printdebugmsg("Warning: failed to connect to some replicas: %s" % (errors,))
return ret
- self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors))
+ self.report_error("Failed to connect to all replicas %s for %s, errors: %s" % (hosts, token_range, errors),
+ token_range)
return None
def connect(self, host):
@@ -1467,7 +1507,7 @@ class ExportProcess(ChildProcess):
self.write_rows_to_csv(token_range, rows)
else:
self.write_rows_to_csv(token_range, rows)
- self.outmsg.send((None, None))
+ self.send((None, None))
session.complete_request()
def err_callback(err):
@@ -1488,7 +1528,7 @@ class ExportProcess(ChildProcess):
writer.writerow(map(self.format_value, row))
data = (output.getvalue(), len(rows))
- self.outmsg.send((token_range, data))
+ self.send((token_range, data))
output.close()
except Exception, e: