You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:17 UTC
[05/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.cc b/be/src/kudu/util/subprocess.cc
new file mode 100644
index 0000000..d68cb7f
--- /dev/null
+++ b/be/src/kudu/util/subprocess.cc
@@ -0,0 +1,815 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <signal.h>
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <ev++.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+namespace kudu {
+
+// Make glog's STL-compatible operators visible inside this namespace.
+using ::operator<<;
+
+namespace {
+
+static double kProcessWaitTimeoutSeconds = 5.0;
+
+static const char* kProcSelfFd =
+#if defined(__APPLE__)
+ "/dev/fd";
+#else
+ "/proc/self/fd";
+#endif // defined(__APPLE__)
+
+#if defined(__linux__)
+#define READDIR readdir64
+#define DIRENT dirent64
+#else
+#define READDIR readdir
+#define DIRENT dirent
+#endif
+
+// Since opendir() calls malloc(), this must be called before fork().
+// This function is not async-signal-safe.
+Status OpenProcFdDir(DIR** dir) {
+ *dir = opendir(kProcSelfFd);
+ if (PREDICT_FALSE(dir == nullptr)) {
+ return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd),
+ ErrnoToString(errno), errno);
+ }
+ return Status::OK();
+}
+
+// Close the directory stream opened by OpenProcFdDir().
+// This function is not async-signal-safe.
+void CloseProcFdDir(DIR* dir) {
+ if (PREDICT_FALSE(closedir(dir) == -1)) {
+ LOG(WARNING) << "Unable to close fd dir: "
+ << Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd),
+ ErrnoToString(errno), errno).ToString();
+ }
+}
+
+// Close all open file descriptors other than stdin, stderr, stdout.
+// Expects a directory stream created by OpenProdFdDir() as a parameter.
+// This function is called after fork() and must not call malloc().
+// The rule of thumb is to only call async-signal-safe functions in such cases
+// if at all possible.
+void CloseNonStandardFDs(DIR* fd_dir) {
+ // This is implemented by iterating over the open file descriptors
+ // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone
+ // since it may not represent the highest open fd if the fd soft limit
+ // has changed since the process started. This should also be faster
+ // since iterating over all possible fds is likely to cause 64k+ syscalls
+ // in typical configurations.
+ //
+ // Note also that this doesn't use any of the Env utility functions, to
+ // make it as lean and mean as possible -- this runs in the subprocess
+ // after a fork, so there's some possibility that various global locks
+ // inside malloc() might be held, so allocating memory is a no-no.
+ PCHECK(fd_dir != nullptr);
+ int dir_fd = dirfd(fd_dir);
+
+ struct DIRENT* ent;
+ // readdir64() is not reentrant (it uses a static buffer) and it also
+ // locks fd_dir->lock, so it must not be called in a multi-threaded
+ // environment and is certainly not async-signal-safe.
+ // However, it appears to be safe to call right after fork(), since only one
+ // thread exists in the child process at that time. It also does not call
+ // malloc() or free(). We could use readdir64_r() instead, but all that
+ // buys us is reentrancy, and not async-signal-safety, due to the use of
+ // dir->lock, so seems not worth the added complexity in lifecycle & plumbing.
+ while ((ent = READDIR(fd_dir)) != nullptr) {
+ uint32_t fd;
+ if (!safe_strtou32(ent->d_name, &fd)) continue;
+ if (!(fd == STDIN_FILENO ||
+ fd == STDOUT_FILENO ||
+ fd == STDERR_FILENO ||
+ fd == dir_fd)) {
+ int ret;
+ RETRY_ON_EINTR(ret, close(fd));
+ }
+ }
+}
+
+void RedirectToDevNull(int fd) {
+ // We must not close stderr or stdout, because then when a new file
+ // descriptor is opened, it might reuse the closed file descriptor's number
+ // (we always allocate the lowest available file descriptor number).
+ //
+ // Instead, we open /dev/null as a new file descriptor, then use dup2() to
+ // atomically close 'fd' and reuse its file descriptor number as an open file
+ // handle to /dev/null.
+ //
+ // It is expected that the file descriptor allocated when opening /dev/null
+ // will be closed when the child process closes all of its "non-standard"
+ // file descriptors later on.
+ int dev_null;
+ RETRY_ON_EINTR(dev_null, open("/dev/null", O_WRONLY));
+ if (dev_null < 0) {
+ PLOG(WARNING) << "failed to open /dev/null";
+ } else {
+ int ret;
+ RETRY_ON_EINTR(ret, dup2(dev_null, fd));
+ PCHECK(ret);
+ }
+}
+
+// Stateful libev watcher to help ReadFdsFully().
+class ReadFdsFullyHelper {
+ public:
+ ReadFdsFullyHelper(string progname, ev::dynamic_loop* loop, int fd)
+ : progname_(std::move(progname)) {
+ // Bind the watcher to the provided loop, to this functor, and to the
+ // readable fd.
+ watcher_.set(*loop);
+ watcher_.set(this);
+ watcher_.set(fd, ev::READ);
+
+ // The watcher will now be polled when its loop is run.
+ watcher_.start();
+ }
+
+ void operator() (ev::io &w, int revents) {
+ DCHECK_EQ(ev::READ, revents);
+
+ char buf[1024];
+ ssize_t n;
+ RETRY_ON_EINTR(n, read(w.fd, buf, arraysize(buf)));
+ if (n == 0) {
+ // EOF, stop watching.
+ w.stop();
+ } else if (n < 0) {
+ // A fatal error. Store it and stop watching.
+ status_ = Status::IOError("IO error reading from " + progname_,
+ ErrnoToString(errno), errno);
+ w.stop();
+ } else {
+ // Add our bytes and keep watching.
+ output_.append(buf, n);
+ }
+ }
+
+ const Status& status() const { return status_; }
+ const string& output() const { return output_; }
+
+ private:
+ const string progname_;
+
+ ev::io watcher_;
+ string output_;
+ Status status_;
+};
+
+// Reads from all descriptors in 'fds' until EOF on all of them. If any read
+// yields an error, it is returned. Otherwise, 'out' contains the bytes read
+// for each fd, in the same order as was in 'fds'.
+Status ReadFdsFully(const string& progname,
+ const vector<int>& fds,
+ vector<string>* out) {
+ ev::dynamic_loop loop;
+
+ // Set up a watcher for each fd.
+ vector<unique_ptr<ReadFdsFullyHelper>> helpers;
+ for (int fd : fds) {
+ helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd));
+ }
+
+ // This will read until all fds return EOF.
+ loop.run();
+
+ // Check for failures.
+ for (const auto& h : helpers) {
+ if (!h->status().ok()) {
+ return h->status();
+ }
+ }
+
+ // No failures; write the output to the caller.
+ for (const auto& h : helpers) {
+ out->push_back(h->output());
+ }
+ return Status::OK();
+}
+
+} // anonymous namespace
+
+Subprocess::Subprocess(vector<string> argv, int sig_on_destruct)
+ : program_(argv[0]),
+ argv_(std::move(argv)),
+ state_(kNotStarted),
+ child_pid_(-1),
+ fd_state_(),
+ child_fds_(),
+ sig_on_destruct_(sig_on_destruct) {
+ // By convention, the first argument in argv is the base name of the program.
+ argv_[0] = BaseName(argv_[0]);
+
+ fd_state_[STDIN_FILENO] = PIPED;
+ fd_state_[STDOUT_FILENO] = SHARED;
+ fd_state_[STDERR_FILENO] = SHARED;
+ child_fds_[STDIN_FILENO] = -1;
+ child_fds_[STDOUT_FILENO] = -1;
+ child_fds_[STDERR_FILENO] = -1;
+}
+
+Subprocess::~Subprocess() {
+ if (state_ == kRunning) {
+ LOG(WARNING) << Substitute(
+ "Child process $0 ($1) was orphaned. Sending signal $2...",
+ child_pid_, JoinStrings(argv_, " "), sig_on_destruct_);
+ WARN_NOT_OK(KillAndWait(sig_on_destruct_),
+ Substitute("Failed to KillAndWait() with signal $0",
+ sig_on_destruct_));
+ }
+
+ for (int i = 0; i < 3; ++i) {
+ if (fd_state_[i] == PIPED && child_fds_[i] >= 0) {
+ int ret;
+ RETRY_ON_EINTR(ret, close(child_fds_[i]));
+ }
+ }
+}
+
+#if defined(__APPLE__)
+static int pipe2(int pipefd[2], int flags) {
+ DCHECK_EQ(O_CLOEXEC, flags);
+
+ int new_fds[2];
+ if (pipe(new_fds) == -1) {
+ return -1;
+ }
+ if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) {
+ int ret;
+ RETRY_ON_EINTR(ret, close(new_fds[0]));
+ RETRY_ON_EINTR(ret, close(new_fds[1]));
+ return -1;
+ }
+ if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) {
+ int ret;
+ RETRY_ON_EINTR(ret, close(new_fds[0]));
+ RETRY_ON_EINTR(ret, close(new_fds[1]));
+ return -1;
+ }
+ pipefd[0] = new_fds[0];
+ pipefd[1] = new_fds[1];
+ return 0;
+}
+#endif
+
+Status Subprocess::Start() {
+ VLOG(2) << "Invoking command: " << argv_;
+ if (state_ != kNotStarted) {
+ const string err_str = Substitute("$0: illegal sub-process state", state_);
+ LOG(DFATAL) << err_str;
+ return Status::IllegalState(err_str);
+ }
+ if (argv_.empty()) {
+ return Status::InvalidArgument("argv must have at least one elem");
+ }
+
+ // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes.
+ IgnoreSigPipe();
+
+ vector<char*> argv_ptrs;
+ for (const string& arg : argv_) {
+ argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
+ }
+ argv_ptrs.push_back(nullptr);
+
+ // Pipe from caller process to child's stdin
+ // [0] = stdin for child, [1] = how parent writes to it
+ int child_stdin[2] = {-1, -1};
+ if (fd_state_[STDIN_FILENO] == PIPED) {
+ PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0);
+ }
+ // Pipe from child's stdout back to caller process
+ // [0] = how parent reads from child's stdout, [1] = how child writes to it
+ int child_stdout[2] = {-1, -1};
+ if (fd_state_[STDOUT_FILENO] == PIPED) {
+ PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0);
+ }
+ // Pipe from child's stderr back to caller process
+ // [0] = how parent reads from child's stderr, [1] = how child writes to it
+ int child_stderr[2] = {-1, -1};
+ if (fd_state_[STDERR_FILENO] == PIPED) {
+ PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0);
+ }
+ // The synchronization pipe: this trick is to make sure the parent returns
+ // control only after the child process has invoked execvp().
+ int sync_pipe[2];
+ PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0);
+
+ DIR* fd_dir = nullptr;
+ RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
+ unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir,
+ CloseProcFdDir);
+ int ret;
+ RETRY_ON_EINTR(ret, fork());
+ if (ret == -1) {
+ return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
+ }
+ if (ret == 0) { // We are the child
+ // Send the child a SIGTERM when the parent dies. This is done as early
+ // as possible in the child's life to prevent any orphaning whatsoever
+ // (e.g. from KUDU-402).
+#if defined(__linux__)
+ // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways
+ // to prevent orphans when parent is killed.
+ prctl(PR_SET_PDEATHSIG, SIGKILL);
+#endif
+
+ // stdin
+ if (fd_state_[STDIN_FILENO] == PIPED) {
+ int dup2_ret;
+ RETRY_ON_EINTR(dup2_ret, dup2(child_stdin[0], STDIN_FILENO));
+ PCHECK(dup2_ret == STDIN_FILENO);
+ } else {
+ DCHECK_EQ(SHARED, fd_state_[STDIN_FILENO]);
+ }
+
+ // stdout
+ switch (fd_state_[STDOUT_FILENO]) {
+ case PIPED: {
+ int dup2_ret;
+ RETRY_ON_EINTR(dup2_ret, dup2(child_stdout[1], STDOUT_FILENO));
+ PCHECK(dup2_ret == STDOUT_FILENO);
+ break;
+ }
+ case DISABLED: {
+ RedirectToDevNull(STDOUT_FILENO);
+ break;
+ }
+ default:
+ DCHECK_EQ(SHARED, fd_state_[STDOUT_FILENO]);
+ break;
+ }
+
+ // stderr
+ switch (fd_state_[STDERR_FILENO]) {
+ case PIPED: {
+ int dup2_ret;
+ RETRY_ON_EINTR(dup2_ret, dup2(child_stderr[1], STDERR_FILENO));
+ PCHECK(dup2_ret == STDERR_FILENO);
+ break;
+ }
+ case DISABLED: {
+ RedirectToDevNull(STDERR_FILENO);
+ break;
+ }
+ default:
+ DCHECK_EQ(SHARED, fd_state_[STDERR_FILENO]);
+ break;
+ }
+
+ // Close the read side of the sync pipe;
+ // the write side should be closed upon execvp().
+ int close_ret;
+ RETRY_ON_EINTR(close_ret, close(sync_pipe[0]));
+ PCHECK(close_ret == 0);
+
+ CloseNonStandardFDs(fd_dir);
+
+ // Ensure we are not ignoring or blocking signals in the child process.
+ ResetAllSignalMasksToUnblocked();
+
+ // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its
+ // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we
+ // don't explicitly ignore any other signals in Kudu.
+ ResetSigPipeHandlerToDefault();
+
+ // Set the current working directory of the subprocess.
+ if (!cwd_.empty()) {
+ PCHECK(chdir(cwd_.c_str()) == 0);
+ }
+
+ // Set the environment for the subprocess. This is more portable than
+ // using execvpe(), which doesn't exist on OS X. We rely on the 'p'
+ // variant of exec to do $PATH searching if the executable specified
+ // by the caller isn't an absolute path.
+ for (const auto& env : env_) {
+ ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */));
+ }
+
+ execvp(program_.c_str(), &argv_ptrs[0]);
+ int err = errno;
+ PLOG(ERROR) << "Couldn't exec " << program_;
+ _exit(err);
+ } else {
+ // We are the parent
+ child_pid_ = ret;
+ // Close child's side of the pipes
+ int close_ret;
+ if (fd_state_[STDIN_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdin[0]));
+ if (fd_state_[STDOUT_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdout[1]));
+ if (fd_state_[STDERR_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stderr[1]));
+ // Keep parent's side of the pipes
+ child_fds_[STDIN_FILENO] = child_stdin[1];
+ child_fds_[STDOUT_FILENO] = child_stdout[0];
+ child_fds_[STDERR_FILENO] = child_stderr[0];
+
+ // Wait for the child process to invoke execvp(). The trick involves
+ // a pipe with O_CLOEXEC option for its descriptors. The parent process
+ // performs blocking read from the pipe while the write side of the pipe
+ // is kept open by the child (it does not write any data, though). The write
+ // side of the pipe is closed when the child invokes execvp(). At that
+ // point, the parent should receive EOF, i.e. read() should return 0.
+ {
+ // Close the write side of the sync pipe. It's crucial to make sure
+ // it succeeds otherwise the blocking read() below might wait forever
+ // even if the child process has closed the pipe.
+ RETRY_ON_EINTR(close_ret, close(sync_pipe[1]));
+ PCHECK(close_ret == 0);
+ while (true) {
+ uint8_t buf;
+ int err = 0;
+ int rc;
+ RETRY_ON_EINTR(rc, read(sync_pipe[0], &buf, 1));
+ if (rc == -1) {
+ err = errno;
+ }
+ RETRY_ON_EINTR(close_ret, close(sync_pipe[0]));
+ PCHECK(close_ret == 0);
+ if (rc == 0) {
+ // That's OK -- expecting EOF from the other side of the pipe.
+ break;
+ } else if (rc == -1) {
+ // Other errors besides EINTR are not expected.
+ return Status::RuntimeError("Unexpected error from the sync pipe",
+ ErrnoToString(err), err);
+ }
+ // No data is expected from the sync pipe.
+ LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc);
+ }
+ }
+ }
+
+ state_ = kRunning;
+ return Status::OK();
+}
+
+Status Subprocess::Wait(int* wait_status) {
+ return DoWait(wait_status, BLOCKING);
+}
+
+Status Subprocess::WaitNoBlock(int* wait_status) {
+ return DoWait(wait_status, NON_BLOCKING);
+}
+
+Status Subprocess::GetProcfsState(int pid, ProcfsState* state) {
+ faststring data;
+ string filename = Substitute("/proc/$0/stat", pid);
+ RETURN_NOT_OK(ReadFileToString(Env::Default(), filename, &data));
+
+ // The part of /proc/<pid>/stat that's relevant for us looks like this:
+ //
+ // "16009 (subprocess-test) R ..."
+ //
+ // The first number is the PID, the string in the parens in the command, and
+ // the single letter afterwards is the process' state.
+ //
+ // To extract the state, we scan backwards looking for the last ')', then
+ // increment past it and the separating space. This is safer than scanning
+ // forward as it properly handles commands containing parens.
+ string data_str = data.ToString();
+ const char* end_parens = strrchr(data_str.c_str(), ')');
+ if (end_parens == nullptr) {
+ return Status::RuntimeError(Substitute("unexpected layout in $0", filename));
+ }
+ char proc_state = end_parens[2];
+
+ switch (proc_state) {
+ case 'T':
+ *state = ProcfsState::PAUSED;
+ break;
+ default:
+ *state = ProcfsState::RUNNING;
+ break;
+ }
+ return Status::OK();
+}
+
+Status Subprocess::Kill(int signal) {
+ if (state_ != kRunning) {
+ const string err_str = "Sub-process is not running";
+ LOG(DFATAL) << err_str;
+ return Status::IllegalState(err_str);
+ }
+ if (kill(child_pid_, signal) != 0) {
+ return Status::RuntimeError("Unable to kill",
+ ErrnoToString(errno),
+ errno);
+ }
+
+ // Signal delivery is often asynchronous. For some signals, we try to wait
+ // for the process to actually change state, using /proc/<pid>/stat as a
+ // guide. This is best-effort.
+ ProcfsState desired_state;
+ switch (signal) {
+ case SIGSTOP:
+ desired_state = ProcfsState::PAUSED;
+ break;
+ case SIGCONT:
+ desired_state = ProcfsState::RUNNING;
+ break;
+ default:
+ return Status::OK();
+ }
+ Stopwatch sw;
+ sw.start();
+ do {
+ ProcfsState current_state;
+ if (!GetProcfsState(child_pid_, ¤t_state).ok()) {
+ // There was some error parsing /proc/<pid>/stat (or perhaps it doesn't
+ // exist on this platform).
+ return Status::OK();
+ }
+ if (current_state == desired_state) {
+ return Status::OK();
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
+ return Status::OK();
+}
+
+Status Subprocess::KillAndWait(int signal) {
+ string procname = Substitute("$0 (pid $1)", argv0(), pid());
+
+ // This is a fatal error because all errors in Kill() are signal-independent,
+ // so Kill(SIGKILL) is just as likely to fail if this did.
+ RETURN_NOT_OK_PREPEND(
+ Kill(signal), Substitute("Failed to send signal $0 to $1",
+ signal, procname));
+ if (signal == SIGKILL) {
+ RETURN_NOT_OK_PREPEND(
+ Wait(), Substitute("Failed to wait on $0", procname));
+ } else {
+ Status s;
+ Stopwatch sw;
+ sw.start();
+ do {
+ s = WaitNoBlock();
+ if (s.ok()) {
+ break;
+ } else if (!s.IsTimedOut()) {
+ // An unexpected error in WaitNoBlock() is likely to manifest repeatedly,
+ // so there's no point in retrying this.
+ RETURN_NOT_OK_PREPEND(
+ s, Substitute("Unexpected failure while waiting on $0", procname));
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
+ if (s.IsTimedOut()) {
+ return KillAndWait(SIGKILL);
+ }
+ }
+ return Status::OK();
+}
+
+Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const {
+ if (state_ != kExited) {
+ const string err_str = "Sub-process termination hasn't yet been detected";
+ LOG(DFATAL) << err_str;
+ return Status::IllegalState(err_str);
+ }
+ string info;
+ int status;
+ if (WIFEXITED(wait_status_)) {
+ status = WEXITSTATUS(wait_status_);
+ if (status == 0) {
+ info = Substitute("$0: process successfully exited", program_);
+ } else {
+ info = Substitute("$0: process exited with non-zero status $1",
+ program_, status);
+ }
+ } else if (WIFSIGNALED(wait_status_)) {
+ // Using signal number as exit status.
+ status = WTERMSIG(wait_status_);
+ info = Substitute("$0: process exited on signal $1", program_, status);
+#if defined(WCOREDUMP)
+ if (WCOREDUMP(wait_status_)) {
+ SubstituteAndAppend(&info, " (core dumped)");
+ }
+#endif
+ } else {
+ status = -1;
+ info = Substitute("$0: process reported unexpected wait status $1",
+ program_, wait_status_);
+ LOG(DFATAL) << info;
+ }
+ if (exit_status) {
+ *exit_status = status;
+ }
+ if (info_str) {
+ *info_str = info;
+ }
+ return Status::OK();
+}
+
+Status Subprocess::Call(const string& arg_str) {
+ vector<string> argv = Split(arg_str, " ");
+ return Call(argv, "", nullptr, nullptr);
+}
+
+Status Subprocess::Call(const vector<string>& argv,
+ const string& stdin_in,
+ string* stdout_out,
+ string* stderr_out) {
+ Subprocess p(argv);
+
+ if (stdout_out) {
+ p.ShareParentStdout(false);
+ }
+ if (stderr_out) {
+ p.ShareParentStderr(false);
+ }
+ RETURN_NOT_OK_PREPEND(p.Start(),
+ "Unable to fork " + argv[0]);
+
+ if (!stdin_in.empty()) {
+ ssize_t written;
+ RETRY_ON_EINTR(written, write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size()));
+ if (written < stdin_in.size()) {
+ return Status::IOError("Unable to write to child process stdin",
+ ErrnoToString(errno), errno);
+ }
+ }
+
+ int err;
+ RETRY_ON_EINTR(err, close(p.ReleaseChildStdinFd()));
+ if (PREDICT_FALSE(err != 0)) {
+ return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno);
+ }
+
+ vector<int> fds;
+ if (stdout_out) {
+ fds.push_back(p.from_child_stdout_fd());
+ }
+ if (stderr_out) {
+ fds.push_back(p.from_child_stderr_fd());
+ }
+ vector<string> outv;
+ RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv));
+
+ // Given that ReadFdsFully captures the strings in the order in which we
+ // had installed 'fds' above, it can be assured that we can receive
+ // as many strings as there were 'fds' in the vector and in that order.
+ CHECK_EQ(outv.size(), fds.size());
+ if (stdout_out) {
+ *stdout_out = std::move(outv.front());
+ }
+ if (stderr_out) {
+ *stderr_out = std::move(outv.back());
+ }
+
+ RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]);
+ int exit_status;
+ string exit_info_str;
+ RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str));
+ if (exit_status != 0) {
+ return Status::RuntimeError(exit_info_str);
+ }
+ return Status::OK();
+}
+
+pid_t Subprocess::pid() const {
+ CHECK_EQ(state_, kRunning);
+ return child_pid_;
+}
+
+Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
+ if (state_ == kExited) {
+ if (wait_status) {
+ *wait_status = wait_status_;
+ }
+ return Status::OK();
+ }
+ if (state_ != kRunning) {
+ const string err_str = Substitute("$0: illegal sub-process state", state_);
+ LOG(DFATAL) << err_str;
+ return Status::IllegalState(err_str);
+ }
+
+ const int options = (mode == NON_BLOCKING) ? WNOHANG : 0;
+ int status;
+ int rc;
+ RETRY_ON_EINTR(rc, waitpid(child_pid_, &status, options));
+ if (rc == -1) {
+ return Status::RuntimeError("Unable to wait on child",
+ ErrnoToString(errno), errno);
+ }
+ if (mode == NON_BLOCKING && rc == 0) {
+ return Status::TimedOut("");
+ }
+ CHECK_EQ(rc, child_pid_);
+ CHECK(WIFEXITED(status) || WIFSIGNALED(status));
+
+ child_pid_ = -1;
+ wait_status_ = status;
+ state_ = kExited;
+ if (wait_status) {
+ *wait_status = status;
+ }
+ return Status::OK();
+}
+
+void Subprocess::SetEnvVars(map<string, string> env) {
+ CHECK_EQ(state_, kNotStarted);
+ env_ = std::move(env);
+}
+
+void Subprocess::SetCurrentDir(string cwd) {
+ CHECK_EQ(state_, kNotStarted);
+ cwd_ = std::move(cwd);
+}
+
+void Subprocess::SetFdShared(int stdfd, bool share) {
+ CHECK_EQ(state_, kNotStarted);
+ fd_state_[stdfd] = share ? SHARED : PIPED;
+}
+
+void Subprocess::DisableStderr() {
+ CHECK_EQ(state_, kNotStarted);
+ fd_state_[STDERR_FILENO] = DISABLED;
+}
+
+void Subprocess::DisableStdout() {
+ CHECK_EQ(state_, kNotStarted);
+ fd_state_[STDOUT_FILENO] = DISABLED;
+}
+
+int Subprocess::CheckAndOffer(int stdfd) const {
+ CHECK_EQ(state_, kRunning);
+ CHECK_EQ(fd_state_[stdfd], PIPED);
+ return child_fds_[stdfd];
+}
+
+int Subprocess::ReleaseChildFd(int stdfd) {
+ CHECK_EQ(state_, kRunning);
+ CHECK_GE(child_fds_[stdfd], 0);
+ CHECK_EQ(fd_state_[stdfd], PIPED);
+ int ret = child_fds_[stdfd];
+ child_fds_[stdfd] = -1;
+ return ret;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.h b/be/src/kudu/util/subprocess.h
new file mode 100644
index 0000000..4d33c8f
--- /dev/null
+++ b/be/src/kudu/util/subprocess.h
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SUBPROCESS_H
+#define KUDU_UTIL_SUBPROCESS_H
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wrapper around a spawned subprocess.
+//
+// program will be treated as an absolute path unless it begins with a dot or a
+// slash.
+//
+// This takes care of creating pipes to/from the subprocess and offers
+// basic functionality to wait on it or send signals.
+// By default, child process only has stdin captured and separate from the parent.
+// The stdout/stderr streams are shared with the parent by default.
+//
+// The process may only be started and waited on/killed once.
+//
+// Optionally, user may change parent/child stream sharing. Also, a user may disable
+// a subprocess stream. A user cannot do both.
+//
+// Note that, when the Subprocess object is destructed, the child process
+// will be forcibly SIGKILLed to avoid orphaning processes.
+class Subprocess {
+ public:
+ // Constructs a new Subprocess that will execute 'argv' on Start().
+ //
+ // If the process isn't explicitly killed, 'sig_on_destroy' will be delivered
+ // to it when the Subprocess goes out of scope.
+ explicit Subprocess(std::vector<std::string> argv, int sig_on_destruct = SIGKILL);
+ ~Subprocess();
+
+ // Disables subprocess stream output. Is mutually exclusive with stream sharing.
+ //
+ // Must be called before subprocess starts.
+ void DisableStderr();
+ void DisableStdout();
+
+ // Configures the subprocess to share the parent's stream. Is mutually
+ // exclusive with stream disabling.
+ //
+ // Must be called before subprocess starts.
+ void ShareParentStdin(bool share = true) { SetFdShared(STDIN_FILENO, share); }
+ void ShareParentStdout(bool share = true) { SetFdShared(STDOUT_FILENO, share); }
+ void ShareParentStderr(bool share = true) { SetFdShared(STDERR_FILENO, share); }
+
+ // Add environment variables to be set before executing the subprocess.
+ //
+ // These environment variables are merged into the existing environment
+ // of the parent process. In other words, there is no need to prime this
+ // map with the current environment; instead, just specify any variables
+ // that should be overridden.
+ //
+ // Repeated calls to this function replace earlier calls.
+ void SetEnvVars(std::map<std::string, std::string> env);
+
+ // Set the initial current working directory of the subprocess.
+ //
+ // Must be set before starting the subprocess.
+ void SetCurrentDir(std::string cwd);
+
+ // Start the subprocess. Can only be called once.
+ //
+ // This returns a bad Status if the fork() fails. However,
+ // note that if the executable path was incorrect such that
+ // exec() fails, this will still return Status::OK. You must
+ // use Wait() to check for failure.
+ Status Start() WARN_UNUSED_RESULT;
+
+ // Wait for the subprocess to exit. The return value is the same as
+ // that of the waitpid() syscall. Only call after starting.
+ //
+ // NOTE: unlike the standard wait(2) call, this may be called multiple
+ // times. If the process has exited, it will repeatedly return the same
+ // exit code.
+ Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+ // Like the above, but does not block. This returns Status::TimedOut
+ // immediately if the child has not exited. Otherwise returns Status::OK
+ // and sets *ret. Only call after starting.
+ //
+ // NOTE: unlike the standard wait(2) call, this may be called multiple
+ // times. If the process has exited, it will repeatedly return the same
+ // exit code.
+ Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+ // Send a signal to the subprocess.
+ // Note that this does not reap the process -- you must still Wait()
+ // in order to reap it. Only call after starting.
+ Status Kill(int signal) WARN_UNUSED_RESULT;
+
+ // Sends a signal to the subprocess and waits for it to exit.
+ //
+ // If the signal is not SIGKILL and the process doesn't appear to be exiting,
+ // retries with SIGKILL.
+ Status KillAndWait(int signal);
+
+ // Retrieve exit status of the process awaited by Wait() and/or WaitNoBlock()
+ // methods. Must be called only after calling Wait()/WaitNoBlock().
+ Status GetExitStatus(int* exit_status, std::string* info_str = nullptr) const
+ WARN_UNUSED_RESULT;
+
+ // Helper method that creates a Subprocess, issues a Start() then a Wait().
+ // Expects a blank-separated list of arguments, with the first being the
+ // full path to the executable.
+ // The returned Status will only be OK if all steps were successful and
+ // the return code was 0.
+ static Status Call(const std::string& arg_str) WARN_UNUSED_RESULT;
+
+ // Same as above, but accepts a vector that includes the path to the
+ // executable as argv[0] and the arguments to the program in argv[1..n].
+ //
+ // Writes the value of 'stdin_in' to the subprocess' stdin. The length of
+ // 'stdin_in' should be limited to 64kib.
+ //
+ // Also collects the output from the child process stdout and stderr into
+ // 'stdout_out' and 'stderr_out' respectively.
+ static Status Call(const std::vector<std::string>& argv,
+ const std::string& stdin_in = "",
+ std::string* stdout_out = nullptr,
+ std::string* stderr_out = nullptr) WARN_UNUSED_RESULT;
+
+ // Return the pipe fd to the child's standard stream.
+ // Stream should not be disabled or shared.
+ int to_child_stdin_fd() const { return CheckAndOffer(STDIN_FILENO); }
+ int from_child_stdout_fd() const { return CheckAndOffer(STDOUT_FILENO); }
+ int from_child_stderr_fd() const { return CheckAndOffer(STDERR_FILENO); }
+
+ // Release control of the file descriptor for the child's stream, only if piped.
+ // Writes to this FD show up on stdin in the subprocess
+ int ReleaseChildStdinFd() { return ReleaseChildFd(STDIN_FILENO ); }
+ // Reads from this FD come from stdout of the subprocess
+ int ReleaseChildStdoutFd() { return ReleaseChildFd(STDOUT_FILENO); }
+ // Reads from this FD come from stderr of the subprocess
+ int ReleaseChildStderrFd() { return ReleaseChildFd(STDERR_FILENO); }
+
+ pid_t pid() const;
+ const std::string& argv0() const { return argv_[0]; }
+
+ private:
+ FRIEND_TEST(SubprocessTest, TestGetProcfsState);
+
+ enum State {
+ kNotStarted,
+ kRunning,
+ kExited
+ };
+ enum StreamMode {SHARED, DISABLED, PIPED};
+ enum WaitMode {BLOCKING, NON_BLOCKING};
+
+ // Process state according to /proc/<pid>/stat.
+ enum class ProcfsState {
+ // "T Stopped (on a signal) or (before Linux 2.6.33) trace stopped"
+ PAUSED,
+
+ // Every other process state.
+ RUNNING,
+ };
+
+ // Extracts the process state for /proc/<pid>/stat.
+ //
+ // Returns an error if /proc/</pid>/stat doesn't exist or if parsing failed.
+ static Status GetProcfsState(int pid, ProcfsState* state);
+
+ Status DoWait(int* wait_status, WaitMode mode) WARN_UNUSED_RESULT;
+ void SetFdShared(int stdfd, bool share);
+ int CheckAndOffer(int stdfd) const;
+ int ReleaseChildFd(int stdfd);
+
+ std::string program_;
+ std::vector<std::string> argv_;
+ std::map<std::string, std::string> env_;
+ State state_;
+ int child_pid_;
+ enum StreamMode fd_state_[3];
+ int child_fds_[3];
+ std::string cwd_;
+
+ // The cached wait status if Wait()/WaitNoBlock() has been called.
+ // Only valid if state_ == kExited.
+ int wait_status_;
+
+ // Custom signal to deliver when the subprocess goes out of scope, provided
+ // the process hasn't already been killed.
+ int sig_on_destruct_;
+
+ DISALLOW_COPY_AND_ASSIGN(Subprocess);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SUBPROCESS_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_graph.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.cc b/be/src/kudu/util/test_graph.cc
new file mode 100644
index 0000000..59f4d30
--- /dev/null
+++ b/be/src/kudu/util/test_graph.cc
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_graph.h"
+
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::string;
+
+namespace kudu {
+
+void TimeSeries::AddValue(double val) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ val_ += val;
+}
+
+void TimeSeries::SetValue(double val) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ val_ = val;
+}
+
+double TimeSeries::value() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return val_;
+}
+
+TimeSeriesCollector::~TimeSeriesCollector() {
+ if (started_) {
+ StopDumperThread();
+ }
+}
+
+shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string &key) {
+ MutexLock l(series_lock_);
+ SeriesMap::const_iterator it = series_map_.find(key);
+ if (it == series_map_.end()) {
+ shared_ptr<TimeSeries> ts(new TimeSeries());
+ series_map_[key] = ts;
+ return ts;
+ } else {
+ return (*it).second;
+ }
+}
+
+void TimeSeriesCollector::StartDumperThread() {
+ LOG(INFO) << "Starting metrics dumper";
+ CHECK(!started_);
+ exit_latch_.Reset(1);
+ started_ = true;
+ CHECK_OK(kudu::Thread::Create("time series", "dumper",
+ &TimeSeriesCollector::DumperThread, this, &dumper_thread_));
+}
+
+void TimeSeriesCollector::StopDumperThread() {
+ CHECK(started_);
+ exit_latch_.CountDown();
+ CHECK_OK(ThreadJoiner(dumper_thread_.get()).Join());
+ started_ = false;
+}
+
+void TimeSeriesCollector::DumperThread() {
+ CHECK(started_);
+ WallTime start_time = WallTime_Now();
+
+ faststring metrics_str;
+ while (true) {
+ metrics_str.clear();
+ metrics_str.append("metrics: ");
+ BuildMetricsString(WallTime_Now() - start_time, &metrics_str);
+ LOG(INFO) << metrics_str.ToString();
+
+ // Sleep until next dump time, or return if we should exit
+ if (exit_latch_.WaitFor(MonoDelta::FromMilliseconds(250))) {
+ return;
+ }
+ }
+}
+
+void TimeSeriesCollector::BuildMetricsString(
+ WallTime time_since_start, faststring *dst_buf) const {
+ MutexLock l(series_lock_);
+
+ dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f",
+ scope_.c_str(), time_since_start));
+
+ for (SeriesMap::const_reference entry : series_map_) {
+ dst_buf->append(StringPrintf(", \"%s\": %.3f",
+ entry.first.c_str(), entry.second->value()));
+ }
+ dst_buf->append("}");
+}
+
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_graph.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.h b/be/src/kudu/util/test_graph.h
new file mode 100644
index 0000000..41df430
--- /dev/null
+++ b/be/src/kudu/util/test_graph.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TEST_GRAPH_COLLECTOR_H
+#define KUDU_TEST_GRAPH_COLLECTOR_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+class Thread;
+class faststring;
+
+class TimeSeries {
+ public:
+ void AddValue(double val);
+ void SetValue(double val);
+
+ double value() const;
+
+ private:
+ friend class TimeSeriesCollector;
+
+ DISALLOW_COPY_AND_ASSIGN(TimeSeries);
+
+ TimeSeries() :
+ val_(0)
+ {}
+
+ mutable simple_spinlock lock_;
+ double val_;
+};
+
+class TimeSeriesCollector {
+ public:
+ explicit TimeSeriesCollector(std::string scope)
+ : scope_(std::move(scope)), exit_latch_(0), started_(false) {}
+
+ ~TimeSeriesCollector();
+
+ std::shared_ptr<TimeSeries> GetTimeSeries(const std::string &key);
+ void StartDumperThread();
+ void StopDumperThread();
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(TimeSeriesCollector);
+
+ void DumperThread();
+ void BuildMetricsString(WallTime time_since_start, faststring *dst_buf) const;
+
+ std::string scope_;
+
+ typedef std::unordered_map<std::string, std::shared_ptr<TimeSeries> > SeriesMap;
+ SeriesMap series_map_;
+ mutable Mutex series_lock_;
+
+ scoped_refptr<kudu::Thread> dumper_thread_;
+
+ // Latch used to stop the dumper_thread_. When the thread is started,
+ // this is set to 1, and when the thread should exit, it is counted down.
+ CountDownLatch exit_latch_;
+
+ bool started_;
+};
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_macros.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_macros.h b/be/src/kudu/util/test_macros.h
new file mode 100644
index 0000000..63cae5a
--- /dev/null
+++ b/be/src/kudu/util/test_macros.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_TEST_MACROS_H
+#define KUDU_UTIL_TEST_MACROS_H
+
+#include <gmock/gmock.h>
+#include <string>
+
+// ASSERT_NO_FATAL_FAILURE is just too long to type.
+#define NO_FATALS(expr) \
+ ASSERT_NO_FATAL_FAILURE(expr)
+
+// Detect fatals in the surrounding scope. NO_FATALS() only checks for fatals
+// in the expression passed to it.
+#define NO_PENDING_FATALS() \
+ if (testing::Test::HasFatalFailure()) { return; }
+
+#define ASSERT_OK(status) do { \
+ const Status& _s = status; \
+ if (_s.ok()) { \
+ SUCCEED(); \
+ } else { \
+ FAIL() << "Bad status: " << _s.ToString(); \
+ } \
+} while (0);
+
+#define EXPECT_OK(status) do { \
+ const Status& _s = status; \
+ if (_s.ok()) { \
+ SUCCEED(); \
+ } else { \
+ ADD_FAILURE() << "Bad status: " << _s.ToString(); \
+ } \
+} while (0);
+
+// Like the above, but doesn't record successful
+// tests.
+#define ASSERT_OK_FAST(status) do { \
+ const Status& _s = status; \
+ if (!_s.ok()) { \
+ FAIL() << "Bad status: " << _s.ToString(); \
+ } \
+} while (0);
+
+// Substring matches.
+#define ASSERT_STR_CONTAINS(str, substr) \
+ ASSERT_THAT(str, testing::HasSubstr(substr))
+
+#define ASSERT_STR_NOT_CONTAINS(str, substr) \
+ ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
+
+// Substring regular expressions in extended regex (POSIX) syntax.
+#define ASSERT_STR_MATCHES(str, pattern) \
+ ASSERT_THAT(str, testing::ContainsRegex(pattern))
+
+#define ASSERT_STR_NOT_MATCHES(str, pattern) \
+ ASSERT_THAT(str, testing::Not(testing::ContainsRegex(pattern)))
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// All strings must match the pattern.
+#define ASSERT_STRINGS_ALL_MATCH(strings, pattern) do { \
+ const auto& _strings = (strings); \
+ const auto& _pattern = (pattern); \
+ int _str_idx = 0; \
+ for (const auto& str : _strings) { \
+ ASSERT_STR_MATCHES(str, _pattern) \
+ << "string " << _str_idx << ": pattern " << _pattern \
+ << " does not match string " << str; \
+ _str_idx++; \
+ } \
+} while (0)
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// At least one string must match the pattern.
+#define ASSERT_STRINGS_ANY_MATCH(strings, pattern) do { \
+ const auto& _strings = (strings); \
+ const auto& _pattern = (pattern); \
+ bool matched = false; \
+ for (const auto& str : _strings) { \
+ if (testing::internal::RE::PartialMatch(str, testing::internal::RE(_pattern))) { \
+ matched = true; \
+ break; \
+ } \
+ } \
+ ASSERT_TRUE(matched) \
+ << "not one string matched pattern " << _pattern; \
+} while (0)
+
+#define ASSERT_FILE_EXISTS(env, path) do { \
+ const std::string& _s = path; \
+ ASSERT_TRUE(env->FileExists(_s)) \
+ << "Expected file to exist: " << _s; \
+} while (0);
+
+#define ASSERT_FILE_NOT_EXISTS(env, path) do { \
+ const std::string& _s = path; \
+ ASSERT_FALSE(env->FileExists(_s)) \
+ << "Expected file not to exist: " << _s; \
+} while (0);
+
+#define CURRENT_TEST_NAME() \
+ ::testing::UnitTest::GetInstance()->current_test_info()->name()
+
+#define CURRENT_TEST_CASE_NAME() \
+ ::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_main.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_main.cc b/be/src/kudu/util/test_main.cc
new file mode 100644
index 0000000..c75e5ae
--- /dev/null
+++ b/be/src/kudu/util/test_main.cc
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/minidump.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pstack_watcher.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_timeout_after, 0,
+ "Maximum total seconds allowed for all unit tests in the suite. Default: disabled");
+
+DEFINE_int32(stress_cpu_threads, 0,
+ "Number of threads to start that burn CPU in an attempt to "
+ "stimulate race conditions");
+
+namespace kudu {
+
+// Start thread that kills the process if --test_timeout_after is exceeded before
+// the tests complete.
+static void CreateAndStartTimeoutThread() {
+ if (FLAGS_test_timeout_after == 0) return;
+
+ // KUDU-1995: if running death tests using EXPECT_EXIT()/ASSERT_EXIT(), LSAN
+ // reports leaks in CreateAndStartTimeoutThread(). Adding a couple of scoped
+ // leak check disablers as a workaround since right now it's not clear what
+ // is going on exactly: LSAN does not report those leaks for tests which run
+ // ASSERT_DEATH(). This does not seem harmful or hiding any potential leaks
+ // since it's scoped and targeted only for this utility thread.
+ debug::ScopedLeakCheckDisabler disabler;
+ std::thread([=](){
+ debug::ScopedLeakCheckDisabler disabler;
+ SleepFor(MonoDelta::FromSeconds(FLAGS_test_timeout_after));
+ // Dump a pstack to stdout.
+ WARN_NOT_OK(PstackWatcher::DumpStacks(), "Unable to print pstack");
+
+ // ...and abort.
+ LOG(FATAL) << "Maximum unit test time exceeded (" << FLAGS_test_timeout_after << " sec)";
+ }).detach();
+}
+} // namespace kudu
+
+
+static void StartStressThreads() {
+ for (int i = 0; i < FLAGS_stress_cpu_threads; i++) {
+ std::thread([]{
+ while (true) {
+ // Do something which won't be optimized out.
+ base::subtle::MemoryBarrier();
+ }
+ }).detach();
+ }
+}
+
+int main(int argc, char **argv) {
+ google::InstallFailureSignalHandler();
+
+ // We don't use InitGoogleLoggingSafe() because gtest initializes glog, so we
+ // need to block SIGUSR1 explicitly in order to test minidump generation.
+ CHECK_OK(kudu::BlockSigUSR1());
+
+ // Ignore SIGPIPE for all tests so that threads writing to TLS
+ // sockets do not crash when writing to a closed socket. See KUDU-1910.
+ kudu::IgnoreSigPipe();
+
+ // InitGoogleTest() must precede ParseCommandLineFlags(), as the former
+ // removes gtest-related flags from argv that would trip up the latter.
+ ::testing::InitGoogleTest(&argc, argv);
+ kudu::ParseCommandLineFlags(&argc, &argv, true);
+
+ // Create the test-timeout timer.
+ kudu::CreateAndStartTimeoutThread();
+
+ StartStressThreads();
+
+ // This is called by the KuduTest setup method, but in case we have
+ // any tests that don't inherit from KuduTest, it's helpful to
+ // cover our bases and call it here too.
+ kudu::KuduTest::OverrideKrb5Environment();
+
+ int ret = RUN_ALL_TESTS();
+
+ return ret;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
new file mode 100644
index 0000000..c960441
--- /dev/null
+++ b/be/src/kudu/util/test_util.cc
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_util.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#ifdef __APPLE__
+#include <fcntl.h>
+#include <sys/param.h> // for MAXPATHLEN
+#endif
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest-spi.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/spinlock_profiling.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+
+DEFINE_string(test_leave_files, "on_failure",
+ "Whether to leave test files around after the test run. "
+ " Valid values are 'always', 'on_failure', or 'never'");
+
+DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+const char* kInvalidPath = "/dev/invalid-path-for-kudu-tests";
+static const char* const kSlowTestsEnvVariable = "KUDU_ALLOW_SLOW_TESTS";
+
+static const uint64_t kTestBeganAtMicros = Env::Default()->NowMicros();
+
+// Global which production code can check to see if it is running
+// in a GTest environment (assuming the test binary links in this module,
+// which is typically a good assumption).
+//
+// This can be checked using the 'IsGTest()' function from test_util_prod.cc.
+bool g_is_gtest = true;
+
+///////////////////////////////////////////////////
+// KuduTest
+///////////////////////////////////////////////////
+
+KuduTest::KuduTest()
+ : env_(Env::Default()),
+ flag_saver_(new google::FlagSaver()),
+ test_dir_(GetTestDataDirectory()) {
+ std::map<const char*, const char*> flags_for_tests = {
+ // Disabling fsync() speeds up tests dramatically, and it's safe to do as no
+ // tests rely on cutting power to a machine or equivalent.
+ {"never_fsync", "true"},
+ // Disable redaction.
+ {"redact", "none"},
+ // Reduce default RSA key length for faster tests. We are using strong/high
+ // TLS v1.2 cipher suites, so minimum possible for TLS-related RSA keys is
+ // 768 bits. However, for the external mini cluster we use 1024 bits because
+ // Java default security policies require at least 1024 bits for RSA keys
+ // used in certificates. For uniformity, here 1024 RSA bit keys are used
+ // as well. As for the TSK keys, 512 bits is the minimum since the SHA256
+ // digest is used for token signing/verification.
+ {"ipki_server_key_size", "1024"},
+ {"ipki_ca_key_size", "1024"},
+ {"tsk_num_rsa_bits", "512"},
+ };
+ for (const auto& e : flags_for_tests) {
+ // We don't check for errors here, because we have some default flags that
+ // only apply to certain tests.
+ google::SetCommandLineOptionWithMode(e.first, e.second, google::SET_FLAGS_DEFAULT);
+ }
+ // If the TEST_TMPDIR variable has been set, then glog will automatically use that
+ // as its default log directory. We would prefer that the default log directory
+ // instead be the test-case-specific subdirectory.
+ FLAGS_log_dir = GetTestDataDirectory();
+}
+
+KuduTest::~KuduTest() {
+ // Reset the flags first to prevent them from affecting test directory cleanup.
+ flag_saver_.reset();
+
+ // Clean up the test directory in the destructor instead of a TearDown
+ // method. This is better because it ensures that the child-class
+ // dtor runs first -- so, if the child class is using a minicluster, etc,
+ // we will shut that down before we remove files underneath.
+ if (FLAGS_test_leave_files == "always") {
+ LOG(INFO) << "-----------------------------------------------";
+ LOG(INFO) << "--test_leave_files specified, leaving files in " << test_dir_;
+ } else if (FLAGS_test_leave_files == "on_failure" && HasFatalFailure()) {
+ LOG(INFO) << "-----------------------------------------------";
+ LOG(INFO) << "Had fatal failures, leaving test files at " << test_dir_;
+ } else {
+ VLOG(1) << "Cleaning up temporary test files...";
+ WARN_NOT_OK(env_->DeleteRecursively(test_dir_),
+ "Couldn't remove test files");
+ }
+}
+
+void KuduTest::SetUp() {
+ InitSpinLockContentionProfiling();
+ OverrideKrb5Environment();
+}
+
+string KuduTest::GetTestPath(const string& relative_path) const {
+ return JoinPathSegments(test_dir_, relative_path);
+}
+
+void KuduTest::OverrideKrb5Environment() {
+ // Set these variables to paths that definitely do not exist and
+ // couldn't be accidentally created.
+ //
+ // Note that if we were to set these to /dev/null, we end up triggering a leak in krb5
+ // when it tries to read an empty file as a ticket cache, whereas non-existent files
+ // don't have this issue. See MIT krb5 bug #8509.
+ //
+ // NOTE: we don't simply *unset* the variables, because then we'd still pick up
+ // the user's /etc/krb5.conf and other default locations.
+ setenv("KRB5_CONFIG", kInvalidPath, 1);
+ setenv("KRB5_KTNAME", kInvalidPath, 1);
+ setenv("KRB5CCNAME", kInvalidPath, 1);
+}
+
+///////////////////////////////////////////////////
+// Test utility functions
+///////////////////////////////////////////////////
+
+bool AllowSlowTests() {
+ char *e = getenv(kSlowTestsEnvVariable);
+ if ((e == nullptr) ||
+ (strlen(e) == 0) ||
+ (strcasecmp(e, "false") == 0) ||
+ (strcasecmp(e, "0") == 0) ||
+ (strcasecmp(e, "no") == 0)) {
+ return false;
+ }
+ if ((strcasecmp(e, "true") == 0) ||
+ (strcasecmp(e, "1") == 0) ||
+ (strcasecmp(e, "yes") == 0)) {
+ return true;
+ }
+ LOG(FATAL) << "Unrecognized value for " << kSlowTestsEnvVariable << ": " << e;
+ return false;
+}
+
+void OverrideFlagForSlowTests(const std::string& flag_name,
+ const std::string& new_value) {
+ // Ensure that the flag is valid.
+ google::GetCommandLineFlagInfoOrDie(flag_name.c_str());
+
+ // If we're not running slow tests, don't override it.
+ if (!AllowSlowTests()) {
+ return;
+ }
+ google::SetCommandLineOptionWithMode(flag_name.c_str(), new_value.c_str(),
+ google::SET_FLAG_IF_DEFAULT);
+}
+
+int SeedRandom() {
+ int seed;
+ // Initialize random seed
+ if (FLAGS_test_random_seed == 0) {
+ // Not specified by user
+ seed = static_cast<int>(GetCurrentTimeMicros());
+ } else {
+ seed = FLAGS_test_random_seed;
+ }
+ LOG(INFO) << "Using random seed: " << seed;
+ srand(seed);
+ return seed;
+}
+
+string GetTestDataDirectory() {
+ const ::testing::TestInfo* const test_info =
+ ::testing::UnitTest::GetInstance()->current_test_info();
+ CHECK(test_info) << "Must be running in a gtest unit test to call this function";
+ string dir;
+ CHECK_OK(Env::Default()->GetTestDirectory(&dir));
+
+ // The directory name includes some strings for specific reasons:
+ // - program name: identifies the directory to the test invoker
+ // - timestamp and pid: disambiguates with prior runs of the same test
+ //
+ // e.g. "env-test.TestEnv.TestReadFully.1409169025392361-23600"
+ //
+ // If the test is sharded, the shard index is also included so that the test
+ // invoker can more easily identify all directories belonging to each shard.
+ string shard_index_infix;
+ const char* shard_index = getenv("GTEST_SHARD_INDEX");
+ if (shard_index && shard_index[0] != '\0') {
+ shard_index_infix = Substitute("$0.", shard_index);
+ }
+ dir += Substitute("/$0.$1$2.$3.$4-$5",
+ StringReplace(google::ProgramInvocationShortName(), "/", "_", true),
+ shard_index_infix,
+ StringReplace(test_info->test_case_name(), "/", "_", true),
+ StringReplace(test_info->name(), "/", "_", true),
+ kTestBeganAtMicros,
+ getpid());
+ Status s = Env::Default()->CreateDir(dir);
+ CHECK(s.IsAlreadyPresent() || s.ok())
+ << "Could not create directory " << dir << ": " << s.ToString();
+ if (s.ok()) {
+ string metadata;
+
+ StrAppend(&metadata, Substitute("PID=$0\n", getpid()));
+
+ StrAppend(&metadata, Substitute("PPID=$0\n", getppid()));
+
+ char* jenkins_build_id = getenv("BUILD_ID");
+ if (jenkins_build_id) {
+ StrAppend(&metadata, Substitute("BUILD_ID=$0\n", jenkins_build_id));
+ }
+
+ CHECK_OK(WriteStringToFile(Env::Default(), metadata,
+ Substitute("$0/test_metadata", dir)));
+ }
+ return dir;
+}
+
+string GetTestExecutableDirectory() {
+ string exec;
+ CHECK_OK(Env::Default()->GetExecutablePath(&exec));
+ return DirName(exec);
+}
+
+void AssertEventually(const std::function<void(void)>& f,
+ const MonoDelta& timeout,
+ AssertBackoff backoff) {
+ const MonoTime deadline = MonoTime::Now() + timeout;
+ {
+ // Disable --gtest_break_on_failure, or else the assertion failures
+ // inside our attempts will cause the test to SEGV even though we
+ // would like to retry.
+ bool old_break_on_failure = testing::FLAGS_gtest_break_on_failure;
+ auto c = MakeScopedCleanup([old_break_on_failure]() {
+ testing::FLAGS_gtest_break_on_failure = old_break_on_failure;
+ });
+ testing::FLAGS_gtest_break_on_failure = false;
+
+ for (int attempts = 0; MonoTime::Now() < deadline; attempts++) {
+ // Capture any assertion failures within this scope (i.e. from their function)
+ // into 'results'
+ testing::TestPartResultArray results;
+ testing::ScopedFakeTestPartResultReporter reporter(
+ testing::ScopedFakeTestPartResultReporter::INTERCEPT_ONLY_CURRENT_THREAD,
+ &results);
+ f();
+
+ // Determine whether their function produced any new test failure results.
+ bool has_failures = false;
+ for (int i = 0; i < results.size(); i++) {
+ has_failures |= results.GetTestPartResult(i).failed();
+ }
+ if (!has_failures) {
+ return;
+ }
+
+ // If they had failures, sleep and try again.
+ int sleep_ms;
+ switch (backoff) {
+ case AssertBackoff::EXPONENTIAL:
+ sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+ break;
+ case AssertBackoff::NONE:
+ sleep_ms = 1;
+ break;
+ default:
+ LOG(FATAL) << "Unknown backoff type";
+ }
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ }
+ }
+
+ // If we ran out of time looping, run their function one more time
+ // without capturing its assertions. This way the assertions will
+ // propagate back out to the normal test reporter. Of course it's
+ // possible that it will pass on this last attempt, but that's OK
+ // too, since we aren't trying to be that strict about the deadline.
+ f();
+ if (testing::Test::HasFatalFailure()) {
+ ADD_FAILURE() << "Timed out waiting for assertion to pass.";
+ }
+}
+
+int CountOpenFds(Env* env, const string& path_pattern) {
+ static const char* kProcSelfFd =
+#if defined(__APPLE__)
+ "/dev/fd";
+#else
+ "/proc/self/fd";
+#endif // defined(__APPLE__)
+ faststring path_buf;
+ vector<string> children;
+ CHECK_OK(env->GetChildren(kProcSelfFd, &children));
+ int num_fds = 0;
+ for (const auto& c : children) {
+ // Skip '.' and '..'.
+ if (c == "." || c == "..") {
+ continue;
+ }
+ int32_t fd;
+ CHECK(safe_strto32(c, &fd)) << "Unexpected file in fd list: " << c;
+#ifdef __APPLE__
+ path_buf.resize(MAXPATHLEN);
+ if (fcntl(fd, F_GETPATH, path_buf.data()) != 0) {
+ if (errno == EBADF) {
+ // The file was closed while we were looping. This is likely the
+ // actual file descriptor used for opening /proc/fd itself.
+ continue;
+ }
+ PLOG(FATAL) << "Unknown error in fcntl(F_GETPATH): " << fd;
+ }
+ char* buf_data = reinterpret_cast<char*>(path_buf.data());
+ path_buf.resize(strlen(buf_data));
+#else
+ path_buf.resize(PATH_MAX);
+ char* buf_data = reinterpret_cast<char*>(path_buf.data());
+ auto proc_file = JoinPathSegments(kProcSelfFd, c);
+ int path_len = readlink(proc_file.c_str(), buf_data, path_buf.size());
+ if (path_len < 0) {
+ if (errno == ENOENT) {
+ // The file was closed while we were looping. This is likely the
+ // actual file descriptor used for opening /proc/fd itself.
+ continue;
+ }
+ PLOG(FATAL) << "Unknown error in readlink: " << proc_file;
+ }
+ path_buf.resize(path_len);
+#endif
+ if (!MatchPattern(path_buf.ToString(), path_pattern)) {
+ continue;
+ }
+ num_fds++;
+ }
+
+ return num_fds;
+}
+
+namespace {
+Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeout) {
+ // In general, processes do not expose the port they bind to, and
+ // reimplementing lsof involves parsing a lot of files in /proc/. So,
+ // requiring lsof for tests and parsing its output seems more
+ // straight-forward. We call lsof in a loop since it typically takes a long
+ // time for it to initialize and bind a port.
+
+ string lsof;
+ RETURN_NOT_OK(FindExecutable("lsof", {"/sbin", "/usr/sbin"}, &lsof));
+
+ const vector<string> cmd = {
+ lsof, "-wbnP", "-Ffn",
+ "-p", std::to_string(pid),
+ "-a", "-i", kind
+ };
+
+ MonoTime deadline = MonoTime::Now() + timeout;
+ string lsof_out;
+
+ for (int64_t i = 1; ; i++) {
+ lsof_out.clear();
+ Status s = Subprocess::Call(cmd, "", &lsof_out);
+
+ if (s.ok()) {
+ StripTrailingNewline(&lsof_out);
+ break;
+ }
+ if (deadline < MonoTime::Now()) {
+ return s;
+ }
+
+ SleepFor(MonoDelta::FromMilliseconds(i * 10));
+ }
+
+ // The '-Ffn' flag gets lsof to output something like:
+ // p19730
+ // f123
+ // n*:41254
+ // The first line is the pid. We ignore it.
+ // The second line is the file descriptor number. We ignore it.
+ // The third line has the bind address and port.
+ // Subsequent lines show active connections.
+ vector<string> lines = strings::Split(lsof_out, "\n");
+ int32_t p = -1;
+ if (lines.size() < 3 ||
+ lines[2].substr(0, 3) != "n*:" ||
+ !safe_strto32(lines[2].substr(3), &p) ||
+ p <= 0) {
+ return Status::RuntimeError("unexpected lsof output", lsof_out);
+ }
+ CHECK(p > 0 && p < std::numeric_limits<uint16_t>::max()) << "parsed invalid port: " << p;
+ VLOG(1) << "Determined bound port: " << p;
+ *port = p;
+ return Status::OK();
+}
+} // anonymous namespace
+
+Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
+ return WaitForBind(pid, port, "4TCP", timeout);
+}
+
+Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) {
+ return WaitForBind(pid, port, "4UDP", timeout);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
new file mode 100644
index 0000000..8090fbc
--- /dev/null
+++ b/be/src/kudu/util/test_util.h
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Base test class, with various utility functions.
+#ifndef KUDU_UTIL_TEST_UTIL_H
+#define KUDU_UTIL_TEST_UTIL_H
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/monotime.h"
+
+#define ASSERT_EVENTUALLY(expr) do { \
+ AssertEventually(expr); \
+ NO_PENDING_FATALS(); \
+} while (0)
+
+namespace google {
+class FlagSaver;
+} // namespace google
+
+namespace kudu {
+
+class Env;
+class Status;
+
+extern const char* kInvalidPath;
+
+class KuduTest : public ::testing::Test {
+ public:
+ KuduTest();
+
+ virtual ~KuduTest();
+
+ virtual void SetUp() OVERRIDE;
+
+ // Tests assume that they run with no outside-provided kerberos credentials, and if the
+ // user happened to have some credentials available they might fail due to being already
+ // kinitted to a different realm, etc. This function overrides the relevant environment
+ // variables so that we don't pick up the user's credentials.
+ static void OverrideKrb5Environment();
+
+ protected:
+ // Returns absolute path based on a unit test-specific work directory, given
+ // a relative path. Useful for writing test files that should be deleted after
+ // the test ends.
+ std::string GetTestPath(const std::string& relative_path) const;
+
+ Env* env_;
+
+ // Reset flags on every test. Allocated on the heap so it can be destroyed
+ // (and the flags reset) before test_dir_ is deleted.
+ std::unique_ptr<google::FlagSaver> flag_saver_;
+
+ std::string test_dir_;
+};
+
+// Returns true if slow tests are runtime-enabled.
+bool AllowSlowTests();
+
+// Override the given gflag to the new value, only in the case that
+// slow tests are enabled and the user hasn't otherwise overridden
+// it on the command line.
+// Example usage:
+//
+// OverrideFlagForSlowTests(
+// "client_inserts_per_thread",
+// strings::Substitute("$0", FLAGS_client_inserts_per_thread * 100));
+//
+void OverrideFlagForSlowTests(const std::string& flag_name,
+ const std::string& new_value);
+
+// Call srand() with a random seed based on the current time, reporting
+// that seed to the logs. The time-based seed may be overridden by passing
+// --test_random_seed= from the CLI in order to reproduce a failed randomized
+// test. Returns the seed.
+int SeedRandom();
+
+// Return a per-test directory in which to store test data. Guaranteed to
+// return the same directory every time for a given unit test.
+//
+// May only be called from within a gtest unit test. Prefer KuduTest::test_dir_
+// if a KuduTest instance is available.
+std::string GetTestDataDirectory();
+
+// Return the directory which contains the test's executable.
+std::string GetTestExecutableDirectory();
+
+// Wait until 'f()' succeeds without adding any GTest 'fatal failures'.
+// For example:
+//
+// AssertEventually([]() {
+// ASSERT_GT(ReadValueOfMetric(), 10);
+// });
+//
+// The function is run in a loop with optional back-off.
+//
+// To check whether AssertEventually() eventually succeeded, call
+// NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs
+// this check automatically.
+enum class AssertBackoff {
+ // Use exponential back-off while looping, capped at one second.
+ EXPONENTIAL,
+
+ // Sleep for a millisecond while looping.
+ NONE,
+};
+void AssertEventually(const std::function<void(void)>& f,
+ const MonoDelta& timeout = MonoDelta::FromSeconds(30),
+ AssertBackoff backoff = AssertBackoff::EXPONENTIAL);
+
+// Count the number of open file descriptors in use by this process.
+// 'path_pattern' is a glob-style pattern. Only paths that match this
+// pattern are included. Note that '*' in this pattern is recursive
+// unlike the usual behavior of path globs.
+int CountOpenFds(Env* env, const std::string& path_pattern);
+
+// Waits for the subprocess to bind to any listening TCP port, and returns the port.
+Status WaitForTcpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+
+// Waits for the subprocess to bind to any listening UDP port, and returns the port.
+Status WaitForUdpBind(pid_t pid, uint16_t* port, MonoDelta timeout) WARN_UNUSED_RESULT;
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util_prod.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.cc b/be/src/kudu/util/test_util_prod.cc
new file mode 100644
index 0000000..5523bac
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_util_prod.h"
+
+#include <dlfcn.h>
+
+namespace kudu {
+
+bool IsGTest() {
+ return dlsym(RTLD_DEFAULT, "_ZN4kudu10g_is_gtestE") != nullptr;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/test_util_prod.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.h b/be/src/kudu/util/test_util_prod.h
new file mode 100644
index 0000000..8b7ea61
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test-related utility methods that can be called from non-test
+// code. This module is part of the 'util' module and is built into
+// all binaries, not just tests, whereas 'test_util.cc' is linked
+// only into test binaries.
+
+#pragma once
+
+namespace kudu {
+
+// Return true if the current binary is a gtest. More specifically,
+// returns true if the 'test_util.cc' module has been linked in
+// (either dynamically or statically) to the running process.
+bool IsGTest();
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread-test.cc b/be/src/kudu/util/thread-test.cc
new file mode 100644
index 0000000..d3ee733
--- /dev/null
+++ b/be/src/kudu/util/thread-test.cc
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/thread.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread_restrictions.h"
+
+using std::string;
+
+namespace kudu {
+
+class ThreadTest : public KuduTest {};
+
+// Join with a thread and emit warnings while waiting to join.
+// This has to be manually verified.
+TEST_F(ThreadTest, TestJoinAndWarn) {
+ if (!AllowSlowTests()) {
+ LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+ return;
+ }
+
+ scoped_refptr<Thread> holder;
+ ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+ ASSERT_OK(ThreadJoiner(holder.get())
+ .warn_after_ms(10)
+ .warn_every_ms(100)
+ .Join());
+}
+
+TEST_F(ThreadTest, TestFailedJoin) {
+ if (!AllowSlowTests()) {
+ LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+ return;
+ }
+
+ scoped_refptr<Thread> holder;
+ ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder));
+ Status s = ThreadJoiner(holder.get())
+ .give_up_after_ms(50)
+ .Join();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out after 50ms joining on sleeper thread");
+}
+
+static void TryJoinOnSelf() {
+ Status s = ThreadJoiner(Thread::current_thread()).Join();
+ // Use CHECK instead of ASSERT because gtest isn't thread-safe.
+ CHECK(s.IsInvalidArgument());
+}
+
+// Try to join on the thread that is currently running.
+TEST_F(ThreadTest, TestJoinOnSelf) {
+ scoped_refptr<Thread> holder;
+ ASSERT_OK(Thread::Create("test", "test", TryJoinOnSelf, &holder));
+ holder->Join();
+ // Actual assertion is done by the thread spawned above.
+}
+
+TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
+ scoped_refptr<Thread> holder;
+ ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder));
+ ThreadJoiner joiner(holder.get());
+ ASSERT_OK(joiner.Join());
+ ASSERT_OK(joiner.Join());
+}
+
+TEST_F(ThreadTest, ThreadStartBenchmark) {
+ std::vector<scoped_refptr<Thread>> threads(1000);
+ LOG_TIMING(INFO, "starting threads") {
+ for (auto& t : threads) {
+ ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+ }
+ }
+ LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
+ for (auto& t : threads) {
+ t->tid();
+ }
+ }
+
+ for (auto& t : threads) {
+ t->Join();
+ }
+}
+
+// The following tests only run in debug mode, since thread restrictions are no-ops
+// in release builds.
+#ifndef NDEBUG
+TEST_F(ThreadTest, TestThreadRestrictions_IO) {
+ // Default should be to allow IO
+ ThreadRestrictions::AssertIOAllowed();
+
+ ThreadRestrictions::SetIOAllowed(false);
+ {
+ ThreadRestrictions::ScopedAllowIO allow_io;
+ ASSERT_TRUE(Env::Default()->FileExists("/"));
+ }
+ ThreadRestrictions::SetIOAllowed(true);
+
+ // Disallow IO - doing IO should crash the process.
+ ASSERT_DEATH({
+ ThreadRestrictions::SetIOAllowed(false);
+ ignore_result(Env::Default()->FileExists("/"));
+ },
+ "Function marked as IO-only was called from a thread that disallows IO");
+}
+
+TEST_F(ThreadTest, TestThreadRestrictions_Waiting) {
+ // Default should be to allow IO
+ ThreadRestrictions::AssertWaitAllowed();
+
+ ThreadRestrictions::SetWaitAllowed(false);
+ {
+ ThreadRestrictions::ScopedAllowWait allow_wait;
+ CountDownLatch l(0);
+ l.Wait();
+ }
+ ThreadRestrictions::SetWaitAllowed(true);
+
+ // Disallow waiting - blocking on a latch should crash the process.
+ ASSERT_DEATH({
+ ThreadRestrictions::SetWaitAllowed(false);
+ CountDownLatch l(0);
+ l.Wait();
+ },
+ "Waiting is not allowed to be used on this thread");
+}
+#endif // NDEBUG
+
+} // namespace kudu