You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2020/08/20 20:04:16 UTC
[cassandra] branch trunk updated: stop passing unpicklable Cluster
object to spawned child processes in cqlsh
This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e53379 stop passing unpicklable Cluster object to spawned child processes in cqlsh
0e53379 is described below
commit 0e53379ec945e07c38aed03048ba8f76d42bdd42
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Mon Aug 17 15:01:23 2020 -0500
stop passing unpicklable Cluster object to spawned child processes in
cqlsh
Patch by Adam Holmberg, reviewed by Ekaterina Dimitrova and
brandonwilliams for CASSANDRA-16053
---
CHANGES.txt | 1 +
pylib/cqlshlib/copyutil.py | 23 ++++-------------------
2 files changed, 5 insertions(+), 19 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3272226..f4efcbd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta2
+ * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
* Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
* Fix unicode chars error input (CASSANDRA-15990)
* Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index 169a6e0..ce4d35d 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -67,7 +67,6 @@ PROFILE_ON = False
STRACE_ON = False
DEBUG = False # This may be set to True when initializing the task
IS_LINUX = platform.system() == 'Linux'
-IS_WINDOWS = platform.system() == 'Windows'
CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
@@ -480,10 +479,8 @@ class CopyTask(object):
def make_params(self):
"""
Return a dictionary of parameters to be used by the worker processes.
- On Windows this dictionary must be pickle-able, therefore we do not pass the
- parent connection since it may not be pickle-able. Also, on Windows child
- processes are spawned and not forked, and therefore we don't need to shutdown
- the parent connection anyway, see CASSANDRA-11749 for more details.
+ On platforms using 'spawn' as the default multiprocessing start method,
+ this dictionary must be picklable.
"""
shell = self.shell
@@ -497,7 +494,6 @@ class CopyTask(object):
port=shell.port,
ssl=shell.ssl,
auth_provider=shell.auth_provider,
- parent_cluster=shell.conn if not IS_WINDOWS else None,
cql_version=shell.conn.cql_version,
config_file=self.config_file,
protocol_version=self.protocol_version,
@@ -1171,8 +1167,7 @@ class ImportTask(CopyTask):
self.processes.append(ImportProcess(self.update_params(params, i)))
feeder = FeedingProcess(self.outmsg.pipes[-1], self.inmsg.pipes[-1],
- self.outmsg.pipes[:-1], self.fname, self.options,
- self.shell.conn if not IS_WINDOWS else None)
+ self.outmsg.pipes[:-1], self.fname, self.options)
self.processes.append(feeder)
self.start_processes()
@@ -1291,7 +1286,7 @@ class FeedingProcess(mp.Process):
"""
A process that reads from import sources and sends chunks to worker processes.
"""
- def __init__(self, inpipe, outpipe, worker_pipes, fname, options, parent_cluster):
+ def __init__(self, inpipe, outpipe, worker_pipes, fname, options):
super(FeedingProcess, self).__init__(target=self.run)
self.inpipe = inpipe
self.outpipe = outpipe
@@ -1305,7 +1300,6 @@ class FeedingProcess(mp.Process):
self.num_worker_processes = options.copy['numprocesses']
self.max_pending_chunks = options.copy['maxpendingchunks']
self.chunk_id = 0
- self.parent_cluster = parent_cluster
def on_fork(self):
"""
@@ -1316,10 +1310,6 @@ class FeedingProcess(mp.Process):
self.outmsg = SendingChannel(self.outpipe)
self.worker_channels = [SendingChannel(p) for p in self.worker_pipes]
- if self.parent_cluster:
- printdebugmsg("Closing parent cluster sockets")
- self.parent_cluster.shutdown()
-
def run(self):
pr = profile_on() if PROFILE_ON else None
@@ -1419,7 +1409,6 @@ class ChildProcess(mp.Process):
self.connect_timeout = params['connect_timeout']
self.cql_version = params['cql_version']
self.auth_provider = params['auth_provider']
- self.parent_cluster = params['parent_cluster']
self.ssl = params['ssl']
self.protocol_version = params['protocol_version']
self.config_file = params['config_file']
@@ -1451,10 +1440,6 @@ class ChildProcess(mp.Process):
self.inmsg = ReceivingChannel(self.inpipe)
self.outmsg = SendingChannel(self.outpipe)
- if self.parent_cluster:
- printdebugmsg("Closing parent cluster sockets")
- self.parent_cluster.shutdown()
-
def close(self):
printdebugmsg("Closing queues...")
self.inmsg.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org