You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:18 UTC
[06/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.h b/be/src/kudu/util/status.h
new file mode 100644
index 0000000..3c8a1d9
--- /dev/null
+++ b/be/src/kudu/util/status.h
@@ -0,0 +1,493 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// A Status encapsulates the result of an operation. It may indicate success,
+// or it may indicate an error with an associated error message.
+//
+// Multiple threads can invoke const methods on a Status without
+// external synchronization, but if any of the threads may call a
+// non-const method, all threads accessing the same Status must use
+// external synchronization.
+
+#ifndef KUDU_UTIL_STATUS_H_
+#define KUDU_UTIL_STATUS_H_
+
+// NOTE: using stdint.h instead of cstdint and errno.h instead of errno because
+// this file is supposed to be processed by a compiler lacking C++11 support.
+#include <errno.h>
+#include <stdint.h>
+
+#include <cstddef>
+#include <string>
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/slice.h"
+
+/// @brief Return the given status if it is not @c OK.
+#define KUDU_RETURN_NOT_OK(s) do { \
+ const ::kudu::Status& _s = (s); \
+ if (PREDICT_FALSE(!_s.ok())) return _s; \
+ } while (0);
+
+/// @brief Return the given status if it is not OK, but first clone it and
+/// prepend the given message.
+#define KUDU_RETURN_NOT_OK_PREPEND(s, msg) do { \
+ const ::kudu::Status& _s = (s); \
+ if (PREDICT_FALSE(!_s.ok())) return _s.CloneAndPrepend(msg); \
+ } while (0);
+
+/// @brief Return @c to_return if @c to_call returns a bad status.
+/// The substitution for 'to_return' may reference the variable
+/// @c s for the bad status.
+#define KUDU_RETURN_NOT_OK_RET(to_call, to_return) do { \
+ const ::kudu::Status& s = (to_call); \
+ if (PREDICT_FALSE(!s.ok())) return (to_return); \
+ } while (0);
+
+/// @brief Return the given status if it is not OK, evaluating `on_error` if so.
+#define KUDU_RETURN_NOT_OK_EVAL(s, on_error) do { \
+ const ::kudu::Status& _s = (s); \
+ if (PREDICT_FALSE(!_s.ok())) { \
+ (on_error); \
+ return _s; \
+ } \
+ } while (0);
+
+/// @brief Emit a warning if @c to_call returns a bad status.
+#define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \
+ const ::kudu::Status& _s = (to_call); \
+ if (PREDICT_FALSE(!_s.ok())) { \
+ KUDU_LOG(WARNING) << (warning_prefix) << ": " << _s.ToString(); \
+ } \
+ } while (0);
+
+/// @brief Log the given status and return immediately.
+#define KUDU_LOG_AND_RETURN(level, status) do { \
+ const ::kudu::Status& _s = (status); \
+ KUDU_LOG(level) << _s.ToString(); \
+ return _s; \
+ } while (0);
+
+/// @brief If the given status is not OK, log it and 'msg' at 'level' and return the status.
+#define KUDU_RETURN_NOT_OK_LOG(s, level, msg) do { \
+ const ::kudu::Status& _s = (s); \
+ if (PREDICT_FALSE(!_s.ok())) { \
+ KUDU_LOG(level) << "Status: " << _s.ToString() << " " << (msg); \
+ return _s; \
+ } \
+ } while (0);
+
+/// @brief If @c to_call returns a bad status, CHECK immediately with
+/// a logged message of @c msg followed by the status.
+#define KUDU_CHECK_OK_PREPEND(to_call, msg) do { \
+ const ::kudu::Status& _s = (to_call); \
+ KUDU_CHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \
+ } while (0);
+
+/// @brief If the status is bad, CHECK immediately, appending the status to the
+/// logged message.
+#define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status")
+
+/// @brief If @c to_call returns a bad status, DCHECK immediately with
+/// a logged message of @c msg followed by the status.
+#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \
+ const ::kudu::Status& _s = (to_call); \
+ KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \
+ } while (0);
+
+/// @brief If the status is bad, DCHECK immediately, appending the status to the
+/// logged 'Bad status' message.
+#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status")
+
+/// @file status.h
+///
+/// This header is used in both the Kudu build as well as in builds of
+/// applications that use the Kudu C++ client. In the latter we need to be
+/// careful to "namespace" our macros, to avoid colliding or overriding with
+/// similarly named macros belonging to the application.
+///
+/// KUDU_HEADERS_USE_SHORT_STATUS_MACROS handles this behavioral change. When
+/// defined, we're building Kudu and:
+/// @li Non-namespaced macros are allowed and mapped to the namespaced versions
+/// defined above.
+/// @li Namespaced versions of glog macros are mapped to the real glog macros
+/// (otherwise the macros are defined in the C++ client stubs).
+#ifdef KUDU_HEADERS_USE_SHORT_STATUS_MACROS
+#define RETURN_NOT_OK KUDU_RETURN_NOT_OK
+#define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND
+#define RETURN_NOT_OK_RET KUDU_RETURN_NOT_OK_RET
+#define RETURN_NOT_OK_EVAL KUDU_RETURN_NOT_OK_EVAL
+#define WARN_NOT_OK KUDU_WARN_NOT_OK
+#define LOG_AND_RETURN KUDU_LOG_AND_RETURN
+#define RETURN_NOT_OK_LOG KUDU_RETURN_NOT_OK_LOG
+#define CHECK_OK_PREPEND KUDU_CHECK_OK_PREPEND
+#define CHECK_OK KUDU_CHECK_OK
+#define DCHECK_OK_PREPEND KUDU_DCHECK_OK_PREPEND
+#define DCHECK_OK KUDU_DCHECK_OK
+
+// These are standard glog macros.
+#define KUDU_LOG LOG
+#define KUDU_CHECK CHECK
+#define KUDU_DCHECK DCHECK
+#endif
+
+namespace kudu {
+
+/// @brief A representation of an operation's outcome.
+class KUDU_EXPORT Status {
+ public:
+ /// Create an object representing success status.
+ Status() : state_(NULL) { }
+
+ ~Status() { delete[] state_; }
+
+ /// Copy the specified status.
+ ///
+ /// @param [in] s
+ /// The status object to copy from.
+ Status(const Status& s);
+
+ /// Assign the specified status.
+ ///
+ /// @param [in] s
+ /// The status object to assign from.
+ /// @return The reference to the modified object.
+ Status& operator=(const Status& s);
+
+#if __cplusplus >= 201103L
+ /// Move the specified status (C++11).
+ ///
+ /// @param [in] s
+ /// rvalue reference to a Status object.
+ Status(Status&& s) noexcept;
+
+ /// Assign the specified status using move semantics (C++11).
+ ///
+ /// @param [in] s
+ /// rvalue reference to a Status object.
+ /// @return The reference to the modified object.
+ Status& operator=(Status&& s) noexcept;
+
+ /// If this status is OK, calls 'op' and returns the result, otherwise returns
+ /// this status.
+ ///
+ /// This method can be used to chain together multiple Status-returning
+ /// operations, short circuiting after the first one to fail.
+ ///
+ /// Example:
+ ///
+ /// @code
+ /// unique_ptr<SequentialFile> file;
+ /// Status s = Env::Default()
+ /// ->NewSequentialFile("/tmp/example.txt", &file)
+ /// .AndThen([&] {
+ /// return file->Write(0, "some data")
+ /// .CloneAndPrepend("failed to write to example file");
+ /// });
+ /// @endcode
+ ///
+ /// @param [in] op
+ /// Status-returning closure or function to run.
+ /// @return 'this', if this is not OK, or the result of running op.
+ template<typename F>
+ Status AndThen(F op) {
+ if (ok()) {
+ return op();
+ }
+ return *this;
+ }
+#endif
+
+ /// @return A success status.
+ static Status OK() { return Status(); }
+
+
+ /// @name Methods to build status objects for various types of errors.
+ ///
+ /// @param [in] msg
+ /// The informational message on the error.
+ /// @param [in] msg2
+ /// Additional information on the error (optional).
+ /// @param [in] posix_code
+ /// POSIX error code, if applicable (optional).
+ /// @return The error status of an appropriate type.
+ ///
+ ///@{
+ static Status NotFound(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kNotFound, msg, msg2, posix_code);
+ }
+ static Status Corruption(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kCorruption, msg, msg2, posix_code);
+ }
+ static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kNotSupported, msg, msg2, posix_code);
+ }
+ static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kInvalidArgument, msg, msg2, posix_code);
+ }
+ static Status IOError(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kIOError, msg, msg2, posix_code);
+ }
+ static Status AlreadyPresent(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kAlreadyPresent, msg, msg2, posix_code);
+ }
+ static Status RuntimeError(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kRuntimeError, msg, msg2, posix_code);
+ }
+ static Status NetworkError(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kNetworkError, msg, msg2, posix_code);
+ }
+ static Status IllegalState(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kIllegalState, msg, msg2, posix_code);
+ }
+ static Status NotAuthorized(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kNotAuthorized, msg, msg2, posix_code);
+ }
+ static Status Aborted(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kAborted, msg, msg2, posix_code);
+ }
+ static Status RemoteError(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kRemoteError, msg, msg2, posix_code);
+ }
+ static Status ServiceUnavailable(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kServiceUnavailable, msg, msg2, posix_code);
+ }
+ static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kTimedOut, msg, msg2, posix_code);
+ }
+ static Status Uninitialized(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kUninitialized, msg, msg2, posix_code);
+ }
+ static Status ConfigurationError(const Slice& msg, const Slice& msg2 = Slice(),
+ int16_t posix_code = -1) {
+ return Status(kConfigurationError, msg, msg2, posix_code);
+ }
+ static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice(),
+ int64_t posix_code = -1) {
+ return Status(kIncomplete, msg, msg2, posix_code);
+ }
+ static Status EndOfFile(const Slice& msg, const Slice& msg2 = Slice(),
+ int64_t posix_code = -1) {
+ return Status(kEndOfFile, msg, msg2, posix_code);
+ }
+ ///@}
+
+ /// @return @c true iff the status indicates success.
+ bool ok() const { return (state_ == NULL); }
+
+ /// @return @c true iff the status indicates a NotFound error.
+ bool IsNotFound() const { return code() == kNotFound; }
+
+ /// @return @c true iff the status indicates a Corruption error.
+ bool IsCorruption() const { return code() == kCorruption; }
+
+ /// @return @c true iff the status indicates a NotSupported error.
+ bool IsNotSupported() const { return code() == kNotSupported; }
+
+ /// @return @c true iff the status indicates an IOError.
+ bool IsIOError() const { return code() == kIOError; }
+
+ /// @return @c true iff the status indicates an InvalidArgument error.
+ bool IsInvalidArgument() const { return code() == kInvalidArgument; }
+
+ /// @return @c true iff the status indicates an AlreadyPresent error.
+ bool IsAlreadyPresent() const { return code() == kAlreadyPresent; }
+
+ /// @return @c true iff the status indicates a RuntimeError.
+ bool IsRuntimeError() const { return code() == kRuntimeError; }
+
+ /// @return @c true iff the status indicates a NetworkError.
+ bool IsNetworkError() const { return code() == kNetworkError; }
+
+ /// @return @c true iff the status indicates an IllegalState error.
+ bool IsIllegalState() const { return code() == kIllegalState; }
+
+ /// @return @c true iff the status indicates a NotAuthorized error.
+ bool IsNotAuthorized() const { return code() == kNotAuthorized; }
+
+ /// @return @c true iff the status indicates an Aborted error.
+ bool IsAborted() const { return code() == kAborted; }
+
+ /// @return @c true iff the status indicates a RemoteError.
+ bool IsRemoteError() const { return code() == kRemoteError; }
+
+ /// @return @c true iff the status indicates ServiceUnavailable.
+ bool IsServiceUnavailable() const { return code() == kServiceUnavailable; }
+
+ /// @return @c true iff the status indicates TimedOut.
+ bool IsTimedOut() const { return code() == kTimedOut; }
+
+ /// @return @c true iff the status indicates Uninitialized.
+ bool IsUninitialized() const { return code() == kUninitialized; }
+
+ /// @return @c true iff the status indicates ConfigurationError.
+ bool IsConfigurationError() const { return code() == kConfigurationError; }
+
+ /// @return @c true iff the status indicates Incomplete.
+ bool IsIncomplete() const { return code() == kIncomplete; }
+
+ /// @return @c true iff the status indicates end of file.
+ bool IsEndOfFile() const { return code() == kEndOfFile; }
+
+ /// @return @c true iff the status indicates a disk failure.
+ bool IsDiskFailure() const {
+ switch (posix_code()) {
+ case EIO:
+ case ENODEV:
+ case ENXIO:
+ case EROFS:
+ return true;
+ }
+ return false;
+ }
+
+ /// @return A string representation of this status suitable for printing.
+ /// Returns the string "OK" for success.
+ std::string ToString() const;
+
+ /// @return A string representation of the status code, without the message
+ /// text or POSIX code information.
+ std::string CodeAsString() const;
+
+ /// This is similar to ToString, except that it does not include
+ /// the stringified error code or POSIX code.
+ ///
+ /// @note The returned Slice is only valid as long as this Status object
+ /// remains live and unchanged.
+ ///
+ /// @return The message portion of the Status. For @c OK statuses,
+ /// this returns an empty string.
+ Slice message() const;
+
+ /// @return The POSIX code associated with this Status object,
+ /// or @c -1 if there is none.
+ int16_t posix_code() const;
+
+ /// Clone this status and add the specified prefix to the message.
+ ///
+ /// If this status is OK, then an OK status will be returned.
+ ///
+ /// @param [in] msg
+ /// The message to prepend.
+ /// @return A new Status object with the same state plus an additional
+ /// leading message.
+ Status CloneAndPrepend(const Slice& msg) const;
+
+ /// Clone this status and add the specified suffix to the message.
+ ///
+ /// If this status is OK, then an OK status will be returned.
+ ///
+ /// @param [in] msg
+ /// The message to append.
+ /// @return A new Status object with the same state plus an additional
+ /// trailing message.
+ Status CloneAndAppend(const Slice& msg) const;
+
+ /// @return The memory usage of this object without the object itself.
+ /// Should be used when embedded inside another object.
+ size_t memory_footprint_excluding_this() const;
+
+ /// @return The memory usage of this object including the object itself.
+ /// Should be used when allocated on the heap.
+ size_t memory_footprint_including_this() const;
+
+ private:
+ // OK status has a NULL state_. Otherwise, state_ is a new[] array
+ // of the following form:
+ // state_[0..3] == length of message
+ // state_[4] == code
+ // state_[5..6] == posix_code
+ // state_[7..] == message
+ const char* state_;
+
+ enum Code {
+ kOk = 0,
+ kNotFound = 1,
+ kCorruption = 2,
+ kNotSupported = 3,
+ kInvalidArgument = 4,
+ kIOError = 5,
+ kAlreadyPresent = 6,
+ kRuntimeError = 7,
+ kNetworkError = 8,
+ kIllegalState = 9,
+ kNotAuthorized = 10,
+ kAborted = 11,
+ kRemoteError = 12,
+ kServiceUnavailable = 13,
+ kTimedOut = 14,
+ kUninitialized = 15,
+ kConfigurationError = 16,
+ kIncomplete = 17,
+ kEndOfFile = 18,
+ // NOTE: Remember to duplicate these constants into wire_protocol.proto and
+ // and to add StatusTo/FromPB ser/deser cases in wire_protocol.cc !
+ // Also remember to make the same changes to the java client in Status.java.
+ //
+ // TODO: Move error codes into an error_code.proto or something similar.
+ };
+ COMPILE_ASSERT(sizeof(Code) == 4, code_enum_size_is_part_of_abi);
+
+ Code code() const {
+ return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
+ }
+
+ Status(Code code, const Slice& msg, const Slice& msg2, int16_t posix_code);
+ static const char* CopyState(const char* s);
+};
+
+inline Status::Status(const Status& s) {
+ state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+}
+
+inline Status& Status::operator=(const Status& s) {
+ // The following condition catches both aliasing (when this == &s),
+ // and the common case where both s and *this are OK.
+ if (state_ != s.state_) {
+ delete[] state_;
+ state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+ }
+ return *this;
+}
+
+#if __cplusplus >= 201103L
+inline Status::Status(Status&& s) noexcept : state_(s.state_) {
+ s.state_ = nullptr;
+}
+
+inline Status& Status::operator=(Status&& s) noexcept {
+ if (state_ != s.state_) {
+ delete[] state_;
+ state_ = s.state_;
+ s.state_ = nullptr;
+ }
+ return *this;
+}
+#endif
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_STATUS_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status_callback.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.cc b/be/src/kudu/util/status_callback.cc
new file mode 100644
index 0000000..a3932b5
--- /dev/null
+++ b/be/src/kudu/util/status_callback.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/status_callback.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+
+void DoNothingStatusCB(const Status& status) {}
+
+void CrashIfNotOkStatusCB(const string& message, const Status& status) {
+ if (PREDICT_FALSE(!status.ok())) {
+ LOG(FATAL) << message << ": " << status.ToString();
+ }
+}
+
+Status DoNothingStatusClosure() { return Status::OK(); }
+
+} // end namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.h b/be/src/kudu/util/status_callback.h
new file mode 100644
index 0000000..70bbb97
--- /dev/null
+++ b/be/src/kudu/util/status_callback.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STATUS_CALLBACK_H
+#define KUDU_UTIL_STATUS_CALLBACK_H
+
+#include <functional>
+#include <string>
+
+#include "kudu/gutil/callback_forward.h"
+
+namespace kudu {
+
+class Status;
+
+// A callback which takes a Status. This is typically used for functions which
+// produce asynchronous results and may fail.
+typedef Callback<void(const Status& status)> StatusCallback;
+
+// Like StatusCallback but uses the STL function objects.
+//
+// TODO(adar): should eventually replace all StatusCallback usage with this.
+typedef std::function<void(const Status& status)> StdStatusCallback;
+
+// To be used when a function signature requires a StatusCallback but none
+// is needed.
+extern void DoNothingStatusCB(const Status& status);
+
+// A callback that crashes with a FATAL log message if the given Status is not OK.
+extern void CrashIfNotOkStatusCB(const std::string& message, const Status& status);
+
+// A closure (callback without arguments) that returns a Status indicating
+// whether it was successful or not.
+typedef Callback<Status(void)> StatusClosure;
+
+// To be used when setting a StatusClosure is optional.
+extern Status DoNothingStatusClosure();
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stopwatch.h b/be/src/kudu/util/stopwatch.h
new file mode 100644
index 0000000..f15b597
--- /dev/null
+++ b/be/src/kudu/util/stopwatch.h
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STOPWATCH_H
+#define KUDU_UTIL_STOPWATCH_H
+
+#include <glog/logging.h>
+#include <sys/resource.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string>
+#if defined(__APPLE__)
+#include <mach/clock.h>
+#include <mach/mach.h>
+#include <mach/thread_info.h>
+#endif // defined(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+
+namespace kudu {
+
+// Macro for logging timing of a block. Usage:
+// LOG_TIMING_PREFIX_IF(INFO, FLAGS_should_record_time, "Tablet X: ", "doing some task") {
+// ... some task which takes some time
+// }
+// If FLAGS_should_record_time is true, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Tablet X: Time spent doing some task:
+// real 3.729s user 3.570s sys 0.150s
+// The task will always execute regardless of whether the timing information is
+// printed.
+#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description) \
+ for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, prefix, description, \
+ -1, (condition)); !_l.HasRun(); _l.MarkHasRun())
+
+// Conditionally log, no prefix.
+#define LOG_TIMING_IF(severity, condition, description) \
+ LOG_TIMING_PREFIX_IF(severity, (condition), "", (description))
+
+// Always log, including prefix.
+#define LOG_TIMING_PREFIX(severity, prefix, description) \
+ LOG_TIMING_PREFIX_IF(severity, true, (prefix), (description))
+
+// Always log, no prefix.
+#define LOG_TIMING(severity, description) \
+ LOG_TIMING_IF(severity, true, (description))
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_LOG_TIMING(severity, description) \
+ kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+ google::severity, "", description, -1, true);
+
+// Scoped version of LOG_SLOW_EXECUTION().
+#define SCOPED_LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+ kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+ google::severity, "", description, max_expected_millis, true)
+
+// Scoped version of LOG_SLOW_EXECUTION() but with a prefix.
+#define SCOPED_LOG_SLOW_EXECUTION_PREFIX(severity, max_expected_millis, prefix, description) \
+ kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+ google::severity, prefix, description, max_expected_millis, true)
+
+// Macro for logging timing of a block. Usage:
+// LOG_SLOW_EXECUTION(INFO, 5, "doing some task") {
+// ... some task which takes some time
+// }
+// when slower than 5 milliseconds, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Time spent doing some task:
+// real 3.729s user 3.570s sys 0.150s
+#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+ for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, "", description, \
+ max_expected_millis, true); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro for vlogging timing of a block. The execution happens regardless of the vlog_level,
+// it's only the logging that's affected.
+// Usage:
+// VLOG_TIMING(1, "doing some task") {
+// ... some task which takes some time
+// }
+// Yields a log just like LOG_TIMING's.
+#define VLOG_TIMING(vlog_level, description) \
+ for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::INFO, "", description, \
+ -1, VLOG_IS_ON(vlog_level)); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_VLOG_TIMING(vlog_level, description) \
+ kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+ google::INFO, "", description, -1, VLOG_IS_ON(vlog_level));
+
+
+// Workaround for the clang analyzer being confused by the above loop-based macros.
+// The analyzer thinks the macros might loop more than once, and thus generates
+// false positives. So, for its purposes, just make them empty.
+#if defined(CLANG_TIDY) || defined(__clang_analyzer__)
+
+#undef LOG_TIMING_PREFIX_IF
+#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description)
+
+#undef VLOG_TIMING
+#define VLOG_TIMING(vlog_level, description)
+
+#undef LOG_SLOW_EXECUTION
+#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description)
+#endif
+
+
+#define NANOS_PER_SECOND 1000000000.0
+#define NANOS_PER_MILLISECOND 1000000.0
+
+class Stopwatch;
+
+typedef int64_t nanosecond_type;
+
+// Structure which contains an elapsed amount of wall/user/sys time.
+struct CpuTimes {
+ nanosecond_type wall;
+ nanosecond_type user;
+ nanosecond_type system;
+ int64_t context_switches;
+
+ void clear() { wall = user = system = context_switches = 0LL; }
+
+ // Return a string formatted similar to the output of the "time" shell command.
+ std::string ToString() const {
+ return StringPrintf(
+ "real %.3fs\tuser %.3fs\tsys %.3fs",
+ wall_seconds(), user_cpu_seconds(), system_cpu_seconds());
+ }
+
+ double wall_millis() const {
+ return static_cast<double>(wall) / NANOS_PER_MILLISECOND;
+ }
+
+ double wall_seconds() const {
+ return static_cast<double>(wall) / NANOS_PER_SECOND;
+ }
+
+ double user_cpu_seconds() const {
+ return static_cast<double>(user) / NANOS_PER_SECOND;
+ }
+
+ double system_cpu_seconds() const {
+ return static_cast<double>(system) / NANOS_PER_SECOND;
+ }
+};
+
+// A Stopwatch is a convenient way of timing a given operation.
+//
+// Wall clock time is based on a monotonic timer, so can be reliably used for
+// determining durations.
+// CPU time is based on either current thread's usage or the usage of the whole
+// process, depending on the value of 'Mode' passed to the constructor.
+//
+// The implementation relies on several syscalls, so should not be used for
+// hot paths, but is useful for timing anything on the granularity of seconds
+// or more.
+//
+// NOTE: the user time reported by this class is based on Linux scheduler ticks
+// and thus has low precision. Use GetThreadCpuTimeMicros() from walltime.h if
+// more accurate per-thread CPU usage timing is required.
+class Stopwatch {
+ public:
+
+ enum Mode {
+ // Collect usage only about the calling thread.
+ // This may not be supported on older versions of Linux.
+ THIS_THREAD,
+ // Collect usage of all threads.
+ ALL_THREADS
+ };
+
+ // Construct a new stopwatch. The stopwatch is initially stopped.
+ explicit Stopwatch(Mode mode = THIS_THREAD)
+ : mode_(mode),
+ stopped_(true) {
+ times_.clear();
+ }
+
+ // Start counting. If the stopwatch is already counting, then resets the
+ // start point at the current time.
+ void start() {
+ stopped_ = false;
+ GetTimes(×_);
+ }
+
+ // Stop counting. If the stopwatch is already stopped, has no effect.
+ void stop() {
+ if (stopped_) return;
+ stopped_ = true;
+
+ CpuTimes current;
+ GetTimes(¤t);
+ times_.wall = current.wall - times_.wall;
+ times_.user = current.user - times_.user;
+ times_.system = current.system - times_.system;
+ times_.context_switches = current.context_switches - times_.context_switches;
+ }
+
+ // Return the elapsed amount of time. If the stopwatch is running, then returns
+ // the amount of time since it was started. If it is stopped, returns the amount
+ // of time between the most recent start/stop pair. If the stopwatch has never been
+ // started, the elapsed time is considered to be zero.
+ CpuTimes elapsed() const {
+ if (stopped_) return times_;
+
+ CpuTimes current;
+ GetTimes(¤t);
+ current.wall -= times_.wall;
+ current.user -= times_.user;
+ current.system -= times_.system;
+ current.context_switches -= times_.context_switches;
+ return current;
+ }
+
+ // Resume a stopped stopwatch, such that the elapsed time continues to grow from
+ // the point where it was last stopped.
+ // For example:
+ // Stopwatch s;
+ // s.start();
+ // sleep(1); // elapsed() is now ~1sec
+ // s.stop();
+ // sleep(1);
+ // s.resume();
+ // sleep(1); // elapsed() is now ~2sec
+ void resume() {
+ if (!stopped_) return;
+
+ CpuTimes current(times_);
+ start();
+ times_.wall -= current.wall;
+ times_.user -= current.user;
+ times_.system -= current.system;
+ times_.context_switches -= current.context_switches;
+ }
+
+ bool is_stopped() const {
+ return stopped_;
+ }
+
+ private:
+ void GetTimes(CpuTimes *times) const {
+ struct rusage usage;
+ struct timespec wall;
+
+#if defined(__APPLE__)
+ if (mode_ == THIS_THREAD) {
+ // Adapted from https://codereview.chromium.org/16818003
+ thread_basic_info_data_t t_info;
+ mach_msg_type_number_t count = THREAD_BASIC_INFO_COUNT;
+ CHECK_EQ(KERN_SUCCESS, thread_info(mach_thread_self(), THREAD_BASIC_INFO,
+ (thread_info_t)&t_info, &count));
+ usage.ru_utime.tv_sec = t_info.user_time.seconds;
+ usage.ru_utime.tv_usec = t_info.user_time.microseconds;
+ usage.ru_stime.tv_sec = t_info.system_time.seconds;
+ usage.ru_stime.tv_usec = t_info.system_time.microseconds;
+ usage.ru_nivcsw = t_info.suspend_count;
+ usage.ru_nvcsw = 0;
+ } else {
+ CHECK_EQ(0, getrusage(RUSAGE_SELF, &usage));
+ }
+
+ mach_timespec_t ts;
+ walltime_internal::GetCurrentTime(&ts);
+ wall.tv_sec = ts.tv_sec;
+ wall.tv_nsec = ts.tv_nsec;
+#else
+ CHECK_EQ(0, getrusage((mode_ == THIS_THREAD) ? RUSAGE_THREAD : RUSAGE_SELF, &usage));
+ CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &wall));
+#endif // defined(__APPLE__)
+ times->wall = wall.tv_sec * 1000000000L + wall.tv_nsec;
+ times->user = usage.ru_utime.tv_sec * 1000000000L + usage.ru_utime.tv_usec * 1000L;
+ times->system = usage.ru_stime.tv_sec * 1000000000L + usage.ru_stime.tv_usec * 1000L;
+ times->context_switches = usage.ru_nvcsw + usage.ru_nivcsw;
+ }
+
+ const Mode mode_;
+ bool stopped_;
+ CpuTimes times_;
+};
+
+
+namespace sw_internal {
+
+// Internal class used by the LOG_TIMING macro.
+class LogTiming {
+ public:
+ LogTiming(const char *file, int line, google::LogSeverity severity,
+ std::string prefix, std::string description,
+ int64_t max_expected_millis, bool should_print)
+ : file_(file),
+ line_(line),
+ severity_(severity),
+ prefix_(std::move(prefix)),
+ description_(std::move(description)),
+ max_expected_millis_(max_expected_millis),
+ should_print_(should_print),
+ has_run_(false) {
+ stopwatch_.start();
+ }
+
+ ~LogTiming() {
+ if (should_print_) {
+ Print(max_expected_millis_);
+ }
+ }
+
+ // Allows this object to be used as the loop variable in for-loop macros.
+ // Call HasRun() in the conditional check in the for-loop.
+ bool HasRun() {
+ return has_run_;
+ }
+
+ // Allows this object to be used as the loop variable in for-loop macros.
+ // Call MarkHasRun() in the "increment" section of the for-loop.
+ void MarkHasRun() {
+ has_run_ = true;
+ }
+
+ private:
+ Stopwatch stopwatch_;
+ const char *file_;
+ const int line_;
+ const google::LogSeverity severity_;
+ const std::string prefix_;
+ const std::string description_;
+ const int64_t max_expected_millis_;
+ const bool should_print_;
+ bool has_run_;
+
+ // Print if the number of expected millis exceeds the max.
+ // Passing a negative number implies "always print".
+ void Print(int64_t max_expected_millis) {
+ stopwatch_.stop();
+ CpuTimes times = stopwatch_.elapsed();
+ // TODO(todd): for some reason, times.wall_millis() sometimes ends up negative
+ // on rare occasion, for unclear reasons, so we have to check max_expected_millis
+ // < 0 to be sure we always print when requested.
+ if (max_expected_millis < 0 || times.wall_millis() > max_expected_millis) {
+ google::LogMessage(file_, line_, severity_).stream()
+ << prefix_ << "Time spent " << description_ << ": "
+ << times.ToString();
+ }
+ }
+
+};
+
+} // namespace sw_internal
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case-test.cc b/be/src/kudu/util/string_case-test.cc
new file mode 100644
index 0000000..96831a1
--- /dev/null
+++ b/be/src/kudu/util/string_case-test.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/string_case.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(TestStringCase, TestSnakeToCamel) {
+ string out;
+ SnakeToCamelCase("foo_bar", &out);
+ ASSERT_EQ("FooBar", out);
+
+
+ SnakeToCamelCase("foo-bar", &out);
+ ASSERT_EQ("FooBar", out);
+
+ SnakeToCamelCase("foobar", &out);
+ ASSERT_EQ("Foobar", out);
+}
+
+TEST(TestStringCase, TestToUpperCase) {
+ string out;
+ ToUpperCase(string("foo"), &out);
+ ASSERT_EQ("FOO", out);
+ ToUpperCase(string("foo bar-BaZ"), &out);
+ ASSERT_EQ("FOO BAR-BAZ", out);
+}
+
+TEST(TestStringCase, TestToUpperCaseInPlace) {
+ string in_out = "foo";
+ ToUpperCase(in_out, &in_out);
+ ASSERT_EQ("FOO", in_out);
+}
+
+TEST(TestStringCase, TestCapitalize) {
+ string word = "foo";
+ Capitalize(&word);
+ ASSERT_EQ("Foo", word);
+
+ word = "HiBerNATe";
+ Capitalize(&word);
+ ASSERT_EQ("Hibernate", word);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.cc b/be/src/kudu/util/string_case.cc
new file mode 100644
index 0000000..7cf60ab
--- /dev/null
+++ b/be/src/kudu/util/string_case.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/string_case.h"
+
+#include <cctype>
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+using std::string;
+
+void SnakeToCamelCase(const std::string &snake_case,
+ std::string *camel_case) {
+ DCHECK_NE(camel_case, &snake_case) << "Does not support in-place operation";
+ camel_case->clear();
+ camel_case->reserve(snake_case.size());
+
+ bool uppercase_next = true;
+ for (char c : snake_case) {
+ if ((c == '_') ||
+ (c == '-')) {
+ uppercase_next = true;
+ continue;
+ }
+ if (uppercase_next) {
+ camel_case->push_back(toupper(c));
+ } else {
+ camel_case->push_back(c);
+ }
+ uppercase_next = false;
+ }
+}
+
+void ToUpperCase(const std::string &string,
+ std::string *out) {
+ if (out != &string) {
+ *out = string;
+ }
+
+ for (char& c : *out) {
+ c = toupper(c);
+ }
+}
+
+void Capitalize(string *word) {
+ uint32_t size = word->size();
+ if (size == 0) {
+ return;
+ }
+
+ (*word)[0] = toupper((*word)[0]);
+
+ for (int i = 1; i < size; i++) {
+ (*word)[i] = tolower((*word)[i]);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/string_case.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.h b/be/src/kudu/util/string_case.h
new file mode 100644
index 0000000..98f5828
--- /dev/null
+++ b/be/src/kudu/util/string_case.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with string case.
+#ifndef KUDU_UTIL_STRING_CASE_H
+#define KUDU_UTIL_STRING_CASE_H
+
+#include <string>
+
+namespace kudu {
+
+// Convert the given snake_case string to camel case.
+// Also treats '-' in a string like a '_'
+// For example:
+// - 'foo_bar' -> FooBar
+// - 'foo-bar' -> FooBar
+//
+// This function cannot operate in-place -- i.e. 'camel_case' must not
+// point to 'snake_case'.
+void SnakeToCamelCase(const std::string &snake_case,
+ std::string *camel_case);
+
+// Upper-case all of the characters in the given string.
+// 'string' and 'out' may refer to the same string to replace in-place.
+void ToUpperCase(const std::string &string,
+ std::string *out);
+
+// Capitalizes a string containing a word in place.
+// For example:
+// - 'hiBerNATe' -> 'Hibernate'
+void Capitalize(std::string *word);
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64-test.cc b/be/src/kudu/util/striped64-test.cc
new file mode 100644
index 0000000..c74e165
--- /dev/null
+++ b/be/src/kudu/util/striped64-test.cc
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <cstdint>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+// These flags are used by the multi-threaded tests, can be used for microbenchmarking.
+DEFINE_int32(num_operations, 10*1000, "Number of operations to perform");
+DEFINE_int32(num_threads, 2, "Number of worker threads");
+
+namespace kudu {
+
+// Test some basic operations
+TEST(Striped64Test, TestBasic) {
+ LongAdder adder;
+ ASSERT_EQ(adder.Value(), 0);
+ adder.IncrementBy(100);
+ ASSERT_EQ(adder.Value(), 100);
+ adder.Increment();
+ ASSERT_EQ(adder.Value(), 101);
+ adder.Decrement();
+ ASSERT_EQ(adder.Value(), 100);
+ adder.IncrementBy(-200);
+ ASSERT_EQ(adder.Value(), -100);
+ adder.Reset();
+ ASSERT_EQ(adder.Value(), 0);
+}
+
+template <class Adder>
+class MultiThreadTest {
+ public:
+ typedef std::vector<scoped_refptr<Thread> > thread_vec_t;
+
+ MultiThreadTest(int64_t num_operations, int64_t num_threads)
+ : num_operations_(num_operations),
+ num_threads_(num_threads) {
+ }
+
+ void IncrementerThread(const int64_t num) {
+ for (int i = 0; i < num; i++) {
+ adder_.Increment();
+ }
+ }
+
+ void DecrementerThread(const int64_t num) {
+ for (int i = 0; i < num; i++) {
+ adder_.Decrement();
+ }
+ }
+
+ void Run() {
+ // Increment
+ for (int i = 0; i < num_threads_; i++) {
+ scoped_refptr<Thread> ref;
+ Thread::Create("Striped64", "Incrementer", &MultiThreadTest::IncrementerThread, this,
+ num_operations_, &ref);
+ threads_.push_back(ref);
+ }
+ for (const scoped_refptr<Thread> &t : threads_) {
+ t->Join();
+ }
+ ASSERT_EQ(num_threads_*num_operations_, adder_.Value());
+ threads_.clear();
+
+ // Decrement back to zero
+ for (int i = 0; i < num_threads_; i++) {
+ scoped_refptr<Thread> ref;
+ Thread::Create("Striped64", "Decrementer", &MultiThreadTest::DecrementerThread, this,
+ num_operations_, &ref);
+ threads_.push_back(ref);
+ }
+ for (const scoped_refptr<Thread> &t : threads_) {
+ t->Join();
+ }
+ ASSERT_EQ(0, adder_.Value());
+ }
+
+ Adder adder_;
+
+ int64_t num_operations_;
+ // This is rounded down to the nearest even number
+ int32_t num_threads_;
+ thread_vec_t threads_;
+};
+
+// Test adder implemented by a single AtomicInt for comparison
+class BasicAdder {
+ public:
+ BasicAdder() : value_(0) {}
+ void IncrementBy(int64_t x) { value_.IncrementBy(x); }
+ inline void Increment() { IncrementBy(1); }
+ inline void Decrement() { IncrementBy(-1); }
+ int64_t Value() { return value_.Load(); }
+ private:
+ AtomicInt<int64_t> value_;
+};
+
+void RunMultiTest(int64_t num_operations, int64_t num_threads) {
+ MonoTime start = MonoTime::Now();
+ MultiThreadTest<BasicAdder> basicTest(num_operations, num_threads);
+ basicTest.Run();
+ MonoTime end1 = MonoTime::Now();
+ MultiThreadTest<LongAdder> test(num_operations, num_threads);
+ test.Run();
+ MonoTime end2 = MonoTime::Now();
+ MonoDelta basic = end1 - start;
+ MonoDelta striped = end2 - end1;
+ LOG(INFO) << "Basic counter took " << basic.ToMilliseconds() << "ms.";
+ LOG(INFO) << "Striped counter took " << striped.ToMilliseconds() << "ms.";
+}
+
+// Compare a single-thread workload. Demonstrates the overhead of LongAdder over AtomicInt.
+TEST(Striped64Test, TestSingleIncrDecr) {
+ OverrideFlagForSlowTests(
+ "num_operations",
+ strings::Substitute("$0", (FLAGS_num_operations * 100)));
+ RunMultiTest(FLAGS_num_operations, 1);
+}
+
+// Compare a multi-threaded workload. LongAdder should show improvements here.
+TEST(Striped64Test, TestMultiIncrDecr) {
+ OverrideFlagForSlowTests(
+ "num_operations",
+ strings::Substitute("$0", (FLAGS_num_operations * 100)));
+ OverrideFlagForSlowTests(
+ "num_threads",
+ strings::Substitute("$0", (FLAGS_num_threads * 4)));
+ RunMultiTest(FLAGS_num_operations, FLAGS_num_threads);
+}
+
+TEST(Striped64Test, TestSize) {
+ ASSERT_EQ(16, sizeof(LongAdder));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.cc b/be/src/kudu/util/striped64.cc
new file mode 100644
index 0000000..789a395
--- /dev/null
+++ b/be/src/kudu/util/striped64.cc
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/striped64.h"
+
+#include <mm_malloc.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <new>
+#include <ostream>
+#include <glog/logging.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+
+using kudu::striped64::internal::Cell;
+
+namespace kudu {
+
+namespace striped64 {
+namespace internal {
+
+//
+// Cell
+//
+
+Cell::Cell()
+ : value_(0) {
+}
+} // namespace internal
+} // namespace striped64
+
+//
+// Striped64
+//
+__thread uint64_t Striped64::tls_hashcode_ = 0;
+
+namespace {
+const uint32_t kNumCpus = sysconf(_SC_NPROCESSORS_ONLN);
+uint32_t ComputeNumCells() {
+ uint32_t n = 1;
+ // Calculate the size. Nearest power of two >= NCPU.
+ // Also handle a negative NCPU, can happen if sysconf name is unknown
+ while (kNumCpus > n) {
+ n <<= 1;
+ }
+ return n;
+}
+const uint32_t kNumCells = ComputeNumCells();
+const uint32_t kCellMask = kNumCells - 1;
+
+striped64::internal::Cell* const kCellsLocked =
+ reinterpret_cast<striped64::internal::Cell*>(-1L);
+
+} // anonymous namespace
+
+uint64_t Striped64::get_tls_hashcode() {
+ if (PREDICT_FALSE(tls_hashcode_ == 0)) {
+ Random r((MonoTime::Now() - MonoTime::Min()).ToNanoseconds());
+ const uint64_t hash = r.Next64();
+ // Avoid zero to allow xorShift rehash, and because 0 indicates an unset
+ // hashcode above.
+ tls_hashcode_ = (hash == 0) ? 1 : hash;
+ }
+ return tls_hashcode_;
+}
+
+
+Striped64::~Striped64() {
+ // Cell is a POD, so no need to destruct each one.
+ free(cells_);
+}
+
+template<class Updater>
+void Striped64::RetryUpdate(Rehash to_rehash, Updater updater) {
+ uint64_t h = get_tls_hashcode();
+ // There are three operations in this loop.
+ //
+ // 1. Try to add to the Cell hash table entry for the thread if the table exists.
+ // When there's contention, rehash to try a different Cell.
+ // 2. Try to initialize the hash table.
+ // 3. Try to update the base counter.
+ //
+ // These are predicated on successful CAS operations, which is why it's all wrapped in an
+ // infinite retry loop.
+ while (true) {
+ Cell* cells = cells_.load(std::memory_order_acquire);
+ if (cells && cells != kCellsLocked) {
+ if (to_rehash == kRehash) {
+ // CAS failed already, rehash before trying to increment.
+ to_rehash = kNoRehash;
+ } else {
+ Cell *cell = &(cells_[h & kCellMask]);
+ int64_t v = cell->value_.load(std::memory_order_relaxed);
+ if (cell->CompareAndSet(v, updater(v))) {
+ // Successfully CAS'd the corresponding cell, done.
+ break;
+ }
+ }
+ // Rehash since we failed to CAS, either previously or just now.
+ h ^= h << 13;
+ h ^= h >> 17;
+ h ^= h << 5;
+ } else if (cells == nullptr &&
+ cells_.compare_exchange_weak(cells, kCellsLocked)) {
+ // Allocate cache-aligned memory for use by the cells_ table.
+ void* cell_buffer = nullptr;
+ int err = posix_memalign(&cell_buffer, CACHELINE_SIZE, sizeof(Cell) * kNumCells);
+ CHECK_EQ(0, err) << "error calling posix_memalign" << std::endl;
+ // Initialize the table
+ cells = new (cell_buffer) Cell[kNumCells];
+ cells_.store(cells, std::memory_order_release);
+ } else {
+ // Fallback to adding to the base value.
+ // Means the table wasn't initialized or we failed to init it.
+ int64_t v = base_.load(std::memory_order_relaxed);
+ if (CasBase(v, updater(v))) {
+ break;
+ }
+ }
+ }
+ // Record index for next time
+ tls_hashcode_ = h;
+}
+
+void Striped64::InternalReset(int64_t initial_value) {
+ base_.store(initial_value);
+ Cell* c;
+ do {
+ c = cells_.load(std::memory_order_acquire);
+ } while (c == kCellsLocked);
+ if (c) {
+ for (int i = 0; i < kNumCells; i++) {
+ c[i].value_.store(initial_value);
+ }
+ }
+}
+void LongAdder::IncrementBy(int64_t x) {
+ // Use hash table if present. If that fails, call RetryUpdate to rehash and retry.
+ // If no hash table, try to CAS the base counter. If that fails, RetryUpdate to init the table.
+ Cell* cells = cells_.load(std::memory_order_acquire);
+ if (cells && cells != kCellsLocked) {
+ Cell *cell = &(cells[get_tls_hashcode() & kCellMask]);
+ DCHECK_EQ(0, reinterpret_cast<const uintptr_t>(cell) & (sizeof(Cell) - 1))
+ << " unaligned Cell not allowed for Striped64" << std::endl;
+ const int64_t old = cell->value_.load(std::memory_order_relaxed);
+ if (!cell->CompareAndSet(old, old + x)) {
+ // When we hit a hash table contention, signal RetryUpdate to rehash.
+ RetryUpdate(kRehash, [x](int64_t old) { return old + x; });
+ }
+ } else {
+ int64_t b = base_.load(std::memory_order_relaxed);
+ if (!CasBase(b, b + x)) {
+ // Attempt to initialize the table. No need to rehash since the contention was for the
+ // base counter, not the hash table.
+ RetryUpdate(kNoRehash, [x](int64_t old) { return old + x; });
+ }
+ }
+}
+
+//
+// LongAdder
+//
+
+int64_t LongAdder::Value() const {
+ int64_t sum = base_.load(std::memory_order_relaxed);
+ Cell* c = cells_.load(std::memory_order_acquire);
+ if (c && c != kCellsLocked) {
+ for (int i = 0; i < kNumCells; i++) {
+ sum += c[i].value_.load(std::memory_order_relaxed);
+ }
+ }
+ return sum;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/striped64.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.h b/be/src/kudu/util/striped64.h
new file mode 100644
index 0000000..48332e1
--- /dev/null
+++ b/be/src/kudu/util/striped64.h
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_STRIPED64_H_
+#define KUDU_UTIL_STRIPED64_H_
+
+#include <atomic>
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+namespace striped64 {
+namespace internal {
+
+// Padded POD container for atomic<int64_t>. This prevents false sharing of cache lines.
+class Cell {
+ public:
+ static constexpr int kAtomicInt64Size = sizeof(std::atomic<int64_t>);
+
+ Cell();
+ inline bool CompareAndSet(int64_t cmp, int64_t value) {
+ return value_.compare_exchange_weak(cmp, value);
+ }
+
+ // Padding advice from Herb Sutter:
+ // http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4
+ std::atomic<int64_t> value_;
+ char pad[CACHELINE_SIZE > kAtomicInt64Size ?
+ CACHELINE_SIZE - kAtomicInt64Size : 1];
+
+ DISALLOW_COPY_AND_ASSIGN(Cell);
+} CACHELINE_ALIGNED;
+#undef ATOMIC_INT_SIZE
+
+} // namespace internal
+} // namespace striped64
+
+// This set of classes is heavily derived from JSR166e, released into the public domain
+// by Doug Lea and the other authors.
+//
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co
+//
+// The Striped64 and LongAdder implementations here are simplified versions of what's present in
+// JSR166e. However, the core ideas remain the same.
+//
+// Updating a single AtomicInteger in a multi-threaded environment can be quite slow:
+//
+// 1. False sharing of cache lines with other counters.
+// 2. Cache line bouncing from high update rates, especially with many cores.
+//
+// These two problems are addressed by Striped64. When there is no contention, it uses CAS on a
+// single base counter to store updates. However, when Striped64 detects contention
+// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells.
+// A Cell is a simple POD that pads out an atomic<int64_t> to 64 bytes to prevent
+// sharing a cache line.
+//
+// Reading the value of a Striped64 requires traversing the hashtable to calculate the true sum.
+//
+// Each updating thread uses a thread-local hashcode to determine its Cell in the hashtable.
+// If a thread fails to CAS its hashed Cell, it will do a lightweight rehash operation to try
+// and find an uncontended bucket. Because the hashcode is thread-local, this rehash affects all
+// Striped64's accessed by the thread. This is good, since contention on one Striped64 is
+// indicative of contention elsewhere too.
+//
+// The hashtable is statically sized to the nearest power of 2 greater than or equal to the
+// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash
+// function. Due to the random rehashing, the threads should eventually converge to this function.
+// In practice, this scheme has shown to be sufficient.
+//
+// The biggest simplification of this implementation compared to JSR166e is that we do not
+// dynamically grow the table, instead immediately allocating it to the full size.
+// We also do not lazily allocate each Cell, instead allocating the entire array at once.
+// This means we waste some additional memory in low contention scenarios, and initial allocation
+// will also be slower. Some of the micro-optimizations were also elided for readability.
+class Striped64 {
+ public:
+ Striped64() = default;
+
+ protected:
+ // NOTE: the destructor is not virtual so that we can ensure that Striped64
+ // has no vtable, thus reducing its size. We make it protected to ensure that
+ // no one attempts to delete a Striped64* and invokes the wrong destructor.
+ ~Striped64();
+
+ enum Rehash {
+ kRehash,
+ kNoRehash
+ };
+
+ // CAS the base field.
+ bool CasBase(int64_t cmp, int64_t val) { return base_.compare_exchange_weak(cmp, val); }
+
+ // Handles cases of updates involving initialization, resizing, creating new Cells, and/or
+ // contention. See above for further explanation.
+ //
+ // 'Updater' should be a function which takes the current value and returns
+ // the new value.
+ template<class Updater>
+ void RetryUpdate(Rehash to_rehash, Updater updater);
+
+ // Sets base and all cells to the given value.
+ void InternalReset(int64_t initial_value);
+
+ // Base value, used mainly when there is no contention, but also as a fallback during
+ // table initialization races. Updated via CAS.
+ std::atomic<int64_t> base_ { 0 };
+
+ // Table of cells. When non-null, size is the nearest power of 2 >= NCPU.
+ // If this is set to -1, the pointer is 'locked' and some thread is in the
+ // process of allocating the array.
+ std::atomic<striped64::internal::Cell*> cells_ { nullptr };
+
+ protected:
+ static uint64_t get_tls_hashcode();
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Striped64);
+
+ // Static hash code per-thread. Shared across all instances to limit thread-local pollution.
+ // Also, if a thread hits a collision on one Striped64, it's also likely to collide on
+ // other Striped64s too.
+ static __thread uint64_t tls_hashcode_;
+};
+
+// A 64-bit number optimized for high-volume concurrent updates.
+// See Striped64 for a longer explanation of the inner workings.
+class LongAdder : Striped64 {
+ public:
+ LongAdder() {}
+ void IncrementBy(int64_t x);
+ void Increment() { IncrementBy(1); }
+ void Decrement() { IncrementBy(-1); }
+
+ // Returns the current value.
+ // Note this is not an atomic snapshot in the presence of concurrent updates.
+ int64_t Value() const;
+
+ // Resets the counter state to zero.
+ void Reset() { InternalReset(0); }
+
+ protected:
+ int64_t CombineValue(int64_t current_value, int64_t new_value) {
+ return current_value + new_value;
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(LongAdder);
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess-test.cc b/be/src/kudu/util/subprocess-test.cc
new file mode 100644
index 0000000..24d7cb3
--- /dev/null
+++ b/be/src/kudu/util/subprocess-test.cc
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::atomic;
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class SubprocessTest : public KuduTest {};
+
+TEST_F(SubprocessTest, TestSimplePipe) {
+ Subprocess p({ "/usr/bin/tr", "a-z", "A-Z" });
+ p.ShareParentStdout(false);
+ ASSERT_OK(p.Start());
+
+ FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+ PCHECK(out);
+ FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+ PCHECK(in);
+
+ fprintf(out, "hello world\n");
+ // We have to close 'out' or else tr won't write any output, since
+ // it enters a buffered mode if it detects that its input is a FIFO.
+ int err;
+ RETRY_ON_EINTR(err, fclose(out));
+
+ char buf[1024];
+ ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+ ASSERT_STREQ("HELLO WORLD\n", &buf[0]);
+
+ int wait_status = 0;
+ ASSERT_OK(p.Wait(&wait_status));
+ ASSERT_TRUE(WIFEXITED(wait_status));
+ ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestErrPipe) {
+ Subprocess p({ "/usr/bin/tee", "/dev/stderr" });
+ p.ShareParentStderr(false);
+ ASSERT_OK(p.Start());
+
+ FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+ PCHECK(out);
+
+ fprintf(out, "Hello, World\n");
+
+ // Same reasoning as above, flush to prevent tee buffering.
+ int err;
+ RETRY_ON_EINTR(err, fclose(out));
+
+ FILE* in = fdopen(p.from_child_stderr_fd(), "r");
+ PCHECK(in);
+
+ char buf[1024];
+ ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+ ASSERT_STREQ("Hello, World\n", &buf[0]);
+
+ int wait_status = 0;
+ ASSERT_OK(p.Wait(&wait_status));
+ ASSERT_TRUE(WIFEXITED(wait_status));
+ ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestKill) {
+ Subprocess p({ "/bin/cat" });
+ ASSERT_OK(p.Start());
+
+ ASSERT_OK(p.Kill(SIGKILL));
+
+ int wait_status = 0;
+ ASSERT_OK(p.Wait(&wait_status));
+ ASSERT_TRUE(WIFSIGNALED(wait_status));
+ ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+
+ // Test that calling Wait() a second time returns the same
+ // cached value instead of trying to wait on some other process
+ // that was assigned the same pid.
+ wait_status = 0;
+ ASSERT_OK(p.Wait(&wait_status));
+ ASSERT_TRUE(WIFSIGNALED(wait_status));
+ ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+}
+
+// Writes enough bytes to stdout and stderr concurrently that if Call() were
+// fully reading them one at a time, the test would deadlock.
+TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
+ // Set an alarm to break out of any potential deadlocks (if the implementation
+ // regresses).
+ alarm(60);
+
+ string stdout;
+ string stderr;
+ ASSERT_OK(Subprocess::Call({
+ "/bin/bash",
+ "-c",
+ "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &"
+ "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &"
+ "wait"
+ }, "", &stdout, &stderr));
+
+ // Reset the alarm when the test is done
+ SCOPED_CLEANUP({ alarm(0); })
+}
+
+// Test that environment variables can be passed to the subprocess.
+TEST_F(SubprocessTest, TestEnvVars) {
+ Subprocess p({ "/bin/bash", "-c", "echo $FOO" });
+ p.SetEnvVars({{"FOO", "bar"}});
+ p.ShareParentStdout(false);
+ ASSERT_OK(p.Start());
+ FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+ PCHECK(in);
+ char buf[1024];
+ ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+ ASSERT_STREQ("bar\n", &buf[0]);
+ ASSERT_OK(p.Wait());
+}
+
+// Test that the the subprocesses CWD can be set.
+TEST_F(SubprocessTest, TestCurrentDir) {
+ string dir_path = GetTestPath("d");
+ string file_path = JoinPathSegments(dir_path, "f");
+ ASSERT_OK(Env::Default()->CreateDir(dir_path));
+ std::unique_ptr<WritableFile> file;
+ ASSERT_OK(Env::Default()->NewWritableFile(file_path, &file));
+
+ Subprocess p({ "/bin/ls", "f" });
+ p.SetCurrentDir(dir_path);
+ p.ShareParentStdout(false);
+ ASSERT_OK(p.Start());
+ ASSERT_OK(p.Wait());
+
+ int rc;
+ ASSERT_OK(p.GetExitStatus(&rc, nullptr));
+ EXPECT_EQ(0, rc);
+}
+
+// Tests writing to the subprocess stdin.
+TEST_F(SubprocessTest, TestCallWithStdin) {
+ string stdout;
+ ASSERT_OK(Subprocess::Call({ "/bin/bash" },
+ "echo \"quick brown fox\"",
+ &stdout));
+ EXPECT_EQ("quick brown fox\n", stdout);
+}
+
+// Test KUDU-1674: '/bin/bash -c "echo"' command below is expected to
+// capture a string on stderr. This test validates that passing
+// stderr alone doesn't result in SIGSEGV as reported in the bug and
+// also check for sanity of stderr in the output.
+TEST_F(SubprocessTest, TestReadSingleFD) {
+ string stderr;
+ const string str = "ApacheKudu";
+ const string cmd_str = Substitute("/bin/echo -n $0 1>&2", str);
+ ASSERT_OK(Subprocess::Call({"/bin/sh", "-c", cmd_str}, "", nullptr, &stderr));
+ ASSERT_EQ(stderr, str);
+
+ // Also sanity check other combinations.
+ string stdout;
+ ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/null"}, "", &stdout, nullptr));
+ ASSERT_STR_CONTAINS(stdout, "/dev/null");
+
+ ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/zero"}, "", nullptr, nullptr));
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
+ Subprocess p({ "/bin/sh", "-c", "exit 0" });
+ ASSERT_OK(p.Start());
+ ASSERT_OK(p.Wait());
+ int exit_status;
+ string exit_info;
+ ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+ ASSERT_EQ(0, exit_status);
+ ASSERT_STR_CONTAINS(exit_info, "process successfully exited");
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitFailure) {
+ static const vector<int> kStatusCodes = { 1, 255 };
+ for (auto code : kStatusCodes) {
+ Subprocess p({ "/bin/sh", "-c", Substitute("exit $0", code) });
+ ASSERT_OK(p.Start());
+ ASSERT_OK(p.Wait());
+ int exit_status;
+ string exit_info;
+ ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+ ASSERT_EQ(code, exit_status);
+ ASSERT_STR_CONTAINS(exit_info,
+ Substitute("process exited with non-zero status $0",
+ exit_status));
+ }
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusSignaled) {
+ static const vector<int> kSignals = {
+ SIGHUP,
+ SIGABRT,
+ SIGKILL,
+ SIGTERM,
+ SIGUSR2,
+ };
+ for (auto signum : kSignals) {
+ Subprocess p({ "/bin/cat" });
+ ASSERT_OK(p.Start());
+ ASSERT_OK(p.Kill(signum));
+ ASSERT_OK(p.Wait());
+ int exit_status;
+ string exit_info;
+ ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+ EXPECT_EQ(signum, exit_status);
+ ASSERT_STR_CONTAINS(exit_info, Substitute("process exited on signal $0",
+ signum));
+ }
+}
+
+TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
+ string kTestFile = GetTestPath("foo");
+
+ // Start a subprocess that creates kTestFile immediately and deletes it on exit.
+ //
+ // Note: it's important that the shell not invoke a command while waiting
+ // to be killed (i.e. "sleep 60"); if it did, the signal could be delivered
+ // just after the command starts but just before the shell decides to forward
+ // signals to it, and we wind up with a deadlock.
+ vector<string> argv = {
+ "/bin/bash",
+ "-c",
+ Substitute(
+ // Delete kTestFile on exit.
+ "trap \"rm $0\" EXIT;"
+ // Create kTestFile on start.
+ "touch $0;"
+ // Spin in a tight loop waiting to be killed.
+ "while true;"
+ " do FOO=$$((FOO + 1));"
+ "done", kTestFile)
+ };
+
+ {
+ Subprocess s(argv);
+ ASSERT_OK(s.Start());
+ AssertEventually([&]{
+ ASSERT_TRUE(env_->FileExists(kTestFile));
+ });
+ }
+
+ // The subprocess went out of scope and was killed with SIGKILL, so it left
+ // kTestFile behind.
+ ASSERT_TRUE(env_->FileExists(kTestFile));
+
+ ASSERT_OK(env_->DeleteFile(kTestFile));
+ {
+ Subprocess s(argv, SIGTERM);
+ ASSERT_OK(s.Start());
+ AssertEventually([&]{
+ ASSERT_TRUE(env_->FileExists(kTestFile));
+ });
+ }
+
+ // The subprocess was killed with SIGTERM, giving it a chance to delete kTestFile.
+ ASSERT_FALSE(env_->FileExists(kTestFile));
+}
+
+// TEST KUDU-2208: Test subprocess interruption handling
+void handler(int /* signal */) {
+}
+
+TEST_F(SubprocessTest, TestSubprocessInterruptionHandling) {
+ // Create Subprocess thread
+ pthread_t t;
+ Subprocess p({ "/bin/sleep", "1" });
+ atomic<bool> t_started(false);
+ atomic<bool> t_finished(false);
+ thread subprocess_thread([&]() {
+ t = pthread_self();
+ t_started = true;
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ CHECK_OK(p.Start());
+ CHECK_OK(p.Wait());
+ t_finished = true;
+ });
+
+ // Set up a no-op signal handler for SIGUSR2.
+ struct sigaction sa, sa_old;
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_handler = &handler;
+ sigaction(SIGUSR2, &sa, &sa_old);
+
+ SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+ SCOPED_CLEANUP({ subprocess_thread.join(); });
+
+ // Send kill signals to Subprocess thread
+ LOG(INFO) << "Start sending kill signals to Subprocess thread";
+ while (!t_finished) {
+ if (t_started) {
+ int err = pthread_kill(t, SIGUSR2);
+ ASSERT_TRUE(err == 0 || err == ESRCH);
+ if (err == ESRCH) {
+ LOG(INFO) << "Async kill signal failed with err=" << err <<
+ " because it tried to kill vanished subprocess_thread";
+ ASSERT_TRUE(t_finished);
+ }
+ // Add microseconds delay to make the unit test runs faster and more reliable
+ SleepFor(MonoDelta::FromMicroseconds(rand() % 1));
+ }
+ }
+}
+
+#ifdef __linux__
+// This test requires a system with /proc/<pid>/stat.
+TEST_F(SubprocessTest, TestGetProcfsState) {
+ // This test should be RUNNING.
+ Subprocess::ProcfsState state;
+ ASSERT_OK(Subprocess::GetProcfsState(getpid(), &state));
+ ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+ // When started, /bin/sleep will be RUNNING (even though it's asleep).
+ Subprocess sleep({"/bin/sleep", "1000"});
+ ASSERT_OK(sleep.Start());
+ ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+ ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+
+ // After a SIGSTOP, it should be PAUSED.
+ ASSERT_OK(sleep.Kill(SIGSTOP));
+ ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+ ASSERT_EQ(Subprocess::ProcfsState::PAUSED, state);
+
+ // After a SIGCONT, it should be RUNNING again.
+ ASSERT_OK(sleep.Kill(SIGCONT));
+ ASSERT_OK(Subprocess::GetProcfsState(sleep.pid(), &state));
+ ASSERT_EQ(Subprocess::ProcfsState::RUNNING, state);
+}
+#endif
+
+} // namespace kudu