You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/08/24 05:48:50 UTC

kudu git commit: subprocess: even more robust fix for asynchronous signals

Repository: kudu
Updated Branches:
  refs/heads/master 0dab9796d -> 58394f9f2


subprocess: even more robust fix for asynchronous signals

Another x1000 loop of raft_consensus-itest yielded the following failure in
TestElectPendingVoter:

  /data/1/adar/kudu/src/kudu/integration-tests/raft_consensus-itest.cc:2035: Failure
  Value of: s.IsTimedOut()
    Actual: false
  Expected: true
  Expected AddServer() to time out. Result: OK

The fix from commit 27435da doesn't work well for AddServer because it's not
an idempotent operation. Instead, let's take a more robust tack and add some
waiting behavior directly into Subprocess, using the /proc/<pid>/stat
process state as a guide.

Change-Id: I99d400e971d6f9b22cc7b4483db94a98ec306e10
Reviewed-on: http://gerrit.cloudera.org:8080/7561
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58394f9f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58394f9f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58394f9f

Branch: refs/heads/master
Commit: 58394f9f2e2fab8e44814a72326649f84b713ae2
Parents: 0dab979
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Aug 23 20:10:33 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Aug 24 05:48:32 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/raft_consensus-itest.cc   | 17 ++----
 src/kudu/util/subprocess-test.cc                | 26 ++++++++
 src/kudu/util/subprocess.cc                     | 63 ++++++++++++++++++++
 src/kudu/util/subprocess.h                      | 18 ++++++
 4 files changed, 113 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/58394f9f/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 43bb4ab..91fa7b4 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -1737,15 +1737,10 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
     }
 
     // Ensure writes timeout while only a minority is alive.
-    //
-    // The SIGSTOP issued by Pause() is delivered asynchronously, so we may
-    // need to retry this a few times to see the timeout.
-    ASSERT_EVENTUALLY([&]{
-      Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
-                                    kTestRowKey, kTestRowIntVal, "foo",
-                                    MonoDelta::FromMilliseconds(100));
-      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-    });
+    Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
+                                  kTestRowKey, kTestRowIntVal, "foo",
+                                  MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 
     // Step down.
     ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
@@ -1753,7 +1748,7 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
     // Assert that elections time out without a live majority.
     // We specify a very short timeout here to keep the tests fast.
     ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-    Status s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
+    s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
     LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
 
@@ -1904,7 +1899,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
   Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1],
                           -1, MonoDelta::FromSeconds(1),
                           &error_code);
-  ASSERT_TRUE(s.IsTimedOut());
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 
   // Pause the leader, and restart the other servers.
   ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause());

http://git-wip-us.apache.org/repos/asf/kudu/blob/58394f9f/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc
index 6c2235b..78d71db 100644
--- a/src/kudu/util/subprocess-test.cc
+++ b/src/kudu/util/subprocess-test.cc
@@ -288,4 +288,30 @@ TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
   ASSERT_FALSE(env_->FileExists(kTestFile));
 }
 
+#ifdef __linux__
+// This test requires a system with /proc/<pid>/stat.
+TEST_F(SubprocessTest, TestGetProcfsState) {
+  // This test should be RUNNING.
+  Subprocess::ProcfsState state;
+  ASSERT_OK(Subprocess::GetProcfsState(getpid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+  // When started, /bin/sleep will be RUNNING (even though it's asleep).
+  Subprocess sleep({"/bin/sleep", "1000"});
+  ASSERT_OK(sleep.Start());
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+  // After a SIGSTOP, it should be PAUSED.
+  ASSERT_OK(sleep.Kill(SIGSTOP));
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::PAUSED, state);
+
+  // After a SIGCONT, it should be RUNNING again.
+  ASSERT_OK(sleep.Kill(SIGCONT));
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+}
+#endif
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/58394f9f/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index 8b6cf60..3163161 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -47,6 +47,7 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/make_shared.h"
 #include "kudu/util/monotime.h"
@@ -499,6 +500,39 @@ Status Subprocess::WaitNoBlock(int* wait_status) {
   return DoWait(wait_status, NON_BLOCKING);
 }
 
+Status Subprocess::GetProcfsState(int pid, ProcfsState* state) {
+  faststring data;
+  string filename = Substitute("/proc/$0/stat", pid);
+  RETURN_NOT_OK(ReadFileToString(Env::Default(), filename, &data));
+
+  // The part of /proc/<pid>/stat that's relevant for us looks like this:
+  //
+  //   "16009 (subprocess-test) R ..."
+  //
+  // The first number is the PID, the string in the parens in the command, and
+  // the single letter afterwards is the process' state.
+  //
+  // To extract the state, we scan backwards looking for the last ')', then
+  // increment past it and the separating space. This is safer than scanning
+  // forward as it properly handles commands containing parens.
+  string data_str = data.ToString();
+  const char* end_parens = strrchr(data_str.c_str(), ')');
+  if (end_parens == nullptr) {
+    return Status::RuntimeError(Substitute("unexpected layout in $0", filename));
+  }
+  char proc_state = end_parens[2];
+
+  switch (proc_state) {
+    case 'T':
+      *state = ProcfsState::PAUSED;
+      break;
+    default:
+      *state = ProcfsState::RUNNING;
+      break;
+  }
+  return Status::OK();
+}
+
 Status Subprocess::Kill(int signal) {
   if (state_ != kRunning) {
     const string err_str = "Sub-process is not running";
@@ -510,6 +544,35 @@ Status Subprocess::Kill(int signal) {
                                 ErrnoToString(errno),
                                 errno);
   }
+
+  // Signal delivery is often asynchronous. For some signals, we try to wait
+  // for the process to actually change state, using /proc/<pid>/stat as a
+  // guide. This is best-effort.
+  ProcfsState desired_state;
+  switch (signal) {
+    case SIGSTOP:
+      desired_state = ProcfsState::PAUSED;
+      break;
+    case SIGCONT:
+      desired_state = ProcfsState::RUNNING;
+      break;
+    default:
+      return Status::OK();
+  }
+  Stopwatch sw;
+  sw.start();
+  do {
+    ProcfsState current_state;
+    if (!GetProcfsState(child_pid_, &current_state).ok()) {
+      // There was some error parsing /proc/<pid>/stat (or perhaps it doesn't
+      // exist on this platform).
+      return Status::OK();
+    }
+    if (current_state == desired_state) {
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/58394f9f/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index 3d3a73c..d69d5e2 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -24,6 +24,8 @@
 #include <string>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/status.h"
@@ -160,6 +162,8 @@ class Subprocess {
   const std::string& argv0() const { return argv_[0]; }
 
  private:
+  FRIEND_TEST(SubprocessTest, TestGetProcfsState);
+
   enum State {
     kNotStarted,
     kRunning,
@@ -168,6 +172,20 @@ class Subprocess {
   enum StreamMode {SHARED, DISABLED, PIPED};
   enum WaitMode {BLOCKING, NON_BLOCKING};
 
+  // Process state according to /proc/<pid>/stat.
+  enum class ProcfsState {
+    // "T  Stopped (on a signal) or (before Linux 2.6.33) trace stopped"
+    PAUSED,
+
+    // Every other process state.
+    RUNNING,
+  };
+
+  // Extracts the process state for /proc/<pid>/stat.
+  //
+  // Returns an error if /proc/</pid>/stat doesn't exist or if parsing failed.
+  static Status GetProcfsState(int pid, ProcfsState* state);
+
   Status DoWait(int* wait_status, WaitMode mode) WARN_UNUSED_RESULT;
   void SetFdShared(int stdfd, bool share);
   int CheckAndOffer(int stdfd) const;