You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/11 03:45:54 UTC
[kudu] 02/03: subprocess: maintain a thread for fork/exec
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 39bd47462d7c3d57f7207e9eebd4a8102484f568
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 10 15:23:58 2020 -0700
subprocess: maintain a thread for fork/exec
If SubprocessServer::Init() is called from a short-lived thread, because
it does a fork/exec, the subprocess will silently be reaped when the
thread exits.
Specifically, we saw this happening upon initializing the
CatalogManager, which happens in a short-lived thread, when starting up
the Ranger client. This surfaced as us getting an EOF from the
subprocess receiver thread.
Change-Id: I803b1613ef1a988df1da4c908c2c37e1fbbdcf81
Reviewed-on: http://gerrit.cloudera.org:8080/15398
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <gr...@apache.org>
---
src/kudu/subprocess/server.cc | 20 +++++++++++++++++++-
src/kudu/subprocess/server.h | 7 +++++++
src/kudu/subprocess/subprocess_server-test.cc | 19 +++++++++++++++++++
3 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 3c7607b..6687e3b 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -100,9 +100,23 @@ SubprocessServer::~SubprocessServer() {
Shutdown();
}
+void SubprocessServer::StartSubprocessThread(const StdStatusCallback& cb) {
+ Status s = process_->Start();
+ cb(s);
+ if (PREDICT_TRUE(s.ok())) {
+ // If we successfully started the process, we should stay alive until we
+ // shut down.
+ closing_.Wait();
+ }
+}
+
Status SubprocessServer::Init() {
VLOG(2) << "Starting the subprocess";
- RETURN_NOT_OK_PREPEND(process_->Start(), "Failed to start subprocess");
+ Synchronizer sync;
+ auto cb = sync.AsStdStatusCallback();
+ RETURN_NOT_OK(Thread::Create("subprocess", "start", &SubprocessServer::StartSubprocessThread,
+ this, cb, &read_thread_));
+ RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
// Start the message protocol.
CHECK(!message_protocol_);
@@ -172,6 +186,9 @@ void SubprocessServer::Shutdown() {
if (deadline_checker_) {
deadline_checker_->Join();
}
+ if (start_thread_) {
+ start_thread_->Join();
+ }
for (const auto& t : responder_threads_) {
t->Join();
}
@@ -197,6 +214,7 @@ void SubprocessServer::ReceiveMessagesThread() {
if (s.IsEndOfFile()) {
// The underlying pipe was closed. We're likely shutting down.
DCHECK_EQ(0, closing_.count());
+ LOG(INFO) << "Received an EOF from the subprocess";
return;
}
// TODO(awong): getting an error here indicates that this server and the
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 512966b..3c33e5b 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -230,6 +230,8 @@ class SubprocessServer {
private:
FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown);
+ void StartSubprocessThread(const StdStatusCallback& cb);
+
// Stop the subprocess and stop processing messages.
void Shutdown();
@@ -274,6 +276,11 @@ class SubprocessServer {
// Protocol with which to send and receive bytes to and from 'process_'.
std::shared_ptr<SubprocessProtocol> message_protocol_;
+ // Thread that runs the subprocess. Since the subprocess is run via
+ // fork/exec, this thread must stay alive for the lifetime of the server.
+ // Otherwise, the OS may silently kill the spawned child process.
+ scoped_refptr<Thread> start_thread_;
+
// Pulls requests off the request queue and serializes them via the
// message protocol.
scoped_refptr<Thread> write_thread_;
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index d14293f..857d61f 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -33,6 +33,7 @@
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
@@ -260,6 +261,24 @@ TEST_F(SubprocessServerTest, TestCallsReturnWhenShuttingDown) {
ASSERT_FALSE(s.ok());
}
+// Some usage of a subprocess warrants calling Init() from a short-lived
+// thread. Let's ensure there's no funny business when that happens (e.g.
+// ensure the OS doesn't reap the underlying process when the parent thread
+// exits).
+TEST_F(SubprocessServerTest, TestInitFromThread) {
+ Status s;
+ thread t([&] {
+ s = ResetSubprocessServer();
+ });
+ t.join();
+ ASSERT_OK(s);
+ // Wait a bit to give time for the OS to wreak havoc (though it shouldn't).
+ SleepFor(MonoDelta::FromSeconds(3));
+ SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello);
+ SubprocessResponsePB response;
+ ASSERT_OK(server_->Execute(&request, &response));
+}
+
} // namespace subprocess
} // namespace kudu