You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:17 UTC

[05/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.cc b/be/src/kudu/util/subprocess.cc
new file mode 100644
index 0000000..d68cb7f
--- /dev/null
+++ b/be/src/kudu/util/subprocess.cc
@@ -0,0 +1,815 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <signal.h>
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <ev++.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+namespace kudu {
+
+// Make glog's STL-compatible operators visible inside this namespace.
+using ::operator<<;
+
+namespace {
+
+static double kProcessWaitTimeoutSeconds = 5.0;
+
+static const char* kProcSelfFd =
+#if defined(__APPLE__)
+  "/dev/fd";
+#else
+  "/proc/self/fd";
+#endif // defined(__APPLE__)
+
+#if defined(__linux__)
+#define READDIR readdir64
+#define DIRENT dirent64
+#else
+#define READDIR readdir
+#define DIRENT dirent
+#endif
+
+// Since opendir() calls malloc(), this must be called before fork().
+// This function is not async-signal-safe.
+Status OpenProcFdDir(DIR** dir) {
+  *dir = opendir(kProcSelfFd);
+  if (PREDICT_FALSE(dir == nullptr)) {
+    return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd),
+                           ErrnoToString(errno), errno);
+  }
+  return Status::OK();
+}
+
+// Close the directory stream opened by OpenProcFdDir().
+// This function is not async-signal-safe.
+void CloseProcFdDir(DIR* dir) {
+  if (PREDICT_FALSE(closedir(dir) == -1)) {
+    LOG(WARNING) << "Unable to close fd dir: "
+                 << Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd),
+                                    ErrnoToString(errno), errno).ToString();
+  }
+}
+
+// Close all open file descriptors other than stdin, stderr, stdout.
+// Expects a directory stream created by OpenProdFdDir() as a parameter.
+// This function is called after fork() and must not call malloc().
+// The rule of thumb is to only call async-signal-safe functions in such cases
+// if at all possible.
+void CloseNonStandardFDs(DIR* fd_dir) {
+  // This is implemented by iterating over the open file descriptors
+  // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone
+  // since it may not represent the highest open fd if the fd soft limit
+  // has changed since the process started. This should also be faster
+  // since iterating over all possible fds is likely to cause 64k+ syscalls
+  // in typical configurations.
+  //
+  // Note also that this doesn't use any of the Env utility functions, to
+  // make it as lean and mean as possible -- this runs in the subprocess
+  // after a fork, so there's some possibility that various global locks
+  // inside malloc() might be held, so allocating memory is a no-no.
+  PCHECK(fd_dir != nullptr);
+  int dir_fd = dirfd(fd_dir);
+
+  struct DIRENT* ent;
+  // readdir64() is not reentrant (it uses a static buffer) and it also
+  // locks fd_dir->lock, so it must not be called in a multi-threaded
+  // environment and is certainly not async-signal-safe.
+  // However, it appears to be safe to call right after fork(), since only one
+  // thread exists in the child process at that time. It also does not call
+  // malloc() or free(). We could use readdir64_r() instead, but all that
+  // buys us is reentrancy, and not async-signal-safety, due to the use of
+  // dir->lock, so seems not worth the added complexity in lifecycle & plumbing.
+  while ((ent = READDIR(fd_dir)) != nullptr) {
+    uint32_t fd;
+    if (!safe_strtou32(ent->d_name, &fd)) continue;
+    if (!(fd == STDIN_FILENO  ||
+          fd == STDOUT_FILENO ||
+          fd == STDERR_FILENO ||
+          fd == dir_fd))  {
+      int ret;
+      RETRY_ON_EINTR(ret, close(fd));
+    }
+  }
+}
+
+void RedirectToDevNull(int fd) {
+  // We must not close stderr or stdout, because then when a new file
+  // descriptor is opened, it might reuse the closed file descriptor's number
+  // (we always allocate the lowest available file descriptor number).
+  //
+  // Instead, we open /dev/null as a new file descriptor, then use dup2() to
+  // atomically close 'fd' and reuse its file descriptor number as an open file
+  // handle to /dev/null.
+  //
+  // It is expected that the file descriptor allocated when opening /dev/null
+  // will be closed when the child process closes all of its "non-standard"
+  // file descriptors later on.
+  int dev_null;
+  RETRY_ON_EINTR(dev_null, open("/dev/null", O_WRONLY));
+  if (dev_null < 0) {
+    PLOG(WARNING) << "failed to open /dev/null";
+  } else {
+    int ret;
+    RETRY_ON_EINTR(ret, dup2(dev_null, fd));
+    PCHECK(ret);
+  }
+}
+
+// Stateful libev watcher to help ReadFdsFully().
+class ReadFdsFullyHelper {
+ public:
+  ReadFdsFullyHelper(string progname, ev::dynamic_loop* loop, int fd)
+      : progname_(std::move(progname)) {
+    // Bind the watcher to the provided loop, to this functor, and to the
+    // readable fd.
+    watcher_.set(*loop);
+    watcher_.set(this);
+    watcher_.set(fd, ev::READ);
+
+    // The watcher will now be polled when its loop is run.
+    watcher_.start();
+  }
+
+  void operator() (ev::io &w, int revents) {
+    DCHECK_EQ(ev::READ, revents);
+
+    char buf[1024];
+    ssize_t n;
+    RETRY_ON_EINTR(n, read(w.fd, buf, arraysize(buf)));
+    if (n == 0) {
+      // EOF, stop watching.
+      w.stop();
+    } else if (n < 0) {
+      // A fatal error. Store it and stop watching.
+      status_ = Status::IOError("IO error reading from " + progname_,
+                                ErrnoToString(errno), errno);
+      w.stop();
+    } else {
+      // Add our bytes and keep watching.
+      output_.append(buf, n);
+    }
+  }
+
+  const Status& status() const { return status_; }
+  const string& output() const { return output_; }
+
+ private:
+  const string progname_;
+
+  ev::io watcher_;
+  string output_;
+  Status status_;
+};
+
+// Reads from all descriptors in 'fds' until EOF on all of them. If any read
+// yields an error, it is returned. Otherwise, 'out' contains the bytes read
+// for each fd, in the same order as was in 'fds'.
+Status ReadFdsFully(const string& progname,
+                    const vector<int>& fds,
+                    vector<string>* out) {
+  ev::dynamic_loop loop;
+
+  // Set up a watcher for each fd.
+  vector<unique_ptr<ReadFdsFullyHelper>> helpers;
+  for (int fd : fds) {
+    helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd));
+  }
+
+  // This will read until all fds return EOF.
+  loop.run();
+
+  // Check for failures.
+  for (const auto& h : helpers) {
+    if (!h->status().ok()) {
+      return h->status();
+    }
+  }
+
+  // No failures; write the output to the caller.
+  for (const auto& h : helpers) {
+    out->push_back(h->output());
+  }
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+Subprocess::Subprocess(vector<string> argv, int sig_on_destruct)
+    : program_(argv[0]),
+      argv_(std::move(argv)),
+      state_(kNotStarted),
+      child_pid_(-1),
+      fd_state_(),
+      child_fds_(),
+      sig_on_destruct_(sig_on_destruct) {
+  // By convention, the first argument in argv is the base name of the program.
+  argv_[0] = BaseName(argv_[0]);
+
+  fd_state_[STDIN_FILENO]   = PIPED;
+  fd_state_[STDOUT_FILENO]  = SHARED;
+  fd_state_[STDERR_FILENO]  = SHARED;
+  child_fds_[STDIN_FILENO]  = -1;
+  child_fds_[STDOUT_FILENO] = -1;
+  child_fds_[STDERR_FILENO] = -1;
+}
+
+Subprocess::~Subprocess() {
+  if (state_ == kRunning) {
+    LOG(WARNING) << Substitute(
+        "Child process $0 ($1) was orphaned. Sending signal $2...",
+        child_pid_, JoinStrings(argv_, " "), sig_on_destruct_);
+    WARN_NOT_OK(KillAndWait(sig_on_destruct_),
+                Substitute("Failed to KillAndWait() with signal $0",
+                           sig_on_destruct_));
+  }
+
+  for (int i = 0; i < 3; ++i) {
+    if (fd_state_[i] == PIPED && child_fds_[i] >= 0) {
+      int ret;
+      RETRY_ON_EINTR(ret, close(child_fds_[i]));
+    }
+  }
+}
+
+#if defined(__APPLE__)
+static int pipe2(int pipefd[2], int flags) {
+  DCHECK_EQ(O_CLOEXEC, flags);
+
+  int new_fds[2];
+  if (pipe(new_fds) == -1) {
+    return -1;
+  }
+  if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) {
+    int ret;
+    RETRY_ON_EINTR(ret, close(new_fds[0]));
+    RETRY_ON_EINTR(ret, close(new_fds[1]));
+    return -1;
+  }
+  if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) {
+    int ret;
+    RETRY_ON_EINTR(ret, close(new_fds[0]));
+    RETRY_ON_EINTR(ret, close(new_fds[1]));
+    return -1;
+  }
+  pipefd[0] = new_fds[0];
+  pipefd[1] = new_fds[1];
+  return 0;
+}
+#endif
+
+Status Subprocess::Start() {
+  VLOG(2) << "Invoking command: " << argv_;
+  if (state_ != kNotStarted) {
+    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  if (argv_.empty()) {
+    return Status::InvalidArgument("argv must have at least one elem");
+  }
+
+  // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes.
+  IgnoreSigPipe();
+
+  vector<char*> argv_ptrs;
+  for (const string& arg : argv_) {
+    argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
+  }
+  argv_ptrs.push_back(nullptr);
+
+  // Pipe from caller process to child's stdin
+  // [0] = stdin for child, [1] = how parent writes to it
+  int child_stdin[2] = {-1, -1};
+  if (fd_state_[STDIN_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0);
+  }
+  // Pipe from child's stdout back to caller process
+  // [0] = how parent reads from child's stdout, [1] = how child writes to it
+  int child_stdout[2] = {-1, -1};
+  if (fd_state_[STDOUT_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0);
+  }
+  // Pipe from child's stderr back to caller process
+  // [0] = how parent reads from child's stderr, [1] = how child writes to it
+  int child_stderr[2] = {-1, -1};
+  if (fd_state_[STDERR_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0);
+  }
+  // The synchronization pipe: this trick is to make sure the parent returns
+  // control only after the child process has invoked execvp().
+  int sync_pipe[2];
+  PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0);
+
+  DIR* fd_dir = nullptr;
+  RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
+  unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir,
+                                                           CloseProcFdDir);
+  int ret;
+  RETRY_ON_EINTR(ret, fork());
+  if (ret == -1) {
+    return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
+  }
+  if (ret == 0) { // We are the child
+    // Send the child a SIGTERM when the parent dies. This is done as early
+    // as possible in the child's life to prevent any orphaning whatsoever
+    // (e.g. from KUDU-402).
+#if defined(__linux__)
+    // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways
+    // to prevent orphans when parent is killed.
+    prctl(PR_SET_PDEATHSIG, SIGKILL);
+#endif
+
+    // stdin
+    if (fd_state_[STDIN_FILENO] == PIPED) {
+      int dup2_ret;
+      RETRY_ON_EINTR(dup2_ret, dup2(child_stdin[0], STDIN_FILENO));
+      PCHECK(dup2_ret == STDIN_FILENO);
+    } else {
+      DCHECK_EQ(SHARED, fd_state_[STDIN_FILENO]);
+    }
+
+    // stdout
+    switch (fd_state_[STDOUT_FILENO]) {
+      case PIPED: {
+        int dup2_ret;
+        RETRY_ON_EINTR(dup2_ret, dup2(child_stdout[1], STDOUT_FILENO));
+        PCHECK(dup2_ret == STDOUT_FILENO);
+        break;
+      }
+      case DISABLED: {
+        RedirectToDevNull(STDOUT_FILENO);
+        break;
+      }
+      default:
+        DCHECK_EQ(SHARED, fd_state_[STDOUT_FILENO]);
+        break;
+    }
+
+    // stderr
+    switch (fd_state_[STDERR_FILENO]) {
+      case PIPED: {
+        int dup2_ret;
+        RETRY_ON_EINTR(dup2_ret, dup2(child_stderr[1], STDERR_FILENO));
+        PCHECK(dup2_ret == STDERR_FILENO);
+        break;
+      }
+      case DISABLED: {
+        RedirectToDevNull(STDERR_FILENO);
+        break;
+      }
+      default:
+        DCHECK_EQ(SHARED, fd_state_[STDERR_FILENO]);
+        break;
+    }
+
+    // Close the read side of the sync pipe;
+    // the write side should be closed upon execvp().
+    int close_ret;
+    RETRY_ON_EINTR(close_ret, close(sync_pipe[0]));
+    PCHECK(close_ret == 0);
+
+    CloseNonStandardFDs(fd_dir);
+
+    // Ensure we are not ignoring or blocking signals in the child process.
+    ResetAllSignalMasksToUnblocked();
+
+    // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its
+    // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we
+    // don't explicitly ignore any other signals in Kudu.
+    ResetSigPipeHandlerToDefault();
+
+    // Set the current working directory of the subprocess.
+    if (!cwd_.empty()) {
+      PCHECK(chdir(cwd_.c_str()) == 0);
+    }
+
+    // Set the environment for the subprocess. This is more portable than
+    // using execvpe(), which doesn't exist on OS X. We rely on the 'p'
+    // variant of exec to do $PATH searching if the executable specified
+    // by the caller isn't an absolute path.
+    for (const auto& env : env_) {
+      ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */));
+    }
+
+    execvp(program_.c_str(), &argv_ptrs[0]);
+    int err = errno;
+    PLOG(ERROR) << "Couldn't exec " << program_;
+    _exit(err);
+  } else {
+    // We are the parent
+    child_pid_ = ret;
+    // Close child's side of the pipes
+    int close_ret;
+    if (fd_state_[STDIN_FILENO]  == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdin[0]));
+    if (fd_state_[STDOUT_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdout[1]));
+    if (fd_state_[STDERR_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stderr[1]));
+    // Keep parent's side of the pipes
+    child_fds_[STDIN_FILENO]  = child_stdin[1];
+    child_fds_[STDOUT_FILENO] = child_stdout[0];
+    child_fds_[STDERR_FILENO] = child_stderr[0];
+
+    // Wait for the child process to invoke execvp(). The trick involves
+    // a pipe with O_CLOEXEC option for its descriptors. The parent process
+    // performs blocking read from the pipe while the write side of the pipe
+    // is kept open by the child (it does not write any data, though). The write
+    // side of the pipe is closed when the child invokes execvp(). At that
+    // point, the parent should receive EOF, i.e. read() should return 0.
+    {
+      // Close the write side of the sync pipe. It's crucial to make sure
+      // it succeeds otherwise the blocking read() below might wait forever
+      // even if the child process has closed the pipe.
+      RETRY_ON_EINTR(close_ret, close(sync_pipe[1]));
+      PCHECK(close_ret == 0);
+      while (true) {
+        uint8_t buf;
+        int err = 0;
+        int rc;
+        RETRY_ON_EINTR(rc, read(sync_pipe[0], &buf, 1));
+        if (rc == -1) {
+          err = errno;
+        }
+        RETRY_ON_EINTR(close_ret, close(sync_pipe[0]));
+        PCHECK(close_ret == 0);
+        if (rc == 0) {
+          // That's OK -- expecting EOF from the other side of the pipe.
+          break;
+        } else if (rc == -1) {
+          // Other errors besides EINTR are not expected.
+          return Status::RuntimeError("Unexpected error from the sync pipe",
+                                      ErrnoToString(err), err);
+        }
+        // No data is expected from the sync pipe.
+        LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc);
+      }
+    }
+  }
+
+  state_ = kRunning;
+  return Status::OK();
+}
+
+Status Subprocess::Wait(int* wait_status) {
+  return DoWait(wait_status, BLOCKING);
+}
+
+Status Subprocess::WaitNoBlock(int* wait_status) {
+  return DoWait(wait_status, NON_BLOCKING);
+}
+
+Status Subprocess::GetProcfsState(int pid, ProcfsState* state) {
+  faststring data;
+  string filename = Substitute("/proc/$0/stat", pid);
+  RETURN_NOT_OK(ReadFileToString(Env::Default(), filename, &data));
+
+  // The part of /proc/<pid>/stat that's relevant for us looks like this:
+  //
+  //   "16009 (subprocess-test) R ..."
+  //
+  // The first number is the PID, the string in the parens in the command, and
+  // the single letter afterwards is the process' state.
+  //
+  // To extract the state, we scan backwards looking for the last ')', then
+  // increment past it and the separating space. This is safer than scanning
+  // forward as it properly handles commands containing parens.
+  string data_str = data.ToString();
+  const char* end_parens = strrchr(data_str.c_str(), ')');
+  if (end_parens == nullptr) {
+    return Status::RuntimeError(Substitute("unexpected layout in $0", filename));
+  }
+  char proc_state = end_parens[2];
+
+  switch (proc_state) {
+    case 'T':
+      *state = ProcfsState::PAUSED;
+      break;
+    default:
+      *state = ProcfsState::RUNNING;
+      break;
+  }
+  return Status::OK();
+}
+
+Status Subprocess::Kill(int signal) {
+  if (state_ != kRunning) {
+    const string err_str = "Sub-process is not running";
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  if (kill(child_pid_, signal) != 0) {
+    return Status::RuntimeError("Unable to kill",
+                                ErrnoToString(errno),
+                                errno);
+  }
+
+  // Signal delivery is often asynchronous. For some signals, we try to wait
+  // for the process to actually change state, using /proc/<pid>/stat as a
+  // guide. This is best-effort.
+  ProcfsState desired_state;
+  switch (signal) {
+    case SIGSTOP:
+      desired_state = ProcfsState::PAUSED;
+      break;
+    case SIGCONT:
+      desired_state = ProcfsState::RUNNING;
+      break;
+    default:
+      return Status::OK();
+  }
+  Stopwatch sw;
+  sw.start();
+  do {
+    ProcfsState current_state;
+    if (!GetProcfsState(child_pid_, &current_state).ok()) {
+      // There was some error parsing /proc/<pid>/stat (or perhaps it doesn't
+      // exist on this platform).
+      return Status::OK();
+    }
+    if (current_state == desired_state) {
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
+  return Status::OK();
+}
+
+Status Subprocess::KillAndWait(int signal) {
+  string procname = Substitute("$0 (pid $1)", argv0(), pid());
+
+  // This is a fatal error because all errors in Kill() are signal-independent,
+  // so Kill(SIGKILL) is just as likely to fail if this did.
+  RETURN_NOT_OK_PREPEND(
+      Kill(signal), Substitute("Failed to send signal $0 to $1",
+                               signal, procname));
+  if (signal == SIGKILL) {
+    RETURN_NOT_OK_PREPEND(
+        Wait(), Substitute("Failed to wait on $0", procname));
+  } else {
+    Status s;
+    Stopwatch sw;
+    sw.start();
+    do {
+      s = WaitNoBlock();
+      if (s.ok()) {
+        break;
+      } else if (!s.IsTimedOut()) {
+        // An unexpected error in WaitNoBlock() is likely to manifest repeatedly,
+        // so there's no point in retrying this.
+        RETURN_NOT_OK_PREPEND(
+            s, Substitute("Unexpected failure while waiting on $0", procname));
+      }
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
+    if (s.IsTimedOut()) {
+      return KillAndWait(SIGKILL);
+    }
+  }
+  return Status::OK();
+}
+
+Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const {
+  if (state_ != kExited) {
+    const string err_str = "Sub-process termination hasn't yet been detected";
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  string info;
+  int status;
+  if (WIFEXITED(wait_status_)) {
+    status = WEXITSTATUS(wait_status_);
+    if (status == 0) {
+      info = Substitute("$0: process successfully exited", program_);
+    } else {
+      info = Substitute("$0: process exited with non-zero status $1",
+                        program_, status);
+    }
+  } else if (WIFSIGNALED(wait_status_)) {
+    // Using signal number as exit status.
+    status = WTERMSIG(wait_status_);
+    info = Substitute("$0: process exited on signal $1", program_, status);
+#if defined(WCOREDUMP)
+    if (WCOREDUMP(wait_status_)) {
+      SubstituteAndAppend(&info, " (core dumped)");
+    }
+#endif
+  } else {
+    status = -1;
+    info = Substitute("$0: process reported unexpected wait status $1",
+                      program_, wait_status_);
+    LOG(DFATAL) << info;
+  }
+  if (exit_status) {
+    *exit_status = status;
+  }
+  if (info_str) {
+    *info_str = info;
+  }
+  return Status::OK();
+}
+
+Status Subprocess::Call(const string& arg_str) {
+  vector<string> argv = Split(arg_str, " ");
+  return Call(argv, "", nullptr, nullptr);
+}
+
+Status Subprocess::Call(const vector<string>& argv,
+                        const string& stdin_in,
+                        string* stdout_out,
+                        string* stderr_out) {
+  Subprocess p(argv);
+
+  if (stdout_out) {
+    p.ShareParentStdout(false);
+  }
+  if (stderr_out) {
+    p.ShareParentStderr(false);
+  }
+  RETURN_NOT_OK_PREPEND(p.Start(),
+                        "Unable to fork " + argv[0]);
+
+  if (!stdin_in.empty()) {
+    ssize_t written;
+    RETRY_ON_EINTR(written, write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size()));
+    if (written < stdin_in.size()) {
+      return Status::IOError("Unable to write to child process stdin",
+                             ErrnoToString(errno), errno);
+    }
+  }
+
+  int err;
+  RETRY_ON_EINTR(err, close(p.ReleaseChildStdinFd()));
+  if (PREDICT_FALSE(err != 0)) {
+    return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno);
+  }
+
+  vector<int> fds;
+  if (stdout_out) {
+    fds.push_back(p.from_child_stdout_fd());
+  }
+  if (stderr_out) {
+    fds.push_back(p.from_child_stderr_fd());
+  }
+  vector<string> outv;
+  RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv));
+
+  // Given that ReadFdsFully captures the strings in the order in which we
+  // had installed 'fds' above, it can be assured that we can receive
+  // as many strings as there were 'fds' in the vector and in that order.
+  CHECK_EQ(outv.size(), fds.size());
+  if (stdout_out) {
+    *stdout_out = std::move(outv.front());
+  }
+  if (stderr_out) {
+    *stderr_out = std::move(outv.back());
+  }
+
+  RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]);
+  int exit_status;
+  string exit_info_str;
+  RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str));
+  if (exit_status != 0) {
+    return Status::RuntimeError(exit_info_str);
+  }
+  return Status::OK();
+}
+
+pid_t Subprocess::pid() const {
+  CHECK_EQ(state_, kRunning);
+  return child_pid_;
+}
+
+Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
+  if (state_ == kExited) {
+    if (wait_status) {
+      *wait_status = wait_status_;
+    }
+    return Status::OK();
+  }
+  if (state_ != kRunning) {
+    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+
+  const int options = (mode == NON_BLOCKING) ? WNOHANG : 0;
+  int status;
+  int rc;
+  RETRY_ON_EINTR(rc, waitpid(child_pid_, &status, options));
+  if (rc == -1) {
+    return Status::RuntimeError("Unable to wait on child",
+                                ErrnoToString(errno), errno);
+  }
+  if (mode == NON_BLOCKING && rc == 0) {
+    return Status::TimedOut("");
+  }
+  CHECK_EQ(rc, child_pid_);
+  CHECK(WIFEXITED(status) || WIFSIGNALED(status));
+
+  child_pid_ = -1;
+  wait_status_ = status;
+  state_ = kExited;
+  if (wait_status) {
+    *wait_status = status;
+  }
+  return Status::OK();
+}
+
+void Subprocess::SetEnvVars(map<string, string> env) {
+  CHECK_EQ(state_, kNotStarted);
+  env_ = std::move(env);
+}
+
+void Subprocess::SetCurrentDir(string cwd) {
+  CHECK_EQ(state_, kNotStarted);
+  cwd_ = std::move(cwd);
+}
+
+void Subprocess::SetFdShared(int stdfd, bool share) {
+  CHECK_EQ(state_, kNotStarted);
+  fd_state_[stdfd] = share ? SHARED : PIPED;
+}
+
+void Subprocess::DisableStderr() {
+  CHECK_EQ(state_, kNotStarted);
+  fd_state_[STDERR_FILENO] = DISABLED;
+}
+
+void Subprocess::DisableStdout() {
+  CHECK_EQ(state_, kNotStarted);
+  fd_state_[STDOUT_FILENO] = DISABLED;
+}
+
+int Subprocess::CheckAndOffer(int stdfd) const {
+  CHECK_EQ(state_, kRunning);
+  CHECK_EQ(fd_state_[stdfd], PIPED);
+  return child_fds_[stdfd];
+}
+
+int Subprocess::ReleaseChildFd(int stdfd) {
+  CHECK_EQ(state_, kRunning);
+  CHECK_GE(child_fds_[stdfd], 0);
+  CHECK_EQ(fd_state_[stdfd], PIPED);
+  int ret = child_fds_[stdfd];
+  child_fds_[stdfd] = -1;
+  return ret;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.h b/be/src/kudu/util/subprocess.h
new file mode 100644
index 0000000..4d33c8f
--- /dev/null
+++ b/be/src/kudu/util/subprocess.h
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SUBPROCESS_H
+#define KUDU_UTIL_SUBPROCESS_H
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wrapper around a spawned subprocess.
+//
+// program will be treated as an absolute path unless it begins with a dot or a
+// slash.
+//
+// This takes care of creating pipes to/from the subprocess and offers
+// basic functionality to wait on it or send signals.
+// By default, child process only has stdin captured and separate from the parent.
+// The stdout/stderr streams are shared with the parent by default.
+//
+// The process may only be started and waited on/killed once.
+//
+// Optionally, user may change parent/child stream sharing. Also, a user may disable
+// a subprocess stream. A user cannot do both.
+//
+// Note that, when the Subprocess object is destructed, the child process
+// will be forcibly SIGKILLed to avoid orphaning processes.
+class Subprocess {
+ public:
+  // Constructs a new Subprocess that will execute 'argv' on Start().
+  //
+  // If the process isn't explicitly killed, 'sig_on_destroy' will be delivered
+  // to it when the Subprocess goes out of scope.
+  explicit Subprocess(std::vector<std::string> argv, int sig_on_destruct = SIGKILL);
+  ~Subprocess();
+
+  // Disables subprocess stream output. Is mutually exclusive with stream sharing.
+  //
+  // Must be called before subprocess starts.
+  void DisableStderr();
+  void DisableStdout();
+
+  // Configures the subprocess to share the parent's stream. Is mutually
+  // exclusive with stream disabling.
+  //
+  // Must be called before subprocess starts.
+  void ShareParentStdin(bool  share = true) { SetFdShared(STDIN_FILENO,  share); }
+  void ShareParentStdout(bool share = true) { SetFdShared(STDOUT_FILENO, share); }
+  void ShareParentStderr(bool share = true) { SetFdShared(STDERR_FILENO, share); }
+
+  // Add environment variables to be set before executing the subprocess.
+  //
+  // These environment variables are merged into the existing environment
+  // of the parent process. In other words, there is no need to prime this
+  // map with the current environment; instead, just specify any variables
+  // that should be overridden.
+  //
+  // Repeated calls to this function replace earlier calls.
+  void SetEnvVars(std::map<std::string, std::string> env);
+
+  // Set the initial current working directory of the subprocess.
+  //
+  // Must be set before starting the subprocess.
+  void SetCurrentDir(std::string cwd);
+
+  // Start the subprocess. Can only be called once.
+  //
+  // This returns a bad Status if the fork() fails. However,
+  // note that if the executable path was incorrect such that
+  // exec() fails, this will still return Status::OK. You must
+  // use Wait() to check for failure.
+  Status Start() WARN_UNUSED_RESULT;
+
+  // Wait for the subprocess to exit. The return value is the same as
+  // that of the waitpid() syscall. Only call after starting.
+  //
+  // NOTE: unlike the standard wait(2) call, this may be called multiple
+  // times. If the process has exited, it will repeatedly return the same
+  // exit code.
+  Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+  // Like the above, but does not block. This returns Status::TimedOut
+  // immediately if the child has not exited. Otherwise returns Status::OK
+  // and sets *ret. Only call after starting.
+  //
+  // NOTE: unlike the standard wait(2) call, this may be called multiple
+  // times. If the process has exited, it will repeatedly return the same
+  // exit code.
+  Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+  // Send a signal to the subprocess.
+  // Note that this does not reap the process -- you must still Wait()
+  // in order to reap it. Only call after starting.
+  Status Kill(int signal) WARN_UNUSED_RESULT;
+
+  // Sends a signal to the subprocess and waits for it to exit.
+  //
+  // If the signal is not SIGKILL and the process doesn't appear to be exiting,
+  // retries with SIGKILL.
+  Status KillAndWait(int signal);
+
+  // Retrieve exit status of the process awaited by Wait() and/or WaitNoBlock()
+  // methods. Must be called only after calling Wait()/WaitNoBlock().
+  Status GetExitStatus(int* exit_status, std::string* info_str = nullptr) const
+      WARN_UNUSED_RESULT;
+
+  // Helper method that creates a Subprocess, issues a Start() then a Wait().
+  // Expects a blank-separated list of arguments, with the first being the
+  // full path to the executable.
+  // The returned Status will only be OK if all steps were successful and
+  // the return code was 0.
+  static Status Call(const std::string& arg_str) WARN_UNUSED_RESULT;
+
+  // Same as above, but accepts a vector that includes the path to the
+  // executable as argv[0] and the arguments to the program in argv[1..n].
+  //
+  // Writes the value of 'stdin_in' to the subprocess' stdin. The length of
+  // 'stdin_in' should be limited to 64kib.
+  //
+  // Also collects the output from the child process stdout and stderr into
+  // 'stdout_out' and 'stderr_out' respectively.
+  static Status Call(const std::vector<std::string>& argv,
+                     const std::string& stdin_in = "",
+                     std::string* stdout_out = nullptr,
+                     std::string* stderr_out = nullptr) WARN_UNUSED_RESULT;
+
+  // Return the pipe fd to the child's standard stream.
+  // Stream should not be disabled or shared.
+  int to_child_stdin_fd()    const { return CheckAndOffer(STDIN_FILENO); }
+  int from_child_stdout_fd() const { return CheckAndOffer(STDOUT_FILENO); }
+  int from_child_stderr_fd() const { return CheckAndOffer(STDERR_FILENO); }
+
+  // Release control of the file descriptor for the child's stream, only if piped.
+  // Writes to this FD show up on stdin in the subprocess
+  int ReleaseChildStdinFd()  { return ReleaseChildFd(STDIN_FILENO ); }
+  // Reads from this FD come from stdout of the subprocess
+  int ReleaseChildStdoutFd() { return ReleaseChildFd(STDOUT_FILENO); }
+  // Reads from this FD come from stderr of the subprocess
+  int ReleaseChildStderrFd() { return ReleaseChildFd(STDERR_FILENO); }
+
+  pid_t pid() const;
+  const std::string& argv0() const { return argv_[0]; }
+
+ private:
+  FRIEND_TEST(SubprocessTest, TestGetProcfsState);
+
+  enum State {
+    kNotStarted,
+    kRunning,
+    kExited
+  };
+  enum StreamMode {SHARED, DISABLED, PIPED};
+  enum WaitMode {BLOCKING, NON_BLOCKING};
+
+  // Process state according to /proc/<pid>/stat.
+  enum class ProcfsState {
+    // "T  Stopped (on a signal) or (before Linux 2.6.33) trace stopped"
+    PAUSED,
+
+    // Every other process state.
+    RUNNING,
+  };
+
+  // Extracts the process state for /proc/<pid>/stat.
+  //
+  // Returns an error if /proc/</pid>/stat doesn't exist or if parsing failed.
+  static Status GetProcfsState(int pid, ProcfsState* state);
+
+  Status DoWait(int* wait_status, WaitMode mode) WARN_UNUSED_RESULT;
+  void SetFdShared(int stdfd, bool share);
+  int CheckAndOffer(int stdfd) const;
+  int ReleaseChildFd(int stdfd);
+
+  std::string program_;
+  std::vector<std::string> argv_;
+  std::map<std::string, std::string> env_;
+  State state_;
+  int child_pid_;
+  enum StreamMode fd_state_[3];
+  int child_fds_[3];
+  std::string cwd_;
+
+  // The cached wait status if Wait()/WaitNoBlock() has been called.
+  // Only valid if state_ == kExited.
+  int wait_status_;
+
+  // Custom signal to deliver when the subprocess goes out of scope, provided
+  // the process hasn't already been killed.
+  int sig_on_destruct_;
+
+  DISALLOW_COPY_AND_ASSIGN(Subprocess);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SUBPROCESS_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_graph.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.cc b/be/src/kudu/util/test_graph.cc
new file mode 100644
index 0000000..59f4d30
--- /dev/null
+++ b/be/src/kudu/util/test_graph.cc
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_graph.h"
+
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::string;
+
+namespace kudu {
+
+void TimeSeries::AddValue(double val) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  val_ += val;
+}
+
+void TimeSeries::SetValue(double val) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  val_ = val;
+}
+
+double TimeSeries::value() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return val_;
+}
+
+TimeSeriesCollector::~TimeSeriesCollector() {
+  if (started_) {
+    StopDumperThread();
+  }
+}
+
+shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string &key) {
+  MutexLock l(series_lock_);
+  SeriesMap::const_iterator it = series_map_.find(key);
+  if (it == series_map_.end()) {
+    shared_ptr<TimeSeries> ts(new TimeSeries());
+    series_map_[key] = ts;
+    return ts;
+  } else {
+    return (*it).second;
+  }
+}
+
+void TimeSeriesCollector::StartDumperThread() {
+  LOG(INFO) << "Starting metrics dumper";
+  CHECK(!started_);
+  exit_latch_.Reset(1);
+  started_ = true;
+  CHECK_OK(kudu::Thread::Create("time series", "dumper",
+      &TimeSeriesCollector::DumperThread, this, &dumper_thread_));
+}
+
+void TimeSeriesCollector::StopDumperThread() {
+  CHECK(started_);
+  exit_latch_.CountDown();
+  CHECK_OK(ThreadJoiner(dumper_thread_.get()).Join());
+  started_ = false;
+}
+
+void TimeSeriesCollector::DumperThread() {
+  CHECK(started_);
+  WallTime start_time = WallTime_Now();
+
+  faststring metrics_str;
+  while (true) {
+    metrics_str.clear();
+    metrics_str.append("metrics: ");
+    BuildMetricsString(WallTime_Now() - start_time, &metrics_str);
+    LOG(INFO) << metrics_str.ToString();
+
+    // Sleep until next dump time, or return if we should exit
+    if (exit_latch_.WaitFor(MonoDelta::FromMilliseconds(250))) {
+      return;
+    }
+  }
+}
+
+void TimeSeriesCollector::BuildMetricsString(
+  WallTime time_since_start, faststring *dst_buf) const {
+  MutexLock l(series_lock_);
+
+  dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f",
+                               scope_.c_str(), time_since_start));
+
+  for (SeriesMap::const_reference entry : series_map_) {
+    dst_buf->append(StringPrintf(", \"%s\": %.3f",
+                                 entry.first.c_str(),  entry.second->value()));
+  }
+  dst_buf->append("}");
+}
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_graph.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.h b/be/src/kudu/util/test_graph.h
new file mode 100644
index 0000000..41df430
--- /dev/null
+++ b/be/src/kudu/util/test_graph.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TEST_GRAPH_COLLECTOR_H
+#define KUDU_TEST_GRAPH_COLLECTOR_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+class Thread;
+class faststring;
+
+class TimeSeries {
+ public:
+  void AddValue(double val);
+  void SetValue(double val);
+
+  double value() const;
+
+ private:
+  friend class TimeSeriesCollector;
+
+  DISALLOW_COPY_AND_ASSIGN(TimeSeries);
+
+  TimeSeries() :
+    val_(0)
+  {}
+
+  mutable simple_spinlock lock_;
+  double val_;
+};
+
+class TimeSeriesCollector {
+ public:
+  explicit TimeSeriesCollector(std::string scope)
+      : scope_(std::move(scope)), exit_latch_(0), started_(false) {}
+
+  ~TimeSeriesCollector();
+
+  std::shared_ptr<TimeSeries> GetTimeSeries(const std::string &key);
+  void StartDumperThread();
+  void StopDumperThread();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(TimeSeriesCollector);
+
+  void DumperThread();
+  void BuildMetricsString(WallTime time_since_start, faststring *dst_buf) const;
+
+  std::string scope_;
+
+  typedef std::unordered_map<std::string, std::shared_ptr<TimeSeries> > SeriesMap;
+  SeriesMap series_map_;
+  mutable Mutex series_lock_;
+
+  scoped_refptr<kudu::Thread> dumper_thread_;
+
+  // Latch used to stop the dumper_thread_. When the thread is started,
+  // this is set to 1, and when the thread should exit, it is counted down.
+  CountDownLatch exit_latch_;
+
+  bool started_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_macros.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_macros.h b/be/src/kudu/util/test_macros.h
new file mode 100644
index 0000000..63cae5a
--- /dev/null
+++ b/be/src/kudu/util/test_macros.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_TEST_MACROS_H
+#define KUDU_UTIL_TEST_MACROS_H
+
+#include <gmock/gmock.h>
+#include <string>
+
+// ASSERT_NO_FATAL_FAILURE is just too long to type.
+#define NO_FATALS(expr) \
+  ASSERT_NO_FATAL_FAILURE(expr)
+
+// Detect fatals in the surrounding scope. NO_FATALS() only checks for fatals
+// in the expression passed to it.
+#define NO_PENDING_FATALS() \
+  if (testing::Test::HasFatalFailure()) { return; }
+
+#define ASSERT_OK(status) do { \
+  const Status& _s = status;        \
+  if (_s.ok()) { \
+    SUCCEED(); \
+  } else { \
+    FAIL() << "Bad status: " << _s.ToString();  \
+  } \
+} while (0);
+
+#define EXPECT_OK(status) do { \
+  const Status& _s = status; \
+  if (_s.ok()) { \
+    SUCCEED(); \
+  } else { \
+    ADD_FAILURE() << "Bad status: " << _s.ToString();  \
+  } \
+} while (0);
+
+// Like the above, but doesn't record successful
+// tests.
+#define ASSERT_OK_FAST(status) do { \
+  const Status& _s = status; \
+  if (!_s.ok()) { \
+    FAIL() << "Bad status: " << _s.ToString(); \
+  } \
+} while (0);
+
+// Substring matches.
+#define ASSERT_STR_CONTAINS(str, substr) \
+  ASSERT_THAT(str, testing::HasSubstr(substr))
+
+#define ASSERT_STR_NOT_CONTAINS(str, substr) \
+  ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
+
+// Substring regular expressions in extended regex (POSIX) syntax.
+#define ASSERT_STR_MATCHES(str, pattern) \
+  ASSERT_THAT(str, testing::ContainsRegex(pattern))
+
+#define ASSERT_STR_NOT_MATCHES(str, pattern) \
+  ASSERT_THAT(str, testing::Not(testing::ContainsRegex(pattern)))
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// All strings must match the pattern.
+#define ASSERT_STRINGS_ALL_MATCH(strings, pattern) do { \
+  const auto& _strings = (strings); \
+  const auto& _pattern = (pattern); \
+  int _str_idx = 0; \
+  for (const auto& str : _strings) { \
+    ASSERT_STR_MATCHES(str, _pattern) \
+        << "string " << _str_idx << ": pattern " << _pattern \
+        << " does not match string " << str; \
+    _str_idx++; \
+  } \
+} while (0)
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// At least one string must match the pattern.
+#define ASSERT_STRINGS_ANY_MATCH(strings, pattern) do { \
+  const auto& _strings = (strings); \
+  const auto& _pattern = (pattern); \
+  bool matched = false; \
+  for (const auto& str : _strings) { \
+    if (testing::internal::RE::PartialMatch(str, testing::internal::RE(_pattern))) { \
+      matched = true; \
+      break; \
+    } \
+  } \
+  ASSERT_TRUE(matched) \
+      << "not one string matched pattern " << _pattern; \
+} while (0)
+
+#define ASSERT_FILE_EXISTS(env, path) do { \
+  const std::string& _s = path; \
+  ASSERT_TRUE(env->FileExists(_s)) \
+    << "Expected file to exist: " << _s; \
+} while (0);
+
+#define ASSERT_FILE_NOT_EXISTS(env, path) do { \
+  const std::string& _s = path; \
+  ASSERT_FALSE(env->FileExists(_s)) \
+    << "Expected file not to exist: " << _s; \
+} while (0);
+
+#define CURRENT_TEST_NAME() \
+  ::testing::UnitTest::GetInstance()->current_test_info()->name()
+
+#define CURRENT_TEST_CASE_NAME() \
+  ::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_main.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_main.cc b/be/src/kudu/util/test_main.cc
new file mode 100644
index 0000000..c75e5ae
--- /dev/null
+++ b/be/src/kudu/util/test_main.cc
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/minidump.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pstack_watcher.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_timeout_after, 0,
+             "Maximum total seconds allowed for all unit tests in the suite. Default: disabled");
+
+DEFINE_int32(stress_cpu_threads, 0,
+             "Number of threads to start that burn CPU in an attempt to "
+             "stimulate race conditions");
+
+namespace kudu {
+
+// Start thread that kills the process if --test_timeout_after is exceeded before
+// the tests complete.
+static void CreateAndStartTimeoutThread() {
+  if (FLAGS_test_timeout_after == 0) return;
+
+  // KUDU-1995: if running death tests using EXPECT_EXIT()/ASSERT_EXIT(), LSAN
+  // reports leaks in CreateAndStartTimeoutThread(). Adding a couple of scoped
+  // leak check disablers as a workaround since right now it's not clear what
+  // is going on exactly: LSAN does not report those leaks for tests which run
+  // ASSERT_DEATH(). This does not seem harmful or hiding any potential leaks
+  // since it's scoped and targeted only for this utility thread.
+  debug::ScopedLeakCheckDisabler disabler;
+  std::thread([=](){
+      debug::ScopedLeakCheckDisabler disabler;
+      SleepFor(MonoDelta::FromSeconds(FLAGS_test_timeout_after));
+      // Dump a pstack to stdout.
+      WARN_NOT_OK(PstackWatcher::DumpStacks(), "Unable to print pstack");
+
+      // ...and abort.
+      LOG(FATAL) << "Maximum unit test time exceeded (" << FLAGS_test_timeout_after << " sec)";
+    }).detach();
+}
+} // namespace kudu
+
+
+static void StartStressThreads() {
+  for (int i = 0; i < FLAGS_stress_cpu_threads; i++) {
+    std::thread([]{
+        while (true) {
+          // Do something which won't be optimized out.
+          base::subtle::MemoryBarrier();
+        }
+      }).detach();
+  }
+}
+
+int main(int argc, char **argv) {
+  google::InstallFailureSignalHandler();
+
+  // We don't use InitGoogleLoggingSafe() because gtest initializes glog, so we
+  // need to block SIGUSR1 explicitly in order to test minidump generation.
+  CHECK_OK(kudu::BlockSigUSR1());
+
+  // Ignore SIGPIPE for all tests so that threads writing to TLS
+  // sockets do not crash when writing to a closed socket. See KUDU-1910.
+  kudu::IgnoreSigPipe();
+
+  // InitGoogleTest() must precede ParseCommandLineFlags(), as the former
+  // removes gtest-related flags from argv that would trip up the latter.
+  ::testing::InitGoogleTest(&argc, argv);
+  kudu::ParseCommandLineFlags(&argc, &argv, true);
+
+  // Create the test-timeout timer.
+  kudu::CreateAndStartTimeoutThread();
+
+  StartStressThreads();
+
+  // This is called by the KuduTest setup method, but in case we have
+  // any tests that don't inherit from KuduTest, it's helpful to
+  // cover our bases and call it here too.
+  kudu::KuduTest::OverrideKrb5Environment();
+
+  int ret = RUN_ALL_TESTS();
+
+  return ret;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
new file mode 100644
index 0000000..c960441
--- /dev/null
+++ b/be/src/kudu/util/test_util.cc
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_util.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#ifdef __APPLE__
+#include <fcntl.h>
+#include <sys/param.h> // for MAXPATHLEN
+#endif
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest-spi.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/spinlock_profiling.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+
+DEFINE_string(test_leave_files, "on_failure",
+              "Whether to leave test files around after the test run. "
+              " Valid values are 'always', 'on_failure', or 'never'");
+
+DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+const char* kInvalidPath = "/dev/invalid-path-for-kudu-tests";
+static const char* const kSlowTestsEnvVariable = "KUDU_ALLOW_SLOW_TESTS";
+
+static const uint64_t kTestBeganAtMicros = Env::Default()->NowMicros();
+
+// Global which production code can check to see if it is running
+// in a GTest environment (assuming the test binary links in this module,
+// which is typically a good assumption).
+//
+// This can be checked using the 'IsGTest()' function from test_util_prod.cc.
+bool g_is_gtest = true;
+
+///////////////////////////////////////////////////
+// KuduTest
+///////////////////////////////////////////////////
+
+KuduTest::KuduTest()
+  : env_(Env::Default()),
+    flag_saver_(new google::FlagSaver()),
+    test_dir_(GetTestDataDirectory()) {
+  std::map<const char*, const char*> flags_for_tests = {
+    // Disabling fsync() speeds up tests dramatically, and it's safe to do as no
+    // tests rely on cutting power to a machine or equivalent.
+    {"never_fsync", "true"},
+    // Disable redaction.
+    {"redact", "none"},
+    // Reduce default RSA key length for faster tests. We are using strong/high
+    // TLS v1.2 cipher suites, so minimum possible for TLS-related RSA keys is
+    // 768 bits. However, for the external mini cluster we use 1024 bits because
+    // Java default security policies require at least 1024 bits for RSA keys
+    // used in certificates. For uniformity, here 1024 RSA bit keys are used
+    // as well. As for the TSK keys, 512 bits is the minimum since the SHA256
+    // digest is used for token signing/verification.
+    {"ipki_server_key_size", "1024"},
+    {"ipki_ca_key_size", "1024"},
+    {"tsk_num_rsa_bits", "512"},
+  };
+  for (const auto& e : flags_for_tests) {
+    // We don't check for errors here, because we have some default flags that
+    // only apply to certain tests.
+    google::SetCommandLineOptionWithMode(e.first, e.second, google::SET_FLAGS_DEFAULT);
+  }
+  // If the TEST_TMPDIR variable has been set, then glog will automatically use that
+  // as its default log directory. We would prefer that the default log directory
+  // instead be the test-case-specific subdirectory.
+  FLAGS_log_dir = GetTestDataDirectory();
+}
+
+KuduTest::~KuduTest() {
+  // Reset the flags first to prevent them from affecting test directory cleanup.
+  flag_saver_.reset();
+
+  // Clean up the test directory in the destructor instead of a TearDown
+  // method. This is better because it ensures that the child-class
+  // dtor runs first -- so, if the child class is using a minicluster, etc,
+  // we will shut that down before we remove files underneath.
+  if (FLAGS_test_leave_files == "always") {
+    LOG(INFO) << "-----------------------------------------------";
+    LOG(INFO) << "--test_leave_files specified, leaving files in " << test_dir_;
+  } else if (FLAGS_test_leave_files == "on_failure" && HasFatalFailure()) {
+    LOG(INFO) << "-----------------------------------------------";
+    LOG(INFO) << "Had fatal failures, leaving test files at " << test_dir_;
+  } else {
+    VLOG(1) << "Cleaning up temporary test files...";
+    WARN_NOT_OK(env_->DeleteRecursively(test_dir_),
+                "Couldn't remove test files");
+  }
+}
+
+void KuduTest::SetUp() {
+  InitSpinLockContentionProfiling();
+  OverrideKrb5Environment();
+}
+
+string KuduTest::GetTestPath(const string& relative_path) const {
+  return JoinPathSegments(test_dir_, relative_path);
+}
+
+void KuduTest::OverrideKrb5Environment() {
+  // Set these variables to paths that definitely do not exist and
+  // couldn't be accidentally created.
+  //
+  // Note that if we were to set these to /dev/null, we end up triggering a leak in krb5
+  // when it tries to read an empty file as a ticket cache, whereas non-existent files
+  // don't have this issue. See MIT krb5 bug #8509.
+  //
+  // NOTE: we don't simply *unset* the variables, because then we'd still pick up
+  // the user's /etc/krb5.conf and other default locations.
+  setenv("KRB5_CONFIG", kInvalidPath, 1);
+  setenv("KRB5_KTNAME", kInvalidPath, 1);
+  setenv("KRB5CCNAME", kInvalidPath, 1);
+}
+
+///////////////////////////////////////////////////
+// Test utility functions
+///////////////////////////////////////////////////
+
+bool AllowSlowTests() {
+  char *e = getenv(kSlowTestsEnvVariable);
+  if ((e == nullptr) ||
+      (strlen(e) == 0) ||
+      (strcasecmp(e, "false") == 0) ||
+      (strcasecmp(e, "0") == 0) ||
+      (strcasecmp(e, "no") == 0)) {
+    return false;
+  }
+  if ((strcasecmp(e, "true") == 0) ||
+      (strcasecmp(e, "1") == 0) ||
+      (strcasecmp(e, "yes") == 0)) {
+    return true;
+  }
+  LOG(FATAL) << "Unrecognized value for " << kSlowTestsEnvVariable << ": " << e;
+  return false;
+}
+
+void OverrideFlagForSlowTests(const std::string& flag_name,
+                              const std::string& new_value) {
+  // Ensure that the flag is valid.
+  google::GetCommandLineFlagInfoOrDie(flag_name.c_str());
+
+  // If we're not running slow tests, don't override it.
+  if (!AllowSlowTests()) {
+    return;
+  }
+  google::SetCommandLineOptionWithMode(flag_name.c_str(), new_value.c_str(),
+                                       google::SET_FLAG_IF_DEFAULT);
+}
+
+int SeedRandom() {
+  int seed;
+  // Initialize random seed
+  if (FLAGS_test_random_seed == 0) {
+    // Not specified by user
+    seed = static_cast<int>(GetCurrentTimeMicros());
+  } else {
+    seed = FLAGS_test_random_seed;
+  }
+  LOG(INFO) << "Using random seed: " << seed;
+  srand(seed);
+  return seed;
+}
+
+string GetTestDataDirectory() {
+  const ::testing::TestInfo* const test_info =
+    ::testing::UnitTest::GetInstance()->current_test_info();
+  CHECK(test_info) << "Must be running in a gtest unit test to call this function";
+  string dir;
+  CHECK_OK(Env::Default()->GetTestDirectory(&dir));
+
+  // The directory name includes some strings for specific reasons:
+  // - program name: identifies the directory to the test invoker
+  // - timestamp and pid: disambiguates with prior runs of the same test
+  //
+  // e.g. "env-test.TestEnv.TestReadFully.1409169025392361-23600"
+  //
+  // If the test is sharded, the shard index is also included so that the test
+  // invoker can more easily identify all directories belonging to each shard.
+  string shard_index_infix;
+  const char* shard_index = getenv("GTEST_SHARD_INDEX");
+  if (shard_index && shard_index[0] != '\0') {
+    shard_index_infix = Substitute("$0.", shard_index);
+  }
+  dir += Substitute("/$0.$1$2.$3.$4-$5",
+    StringReplace(google::ProgramInvocationShortName(), "/", "_", true),
+    shard_index_infix,
+    StringReplace(test_info->test_case_name(), "/", "_", true),
+    StringReplace(test_info->name(), "/", "_", true),
+    kTestBeganAtMicros,
+    getpid());
+  Status s = Env::Default()->CreateDir(dir);
+  CHECK(s.IsAlreadyPresent() || s.ok())
+    << "Could not create directory " << dir << ": " << s.ToString();
+  if (s.ok()) {
+    string metadata;
+
+    StrAppend(&metadata, Substitute("PID=$0\n", getpid()));
+
+    StrAppend(&metadata, Substitute("PPID=$0\n", getppid()));
+
+    char* jenkins_build_id = getenv("BUILD_ID");
+    if (jenkins_build_id) {
+      StrAppend(&metadata, Substitute("BUILD_ID=$0\n", jenkins_build_id));
+    }
+
+    CHECK_OK(WriteStringToFile(Env::Default(), metadata,
+                               Substitute("$0/test_metadata", dir)));
+  }
+  return dir;
+}
+
+string GetTestExecutableDirectory() {
+  string exec;
+  CHECK_OK(Env::Default()->GetExecutablePath(&exec));
+  return DirName(exec);
+}
+
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout,
+                      AssertBackoff backoff) {
+  const MonoTime deadline = MonoTime::Now() + timeout;
+  {
+    // Disable --gtest_break_on_failure, or else the assertion failures
+    // inside our attempts will cause the test to SEGV even though we
+    // would like to retry.
+    bool old_break_on_failure = testing::FLAGS_gtest_break_on_failure;
+    auto c = MakeScopedCleanup([old_break_on_failure]() {
+      testing::FLAGS_gtest_break_on_failure = old_break_on_failure;
+    });
+    testing::FLAGS_gtest_break_on_failure = false;
+
+    for (int attempts = 0; MonoTime::Now() < deadline; attempts++) {
+      // Capture any assertion failures within this scope (i.e. from their function)
+      // into 'results'
+      testing::TestPartResultArray results;
+      testing::ScopedFakeTestPartResultReporter reporter(
+          testing::ScopedFakeTestPartResultReporter::INTERCEPT_ONLY_CURRENT_THREAD,
+          &results);
+      f();
+
+      // Determine whether their function produced any new test failure results.
+      bool has_failures = false;
+      for (int i = 0; i < results.size(); i++) {
+        has_failures |= results.GetTestPartResult(i).failed();
+      }
+      if (!has_failures) {
+        return;
+      }
+
+      // If they had failures, sleep and try again.
+      int sleep_ms;
+      switch (backoff) {
+        case AssertBackoff::EXPONENTIAL:
+          sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+          break;
+        case AssertBackoff::NONE:
+          sleep_ms = 1;
+          break;
+        default:
+          LOG(FATAL) << "Unknown backoff type";
+      }
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+    }
+  }
+
+  // If we ran out of time looping, run their function one more time
+  // without capturing its assertions. This way the assertions will
+  // propagate back out to the normal test reporter. Of course it's
+  // possible that it will pass on this last attempt, but that's OK
+  // too, since we aren't trying to be that strict about the deadline.
+  f();
+  if (testing::Test::HasFatalFailure()) {
+    ADD_FAILURE() << "Timed out waiting for assertion to pass.";
+  }
+}
+
+int CountOpenFds(Env* env, const string& path_pattern) {
+  static const char* kProcSelfFd =
+#if defined(__APPLE__)
+    "/dev/fd";
+#else
+    "/proc/self/fd";
+#endif // defined(__APPLE__)
+  faststring path_buf;
+  vector<string> children;
+  CHECK_OK(env->GetChildren(kProcSelfFd, &children));
+  int num_fds = 0;
+  for (const auto& c : children) {
+    // Skip '.' and '..'.
+    if (c == "." || c == "..") {
+      continue;
+    }
+    int32_t fd;
+    CHECK(safe_strto32(c, &fd)) << "Unexpected file in fd list: " << c;
+#ifdef __APPLE__
+    path_buf.resize(MAXPATHLEN);
+    if (fcntl(fd, F_GETPATH, path_buf.data()) != 0) {
+      if (errno == EBADF) {
+        // The file was closed while we were looping. This is likely the
+        // actual file descriptor used for opening /proc/fd itself.
+        continue;
+      }
+      PLOG(FATAL) << "Unknown error in fcntl(F_GETPATH): " << fd;
+    }
+    char* buf_data = reinterpret_cast<char*>(path_buf.data());
+    path_buf.resize(strlen(buf_data));
+#else
+    path_buf.resize(PATH_MAX);
+    char* buf_data = reinterpret_cast<char*>(path_buf.data());
+    auto proc_file = JoinPathSegments(kProcSelfFd, c);
+    int path_len = readlink(proc_file.c_str(), buf_data, path_buf.size());
+    if (path_len < 0) {
+      if (errno == ENOENT) {
+        // The file was closed while we were looping. This is likely the
+        // actual file descriptor used for opening /proc/fd itself.
+        continue;
+      }
+      PLOG(FATAL) << "Unknown error in readlink: " << proc_file;
+    }
+    path_buf.resize(path_len);
+#endif
+    if (!MatchPattern(path_buf.ToString(), path_pattern)) {
+      continue;
+    }
+    num_fds++;
+  }
+
+  return num_fds;
+}
+
+namespace {
+Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeout) {
+  // In general, processes do not expose the port they bind to, and
+  // reimplementing lsof involves parsing a lot of files in /proc/. So,
+  // requiring lsof for tests and parsing its output seems more
+  // straight-forward. We call lsof in a loop since it typically takes a long
+  // time for it to initialize and bind a port.
+
+  string lsof;
+  RETURN_NOT_OK(FindExecutable("lsof", {"/sbin", "/usr/sbin"}, &lsof));
+
+  const vector<string> cmd = {
+    lsof, "-wbnP", "-Ffn",
+    "-p", std::to_string(pid),
+    "-a", "-i", kind
+  };
+
+  MonoTime deadline = MonoTime::Now() + timeout;
+  string lsof_out;
+
+  for (int64_t i = 1; ; i++) {
+    lsof_out.clear();
+    Status s = Subprocess::Call(cmd, "", &lsof_out);
+
+    if (s.ok()) {
+      StripTrailingNewline(&lsof_out);
+      break;
+    }
+    if (deadline < MonoTime::Now()) {
+      return s;
+    }
+
+    SleepFor(MonoDelta::FromMilliseconds(i * 10));
+  }
+
+  // The '-Ffn' flag gets lsof to output something like:
+  //   p19730
+  //   f123
+  //   n*:41254
+  // The first line is the pid. We ignore it.
+  // The second line is the file descriptor number. We ignore it.
+  // The third line has the bind address and port.
+  // Subsequent lines show active connections.
+  vector<string> lines = strings::Split(lsof_out, "\n");
+  int32_t p = -1;
+  if (lines.size() < 3 ||
+      lines[2].substr(0, 3) != "n*:" ||
+      !safe_strto32(lines[2].substr(3), &p) ||
+      p <= 0) {
+    return Status::RuntimeError("unexpected lsof output", lsof_out);
+  }
+  CHECK(p > 0 && p < std::numeric_limits<uint16_t>::max()) << "parsed invalid port: " << p;
+  VLOG(1) << "Determined bound port: " << p;
+  *port = p;
+  return Status::OK();
+}
+} // anonymous namespace
+
+Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
+  return WaitForBind(pid, port, "4TCP", timeout);
+}
+
+Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
+  return WaitForBind(pid, port, "4UDP", timeout);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
new file mode 100644
index 0000000..8090fbc
--- /dev/null
+++ b/be/src/kudu/util/test_util.h
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Base test class, with various utility functions.
+#ifndef KUDU_UTIL_TEST_UTIL_H
+#define KUDU_UTIL_TEST_UTIL_H
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/monotime.h"
+
+#define ASSERT_EVENTUALLY(expr) do { \
+  AssertEventually(expr); \
+  NO_PENDING_FATALS(); \
+} while (0)
+
+namespace google {
+class FlagSaver;
+} // namespace google
+
+namespace kudu {
+
+class Env;
+class Status;
+
+extern const char* kInvalidPath;
+
+class KuduTest : public ::testing::Test {
+ public:
+  KuduTest();
+
+  virtual ~KuduTest();
+
+  virtual void SetUp() OVERRIDE;
+
+  // Tests assume that they run with no outside-provided kerberos credentials, and if the
+  // user happened to have some credentials available they might fail due to being already
+  // kinitted to a different realm, etc. This function overrides the relevant environment
+  // variables so that we don't pick up the user's credentials.
+  static void OverrideKrb5Environment();
+
+ protected:
+  // Returns absolute path based on a unit test-specific work directory, given
+  // a relative path. Useful for writing test files that should be deleted after
+  // the test ends.
+  std::string GetTestPath(const std::string& relative_path) const;
+
+  Env* env_;
+
+  // Reset flags on every test. Allocated on the heap so it can be destroyed
+  // (and the flags reset) before test_dir_ is deleted.
+  std::unique_ptr<google::FlagSaver> flag_saver_;
+
+  std::string test_dir_;
+};
+
+// Returns true if slow tests are runtime-enabled.
+bool AllowSlowTests();
+
+// Override the given gflag to the new value, only in the case that
+// slow tests are enabled and the user hasn't otherwise overridden
+// it on the command line.
+// Example usage:
+//
+// OverrideFlagForSlowTests(
+//     "client_inserts_per_thread",
+//     strings::Substitute("$0", FLAGS_client_inserts_per_thread * 100));
+//
+void OverrideFlagForSlowTests(const std::string& flag_name,
+                              const std::string& new_value);
+
+// Call srand() with a random seed based on the current time, reporting
+// that seed to the logs. The time-based seed may be overridden by passing
+// --test_random_seed= from the CLI in order to reproduce a failed randomized
+// test. Returns the seed.
+int SeedRandom();
+
+// Return a per-test directory in which to store test data. Guaranteed to
+// return the same directory every time for a given unit test.
+//
+// May only be called from within a gtest unit test. Prefer KuduTest::test_dir_
+// if a KuduTest instance is available.
+std::string GetTestDataDirectory();
+
+// Return the directory which contains the test's executable.
+std::string GetTestExecutableDirectory();
+
+// Wait until 'f()' succeeds without adding any GTest 'fatal failures'.
+// For example:
+//
+//   AssertEventually([]() {
+//     ASSERT_GT(ReadValueOfMetric(), 10);
+//   });
+//
+// The function is run in a loop with optional back-off.
+//
+// To check whether AssertEventually() eventually succeeded, call
+// NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs
+// this check automatically.
+enum class AssertBackoff {
+  // Use exponential back-off while looping, capped at one second.
+  EXPONENTIAL,
+
+  // Sleep for a millisecond while looping.
+  NONE,
+};
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout = MonoDelta::FromSeconds(30),
+                      AssertBackoff backoff = AssertBackoff::EXPONENTIAL);
+
+// Count the number of open file descriptors in use by this process.
+// 'path_pattern' is a glob-style pattern. Only paths that match this
+// pattern are included. Note that '*' in this pattern is recursive
+// unlike the usual behavior of path globs.
+int CountOpenFds(Env* env, const std::string& path_pattern);
+
+// Waits for the subprocess to bind to any listening TCP port, and returns the port.
+Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+
+// Waits for the subprocess to bind to any listening UDP port, and returns the port.
+Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util_prod.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.cc b/be/src/kudu/util/test_util_prod.cc
new file mode 100644
index 0000000..5523bac
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_util_prod.h"
+
+#include <dlfcn.h>
+
+namespace kudu {
+
+bool IsGTest() {
+  return dlsym(RTLD_DEFAULT, "_ZN4kudu10g_is_gtestE") != nullptr;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util_prod.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.h b/be/src/kudu/util/test_util_prod.h
new file mode 100644
index 0000000..8b7ea61
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test-related utility methods that can be called from non-test
+// code. This module is part of the 'util' module and is built into
+// all binaries, not just tests, whereas 'test_util.cc' is linked
+// only into test binaries.
+
+#pragma once
+
+namespace kudu {
+
+// Return true if the current binary is a gtest. More specifically,
+// returns true if the 'test_util.cc' module has been linked in
+// (either dynamically or statically) to the running process.
+bool IsGTest();
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread-test.cc b/be/src/kudu/util/thread-test.cc
new file mode 100644
index 0000000..d3ee733
--- /dev/null
+++ b/be/src/kudu/util/thread-test.cc
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/thread.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread_restrictions.h"
+
+using std::string;
+
+namespace kudu {
+
+class ThreadTest : public KuduTest {};
+
+// Join with a thread and emit warnings while waiting to join.
+// This has to be manually verified.
+TEST_F(ThreadTest, TestJoinAndWarn) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+    return;
+  }
+
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+  ASSERT_OK(ThreadJoiner(holder.get())
+                   .warn_after_ms(10)
+                   .warn_every_ms(100)
+                   .Join());
+}
+
+TEST_F(ThreadTest, TestFailedJoin) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+    return;
+  }
+
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+  Status s = ThreadJoiner(holder.get())
+    .give_up_after_ms(50)
+    .Join();
+  ASSERT_STR_CONTAINS(s.ToString(), "Timed out after 50ms joining on sleeper thread");
+}
+
+static void TryJoinOnSelf() {
+  Status s = ThreadJoiner(Thread::current_thread()).Join();
+  // Use CHECK instead of ASSERT because gtest isn't thread-safe.
+  CHECK(s.IsInvalidArgument());
+}
+
+// Try to join on the thread that is currently running.
+TEST_F(ThreadTest, TestJoinOnSelf) {
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "test", TryJoinOnSelf, &holder));
+  holder->Join();
+  // Actual assertion is done by the thread spawned above.
+}
+
+TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder));
+  ThreadJoiner joiner(holder.get());
+  ASSERT_OK(joiner.Join());
+  ASSERT_OK(joiner.Join());
+}
+
+TEST_F(ThreadTest, ThreadStartBenchmark) {
+  std::vector<scoped_refptr<Thread>> threads(1000);
+  LOG_TIMING(INFO, "starting threads") {
+    for (auto& t : threads) {
+      ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+    }
+  }
+  LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
+    for (auto& t : threads) {
+      t->tid();
+    }
+  }
+
+  for (auto& t : threads) {
+    t->Join();
+  }
+}
+
+// The following tests only run in debug mode, since thread restrictions are no-ops
+// in release builds.
+#ifndef NDEBUG
+TEST_F(ThreadTest, TestThreadRestrictions_IO) {
+  // Default should be to allow IO
+  ThreadRestrictions::AssertIOAllowed();
+
+  ThreadRestrictions::SetIOAllowed(false);
+  {
+    ThreadRestrictions::ScopedAllowIO allow_io;
+    ASSERT_TRUE(Env::Default()->FileExists("/"));
+  }
+  ThreadRestrictions::SetIOAllowed(true);
+
+  // Disallow IO - doing IO should crash the process.
+  ASSERT_DEATH({
+      ThreadRestrictions::SetIOAllowed(false);
+      ignore_result(Env::Default()->FileExists("/"));
+    },
+    "Function marked as IO-only was called from a thread that disallows IO");
+}
+
+TEST_F(ThreadTest, TestThreadRestrictions_Waiting) {
+  // Default should be to allow IO
+  ThreadRestrictions::AssertWaitAllowed();
+
+  ThreadRestrictions::SetWaitAllowed(false);
+  {
+    ThreadRestrictions::ScopedAllowWait allow_wait;
+    CountDownLatch l(0);
+    l.Wait();
+  }
+  ThreadRestrictions::SetWaitAllowed(true);
+
+  // Disallow waiting - blocking on a latch should crash the process.
+  ASSERT_DEATH({
+      ThreadRestrictions::SetWaitAllowed(false);
+      CountDownLatch l(0);
+      l.Wait();
+    },
+    "Waiting is not allowed to be used on this thread");
+}
+#endif // NDEBUG
+
+} // namespace kudu