You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/11 03:45:54 UTC

[kudu] 02/03: subprocess: maintain a thread for fork/exec

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 39bd47462d7c3d57f7207e9eebd4a8102484f568
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 10 15:23:58 2020 -0700

    subprocess: maintain a thread for fork/exec
    
    If SubprocessServer::Init() is called from a short-lived thread, because
    it does a fork/exec, the subprocess will silently be reaped when the
    thread exits.
    
    Specifically, we saw this happening upon initializing the
    CatalogManager, which happens in a short-lived thread, when starting up
    the Ranger client. This surfaced as us getting an EOF from the
    subprocess receiver thread.
    
    Change-Id: I803b1613ef1a988df1da4c908c2c37e1fbbdcf81
    Reviewed-on: http://gerrit.cloudera.org:8080/15398
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 src/kudu/subprocess/server.cc                 | 20 +++++++++++++++++++-
 src/kudu/subprocess/server.h                  |  7 +++++++
 src/kudu/subprocess/subprocess_server-test.cc | 19 +++++++++++++++++++
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 3c7607b..6687e3b 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -100,9 +100,23 @@ SubprocessServer::~SubprocessServer() {
   Shutdown();
 }
 
+void SubprocessServer::StartSubprocessThread(const StdStatusCallback& cb) {
+  Status s = process_->Start();
+  cb(s);
+  if (PREDICT_TRUE(s.ok())) {
+    // If we successfully started the process, we should stay alive until we
+    // shut down.
+    closing_.Wait();
+  }
+}
+
 Status SubprocessServer::Init() {
   VLOG(2) << "Starting the subprocess";
-  RETURN_NOT_OK_PREPEND(process_->Start(), "Failed to start subprocess");
+  Synchronizer sync;
+  auto cb = sync.AsStdStatusCallback();
+  RETURN_NOT_OK(Thread::Create("subprocess", "start", &SubprocessServer::StartSubprocessThread,
+                               this, cb, &read_thread_));
+  RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
 
   // Start the message protocol.
   CHECK(!message_protocol_);
@@ -172,6 +186,9 @@ void SubprocessServer::Shutdown() {
   if (deadline_checker_) {
     deadline_checker_->Join();
   }
+  if (start_thread_) {
+    start_thread_->Join();
+  }
   for (const auto& t : responder_threads_) {
     t->Join();
   }
@@ -197,6 +214,7 @@ void SubprocessServer::ReceiveMessagesThread() {
     if (s.IsEndOfFile()) {
       // The underlying pipe was closed. We're likely shutting down.
       DCHECK_EQ(0, closing_.count());
+      LOG(INFO) << "Received an EOF from the subprocess";
       return;
     }
     // TODO(awong): getting an error here indicates that this server and the
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 512966b..3c33e5b 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -230,6 +230,8 @@ class SubprocessServer {
  private:
   FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown);
 
+  void StartSubprocessThread(const StdStatusCallback& cb);
+
   // Stop the subprocess and stop processing messages.
   void Shutdown();
 
@@ -274,6 +276,11 @@ class SubprocessServer {
   // Protocol with which to send and receive bytes to and from 'process_'.
   std::shared_ptr<SubprocessProtocol> message_protocol_;
 
+  // Thread that runs the subprocess. Since the subprocess is run via
+  // fork/exec, this thread must stay alive for the lifetime of the server.
+  // Otherwise, the OS may silently kill the spawned child process.
+  scoped_refptr<Thread> start_thread_;
+
   // Pulls requests off the request queue and serializes them via the
   // message protocol.
   scoped_refptr<Thread> write_thread_;
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index d14293f..857d61f 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -33,6 +33,7 @@
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
@@ -260,6 +261,24 @@ TEST_F(SubprocessServerTest, TestCallsReturnWhenShuttingDown) {
   ASSERT_FALSE(s.ok());
 }
 
+// Some usage of a subprocess warrants calling Init() from a short-lived
+// thread. Let's ensure there's no funny business when that happens (e.g.
+// ensure the OS doesn't reap the underlying process when the parent thread
+// exits).
+TEST_F(SubprocessServerTest, TestInitFromThread) {
+  Status s;
+  thread t([&] {
+    s = ResetSubprocessServer();
+  });
+  t.join();
+  ASSERT_OK(s);
+  // Wait a bit to give time for the OS to wreak havoc (though it shouldn't).
+  SleepFor(MonoDelta::FromSeconds(3));
+  SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
+  SubprocessResponsePB response;
+  ASSERT_OK(server_->Execute(&request, &response));
+}
+
 } // namespace subprocess
 } // namespace kudu