You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:18 UTC

[06/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.h b/be/src/kudu/util/status.h
new file mode 100644
index 0000000..3c8a1d9
--- /dev/null
+++ b/be/src/kudu/util/status.h
@@ -0,0 +1,493 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// A Status encapsulates the result of an operation.  It may indicate success,
+// or it may indicate an error with an associated error message.
+//
+// Multiple threads can invoke const methods on a Status without
+// external synchronization, but if any of the threads may call a
+// non-const method, all threads accessing the same Status must use
+// external synchronization.
+
+#ifndef KUDU_UTIL_STATUS_H_
+#define KUDU_UTIL_STATUS_H_
+
+// NOTE: using stdint.h instead of cstdint and errno.h instead of errno because
+// this file is supposed to be processed by a compiler lacking C++11 support.
+#include <errno.h>
+#include <stdint.h>
+
+#include <cstddef>
+#include <string>
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/slice.h"
+
+/// @brief Return the given status if it is not @c OK.
+#define KUDU_RETURN_NOT_OK(s) do { \
+    const ::kudu::Status& _s = (s);             \
+    if (PREDICT_FALSE(!_s.ok())) return _s;     \
+  } while (0);
+
+/// @brief Return the given status if it is not OK, but first clone it and
+///   prepend the given message.
+#define KUDU_RETURN_NOT_OK_PREPEND(s, msg) do { \
+    const ::kudu::Status& _s = (s);                              \
+    if (PREDICT_FALSE(!_s.ok())) return _s.CloneAndPrepend(msg); \
+  } while (0);
+
+/// @brief Return @c to_return if @c to_call returns a bad status.
+///   The substitution for 'to_return' may reference the variable
+///   @c s for the bad status.
+#define KUDU_RETURN_NOT_OK_RET(to_call, to_return) do { \
+    const ::kudu::Status& s = (to_call);                \
+    if (PREDICT_FALSE(!s.ok())) return (to_return);  \
+  } while (0);
+
+/// @brief Return the given status if it is not OK, evaluating `on_error` if so.
+#define KUDU_RETURN_NOT_OK_EVAL(s, on_error) do { \
+    const ::kudu::Status& _s = (s); \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      (on_error); \
+      return _s; \
+    } \
+  } while (0);
+
+/// @brief Emit a warning if @c to_call returns a bad status.
+#define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \
+    const ::kudu::Status& _s = (to_call);              \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      KUDU_LOG(WARNING) << (warning_prefix) << ": " << _s.ToString();  \
+    } \
+  } while (0);
+
+/// @brief Log the given status and return immediately.
+#define KUDU_LOG_AND_RETURN(level, status) do { \
+    const ::kudu::Status& _s = (status);        \
+    KUDU_LOG(level) << _s.ToString(); \
+    return _s; \
+  } while (0);
+
+/// @brief If the given status is not OK, log it and 'msg' at 'level' and return the status.
+#define KUDU_RETURN_NOT_OK_LOG(s, level, msg) do { \
+    const ::kudu::Status& _s = (s);             \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      KUDU_LOG(level) << "Status: " << _s.ToString() << " " << (msg); \
+      return _s;     \
+    } \
+  } while (0);
+
+/// @brief If @c to_call returns a bad status, CHECK immediately with
+///   a logged message of @c msg followed by the status.
+#define KUDU_CHECK_OK_PREPEND(to_call, msg) do { \
+    const ::kudu::Status& _s = (to_call);                   \
+    KUDU_CHECK(_s.ok()) << (msg) << ": " << _s.ToString();  \
+  } while (0);
+
+/// @brief If the status is bad, CHECK immediately, appending the status to the
+///   logged message.
+#define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status")
+
+/// @brief If @c to_call returns a bad status, DCHECK immediately with
+///   a logged message of @c msg followed by the status.
+#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \
+    const ::kudu::Status& _s = (to_call);                   \
+    KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString();  \
+  } while (0);
+
+/// @brief If the status is bad, DCHECK immediately, appending the status to the
+///   logged 'Bad status' message.
+#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status")
+
+/// @file status.h
+///
+/// This header is used in both the Kudu build as well as in builds of
+/// applications that use the Kudu C++ client. In the latter we need to be
+/// careful to "namespace" our macros, to avoid colliding or overriding with
+/// similarly named macros belonging to the application.
+///
+/// KUDU_HEADERS_USE_SHORT_STATUS_MACROS handles this behavioral change. When
+/// defined, we're building Kudu and:
+/// @li Non-namespaced macros are allowed and mapped to the namespaced versions
+///   defined above.
+/// @li Namespaced versions of glog macros are mapped to the real glog macros
+///   (otherwise the macros are defined in the C++ client stubs).
+#ifdef KUDU_HEADERS_USE_SHORT_STATUS_MACROS
+#define RETURN_NOT_OK         KUDU_RETURN_NOT_OK
+#define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND
+#define RETURN_NOT_OK_RET     KUDU_RETURN_NOT_OK_RET
+#define RETURN_NOT_OK_EVAL    KUDU_RETURN_NOT_OK_EVAL
+#define WARN_NOT_OK           KUDU_WARN_NOT_OK
+#define LOG_AND_RETURN        KUDU_LOG_AND_RETURN
+#define RETURN_NOT_OK_LOG     KUDU_RETURN_NOT_OK_LOG
+#define CHECK_OK_PREPEND      KUDU_CHECK_OK_PREPEND
+#define CHECK_OK              KUDU_CHECK_OK
+#define DCHECK_OK_PREPEND     KUDU_DCHECK_OK_PREPEND
+#define DCHECK_OK             KUDU_DCHECK_OK
+
+// These are standard glog macros.
+#define KUDU_LOG              LOG
+#define KUDU_CHECK            CHECK
+#define KUDU_DCHECK           DCHECK
+#endif
+
+namespace kudu {
+
+/// @brief A representation of an operation's outcome.
+class KUDU_EXPORT Status {
+ public:
+  /// Create an object representing success status.
+  Status() : state_(NULL) { }
+
+  ~Status() { delete[] state_; }
+
+  /// Copy the specified status.
+  ///
+  /// @param [in] s
+  ///   The status object to copy from.
+  Status(const Status& s);
+
+  /// Assign the specified status.
+  ///
+  /// @param [in] s
+  ///   The status object to assign from.
+  /// @return The reference to the modified object.
+  Status& operator=(const Status& s);
+
+#if __cplusplus >= 201103L
+  /// Move the specified status (C++11).
+  ///
+  /// @param [in] s
+  ///   rvalue reference to a Status object.
+  Status(Status&& s) noexcept;
+
+  /// Assign the specified status using move semantics (C++11).
+  ///
+  /// @param [in] s
+  ///   rvalue reference to a Status object.
+  /// @return The reference to the modified object.
+  Status& operator=(Status&& s) noexcept;
+
+  /// If this status is OK, calls 'op' and returns the result, otherwise returns
+  /// this status.
+  ///
+  /// This method can be used to chain together multiple Status-returning
+  /// operations, short circuiting after the first one to fail.
+  ///
+  /// Example:
+  ///
+  /// @code
+  /// unique_ptr<SequentialFile> file;
+  /// Status s = Env::Default()
+  ///               ->NewSequentialFile("/tmp/example.txt", &file)
+  ///               .AndThen([&] {
+  ///                 return file->Write(0, "some data")
+  ///                             .CloneAndPrepend("failed to write to example file");
+  ///               });
+  /// @endcode
+  ///
+  /// @param [in] op
+  ///   Status-returning closure or function to run.
+  /// @return 'this', if this is not OK, or the result of running op.
+  template<typename F>
+  Status AndThen(F op) {
+    if (ok()) {
+      return op();
+    }
+    return *this;
+  }
+#endif
+
+  /// @return A success status.
+  static Status OK() { return Status(); }
+
+
+  /// @name Methods to build status objects for various types of errors.
+  ///
+  /// @param [in] msg
+  ///   The informational message on the error.
+  /// @param [in] msg2
+  ///   Additional information on the error (optional).
+  /// @param [in] posix_code
+  ///   POSIX error code, if applicable (optional).
+  /// @return The error status of an appropriate type.
+  ///
+  ///@{
+  static Status NotFound(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotFound, msg, msg2, posix_code);
+  }
+  static Status Corruption(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kCorruption, msg, msg2, posix_code);
+  }
+  static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotSupported, msg, msg2, posix_code);
+  }
+  static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kInvalidArgument, msg, msg2, posix_code);
+  }
+  static Status IOError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kIOError, msg, msg2, posix_code);
+  }
+  static Status AlreadyPresent(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kAlreadyPresent, msg, msg2, posix_code);
+  }
+  static Status RuntimeError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kRuntimeError, msg, msg2, posix_code);
+  }
+  static Status NetworkError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNetworkError, msg, msg2, posix_code);
+  }
+  static Status IllegalState(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kIllegalState, msg, msg2, posix_code);
+  }
+  static Status NotAuthorized(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotAuthorized, msg, msg2, posix_code);
+  }
+  static Status Aborted(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kAborted, msg, msg2, posix_code);
+  }
+  static Status RemoteError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kRemoteError, msg, msg2, posix_code);
+  }
+  static Status ServiceUnavailable(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kServiceUnavailable, msg, msg2, posix_code);
+  }
+  static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kTimedOut, msg, msg2, posix_code);
+  }
+  static Status Uninitialized(const Slice& msg, const Slice& msg2 = Slice(),
+                              int16_t posix_code = -1) {
+    return Status(kUninitialized, msg, msg2, posix_code);
+  }
+  static Status ConfigurationError(const Slice& msg, const Slice& msg2 = Slice(),
+                                   int16_t posix_code = -1) {
+    return Status(kConfigurationError, msg, msg2, posix_code);
+  }
+  static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice(),
+                           int64_t posix_code = -1) {
+    return Status(kIncomplete, msg, msg2, posix_code);
+  }
+  static Status EndOfFile(const Slice& msg, const Slice& msg2 = Slice(),
+                          int64_t posix_code = -1) {
+    return Status(kEndOfFile, msg, msg2, posix_code);
+  }
+  ///@}
+
+  /// @return @c true iff the status indicates success.
+  bool ok() const { return (state_ == NULL); }
+
+  /// @return @c true iff the status indicates a NotFound error.
+  bool IsNotFound() const { return code() == kNotFound; }
+
+  /// @return @c true iff the status indicates a Corruption error.
+  bool IsCorruption() const { return code() == kCorruption; }
+
+  /// @return @c true iff the status indicates a NotSupported error.
+  bool IsNotSupported() const { return code() == kNotSupported; }
+
+  /// @return @c true iff the status indicates an IOError.
+  bool IsIOError() const { return code() == kIOError; }
+
+  /// @return @c true iff the status indicates an InvalidArgument error.
+  bool IsInvalidArgument() const { return code() == kInvalidArgument; }
+
+  /// @return @c true iff the status indicates an AlreadyPresent error.
+  bool IsAlreadyPresent() const { return code() == kAlreadyPresent; }
+
+  /// @return @c true iff the status indicates a RuntimeError.
+  bool IsRuntimeError() const { return code() == kRuntimeError; }
+
+  /// @return @c true iff the status indicates a NetworkError.
+  bool IsNetworkError() const { return code() == kNetworkError; }
+
+  /// @return @c true iff the status indicates an IllegalState error.
+  bool IsIllegalState() const { return code() == kIllegalState; }
+
+  /// @return @c true iff the status indicates a NotAuthorized error.
+  bool IsNotAuthorized() const { return code() == kNotAuthorized; }
+
+  /// @return @c true iff the status indicates an Aborted error.
+  bool IsAborted() const { return code() == kAborted; }
+
+  /// @return @c true iff the status indicates a RemoteError.
+  bool IsRemoteError() const { return code() == kRemoteError; }
+
+  /// @return @c true iff the status indicates ServiceUnavailable.
+  bool IsServiceUnavailable() const { return code() == kServiceUnavailable; }
+
+  /// @return @c true iff the status indicates TimedOut.
+  bool IsTimedOut() const { return code() == kTimedOut; }
+
+  /// @return @c true iff the status indicates Uninitialized.
+  bool IsUninitialized() const { return code() == kUninitialized; }
+
+  /// @return @c true iff the status indicates ConfigurationError.
+  bool IsConfigurationError() const { return code() == kConfigurationError; }
+
+  /// @return @c true iff the status indicates Incomplete.
+  bool IsIncomplete() const { return code() == kIncomplete; }
+
+  /// @return @c true iff the status indicates end of file.
+  bool IsEndOfFile() const { return code() == kEndOfFile; }
+
+  /// @return @c true iff the status indicates a disk failure.
+  bool IsDiskFailure() const {
+    switch (posix_code()) {
+      case EIO:
+      case ENODEV:
+      case ENXIO:
+      case EROFS:
+        return true;
+    }
+    return false;
+  }
+
+  /// @return A string representation of this status suitable for printing.
+  ///   Returns the string "OK" for success.
+  std::string ToString() const;
+
+  /// @return A string representation of the status code, without the message
+  ///   text or POSIX code information.
+  std::string CodeAsString() const;
+
+  /// This is similar to ToString, except that it does not include
+  /// the stringified error code or POSIX code.
+  ///
+  /// @note The returned Slice is only valid as long as this Status object
+  ///   remains live and unchanged.
+  ///
+  /// @return The message portion of the Status. For @c OK statuses,
+  ///   this returns an empty string.
+  Slice message() const;
+
+  /// @return The POSIX code associated with this Status object,
+  ///   or @c -1 if there is none.
+  int16_t posix_code() const;
+
+  /// Clone this status and add the specified prefix to the message.
+  ///
+  /// If this status is OK, then an OK status will be returned.
+  ///
+  /// @param [in] msg
+  ///   The message to prepend.
+  /// @return A new Status object with the same state plus an additional
+  ///   leading message.
+  Status CloneAndPrepend(const Slice& msg) const;
+
+  /// Clone this status and add the specified suffix to the message.
+  ///
+  /// If this status is OK, then an OK status will be returned.
+  ///
+  /// @param [in] msg
+  ///   The message to append.
+  /// @return A new Status object with the same state plus an additional
+  ///   trailing message.
+  Status CloneAndAppend(const Slice& msg) const;
+
+  /// @return The memory usage of this object without the object itself.
+  ///   Should be used when embedded inside another object.
+  size_t memory_footprint_excluding_this() const;
+
+  /// @return The memory usage of this object including the object itself.
+  ///   Should be used when allocated on the heap.
+  size_t memory_footprint_including_this() const;
+
+ private:
+  // OK status has a NULL state_.  Otherwise, state_ is a new[] array
+  // of the following form:
+  //    state_[0..3] == length of message
+  //    state_[4]    == code
+  //    state_[5..6] == posix_code
+  //    state_[7..]  == message
+  const char* state_;
+
+  enum Code {
+    kOk = 0,
+    kNotFound = 1,
+    kCorruption = 2,
+    kNotSupported = 3,
+    kInvalidArgument = 4,
+    kIOError = 5,
+    kAlreadyPresent = 6,
+    kRuntimeError = 7,
+    kNetworkError = 8,
+    kIllegalState = 9,
+    kNotAuthorized = 10,
+    kAborted = 11,
+    kRemoteError = 12,
+    kServiceUnavailable = 13,
+    kTimedOut = 14,
+    kUninitialized = 15,
+    kConfigurationError = 16,
+    kIncomplete = 17,
+    kEndOfFile = 18,
+    // NOTE: Remember to duplicate these constants into wire_protocol.proto and
+    // and to add StatusTo/FromPB ser/deser cases in wire_protocol.cc !
+    // Also remember to make the same changes to the java client in Status.java.
+    //
+    // TODO: Move error codes into an error_code.proto or something similar.
+  };
+  COMPILE_ASSERT(sizeof(Code) == 4, code_enum_size_is_part_of_abi);
+
+  Code code() const {
+    return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
+  }
+
+  Status(Code code, const Slice& msg, const Slice& msg2, int16_t posix_code);
+  static const char* CopyState(const char* s);
+};
+
+inline Status::Status(const Status& s) {
+  state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+}
+
+inline Status& Status::operator=(const Status& s) {
+  // The following condition catches both aliasing (when this == &s),
+  // and the common case where both s and *this are OK.
+  if (state_ != s.state_) {
+    delete[] state_;
+    state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+  }
+  return *this;
+}
+
+#if __cplusplus >= 201103L
+inline Status::Status(Status&& s) noexcept : state_(s.state_) {
+  s.state_ = nullptr;
+}
+
+inline Status& Status::operator=(Status&& s) noexcept {
+  if (state_ != s.state_) {
+    delete[] state_;
+    state_ = s.state_;
+    s.state_ = nullptr;
+  }
+  return *this;
+}
+#endif
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_STATUS_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status_callback.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.cc b/be/src/kudu/util/status_callback.cc
new file mode 100644
index 0000000..a3932b5
--- /dev/null
+++ b/be/src/kudu/util/status_callback.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/status_callback.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+
+void DoNothingStatusCB(const Status& status) {}
+
+void CrashIfNotOkStatusCB(const string& message, const Status& status) {
+  if (PREDICT_FALSE(!status.ok())) {
+    LOG(FATAL) << message << ": " << status.ToString();
+  }
+}
+
+Status DoNothingStatusClosure() { return Status::OK(); }
+
+} // end namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.h b/be/src/kudu/util/status_callback.h
new file mode 100644
index 0000000..70bbb97
--- /dev/null
+++ b/be/src/kudu/util/status_callback.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STATUS_CALLBACK_H
+#define KUDU_UTIL_STATUS_CALLBACK_H
+
+#include <functional>
+#include <string>
+
+#include "kudu/gutil/callback_forward.h"
+
+namespace kudu {
+
+class Status;
+
+// A callback which takes a Status. This is typically used for functions which
+// produce asynchronous results and may fail.
+typedef Callback<void(const Status& status)> StatusCallback;
+
+// Like StatusCallback but uses the STL function objects.
+//
+// TODO(adar): should eventually replace all StatusCallback usage with this.
+typedef std::function<void(const Status& status)> StdStatusCallback;
+
+// To be used when a function signature requires a StatusCallback but none
+// is needed.
+extern void DoNothingStatusCB(const Status& status);
+
+// A callback that crashes with a FATAL log message if the given Status is not OK.
+extern void CrashIfNotOkStatusCB(const std::string& message, const Status& status);
+
+// A closure (callback without arguments) that returns a Status indicating
+// whether it was successful or not.
+typedef Callback<Status(void)> StatusClosure;
+
+// To be used when setting a StatusClosure is optional.
+extern Status DoNothingStatusClosure();
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stopwatch.h b/be/src/kudu/util/stopwatch.h
new file mode 100644
index 0000000..f15b597
--- /dev/null
+++ b/be/src/kudu/util/stopwatch.h
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STOPWATCH_H
+#define KUDU_UTIL_STOPWATCH_H
+
+#include <glog/logging.h>
+#include <sys/resource.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string>
+#if defined(__APPLE__)
+#include <mach/clock.h>
+#include <mach/mach.h>
+#include <mach/thread_info.h>
+#endif  // defined(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+
+namespace kudu {
+
+// Macro for logging timing of a block. Usage:
+//   LOG_TIMING_PREFIX_IF(INFO, FLAGS_should_record_time, "Tablet X: ", "doing some task") {
+//     ... some task which takes some time
+//   }
+// If FLAGS_should_record_time is true, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Tablet X: Time spent doing some task:
+//   real 3.729s user 3.570s sys 0.150s
+// The task will always execute regardless of whether the timing information is
+// printed.
+#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, prefix, description, \
+          -1, (condition)); !_l.HasRun(); _l.MarkHasRun())
+
+// Conditionally log, no prefix.
+#define LOG_TIMING_IF(severity, condition, description) \
+  LOG_TIMING_PREFIX_IF(severity, (condition), "", (description))
+
+// Always log, including prefix.
+#define LOG_TIMING_PREFIX(severity, prefix, description) \
+  LOG_TIMING_PREFIX_IF(severity, true, (prefix), (description))
+
+// Always log, no prefix.
+#define LOG_TIMING(severity, description) \
+  LOG_TIMING_IF(severity, true, (description))
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_LOG_TIMING(severity, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, "", description, -1, true);
+
+// Scoped version of LOG_SLOW_EXECUTION().
+#define SCOPED_LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, "", description, max_expected_millis, true)
+
+// Scoped version of LOG_SLOW_EXECUTION() but with a prefix.
+#define SCOPED_LOG_SLOW_EXECUTION_PREFIX(severity, max_expected_millis, prefix, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, prefix, description, max_expected_millis, true)
+
+// Macro for logging timing of a block. Usage:
+//   LOG_SLOW_EXECUTION(INFO, 5, "doing some task") {
+//     ... some task which takes some time
+//   }
+// when slower than 5 milliseconds, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Time spent doing some task:
+//   real 3.729s user 3.570s sys 0.150s
+#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, "", description, \
+          max_expected_millis, true); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro for vlogging timing of a block. The execution happens regardless of the vlog_level,
+// it's only the logging that's affected.
+// Usage:
+//   VLOG_TIMING(1, "doing some task") {
+//     ... some task which takes some time
+//   }
+// Yields a log just like LOG_TIMING's.
+#define VLOG_TIMING(vlog_level, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::INFO, "", description, \
+          -1, VLOG_IS_ON(vlog_level)); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_VLOG_TIMING(vlog_level, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::INFO, "", description, -1, VLOG_IS_ON(vlog_level));
+
+
+// Workaround for the clang analyzer being confused by the above loop-based macros.
+// The analyzer thinks the macros might loop more than once, and thus generates
+// false positives. So, for its purposes, just make them empty.
+#if defined(CLANG_TIDY) || defined(__clang_analyzer__)
+
+#undef LOG_TIMING_PREFIX_IF
+#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description)
+
+#undef VLOG_TIMING
+#define VLOG_TIMING(vlog_level, description)
+
+#undef LOG_SLOW_EXECUTION
+#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description)
+#endif
+
+
+#define NANOS_PER_SECOND 1000000000.0
+#define NANOS_PER_MILLISECOND 1000000.0
+
+class Stopwatch;
+
+typedef int64_t nanosecond_type;
+
+// Structure which contains an elapsed amount of wall/user/sys time.
+struct CpuTimes {
+  nanosecond_type wall;
+  nanosecond_type user;
+  nanosecond_type system;
+  int64_t context_switches;
+
+  void clear() { wall = user = system = context_switches = 0LL; }
+
+  // Return a string formatted similar to the output of the "time" shell command.
+  std::string ToString() const {
+    return StringPrintf(
+      "real %.3fs\tuser %.3fs\tsys %.3fs",
+      wall_seconds(), user_cpu_seconds(), system_cpu_seconds());
+  }
+
+  double wall_millis() const {
+    return static_cast<double>(wall) / NANOS_PER_MILLISECOND;
+  }
+
+  double wall_seconds() const {
+    return static_cast<double>(wall) / NANOS_PER_SECOND;
+  }
+
+  double user_cpu_seconds() const {
+    return static_cast<double>(user) / NANOS_PER_SECOND;
+  }
+
+  double system_cpu_seconds() const {
+    return static_cast<double>(system) / NANOS_PER_SECOND;
+  }
+};
+
+// A Stopwatch is a convenient way of timing a given operation.
+//
+// Wall clock time is based on a monotonic timer, so can be reliably used for
+// determining durations.
+// CPU time is based on either current thread's usage or the usage of the whole
+// process, depending on the value of 'Mode' passed to the constructor.
+//
+// The implementation relies on several syscalls, so should not be used for
+// hot paths, but is useful for timing anything on the granularity of seconds
+// or more.
+//
+// NOTE: the user time reported by this class is based on Linux scheduler ticks
+// and thus has low precision. Use GetThreadCpuTimeMicros() from walltime.h if
+// more accurate per-thread CPU usage timing is required.
+class Stopwatch {
+ public:
+
+  enum Mode {
+    // Collect usage only about the calling thread.
+    // This may not be supported on older versions of Linux.
+    THIS_THREAD,
+    // Collect usage of all threads.
+    ALL_THREADS
+  };
+
+  // Construct a new stopwatch. The stopwatch is initially stopped.
+  explicit Stopwatch(Mode mode = THIS_THREAD)
+      : mode_(mode),
+        stopped_(true) {
+    times_.clear();
+  }
+
+  // Start counting. If the stopwatch is already counting, then resets the
+  // start point at the current time.
+  void start() {
+    stopped_ = false;
+    GetTimes(&times_);
+  }
+
+  // Stop counting. If the stopwatch is already stopped, has no effect.
+  void stop() {
+    if (stopped_) return;
+    stopped_ = true;
+
+    CpuTimes current;
+    GetTimes(&current);
+    times_.wall = current.wall - times_.wall;
+    times_.user = current.user - times_.user;
+    times_.system = current.system - times_.system;
+    times_.context_switches = current.context_switches - times_.context_switches;
+  }
+
+  // Return the elapsed amount of time. If the stopwatch is running, then returns
+  // the amount of time since it was started. If it is stopped, returns the amount
+  // of time between the most recent start/stop pair. If the stopwatch has never been
+  // started, the elapsed time is considered to be zero.
+  CpuTimes elapsed() const {
+    if (stopped_) return times_;
+
+    CpuTimes current;
+    GetTimes(&current);
+    current.wall -= times_.wall;
+    current.user -= times_.user;
+    current.system -= times_.system;
+    current.context_switches -= times_.context_switches;
+    return current;
+  }
+
+  // Resume a stopped stopwatch, such that the elapsed time continues to grow from
+  // the point where it was last stopped.
+  // For example:
+  //   Stopwatch s;
+  //   s.start();
+  //   sleep(1); // elapsed() is now ~1sec
+  //   s.stop();
+  //   sleep(1);
+  //   s.resume();
+  //   sleep(1); // elapsed() is now ~2sec
+  void resume() {
+    if (!stopped_) return;
+
+    CpuTimes current(times_);
+    start();
+    times_.wall   -= current.wall;
+    times_.user   -= current.user;
+    times_.system -= current.system;
+    times_.context_switches -= current.context_switches;
+  }
+
+  bool is_stopped() const {
+    return stopped_;
+  }
+
+ private:
+  void GetTimes(CpuTimes *times) const {
+    struct rusage usage;
+    struct timespec wall;
+
+#if defined(__APPLE__)
+    if (mode_ == THIS_THREAD) {
+      // Adapted from https://codereview.chromium.org/16818003
+      thread_basic_info_data_t t_info;
+      mach_msg_type_number_t count = THREAD_BASIC_INFO_COUNT;
+      CHECK_EQ(KERN_SUCCESS, thread_info(mach_thread_self(), THREAD_BASIC_INFO,
+                                         (thread_info_t)&t_info, &count));
+      usage.ru_utime.tv_sec = t_info.user_time.seconds;
+      usage.ru_utime.tv_usec = t_info.user_time.microseconds;
+      usage.ru_stime.tv_sec = t_info.system_time.seconds;
+      usage.ru_stime.tv_usec = t_info.system_time.microseconds;
+      usage.ru_nivcsw = t_info.suspend_count;
+      usage.ru_nvcsw = 0;
+    } else {
+      CHECK_EQ(0, getrusage(RUSAGE_SELF, &usage));
+    }
+
+    mach_timespec_t ts;
+    walltime_internal::GetCurrentTime(&ts);
+    wall.tv_sec = ts.tv_sec;
+    wall.tv_nsec = ts.tv_nsec;
+#else
+    CHECK_EQ(0, getrusage((mode_ == THIS_THREAD) ? RUSAGE_THREAD : RUSAGE_SELF, &usage));
+    CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &wall));
+#endif  // defined(__APPLE__)
+    times->wall   = wall.tv_sec * 1000000000L + wall.tv_nsec;
+    times->user   = usage.ru_utime.tv_sec * 1000000000L + usage.ru_utime.tv_usec * 1000L;
+    times->system = usage.ru_stime.tv_sec * 1000000000L + usage.ru_stime.tv_usec * 1000L;
+    times->context_switches = usage.ru_nvcsw + usage.ru_nivcsw;
+  }
+
+  const Mode mode_;
+  bool stopped_;
+  CpuTimes times_;
+};
+
+
+namespace sw_internal {
+
+// Internal class used by the LOG_TIMING macro.
+class LogTiming {
+ public:
+  LogTiming(const char *file, int line, google::LogSeverity severity,
+            std::string prefix, std::string description,
+            int64_t max_expected_millis, bool should_print)
+      : file_(file),
+        line_(line),
+        severity_(severity),
+        prefix_(std::move(prefix)),
+        description_(std::move(description)),
+        max_expected_millis_(max_expected_millis),
+        should_print_(should_print),
+        has_run_(false) {
+    stopwatch_.start();
+  }
+
+  ~LogTiming() {
+    if (should_print_) {
+      Print(max_expected_millis_);
+    }
+  }
+
+  // Allows this object to be used as the loop variable in for-loop macros.
+  // Call HasRun() in the conditional check in the for-loop.
+  bool HasRun() {
+    return has_run_;
+  }
+
+  // Allows this object to be used as the loop variable in for-loop macros.
+  // Call MarkHasRun() in the "increment" section of the for-loop.
+  void MarkHasRun() {
+    has_run_ = true;
+  }
+
+ private:
+  Stopwatch stopwatch_;
+  const char *file_;
+  const int line_;
+  const google::LogSeverity severity_;
+  const std::string prefix_;
+  const std::string description_;
+  const int64_t max_expected_millis_;
+  const bool should_print_;
+  bool has_run_;
+
+  // Print if the number of expected millis exceeds the max.
+  // Passing a negative number implies "always print".
+  void Print(int64_t max_expected_millis) {
+    stopwatch_.stop();
+    CpuTimes times = stopwatch_.elapsed();
+    // TODO(todd): for some reason, times.wall_millis() sometimes ends up negative
+    // on rare occasion, for unclear reasons, so we have to check max_expected_millis
+    // < 0 to be sure we always print when requested.
+    if (max_expected_millis < 0 || times.wall_millis() > max_expected_millis) {
+      google::LogMessage(file_, line_, severity_).stream()
+        << prefix_ << "Time spent " << description_ << ": "
+        << times.ToString();
+    }
+  }
+
+};
+
+} // namespace sw_internal
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case-test.cc b/be/src/kudu/util/string_case-test.cc
new file mode 100644
index 0000000..96831a1
--- /dev/null
+++ b/be/src/kudu/util/string_case-test.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/string_case.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(TestStringCase, TestSnakeToCamel) {
+  string out;
+  SnakeToCamelCase("foo_bar", &out);
+  ASSERT_EQ("FooBar", out);
+
+
+  SnakeToCamelCase("foo-bar", &out);
+  ASSERT_EQ("FooBar", out);
+
+  SnakeToCamelCase("foobar", &out);
+  ASSERT_EQ("Foobar", out);
+}
+
+TEST(TestStringCase, TestToUpperCase) {
+  string out;
+  ToUpperCase(string("foo"), &out);
+  ASSERT_EQ("FOO", out);
+  ToUpperCase(string("foo bar-BaZ"), &out);
+  ASSERT_EQ("FOO BAR-BAZ", out);
+}
+
+TEST(TestStringCase, TestToUpperCaseInPlace) {
+  string in_out = "foo";
+  ToUpperCase(in_out, &in_out);
+  ASSERT_EQ("FOO", in_out);
+}
+
+TEST(TestStringCase, TestCapitalize) {
+  string word = "foo";
+  Capitalize(&word);
+  ASSERT_EQ("Foo", word);
+
+  word = "HiBerNATe";
+  Capitalize(&word);
+  ASSERT_EQ("Hibernate", word);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.cc b/be/src/kudu/util/string_case.cc
new file mode 100644
index 0000000..7cf60ab
--- /dev/null
+++ b/be/src/kudu/util/string_case.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/string_case.h"
+
+#include <cctype>
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+using std::string;
+
+void SnakeToCamelCase(const std::string &snake_case,
+                      std::string *camel_case) {
+  DCHECK_NE(camel_case, &snake_case) << "Does not support in-place operation";
+  camel_case->clear();
+  camel_case->reserve(snake_case.size());
+
+  bool uppercase_next = true;
+  for (char c : snake_case) {
+    if ((c == '_') ||
+        (c == '-')) {
+      uppercase_next = true;
+      continue;
+    }
+    if (uppercase_next) {
+      camel_case->push_back(toupper(c));
+    } else {
+      camel_case->push_back(c);
+    }
+    uppercase_next = false;
+  }
+}
+
+void ToUpperCase(const std::string &string,
+                 std::string *out) {
+  if (out != &string) {
+    *out = string;
+  }
+
+  for (char& c : *out) {
+    c = toupper(c);
+  }
+}
+
+void Capitalize(string *word) {
+  uint32_t size = word->size();
+  if (size == 0) {
+    return;
+  }
+
+  (*word)[0] = toupper((*word)[0]);
+
+  for (int i = 1; i < size; i++) {
+    (*word)[i] = tolower((*word)[i]);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.h b/be/src/kudu/util/string_case.h
new file mode 100644
index 0000000..98f5828
--- /dev/null
+++ b/be/src/kudu/util/string_case.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with string case.
+#ifndef KUDU_UTIL_STRING_CASE_H
+#define KUDU_UTIL_STRING_CASE_H
+
+#include <string>
+
+namespace kudu {
+
+// Convert the given snake_case string to camel case.
+// Also treats '-' in a string like a '_'
+// For example:
+// - 'foo_bar' -> FooBar
+// - 'foo-bar' -> FooBar
+//
+// This function cannot operate in-place -- i.e. 'camel_case' must not
+// point to 'snake_case'.
+void SnakeToCamelCase(const std::string &snake_case,
+                      std::string *camel_case);
+
+// Upper-case all of the characters in the given string.
+// 'string' and 'out' may refer to the same string to replace in-place.
+void ToUpperCase(const std::string &string,
+                 std::string *out);
+
+// Capitalizes a string containing a word in place.
+// For example:
+// - 'hiBerNATe' -> 'Hibernate'
+void Capitalize(std::string *word);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64-test.cc b/be/src/kudu/util/striped64-test.cc
new file mode 100644
index 0000000..c74e165
--- /dev/null
+++ b/be/src/kudu/util/striped64-test.cc
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <cstdint>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+// These flags are used by the multi-threaded tests, can be used for microbenchmarking.
+DEFINE_int32(num_operations, 10*1000, "Number of operations to perform");
+DEFINE_int32(num_threads, 2, "Number of worker threads");
+
+namespace kudu {
+
+// Test some basic operations
+TEST(Striped64Test, TestBasic) {
+  LongAdder adder;
+  ASSERT_EQ(adder.Value(), 0);
+  adder.IncrementBy(100);
+  ASSERT_EQ(adder.Value(), 100);
+  adder.Increment();
+  ASSERT_EQ(adder.Value(), 101);
+  adder.Decrement();
+  ASSERT_EQ(adder.Value(), 100);
+  adder.IncrementBy(-200);
+  ASSERT_EQ(adder.Value(), -100);
+  adder.Reset();
+  ASSERT_EQ(adder.Value(), 0);
+}
+
+template <class Adder>
+class MultiThreadTest {
+ public:
+  typedef std::vector<scoped_refptr<Thread> > thread_vec_t;
+
+  MultiThreadTest(int64_t num_operations, int64_t num_threads)
+   :  num_operations_(num_operations),
+      num_threads_(num_threads) {
+  }
+
+  void IncrementerThread(const int64_t num) {
+    for (int i = 0; i < num; i++) {
+      adder_.Increment();
+    }
+  }
+
+  void DecrementerThread(const int64_t num) {
+    for (int i = 0; i < num; i++) {
+      adder_.Decrement();
+    }
+  }
+
+  void Run() {
+    // Increment
+    for (int i = 0; i < num_threads_; i++) {
+      scoped_refptr<Thread> ref;
+      Thread::Create("Striped64", "Incrementer", &MultiThreadTest::IncrementerThread, this,
+                     num_operations_, &ref);
+      threads_.push_back(ref);
+    }
+    for (const scoped_refptr<Thread> &t : threads_) {
+      t->Join();
+    }
+    ASSERT_EQ(num_threads_*num_operations_, adder_.Value());
+    threads_.clear();
+
+    // Decrement back to zero
+    for (int i = 0; i < num_threads_; i++) {
+      scoped_refptr<Thread> ref;
+      Thread::Create("Striped64", "Decrementer", &MultiThreadTest::DecrementerThread, this,
+                     num_operations_, &ref);
+      threads_.push_back(ref);
+    }
+    for (const scoped_refptr<Thread> &t : threads_) {
+      t->Join();
+    }
+    ASSERT_EQ(0, adder_.Value());
+  }
+
+  Adder adder_;
+
+  int64_t num_operations_;
+  // This is rounded down to the nearest even number
+  int32_t num_threads_;
+  thread_vec_t threads_;
+};
+
+// Test adder implemented by a single AtomicInt for comparison
+class BasicAdder {
+ public:
+  BasicAdder() : value_(0) {}
+  void IncrementBy(int64_t x) { value_.IncrementBy(x); }
+  inline void Increment() { IncrementBy(1); }
+  inline void Decrement() { IncrementBy(-1); }
+  int64_t Value() { return value_.Load(); }
+ private:
+  AtomicInt<int64_t> value_;
+};
+
+void RunMultiTest(int64_t num_operations, int64_t num_threads) {
+  MonoTime start = MonoTime::Now();
+  MultiThreadTest<BasicAdder> basicTest(num_operations, num_threads);
+  basicTest.Run();
+  MonoTime end1 = MonoTime::Now();
+  MultiThreadTest<LongAdder> test(num_operations, num_threads);
+  test.Run();
+  MonoTime end2 = MonoTime::Now();
+  MonoDelta basic = end1 - start;
+  MonoDelta striped = end2 - end1;
+  LOG(INFO) << "Basic counter took   " << basic.ToMilliseconds() << "ms.";
+  LOG(INFO) << "Striped counter took " << striped.ToMilliseconds() << "ms.";
+}
+
+// Compare a single-thread workload. Demonstrates the overhead of LongAdder over AtomicInt.
+TEST(Striped64Test, TestSingleIncrDecr) {
+  OverrideFlagForSlowTests(
+      "num_operations",
+      strings::Substitute("$0", (FLAGS_num_operations * 100)));
+  RunMultiTest(FLAGS_num_operations, 1);
+}
+
+// Compare a multi-threaded workload. LongAdder should show improvements here.
+TEST(Striped64Test, TestMultiIncrDecr) {
+  OverrideFlagForSlowTests(
+      "num_operations",
+      strings::Substitute("$0", (FLAGS_num_operations * 100)));
+  OverrideFlagForSlowTests(
+      "num_threads",
+      strings::Substitute("$0", (FLAGS_num_threads * 4)));
+  RunMultiTest(FLAGS_num_operations, FLAGS_num_threads);
+}
+
+TEST(Striped64Test, TestSize) {
+  ASSERT_EQ(16, sizeof(LongAdder));
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.cc b/be/src/kudu/util/striped64.cc
new file mode 100644
index 0000000..789a395
--- /dev/null
+++ b/be/src/kudu/util/striped64.cc
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/striped64.h"
+
+#include <mm_malloc.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <new>
+#include <ostream>
+#include <glog/logging.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+
+using kudu::striped64::internal::Cell;
+
+namespace kudu {
+
+namespace striped64 {
+namespace internal {
+
+//
+// Cell
+//
+
+Cell::Cell()
+    : value_(0) {
+}
+} // namespace internal
+} // namespace striped64
+
+//
+// Striped64
+//
+__thread uint64_t Striped64::tls_hashcode_ = 0;
+
+namespace {
+const uint32_t kNumCpus = sysconf(_SC_NPROCESSORS_ONLN);
+uint32_t ComputeNumCells() {
+  uint32_t n = 1;
+  // Calculate the size. Nearest power of two >= NCPU.
+  // Also handle a negative NCPU, can happen if sysconf name is unknown
+  while (kNumCpus > n) {
+    n <<= 1;
+  }
+  return n;
+}
+const uint32_t kNumCells = ComputeNumCells();
+const uint32_t kCellMask = kNumCells - 1;
+
+striped64::internal::Cell* const kCellsLocked =
+      reinterpret_cast<striped64::internal::Cell*>(-1L);
+
+} // anonymous namespace
+
+uint64_t Striped64::get_tls_hashcode() {
+  if (PREDICT_FALSE(tls_hashcode_ == 0)) {
+    Random r((MonoTime::Now() - MonoTime::Min()).ToNanoseconds());
+    const uint64_t hash = r.Next64();
+    // Avoid zero to allow xorShift rehash, and because 0 indicates an unset
+    // hashcode above.
+    tls_hashcode_ = (hash == 0) ? 1 : hash;
+  }
+  return tls_hashcode_;
+}
+
+
+Striped64::~Striped64() {
+  // Cell is a POD, so no need to destruct each one.
+  free(cells_);
+}
+
+template<class Updater>
+void Striped64::RetryUpdate(Rehash to_rehash, Updater updater) {
+  uint64_t h = get_tls_hashcode();
+  // There are three operations in this loop.
+  //
+  // 1. Try to add to the Cell hash table entry for the thread if the table exists.
+  //    When there's contention, rehash to try a different Cell.
+  // 2. Try to initialize the hash table.
+  // 3. Try to update the base counter.
+  //
+  // These are predicated on successful CAS operations, which is why it's all wrapped in an
+  // infinite retry loop.
+  while (true) {
+    Cell* cells = cells_.load(std::memory_order_acquire);
+    if (cells && cells != kCellsLocked) {
+      if (to_rehash == kRehash) {
+        // CAS failed already, rehash before trying to increment.
+        to_rehash = kNoRehash;
+      } else {
+        Cell *cell = &(cells_[h & kCellMask]);
+        int64_t v = cell->value_.load(std::memory_order_relaxed);
+        if (cell->CompareAndSet(v, updater(v))) {
+          // Successfully CAS'd the corresponding cell, done.
+          break;
+        }
+      }
+      // Rehash since we failed to CAS, either previously or just now.
+      h ^= h << 13;
+      h ^= h >> 17;
+      h ^= h << 5;
+    } else if (cells == nullptr &&
+               cells_.compare_exchange_weak(cells, kCellsLocked)) {
+      // Allocate cache-aligned memory for use by the cells_ table.
+      void* cell_buffer = nullptr;
+      int err = posix_memalign(&cell_buffer, CACHELINE_SIZE, sizeof(Cell) * kNumCells);
+      CHECK_EQ(0, err) << "error calling posix_memalign" << std::endl;
+      // Initialize the table
+      cells = new (cell_buffer) Cell[kNumCells];
+      cells_.store(cells, std::memory_order_release);
+    } else {
+      // Fallback to adding to the base value.
+      // Means the table wasn't initialized or we failed to init it.
+      int64_t v = base_.load(std::memory_order_relaxed);
+      if (CasBase(v, updater(v))) {
+        break;
+      }
+    }
+  }
+  // Record index for next time
+  tls_hashcode_ = h;
+}
+
+void Striped64::InternalReset(int64_t initial_value) {
+  base_.store(initial_value);
+  Cell* c;
+  do {
+    c = cells_.load(std::memory_order_acquire);
+  } while (c == kCellsLocked);
+  if (c) {
+    for (int i = 0; i < kNumCells; i++) {
+      c[i].value_.store(initial_value);
+    }
+  }
+}
+void LongAdder::IncrementBy(int64_t x) {
+  // Use hash table if present. If that fails, call RetryUpdate to rehash and retry.
+  // If no hash table, try to CAS the base counter. If that fails, RetryUpdate to init the table.
+  Cell* cells = cells_.load(std::memory_order_acquire);
+  if (cells && cells != kCellsLocked) {
+    Cell *cell = &(cells[get_tls_hashcode() & kCellMask]);
+    DCHECK_EQ(0, reinterpret_cast<const uintptr_t>(cell) & (sizeof(Cell) - 1))
+        << " unaligned Cell not allowed for Striped64" << std::endl;
+    const int64_t old = cell->value_.load(std::memory_order_relaxed);
+    if (!cell->CompareAndSet(old, old + x)) {
+      // When we hit a hash table contention, signal RetryUpdate to rehash.
+      RetryUpdate(kRehash, [x](int64_t old) { return old + x; });
+    }
+  } else {
+    int64_t b = base_.load(std::memory_order_relaxed);
+    if (!CasBase(b, b + x)) {
+      // Attempt to initialize the table. No need to rehash since the contention was for the
+      // base counter, not the hash table.
+      RetryUpdate(kNoRehash, [x](int64_t old) { return old + x; });
+    }
+  }
+}
+
+//
+// LongAdder
+//
+
+int64_t LongAdder::Value() const {
+  int64_t sum = base_.load(std::memory_order_relaxed);
+  Cell* c = cells_.load(std::memory_order_acquire);
+  if (c && c != kCellsLocked) {
+    for (int i = 0; i < kNumCells; i++) {
+      sum += c[i].value_.load(std::memory_order_relaxed);
+    }
+  }
+  return sum;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.h b/be/src/kudu/util/striped64.h
new file mode 100644
index 0000000..48332e1
--- /dev/null
+++ b/be/src/kudu/util/striped64.h
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_STRIPED64_H_
+#define KUDU_UTIL_STRIPED64_H_
+
+#include <atomic>
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+namespace striped64 {
+namespace internal {
+
+// Padded POD container for atomic<int64_t>. This prevents false sharing of cache lines.
+class Cell {
+ public:
+  static constexpr int kAtomicInt64Size = sizeof(std::atomic<int64_t>);
+
+  Cell();
+  inline bool CompareAndSet(int64_t cmp, int64_t value) {
+    return value_.compare_exchange_weak(cmp, value);
+  }
+
+  // Padding advice from Herb Sutter:
+  // http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4
+  std::atomic<int64_t> value_;
+  char pad[CACHELINE_SIZE > kAtomicInt64Size ?
+           CACHELINE_SIZE - kAtomicInt64Size : 1];
+
+  DISALLOW_COPY_AND_ASSIGN(Cell);
+} CACHELINE_ALIGNED;
+#undef ATOMIC_INT_SIZE
+
+} // namespace internal
+} // namespace striped64
+
+// This set of classes is heavily derived from JSR166e, released into the public domain
+// by Doug Lea and the other authors.
+//
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co
+//
+// The Striped64 and LongAdder implementations here are simplified versions of what's present in
+// JSR166e. However, the core ideas remain the same.
+//
+// Updating a single AtomicInteger in a multi-threaded environment can be quite slow:
+//
+//   1. False sharing of cache lines with other counters.
+//   2. Cache line bouncing from high update rates, especially with many cores.
+//
+// These two problems are addressed by Striped64. When there is no contention, it uses CAS on a
+// single base counter to store updates. However, when Striped64 detects contention
+// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells.
+// A Cell is a simple POD that pads out an atomic<int64_t> to 64 bytes to prevent
+// sharing a cache line.
+//
+// Reading the value of a Striped64 requires traversing the hashtable to calculate the true sum.
+//
+// Each updating thread uses a thread-local hashcode to determine its Cell in the hashtable.
+// If a thread fails to CAS its hashed Cell, it will do a lightweight rehash operation to try
+// and find an uncontended bucket. Because the hashcode is thread-local, this rehash affects all
+// Striped64's accessed by the thread. This is good, since contention on one Striped64 is
+// indicative of contention elsewhere too.
+//
+// The hashtable is statically sized to the nearest power of 2 greater than or equal to the
+// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash
+// function. Due to the random rehashing, the threads should eventually converge to this function.
+// In practice, this scheme has shown to be sufficient.
+//
+// The biggest simplification of this implementation compared to JSR166e is that we do not
+// dynamically grow the table, instead immediately allocating it to the full size.
+// We also do not lazily allocate each Cell, instead allocating the entire array at once.
+// This means we waste some additional memory in low contention scenarios, and initial allocation
+// will also be slower. Some of the micro-optimizations were also elided for readability.
+class Striped64 {
+ public:
+  Striped64() = default;
+
+ protected:
+  // NOTE: the destructor is not virtual so that we can ensure that Striped64
+  // has no vtable, thus reducing its size. We make it protected to ensure that
+  // no one attempts to delete a Striped64* and invokes the wrong destructor.
+  ~Striped64();
+
+  enum Rehash {
+    kRehash,
+    kNoRehash
+  };
+
+  // CAS the base field.
+  bool CasBase(int64_t cmp, int64_t val) { return base_.compare_exchange_weak(cmp, val); }
+
+  // Handles cases of updates involving initialization, resizing, creating new Cells, and/or
+  // contention. See above for further explanation.
+  //
+  // 'Updater' should be a function which takes the current value and returns
+  // the new value.
+  template<class Updater>
+  void RetryUpdate(Rehash to_rehash, Updater updater);
+
+  // Sets base and all cells to the given value.
+  void InternalReset(int64_t initial_value);
+
+  // Base value, used mainly when there is no contention, but also as a fallback during
+  // table initialization races. Updated via CAS.
+  std::atomic<int64_t> base_ { 0 };
+
+  // Table of cells. When non-null, size is the nearest power of 2 >= NCPU.
+  // If this is set to -1, the pointer is 'locked' and some thread is in the
+  // process of allocating the array.
+  std::atomic<striped64::internal::Cell*> cells_ { nullptr };
+
+ protected:
+  static uint64_t get_tls_hashcode();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Striped64);
+
+  // Static hash code per-thread. Shared across all instances to limit thread-local pollution.
+  // Also, if a thread hits a collision on one Striped64, it's also likely to collide on
+  // other Striped64s too.
+  static __thread uint64_t tls_hashcode_;
+};
+
+// A 64-bit number optimized for high-volume concurrent updates.
+// See Striped64 for a longer explanation of the inner workings.
+class LongAdder : Striped64 {
+ public:
+  LongAdder() {}
+  void IncrementBy(int64_t x);
+  void Increment() { IncrementBy(1); }
+  void Decrement() { IncrementBy(-1); }
+
+  // Returns the current value.
+  // Note this is not an atomic snapshot in the presence of concurrent updates.
+  int64_t Value() const;
+
+  // Resets the counter state to zero.
+  void Reset() { InternalReset(0); }
+
+ protected:
+  int64_t CombineValue(int64_t current_value, int64_t new_value) {
+    return current_value + new_value;
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(LongAdder);
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess-test.cc b/be/src/kudu/util/subprocess-test.cc
new file mode 100644
index 0000000..24d7cb3
--- /dev/null
+++ b/be/src/kudu/util/subprocess-test.cc
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::atomic;
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class SubprocessTest : public KuduTest {};
+
+TEST_F(SubprocessTest, TestSimplePipe) {
+  Subprocess p({ "/usr/bin/tr", "a-z", "A-Z" });
+  p.ShareParentStdout(false);
+  ASSERT_OK(p.Start());
+
+  FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+  PCHECK(out);
+  FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+  PCHECK(in);
+
+  fprintf(out, "hello world\n");
+  // We have to close 'out' or else tr won't write any output, since
+  // it enters a buffered mode if it detects that its input is a FIFO.
+  int err;
+  RETRY_ON_EINTR(err, fclose(out));
+
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("HELLO WORLD\n", &buf[0]);
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFEXITED(wait_status));
+  ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestErrPipe) {
+  Subprocess p({ "/usr/bin/tee", "/dev/stderr" });
+  p.ShareParentStderr(false);
+  ASSERT_OK(p.Start());
+
+  FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+  PCHECK(out);
+
+  fprintf(out, "Hello, World\n");
+
+  // Same reasoning as above, flush to prevent tee buffering.
+  int err;
+  RETRY_ON_EINTR(err, fclose(out));
+
+  FILE* in = fdopen(p.from_child_stderr_fd(), "r");
+  PCHECK(in);
+
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("Hello, World\n", &buf[0]);
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFEXITED(wait_status));
+  ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestKill) {
+  Subprocess p({ "/bin/cat" });
+  ASSERT_OK(p.Start());
+
+  ASSERT_OK(p.Kill(SIGKILL));
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFSIGNALED(wait_status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+
+  // Test that calling Wait() a second time returns the same
+  // cached value instead of trying to wait on some other process
+  // that was assigned the same pid.
+  wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFSIGNALED(wait_status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+}
+
+// Writes enough bytes to stdout and stderr concurrently that if Call() were
+// fully reading them one at a time, the test would deadlock.
+TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
+  // Set an alarm to break out of any potential deadlocks (if the implementation
+  // regresses).
+  alarm(60);
+
+  string stdout;
+  string stderr;
+  ASSERT_OK(Subprocess::Call({
+    "/bin/bash",
+    "-c",
+    "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &"
+    "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &"
+    "wait"
+  }, "", &stdout, &stderr));
+
+  // Reset the alarm when the test is done
+  SCOPED_CLEANUP({ alarm(0); })
+}
+
+// Test that environment variables can be passed to the subprocess.
+TEST_F(SubprocessTest, TestEnvVars) {
+  Subprocess p({ "/bin/bash", "-c", "echo $FOO" });
+  p.SetEnvVars({{"FOO", "bar"}});
+  p.ShareParentStdout(false);
+  ASSERT_OK(p.Start());
+  FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+  PCHECK(in);
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("bar\n", &buf[0]);
+  ASSERT_OK(p.Wait());
+}
+
+// Test that the the subprocesses CWD can be set.
+TEST_F(SubprocessTest, TestCurrentDir) {
+  string dir_path = GetTestPath("d");
+  string file_path = JoinPathSegments(dir_path, "f");
+  ASSERT_OK(Env::Default()->CreateDir(dir_path));
+  std::unique_ptr<WritableFile> file;
+  ASSERT_OK(Env::Default()->NewWritableFile(file_path, &file));
+
+  Subprocess p({ "/bin/ls", "f" });
+  p.SetCurrentDir(dir_path);
+  p.ShareParentStdout(false);
+  ASSERT_OK(p.Start());
+  ASSERT_OK(p.Wait());
+
+  int rc;
+  ASSERT_OK(p.GetExitStatus(&rc, nullptr));
+  EXPECT_EQ(0, rc);
+}
+
+// Tests writing to the subprocess stdin.
+TEST_F(SubprocessTest, TestCallWithStdin) {
+  string stdout;
+  ASSERT_OK(Subprocess::Call({ "/bin/bash" },
+                             "echo \"quick brown fox\"",
+                             &stdout));
+  EXPECT_EQ("quick brown fox\n", stdout);
+}
+
+// Test KUDU-1674: '/bin/bash -c "echo"' command below is expected to
+// capture a string on stderr. This test validates that passing
+// stderr alone doesn't result in SIGSEGV as reported in the bug and
+// also check for sanity of stderr in the output.
+TEST_F(SubprocessTest, TestReadSingleFD) {
+  string stderr;
+  const string str = "ApacheKudu";
+  const string cmd_str = Substitute("/bin/echo -n $0 1>&2", str);
+  ASSERT_OK(Subprocess::Call({"/bin/sh", "-c", cmd_str}, "", nullptr, &stderr));
+  ASSERT_EQ(stderr, str);
+
+  // Also sanity check other combinations.
+  string stdout;
+  ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/null"}, "", &stdout, nullptr));
+  ASSERT_STR_CONTAINS(stdout, "/dev/null");
+
+  ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/zero"}, "", nullptr, nullptr));
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
+  Subprocess p({ "/bin/sh", "-c", "exit 0" });
+  ASSERT_OK(p.Start());
+  ASSERT_OK(p.Wait());
+  int exit_status;
+  string exit_info;
+  ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+  ASSERT_EQ(0, exit_status);
+  ASSERT_STR_CONTAINS(exit_info, "process successfully exited");
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitFailure) {
+  static const vector<int> kStatusCodes = { 1, 255 };
+  for (auto code : kStatusCodes) {
+    Subprocess p({ "/bin/sh", "-c", Substitute("exit $0", code) });
+    ASSERT_OK(p.Start());
+    ASSERT_OK(p.Wait());
+    int exit_status;
+    string exit_info;
+    ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+    ASSERT_EQ(code, exit_status);
+    ASSERT_STR_CONTAINS(exit_info,
+                        Substitute("process exited with non-zero status $0",
+                                   exit_status));
+  }
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusSignaled) {
+  static const vector<int> kSignals = {
+    SIGHUP,
+    SIGABRT,
+    SIGKILL,
+    SIGTERM,
+    SIGUSR2,
+  };
+  for (auto signum : kSignals) {
+    Subprocess p({ "/bin/cat" });
+    ASSERT_OK(p.Start());
+    ASSERT_OK(p.Kill(signum));
+    ASSERT_OK(p.Wait());
+    int exit_status;
+    string exit_info;
+    ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+    EXPECT_EQ(signum, exit_status);
+    ASSERT_STR_CONTAINS(exit_info, Substitute("process exited on signal $0",
+                                              signum));
+  }
+}
+
+TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
+  string kTestFile = GetTestPath("foo");
+
+  // Start a subprocess that creates kTestFile immediately and deletes it on exit.
+  //
+  // Note: it's important that the shell not invoke a command while waiting
+  // to be killed (i.e. "sleep 60"); if it did, the signal could be delivered
+  // just after the command starts but just before the shell decides to forward
+  // signals to it, and we wind up with a deadlock.
+  vector<string> argv = {
+      "/bin/bash",
+      "-c",
+      Substitute(
+          // Delete kTestFile on exit.
+          "trap \"rm $0\" EXIT;"
+          // Create kTestFile on start.
+          "touch $0;"
+          // Spin in a tight loop waiting to be killed.
+          "while true;"
+          "  do FOO=$$((FOO + 1));"
+          "done", kTestFile)
+  };
+
+  {
+    Subprocess s(argv);
+    ASSERT_OK(s.Start());
+    AssertEventually([&]{
+        ASSERT_TRUE(env_->FileExists(kTestFile));
+    });
+  }
+
+  // The subprocess went out of scope and was killed with SIGKILL, so it left
+  // kTestFile behind.
+  ASSERT_TRUE(env_->FileExists(kTestFile));
+
+  ASSERT_OK(env_->DeleteFile(kTestFile));
+  {
+    Subprocess s(argv, SIGTERM);
+    ASSERT_OK(s.Start());
+    AssertEventually([&]{
+        ASSERT_TRUE(env_->FileExists(kTestFile));
+    });
+  }
+
+  // The subprocess was killed with SIGTERM, giving it a chance to delete kTestFile.
+  ASSERT_FALSE(env_->FileExists(kTestFile));
+}
+
+// TEST KUDU-2208: Test subprocess interruption handling
+void handler(int /* signal */) {
+}
+
+TEST_F(SubprocessTest, TestSubprocessInterruptionHandling) {
+  // Create Subprocess thread
+  pthread_t t;
+  Subprocess p({ "/bin/sleep", "1" });
+  atomic<bool> t_started(false);
+  atomic<bool> t_finished(false);
+  thread subprocess_thread([&]() {
+    t = pthread_self();
+    t_started = true;
+    SleepFor(MonoDelta::FromMilliseconds(50));
+    CHECK_OK(p.Start());
+    CHECK_OK(p.Wait());
+    t_finished = true;
+  });
+
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+
+  SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+  SCOPED_CLEANUP({ subprocess_thread.join(); });
+
+  // Send kill signals to Subprocess thread
+  LOG(INFO) << "Start sending kill signals to Subprocess thread";
+  while (!t_finished) {
+    if (t_started) {
+      int err = pthread_kill(t, SIGUSR2);
+      ASSERT_TRUE(err == 0 || err == ESRCH);
+      if (err == ESRCH) {
+        LOG(INFO) << "Async kill signal failed with err=" << err <<
+            " because it tried to kill vanished subprocess_thread";
+        ASSERT_TRUE(t_finished);
+      }
+      // Add microseconds delay to make the unit test runs faster and more reliable
+      SleepFor(MonoDelta::FromMicroseconds(rand() % 1));
+    }
+  }
+}
+
+#ifdef __linux__
+// This test requires a system with /proc/<pid>/stat.
+TEST_F(SubprocessTest, TestGetProcfsState) {
+  // This test should be RUNNING.
+  Subprocess::ProcfsState state;
+  ASSERT_OK(Subprocess::GetProcfsState(getpid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+  // When started, /bin/sleep will be RUNNING (even though it's asleep).
+  Subprocess sleep({"/bin/sleep", "1000"});
+  ASSERT_OK(sleep.Start());
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+  // After a SIGSTOP, it should be PAUSED.
+  ASSERT_OK(sleep.Kill(SIGSTOP));
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::PAUSED, state);
+
+  // After a SIGCONT, it should be RUNNING again.
+  ASSERT_OK(sleep.Kill(SIGCONT));
+  ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+  ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+}
+#endif
+
+} // namespace kudu