You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/06/10 21:00:28 UTC

[02/10] cassandra git commit: cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections

cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections

patch by Stefania Alborghetti; reviewed by Tyler Hobbs for CASSANDRA-11749


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68319f7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68319f7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68319f7c

Branch: refs/heads/cassandra-2.2
Commit: 68319f7c3be232a58e68ca91206283076aa3dedb
Parents: 06bb6b9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri May 27 11:00:27 2016 +0800
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 10 15:49:51 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                |  1 +
 pylib/cqlshlib/copyutil.py | 38 +++++++++++++++++++++++++++++++++++---
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/68319f7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 619dc61..af641e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
  * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables (CASSANDRA-11055)
  * cqlsh: apply current keyspace to source command (CASSANDRA-11152)
  * Backport CASSANDRA-11578 (CASSANDRA-11750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68319f7c/pylib/cqlshlib/copyutil.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/copyutil.py b/pylib/cqlshlib/copyutil.py
index d68812c..0016dfd 100644
--- a/pylib/cqlshlib/copyutil.py
+++ b/pylib/cqlshlib/copyutil.py
@@ -59,6 +59,7 @@ PROFILE_ON = False
 STRACE_ON = False
 DEBUG = False  # This may be set to True when initializing the task
 IS_LINUX = platform.system() == 'Linux'
+IS_WINDOWS = platform.system() == 'Windows'
 
 CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized')
 
@@ -421,9 +422,13 @@ class CopyTask(object):
     def make_params(self):
         """
         Return a dictionary of parameters to be used by the worker processes.
-        On Windows this dictionary must be pickle-able.
+        On Windows this dictionary must be pickle-able, therefore we do not pass the
+        parent connection since it may not be pickle-able. Also, on Windows child
+        processes are spawned and not forked, and therefore we don't need to shutdown
+        the parent connection anyway, see CASSANDRA-11749 for more details.
         """
         shell = self.shell
+
         return dict(ks=self.ks,
                     table=self.table,
                     local_dc=self.local_dc,
@@ -434,6 +439,7 @@ class CopyTask(object):
                     port=shell.port,
                     ssl=shell.ssl,
                     auth_provider=shell.auth_provider,
+                    parent_cluster=shell.conn if not IS_WINDOWS else None,
                     cql_version=shell.conn.cql_version,
                     config_file=self.config_file,
                     protocol_version=self.protocol_version,
@@ -1072,7 +1078,8 @@ class ImportTask(CopyTask):
                 self.processes.append(ImportProcess(self.update_params(params, i)))
 
             feeder = FeedingProcess(self.outmsg.channels[-1], self.inmsg.channels[-1],
-                                    self.outmsg.channels[:-1], self.fname, self.options)
+                                    self.outmsg.channels[:-1], self.fname, self.options,
+                                    self.shell.conn if not IS_WINDOWS else None)
             self.processes.append(feeder)
 
             self.start_processes()
@@ -1179,7 +1186,7 @@ class FeedingProcess(mp.Process):
     """
     A process that reads from import sources and sends chunks to worker processes.
     """
-    def __init__(self, inmsg, outmsg, worker_channels, fname, options):
+    def __init__(self, inmsg, outmsg, worker_channels, fname, options, parent_cluster):
         mp.Process.__init__(self, target=self.run)
         self.inmsg = inmsg
         self.outmsg = outmsg
@@ -1189,6 +1196,15 @@ class FeedingProcess(mp.Process):
         self.ingest_rate = options.copy['ingestrate']
         self.num_worker_processes = options.copy['numprocesses']
         self.chunk_id = 0
+        self.parent_cluster = parent_cluster
+
+    def on_fork(self):
+        """
+        Release any parent connections after forking, see CASSANDRA-11749 for details.
+        """
+        if self.parent_cluster:
+            printdebugmsg("Closing parent cluster sockets")
+            self.parent_cluster.shutdown()
 
     def run(self):
         pr = profile_on() if PROFILE_ON else None
@@ -1205,6 +1221,9 @@ class FeedingProcess(mp.Process):
         here we throttle using the ingest rate in the feeding process because of memory usage concerns.
         When finished we send back to the parent process the total number of rows sent.
         """
+
+        self.on_fork()
+
         reader = self.reader
         reader.start()
         channels = self.worker_channels
@@ -1268,6 +1287,7 @@ class ChildProcess(mp.Process):
         self.connect_timeout = params['connect_timeout']
         self.cql_version = params['cql_version']
         self.auth_provider = params['auth_provider']
+        self.parent_cluster = params['parent_cluster']
         self.ssl = params['ssl']
         self.protocol_version = params['protocol_version']
         self.config_file = params['config_file']
@@ -1285,6 +1305,14 @@ class ChildProcess(mp.Process):
         else:
             self.test_failures = None
 
+    def on_fork(self):
+        """
+        Release any parent connections after forking, see CASSANDRA-11749 for details.
+        """
+        if self.parent_cluster:
+            printdebugmsg("Closing parent cluster sockets")
+            self.parent_cluster.shutdown()
+
     def close(self):
         printdebugmsg("Closing queues...")
         self.inmsg.close()
@@ -1411,6 +1439,9 @@ class ExportProcess(ChildProcess):
         we can signal a global error by sending (None, error).
         We terminate when the inbound queue is closed.
         """
+
+        self.on_fork()
+
         while True:
             if self.num_requests() > self.max_requests:
                 time.sleep(0.001)  # 1 millisecond
@@ -2059,6 +2090,7 @@ class ImportProcess(ChildProcess):
         try:
             pr = profile_on() if PROFILE_ON else None
 
+            self.on_fork()
             self.inner_run(*self.make_params())
 
             if pr: