You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2018/01/17 07:30:59 UTC

kudu git commit: KUDU-2208 Add RETRY_ON_EINTR() to Subprocess

Repository: kudu
Updated Branches:
  refs/heads/master c40e0587b -> e453f7e5a


KUDU-2208 Add RETRY_ON_EINTR() to Subprocess

This patch submits a unit test that create a Subprocess thread
and while it is starting and waiting, another thread sends kill signals
to the Subprocess thread. Since the pthread_kill() signal is sent
asynchronously, sometimes the pthread_kill() raises error signal 3.
In that condition, the unit test logs the incident as an INFO.

Prior to this bug fix patch, the Subprocess throws an exception
because it cannot handle the kill signal in Wait() state. To fix the bug,
we add RETRY_ON_EINTR() inside Subprocess::DoWait() function.

Furthermore, this patch also moves the RETRY_ON_EINTR() function
that occurs in many places across the code to os-util.h and it adds
alarm reset in the end of TestReadFromStdoutAndStderr in
Subprocess-test.

Change-Id: I148b4619a8dda8e3e95dd6ea5c6e993a9e37a333
Reviewed-on: http://gerrit.cloudera.org:8080/9015
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e453f7e5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e453f7e5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e453f7e5

Branch: refs/heads/master
Commit: e453f7e5a20fc9bd9a9c906cca341fac4b16393c
Parents: c40e058
Author: Jeffrey F. Lukman <je...@gmail.com>
Authored: Thu Jan 11 18:52:42 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 17 07:30:36 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log_index.cc  |  5 +--
 src/kudu/util/env_posix.cc       |  8 +----
 src/kudu/util/net/socket.cc      | 10 +-----
 src/kudu/util/os-util.h          | 10 ++++++
 src/kudu/util/subprocess-test.cc | 59 +++++++++++++++++++++++++++++++++++
 src/kudu/util/subprocess.cc      |  7 +++--
 6 files changed, 77 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/consensus/log_index.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index 7a77e90..319e9eb 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -50,15 +50,12 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/os-util.h"
 
 using std::string;
 using std::vector;
 using strings::Substitute;
 
-#define RETRY_ON_EINTR(ret, expr) do {          \
-    (ret) = (expr);                             \
-  } while (((ret) == -1) && (errno == EINTR));
-
 namespace kudu {
 namespace log {
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index e34e663..28559e7 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -56,6 +56,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
@@ -122,13 +123,6 @@ typedef struct xfs_flock64 {
 #define fread_unlocked fread
 #endif
 
-// Retry on EINTR for functions like read() that return -1 on error.
-#define RETRY_ON_EINTR(err, expr) do { \
-  static_assert(std::is_signed<decltype(err)>::value == true, \
-                #err " must be a signed integer"); \
-  (err) = (expr); \
-} while ((err) == -1 && errno == EINTR)
-
 // Same as the above, but for stream API calls like fread() and fwrite().
 #define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \
   static_assert(std::is_unsigned<decltype(nread)>::value == true, \

http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index d2448f6..1389565 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -30,7 +30,6 @@
 #include <limits>
 #include <ostream>
 #include <string>
-#include <type_traits>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -45,6 +44,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/os-util.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
@@ -61,14 +61,6 @@ DEFINE_bool(socket_inject_short_recvs, false,
 TAG_FLAG(socket_inject_short_recvs, hidden);
 TAG_FLAG(socket_inject_short_recvs, unsafe);
 
-// TODO(todd) consolidate with other copies of this!
-// Retry on EINTR for functions like read() that return -1 on error.
-#define RETRY_ON_EINTR(err, expr) do { \
-  static_assert(std::is_signed<decltype(err)>::value == true, \
-                #err " must be a signed integer"); \
-  (err) = (expr); \
-} while ((err) == -1 && errno == EINTR)
-
 namespace kudu {
 
 Socket::Socket()

http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/util/os-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/os-util.h b/src/kudu/util/os-util.h
index bd1a444..04ba8f8 100644
--- a/src/kudu/util/os-util.h
+++ b/src/kudu/util/os-util.h
@@ -23,11 +23,21 @@
 #ifndef KUDU_UTIL_OS_UTIL_H
 #define KUDU_UTIL_OS_UTIL_H
 
+#include <errno.h>
+
 #include <cstdint>
 #include <string>
+#include <type_traits>
 
 #include "kudu/util/status.h"
 
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+  static_assert(std::is_signed<decltype(err)>::value, \
+                #err " must be a signed integer"); \
+  (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
 namespace kudu {
 
 // Utility methods to read interesting values from /proc.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc
index 78d71db..c7ab804 100644
--- a/src/kudu/util/subprocess-test.cc
+++ b/src/kudu/util/subprocess-test.cc
@@ -17,13 +17,19 @@
 
 #include "kudu/util/subprocess.h"
 
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
 #include <unistd.h>
 
+#include <atomic>
 #include <csignal>
 #include <cstdio>
 #include <cstdlib>
 #include <memory>
+#include <ostream>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <glog/logging.h>
@@ -31,11 +37,16 @@
 
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::atomic;
 using std::string;
+using std::thread;
 using std::vector;
 using strings::Substitute;
 
@@ -128,6 +139,9 @@ TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
     "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &"
     "wait"
   }, "", &stdout, &stderr));
+
+  // Reset the alarm when the test is done
+  SCOPED_CLEANUP({ alarm(0); })
 }
 
 // Test that environment variables can be passed to the subprocess.
@@ -288,6 +302,51 @@ TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
   ASSERT_FALSE(env_->FileExists(kTestFile));
 }
 
+// TEST KUDU-2208: Test subprocess interruption handling
+void handler(int /* signal */) {
+}
+
+TEST_F(SubprocessTest, TestSubprocessInterruptionHandling) {
+  // Create Subprocess thread
+  pthread_t t;
+  Subprocess p({ "/bin/sleep", "1" });
+  atomic<bool> t_started(false);
+  atomic<bool> t_finished(false);
+  thread subprocess_thread([&]() {
+    t = pthread_self();
+    t_started = true;
+    SleepFor(MonoDelta::FromMilliseconds(50));
+    CHECK_OK(p.Start());
+    CHECK_OK(p.Wait());
+    t_finished = true;
+  });
+
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+
+  SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+  SCOPED_CLEANUP({ subprocess_thread.join(); });
+
+  // Send kill signals to Subprocess thread
+  LOG(INFO) << "Start sending kill signals to Subprocess thread";
+  while (!t_finished) {
+    if (t_started) {
+      int err = pthread_kill(t, SIGUSR2);
+      ASSERT_TRUE(err == 0 || err == ESRCH);
+      if (err == ESRCH) {
+        LOG(INFO) << "Async kill signal failed with err=" << err <<
+            " because it tried to kill vanished subprocess_thread";
+        ASSERT_TRUE(t_finished);
+      }
+      // Add microseconds delay to make the unit test runs faster and more reliable
+      SleepFor(MonoDelta::FromMicroseconds(rand() % 1));
+    }
+  }
+}
+
 #ifdef __linux__
 // This test requires a system with /proc/<pid>/stat.
 TEST_F(SubprocessTest, TestGetProcfsState) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e453f7e5/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index a1e01ba..9e550ce 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -51,6 +51,7 @@
 #include "kudu/util/errno.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/signal.h"
 #include "kudu/util/status.h"
@@ -358,7 +359,8 @@ Status Subprocess::Start() {
   RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
   unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir,
                                                            CloseProcFdDir);
-  int ret = fork();
+  int ret;
+  RETRY_ON_EINTR(ret, fork());
   if (ret == -1) {
     return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
   }
@@ -732,7 +734,8 @@ Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
 
   const int options = (mode == NON_BLOCKING) ? WNOHANG : 0;
   int status;
-  const int rc = waitpid(child_pid_, &status, options);
+  int rc;
+  RETRY_ON_EINTR(rc, waitpid(child_pid_, &status, options));
   if (rc == -1) {
     return Status::RuntimeError("Unable to wait on child",
                                 ErrnoToString(errno), errno);