You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2020/08/20 20:04:16 UTC

[cassandra] branch trunk updated: stop passing unpicklable Cluster object to spawned child processes in cqlsh

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e53379  stop passing unpicklable Cluster object to spawned child processes in cqlsh
0e53379 is described below

commit 0e53379ec945e07c38aed03048ba8f76d42bdd42
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Mon Aug 17 15:01:23 2020 -0500

    stop passing unpicklable Cluster object to spawned child processes in
    cqlsh
    
    Patch by Adam Holmberg, reviewed by Ekaterina Dimitrova and
    brandonwilliams for CASSANDRA-16053
---
 CHANGES.txt                |  1 +
 pylib/cqlshlib/copyutil.py | 23 ++++-------------------
 2 files changed, 5 insertions(+), 19 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3272226..f4efcbd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta2
+ * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
  * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
  * Fix unicode chars error input (CASSANDRA-15990)
  * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index 169a6e0..ce4d35d 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -67,7 +67,6 @@ PROFILE_ON = False
 STRACE_ON = False
 DEBUG = False  # This may be set to True when initializing the task
 IS_LINUX = platform.system() == 'Linux'
-IS_WINDOWS = platform.system() == 'Windows'
 
 CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
 
@@ -480,10 +479,8 @@ class CopyTask(object):
     def make_params(self):
         """
         Return a dictionary of parameters to be used by the worker processes.
-        On Windows this dictionary must be pickle-able, therefore we do not pass the
-        parent connection since it may not be pickle-able. Also, on Windows child
-        processes are spawned and not forked, and therefore we don't need to shutdown
-        the parent connection anyway, see CASSANDRA-11749 for more details.
+        On platforms using 'spawn' as the default multiprocessing start method,
+        this dictionary must be picklable.
         """
         shell = self.shell
 
@@ -497,7 +494,6 @@ class CopyTask(object):
                     port=shell.port,
                     ssl=shell.ssl,
                     auth_provider=shell.auth_provider,
-                    parent_cluster=shell.conn if not IS_WINDOWS else None,
                     cql_version=shell.conn.cql_version,
                     config_file=self.config_file,
                     protocol_version=self.protocol_version,
@@ -1171,8 +1167,7 @@ class ImportTask(CopyTask):
                 self.processes.append(ImportProcess(self.update_params(params, i)))
 
             feeder = FeedingProcess(self.outmsg.pipes[-1], self.inmsg.pipes[-1],
-                                    self.outmsg.pipes[:-1], self.fname, self.options,
-                                    self.shell.conn if not IS_WINDOWS else None)
+                                    self.outmsg.pipes[:-1], self.fname, self.options)
             self.processes.append(feeder)
 
             self.start_processes()
@@ -1291,7 +1286,7 @@ class FeedingProcess(mp.Process):
     """
     A process that reads from import sources and sends chunks to worker processes.
     """
-    def __init__(self, inpipe, outpipe, worker_pipes, fname, options, parent_cluster):
+    def __init__(self, inpipe, outpipe, worker_pipes, fname, options):
         super(FeedingProcess, self).__init__(target=self.run)
         self.inpipe = inpipe
         self.outpipe = outpipe
@@ -1305,7 +1300,6 @@ class FeedingProcess(mp.Process):
         self.num_worker_processes = options.copy['numprocesses']
         self.max_pending_chunks = options.copy['maxpendingchunks']
         self.chunk_id = 0
-        self.parent_cluster = parent_cluster
 
     def on_fork(self):
         """
@@ -1316,10 +1310,6 @@ class FeedingProcess(mp.Process):
         self.outmsg = SendingChannel(self.outpipe)
         self.worker_channels = [SendingChannel(p) for p in self.worker_pipes]
 
-        if self.parent_cluster:
-            printdebugmsg("Closing parent cluster sockets")
-            self.parent_cluster.shutdown()
-
     def run(self):
         pr = profile_on() if PROFILE_ON else None
 
@@ -1419,7 +1409,6 @@ class ChildProcess(mp.Process):
         self.connect_timeout = params['connect_timeout']
         self.cql_version = params['cql_version']
         self.auth_provider = params['auth_provider']
-        self.parent_cluster = params['parent_cluster']
         self.ssl = params['ssl']
         self.protocol_version = params['protocol_version']
         self.config_file = params['config_file']
@@ -1451,10 +1440,6 @@ class ChildProcess(mp.Process):
         self.inmsg = ReceivingChannel(self.inpipe)
         self.outmsg = SendingChannel(self.outpipe)
 
-        if self.parent_cluster:
-            printdebugmsg("Closing parent cluster sockets")
-            self.parent_cluster.shutdown()
-
     def close(self):
         printdebugmsg("Closing queues...")
         self.inmsg.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org