You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:24 UTC
[12/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/monotime.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/monotime.cc b/be/src/kudu/util/monotime.cc
new file mode 100644
index 0000000..89c795d
--- /dev/null
+++ b/be/src/kudu/util/monotime.cc
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/monotime.h"
+
+#include <sys/time.h>
+
+#include <ctime>
+#include <limits>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/thread_restrictions.h"
+#if defined(__APPLE__)
+#include "kudu/gutil/walltime.h"
+#endif
+
+namespace kudu {
+
+#define MAX_MONOTONIC_SECONDS \
+ (((1ULL<<63) - 1ULL) /(int64_t)MonoTime::kNanosecondsPerSecond)
+
+
+///
+/// MonoDelta
+///
+
+const int64_t MonoDelta::kUninitialized = kint64min;
+
+MonoDelta MonoDelta::FromSeconds(double seconds) {
+ int64_t delta = seconds * MonoTime::kNanosecondsPerSecond;
+ return MonoDelta(delta);
+}
+
+MonoDelta MonoDelta::FromMilliseconds(int64_t ms) {
+ return MonoDelta(ms * MonoTime::kNanosecondsPerMillisecond);
+}
+
+MonoDelta MonoDelta::FromMicroseconds(int64_t us) {
+ return MonoDelta(us * MonoTime::kNanosecondsPerMicrosecond);
+}
+
+MonoDelta MonoDelta::FromNanoseconds(int64_t ns) {
+ return MonoDelta(ns);
+}
+
+MonoDelta::MonoDelta()
+ : nano_delta_(kUninitialized) {
+}
+
+bool MonoDelta::Initialized() const {
+ return nano_delta_ != kUninitialized;
+}
+
+bool MonoDelta::LessThan(const MonoDelta &rhs) const {
+ DCHECK(Initialized());
+ DCHECK(rhs.Initialized());
+ return nano_delta_ < rhs.nano_delta_;
+}
+
+bool MonoDelta::MoreThan(const MonoDelta &rhs) const {
+ DCHECK(Initialized());
+ DCHECK(rhs.Initialized());
+ return nano_delta_ > rhs.nano_delta_;
+}
+
+bool MonoDelta::Equals(const MonoDelta &rhs) const {
+ DCHECK(Initialized());
+ DCHECK(rhs.Initialized());
+ return nano_delta_ == rhs.nano_delta_;
+}
+
+std::string MonoDelta::ToString() const {
+ return StringPrintf("%.3fs", ToSeconds());
+}
+
+MonoDelta::MonoDelta(int64_t delta)
+ : nano_delta_(delta) {
+}
+
+double MonoDelta::ToSeconds() const {
+ DCHECK(Initialized());
+ double d(nano_delta_);
+ d /= MonoTime::kNanosecondsPerSecond;
+ return d;
+}
+
+int64_t MonoDelta::ToNanoseconds() const {
+ DCHECK(Initialized());
+ return nano_delta_;
+}
+
+int64_t MonoDelta::ToMicroseconds() const {
+ DCHECK(Initialized());
+ return nano_delta_ / MonoTime::kNanosecondsPerMicrosecond;
+}
+
+int64_t MonoDelta::ToMilliseconds() const {
+ DCHECK(Initialized());
+ return nano_delta_ / MonoTime::kNanosecondsPerMillisecond;
+}
+
+void MonoDelta::ToTimeVal(struct timeval *tv) const {
+ DCHECK(Initialized());
+ tv->tv_sec = nano_delta_ / MonoTime::kNanosecondsPerSecond;
+ tv->tv_usec = (nano_delta_ - (tv->tv_sec * MonoTime::kNanosecondsPerSecond))
+ / MonoTime::kNanosecondsPerMicrosecond;
+
+ // tv_usec must be between 0 and 999999.
+ // There is little use for negative timevals so wrap it in PREDICT_FALSE.
+ if (PREDICT_FALSE(tv->tv_usec < 0)) {
+ --(tv->tv_sec);
+ tv->tv_usec += 1000000;
+ }
+
+ // Catch positive corner case where we "round down" and could potentially set a timeout of 0.
+ // Make it 1 usec.
+ if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ > 0)) {
+ tv->tv_usec = 1;
+ }
+
+ // Catch negative corner case where we "round down" and could potentially set a timeout of 0.
+ // Make it -1 usec (but normalized, so tv_usec is not negative).
+ if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ < 0)) {
+ tv->tv_sec = -1;
+ tv->tv_usec = 999999;
+ }
+}
+
+
+void MonoDelta::NanosToTimeSpec(int64_t nanos, struct timespec* ts) {
+ ts->tv_sec = nanos / MonoTime::kNanosecondsPerSecond;
+ ts->tv_nsec = nanos - (ts->tv_sec * MonoTime::kNanosecondsPerSecond);
+
+ // tv_nsec must be between 0 and 999999999.
+ // There is little use for negative timespecs so wrap it in PREDICT_FALSE.
+ if (PREDICT_FALSE(ts->tv_nsec < 0)) {
+ --(ts->tv_sec);
+ ts->tv_nsec += MonoTime::kNanosecondsPerSecond;
+ }
+}
+
+void MonoDelta::ToTimeSpec(struct timespec *ts) const {
+ DCHECK(Initialized());
+ NanosToTimeSpec(nano_delta_, ts);
+}
+
+///
+/// MonoTime
+///
+
+MonoTime MonoTime::Now() {
+#if defined(__APPLE__)
+ return MonoTime(walltime_internal::GetMonoTimeNanos());
+# else
+ struct timespec ts;
+ PCHECK(clock_gettime(CLOCK_MONOTONIC, &ts) == 0);
+ return MonoTime(ts);
+#endif // defined(__APPLE__)
+}
+
+MonoTime MonoTime::Max() {
+ return MonoTime(std::numeric_limits<int64_t>::max());
+}
+
+MonoTime MonoTime::Min() {
+ return MonoTime(1);
+}
+
+const MonoTime& MonoTime::Earliest(const MonoTime& a, const MonoTime& b) {
+ if (b.nanos_ < a.nanos_) {
+ return b;
+ }
+ return a;
+}
+
+MonoTime::MonoTime()
+ : nanos_(0) {
+}
+
+bool MonoTime::Initialized() const {
+ return nanos_ != 0;
+}
+
+MonoDelta MonoTime::GetDeltaSince(const MonoTime &rhs) const {
+ DCHECK(Initialized());
+ DCHECK(rhs.Initialized());
+ int64_t delta(nanos_);
+ delta -= rhs.nanos_;
+ return MonoDelta(delta);
+}
+
+void MonoTime::AddDelta(const MonoDelta &delta) {
+ DCHECK(Initialized());
+ nanos_ += delta.nano_delta_;
+}
+
+bool MonoTime::ComesBefore(const MonoTime &rhs) const {
+ DCHECK(Initialized());
+ DCHECK(rhs.Initialized());
+ return nanos_ < rhs.nanos_;
+}
+
+std::string MonoTime::ToString() const {
+ return StringPrintf("%.3fs", ToSeconds());
+}
+
+void MonoTime::ToTimeSpec(struct timespec* ts) const {
+ DCHECK(Initialized());
+ MonoDelta::NanosToTimeSpec(nanos_, ts);
+}
+
+bool MonoTime::Equals(const MonoTime& other) const {
+ return nanos_ == other.nanos_;
+}
+
+MonoTime& MonoTime::operator+=(const MonoDelta& delta) {
+ this->AddDelta(delta);
+ return *this;
+}
+
+MonoTime& MonoTime::operator-=(const MonoDelta& delta) {
+ this->AddDelta(MonoDelta(-1 * delta.nano_delta_));
+ return *this;
+}
+
+MonoTime::MonoTime(const struct timespec &ts) {
+ // Monotonic time resets when the machine reboots. The 64-bit limitation
+ // means that we can't represent times larger than 292 years, which should be
+ // adequate.
+ CHECK_LT(ts.tv_sec, MAX_MONOTONIC_SECONDS);
+ nanos_ = ts.tv_sec;
+ nanos_ *= MonoTime::kNanosecondsPerSecond;
+ nanos_ += ts.tv_nsec;
+}
+
+MonoTime::MonoTime(int64_t nanos)
+ : nanos_(nanos) {
+}
+
+double MonoTime::ToSeconds() const {
+ double d(nanos_);
+ d /= MonoTime::kNanosecondsPerSecond;
+ return d;
+}
+
+void SleepFor(const MonoDelta& delta) {
+ ThreadRestrictions::AssertWaitAllowed();
+ base::SleepForNanoseconds(delta.ToNanoseconds());
+}
+
+bool operator==(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return lhs.Equals(rhs);
+}
+
+bool operator!=(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return !lhs.Equals(rhs);
+}
+
+bool operator<(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return lhs.LessThan(rhs);
+}
+
+bool operator<=(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return lhs.LessThan(rhs) || lhs.Equals(rhs);
+}
+
+bool operator>(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return lhs.MoreThan(rhs);
+}
+
+bool operator>=(const MonoDelta &lhs, const MonoDelta &rhs) {
+ return lhs.MoreThan(rhs) || lhs.Equals(rhs);
+}
+
+bool operator==(const MonoTime& lhs, const MonoTime& rhs) {
+ return lhs.Equals(rhs);
+}
+
+bool operator!=(const MonoTime& lhs, const MonoTime& rhs) {
+ return !lhs.Equals(rhs);
+}
+
+bool operator<(const MonoTime& lhs, const MonoTime& rhs) {
+ return lhs.ComesBefore(rhs);
+}
+
+bool operator<=(const MonoTime& lhs, const MonoTime& rhs) {
+ return lhs.ComesBefore(rhs) || lhs.Equals(rhs);
+}
+
+bool operator>(const MonoTime& lhs, const MonoTime& rhs) {
+ return rhs.ComesBefore(lhs);
+}
+
+bool operator>=(const MonoTime& lhs, const MonoTime& rhs) {
+ return rhs.ComesBefore(lhs) || rhs.Equals(lhs);
+}
+
+MonoTime operator+(const MonoTime& t, const MonoDelta& delta) {
+ MonoTime tmp(t);
+ tmp.AddDelta(delta);
+ return tmp;
+}
+
+MonoTime operator-(const MonoTime& t, const MonoDelta& delta) {
+ MonoTime tmp(t);
+ tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds()));
+ return tmp;
+}
+
+MonoDelta operator-(const MonoTime& t_end, const MonoTime& t_beg) {
+ return t_end.GetDeltaSince(t_beg);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/monotime.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/monotime.h b/be/src/kudu/util/monotime.h
new file mode 100644
index 0000000..bb8ec35
--- /dev/null
+++ b/be/src/kudu/util/monotime.h
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MONOTIME_H
+#define KUDU_UTIL_MONOTIME_H
+
+// NOTE: using stdint.h instead of cstdint because this file is supposed
+// to be processed by a compiler lacking C++11 support.
+#include <stdint.h>
+
+#include <string>
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/port.h"
+#else
+// This is a poor module interdependency, but the stubs are header-only and
+// it's only for exported header builds, so we'll make an exception.
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+
+namespace kudu {
+
+/// @brief A representation of a time interval.
+///
+/// The MonoDelta class represents an elapsed duration of time -- i.e.
+/// the delta between two MonoTime instances.
+class KUDU_EXPORT MonoDelta {
+ public:
+ /// @name Converters from seconds representation (and ubiquitous SI prefixes).
+ ///
+ /// @param [in] seconds/ms/us/ns
+ /// Time interval representation in seconds (with ubiquitous SI prefixes).
+ /// @return The resulting MonoDelta object initialized in accordance with
+ /// the specified parameter.
+ ///
+ ///@{
+ static MonoDelta FromSeconds(double seconds);
+ static MonoDelta FromMilliseconds(int64_t ms);
+ static MonoDelta FromMicroseconds(int64_t us);
+ static MonoDelta FromNanoseconds(int64_t ns);
+ ///@}
+
+ /// Build a MonoDelta object.
+ ///
+ /// @note A MonoDelta instance built with the this default constructor is
+ /// "uninitialized" and may not be used for any operation.
+ MonoDelta();
+
+ /// @return @c true iff this object is initialized.
+ bool Initialized() const;
+
+ /// Check whether this time interval is shorter than the specified one.
+ ///
+ /// @param [in] rhs
+ /// A time interval for comparison.
+ /// @return @c true iff this time interval is strictly shorter
+ /// than the specified one.
+ bool LessThan(const MonoDelta &rhs) const;
+
+ /// Check whether this time interval is longer than the specified one.
+ ///
+ /// @param [in] rhs
+ /// A time interval for comparison.
+ /// @return @c true iff this time interval is strictly longer
+ /// than the specified one.
+ bool MoreThan(const MonoDelta &rhs) const;
+
+ /// Check whether this time interval has the same duration
+ /// as the specified one.
+ ///
+ /// @param [in] rhs
+ /// A time interval for comparison.
+ /// @return @c true iff this time interval has the same duration as the
+ /// the specified one.
+ bool Equals(const MonoDelta &rhs) const;
+
+ /// @return String representation of this interval's duration (in seconds).
+ std::string ToString() const;
+
+ /// @name Converters into seconds representation (and ubiquitous SI prefixes).
+ ///
+ /// @return Representation of the time interval in appropriate SI units.
+ ///
+ ///@{
+ double ToSeconds() const;
+ int64_t ToMilliseconds() const;
+ int64_t ToMicroseconds() const;
+ int64_t ToNanoseconds() const;
+ ///@}
+
+ /// Represent this time interval as a timeval structure, with microsecond
+ /// accuracy.
+ ///
+ /// @param [out] tv
+ /// Placeholder for the result value.
+ void ToTimeVal(struct timeval *tv) const;
+
+ /// Represent this time interval as a timespec structure, with nanosecond
+ /// accuracy.
+ ///
+ /// @param [out] ts
+ /// Placeholder for the result value.
+ void ToTimeSpec(struct timespec *ts) const;
+
+ /// Convert a nanosecond value to a timespec.
+ ///
+ /// @param [in] nanos
+ /// Representation of a relative point in time in nanoseconds.
+ /// @param [out] ts
+ /// Placeholder for the resulting timespec representation.
+ static void NanosToTimeSpec(int64_t nanos, struct timespec* ts);
+
+ private:
+ static const int64_t kUninitialized;
+
+ friend class MonoTime;
+ FRIEND_TEST(TestMonoTime, TestDeltaConversions);
+
+ explicit MonoDelta(int64_t delta);
+ int64_t nano_delta_;
+};
+
+/// @brief Representation of a particular point in time.
+///
+/// The MonoTime class represents a particular point in time,
+/// relative to some fixed but unspecified reference point.
+///
+/// This time is monotonic, meaning that if the user changes his or her system
+/// clock, the monotime does not change.
+class KUDU_EXPORT MonoTime {
+ public:
+ /// @name Conversion constants for ubiquitous time units.
+ ///
+ ///@{
+ static const int64_t kNanosecondsPerSecond = 1000000000L;
+ static const int64_t kNanosecondsPerMillisecond = 1000000L;
+ static const int64_t kNanosecondsPerMicrosecond = 1000L;
+
+ static const int64_t kMicrosecondsPerSecond = 1000000L;
+ ///@}
+
+ /// Get current time in MonoTime representation.
+ ///
+ /// @return Time specification for the moment of the method's invocation.
+ static MonoTime Now();
+
+ /// @return MonoTime equal to farthest possible time into the future.
+ static MonoTime Max();
+
+ /// @return MonoTime equal to farthest possible time into the past.
+ static MonoTime Min();
+
+ /// Select the earliest between the specified time points.
+ ///
+ /// @param [in] a
+ /// The first MonoTime object to select from.
+ /// @param [in] b
+ /// The second MonoTime object to select from.
+ /// @return The earliest (minimum) of the two monotimes.
+ static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b)
+ ATTRIBUTE_DEPRECATED("use std::min() instead");
+
+ /// Build a MonoTime object. The resulting object is not initialized
+ /// and not ready to use.
+ MonoTime();
+
+ /// @return @c true iff the object is initialized.
+ bool Initialized() const;
+
+ /// Compute time interval between the point in time specified by this
+ /// and the specified object.
+ ///
+ /// @param [in] rhs
+ /// The object that corresponds to the left boundary of the time interval,
+ /// where this object corresponds to the right boundary of the interval.
+ /// @return The resulting time interval represented as a MonoDelta object.
+ MonoDelta GetDeltaSince(const MonoTime &rhs) const;
+
+ /// Advance this object's time specification by the specified interval.
+ ///
+ /// @param [in] delta
+ /// The time interval to add.
+ void AddDelta(const MonoDelta &delta);
+
+ /// Check whether the point in time specified by this object is earlier
+ /// than the specified one.
+ ///
+ /// @param [in] rhs
+ /// The other MonoTime object to compare with.
+ /// @return @c true iff the point in time represented by this MonoTime object
+ /// is earlier then the point in time represented by the parameter.
+ bool ComesBefore(const MonoTime &rhs) const;
+
+ /// @return String representation of the object (in seconds).
+ std::string ToString() const;
+
+ /// Represent this point in time as a timespec structure, with nanosecond
+ /// accuracy.
+ ///
+ /// @param [out] ts
+ /// Placeholder for the result value.
+ void ToTimeSpec(struct timespec* ts) const;
+
+ /// Check whether this object represents the same point in time as the other.
+ ///
+ /// @param [in] other
+ /// The other MonoTime object to compare.
+ /// @return @c true iff the point in time represented by this MonoTime object
+ /// is the same as the one represented by the other.
+ bool Equals(const MonoTime& other) const;
+
+ /// @name Syntactic sugar: increment/decrement operators for MonoTime.
+ ///@{
+ ///
+ /// Add a delta to the point in time represented by the object.
+ ///
+ /// @param [in] delta
+ /// The delta to add.
+ /// @return Reference to the modified object.
+ MonoTime& operator+=(const MonoDelta& delta);
+
+ /// Substract a delta from the point in time represented by the object.
+ ///
+ /// @param [in] delta
+ /// The delta to substract.
+ /// @return Reference to the modified object.
+ MonoTime& operator-=(const MonoDelta& delta);
+ ///@}
+
+ private:
+ friend class MonoDelta;
+ FRIEND_TEST(TestMonoTime, TestTimeSpec);
+ FRIEND_TEST(TestMonoTime, TestDeltaConversions);
+
+ explicit MonoTime(const struct timespec &ts);
+ explicit MonoTime(int64_t nanos);
+ double ToSeconds() const;
+ int64_t nanos_;
+};
+
+/// Sleep for an interval specified by a MonoDelta instance.
+///
+/// This is preferred over sleep(3), usleep(3), and nanosleep(3).
+/// It's less prone to mixups with units since it uses the MonoDelta for
+/// interval specification.
+/// Besides, it ignores signals/EINTR, so will reliably sleep at least for the
+/// MonoDelta duration.
+///
+/// @param [in] delta
+/// The time interval to sleep for.
+void KUDU_EXPORT SleepFor(const MonoDelta& delta);
+
+/// @name Syntactic sugar: binary operators for MonoDelta.
+///@{
+///
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is equal
+/// to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator==(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is not equal
+/// to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator!=(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is shorter
+/// than the time interval represented by @c rhs.
+bool KUDU_EXPORT operator<(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is shorter
+/// than or equal to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator<=(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is longer
+/// than the time interval represented by @c rhs.
+bool KUDU_EXPORT operator>(const MonoDelta &lhs, const MonoDelta &rhs);
+
+/// @param [in] lhs
+/// A time interval for comparison: the left-hand operand.
+/// @param [in] rhs
+/// A time interval for comparison: the right-hand operand.
+/// @return @c true iff the time interval represented by @c lhs is longer
+/// than or equal to the time interval represented by @c rhs.
+bool KUDU_EXPORT operator>=(const MonoDelta &lhs, const MonoDelta &rhs);
+///@}
+
+/// @name Syntactic sugar: binary operators for MonoTime.
+///@{
+///
+/// Check if the specified objects represent the same point in time.
+///
+/// This is a handy operator which is semantically equivalent to
+/// MonoTime::Equals().
+///
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the given objects represent the same point in time.
+bool KUDU_EXPORT operator==(const MonoTime& lhs, const MonoTime& rhs);
+
+/// Check if the specified objects represent different points in time.
+///
+/// This is a handy operator which is semantically equivalent to the negation of
+/// MonoTime::Equals().
+///
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the given object represents a different point in time
+/// than the specified one.
+bool KUDU_EXPORT operator!=(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the @c lhs object represents an earlier point in time
+/// than the @c rhs object.
+bool KUDU_EXPORT operator<(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the @c lhs object represents an earlier than or
+/// the same point in time as the @c rhs object.
+bool KUDU_EXPORT operator<=(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the @c lhs object represents a later point in time
+/// than the @c rhs object.
+bool KUDU_EXPORT operator>(const MonoTime& lhs, const MonoTime& rhs);
+
+/// @param [in] lhs
+/// The left-hand operand.
+/// @param [in] rhs
+/// The right-hand operand.
+/// @return @c true iff the @c lhs object represents a later than or
+/// the same point in time as the @c rhs object.
+bool KUDU_EXPORT operator>=(const MonoTime& lhs, const MonoTime& rhs);
+///@}
+
+/// @name Syntactic sugar: mixed binary operators for MonoTime/MonoDelta.
+///@{
+///
+/// Add the specified time interval to the given point in time.
+///
+/// @param [in] t
+/// A MonoTime object representing the given point in time.
+/// @param [in] delta
+/// A MonoDelta object representing the specified time interval.
+/// @return A MonoTime object representing the resulting point in time.
+MonoTime KUDU_EXPORT operator+(const MonoTime& t, const MonoDelta& delta);
+
+/// Subtract the specified time interval from the given point in time.
+///
+/// @param [in] t
+/// A MonoTime object representing the given point in time.
+/// @param [in] delta
+/// A MonoDelta object representing the specified time interval.
+/// @return A MonoTime object representing the resulting point in time.
+MonoTime KUDU_EXPORT operator-(const MonoTime& t, const MonoDelta& delta);
+
+/// Compute the time interval between the specified points in time.
+///
+/// Semantically, this is equivalent to t0.GetDeltaSince(t1).
+///
+/// @param [in] t_end
+/// The second point in time. Semantically corresponds to the end
+/// of the resulting time interval.
+/// @param [in] t_beg
+/// The first point in time. Semantically corresponds to the beginning
+/// of the resulting time interval.
+/// @return A MonoDelta object representing the time interval between the
+/// specified points in time.
+MonoDelta KUDU_EXPORT operator-(const MonoTime& t_end, const MonoTime& t_begin);
+///@}
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mt-hdr_histogram-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-hdr_histogram-test.cc b/be/src/kudu/util/mt-hdr_histogram-test.cc
new file mode 100644
index 0000000..7221644
--- /dev/null
+++ b/be/src/kudu/util/mt-hdr_histogram-test.cc
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/hdr_histogram.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(histogram_test_num_threads, 16,
+ "Number of threads to spawn for mt-hdr_histogram test");
+DEFINE_uint64(histogram_test_num_increments_per_thread, 100000LU,
+ "Number of times to call Increment() per thread in mt-hdr_histogram test");
+
+using std::vector;
+
+namespace kudu {
+
+class MtHdrHistogramTest : public KuduTest {
+ public:
+ MtHdrHistogramTest() {
+ num_threads_ = FLAGS_histogram_test_num_threads;
+ num_times_ = FLAGS_histogram_test_num_increments_per_thread;
+ }
+
+ protected:
+ int num_threads_;
+ uint64_t num_times_;
+};
+
+// Increment a counter a bunch of times in the same bucket
+static void IncrementSameHistValue(HdrHistogram* hist, uint64_t value, uint64_t times) {
+ for (uint64_t i = 0; i < times; i++) {
+ hist->Increment(value);
+ }
+}
+
+TEST_F(MtHdrHistogramTest, ConcurrentWriteTest) {
+ const uint64_t kValue = 1LU;
+
+ HdrHistogram hist(100000LU, 3);
+
+ auto threads = new scoped_refptr<kudu::Thread>[num_threads_];
+ for (int i = 0; i < num_threads_; i++) {
+ CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i),
+ IncrementSameHistValue, &hist, kValue, num_times_, &threads[i]));
+ }
+ for (int i = 0; i < num_threads_; i++) {
+ CHECK_OK(ThreadJoiner(threads[i].get()).Join());
+ }
+
+ HdrHistogram snapshot(hist);
+ ASSERT_EQ(num_threads_ * num_times_, snapshot.CountInBucketForValue(kValue));
+
+ delete[] threads;
+}
+
+// Copy while writing, then iterate to ensure copies are consistent.
+TEST_F(MtHdrHistogramTest, ConcurrentCopyWhileWritingTest) {
+ const int kNumCopies = 10;
+ const uint64_t kValue = 1;
+
+ HdrHistogram hist(100000LU, 3);
+
+ auto threads = new scoped_refptr<kudu::Thread>[num_threads_];
+ for (int i = 0; i < num_threads_; i++) {
+ CHECK_OK(kudu::Thread::Create("test", strings::Substitute("thread-$0", i),
+ IncrementSameHistValue, &hist, kValue, num_times_, &threads[i]));
+ }
+
+ // This is somewhat racy but the goal is to catch this issue at least
+ // most of the time. At the time of this writing, before fixing a bug where
+ // the total count stored in a copied histogram may not match its internal
+ // counts (under concurrent writes), this test fails for me on 100/100 runs.
+ vector<HdrHistogram *> snapshots;
+ ElementDeleter deleter(&snapshots);
+ for (int i = 0; i < kNumCopies; i++) {
+ snapshots.push_back(new HdrHistogram(hist));
+ SleepFor(MonoDelta::FromMicroseconds(100));
+ }
+ for (int i = 0; i < kNumCopies; i++) {
+ snapshots[i]->MeanValue(); // Will crash if underlying iterator is inconsistent.
+ }
+
+ for (int i = 0; i < num_threads_; i++) {
+ CHECK_OK(ThreadJoiner(threads[i].get()).Join());
+ }
+
+ delete[] threads;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mt-metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-metrics-test.cc b/be/src/kudu/util/mt-metrics-test.cc
new file mode 100644
index 0000000..2480d57
--- /dev/null
+++ b/be/src/kudu/util/mt-metrics-test.cc
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <boost/function.hpp> // IWYU pragma: keep
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(mt_metrics_test_num_threads, 4,
+ "Number of threads to spawn in mt metrics tests");
+
+METRIC_DEFINE_entity(test_entity);
+
+namespace kudu {
+
+using debug::ScopedLeakCheckDisabler;
+using std::string;
+using std::vector;
+
+class MultiThreadedMetricsTest : public KuduTest {
+ public:
+ static void RegisterCounters(const scoped_refptr<MetricEntity>& metric_entity,
+ const string& name_prefix, int num_counters);
+
+ MetricRegistry registry_;
+};
+
+// Call increment on a Counter a bunch of times.
+static void CountWithCounter(scoped_refptr<Counter> counter, int num_increments) {
+ for (int i = 0; i < num_increments; i++) {
+ counter->Increment();
+ }
+}
+
+// Helper function that spawns and then joins a bunch of threads.
+static void RunWithManyThreads(boost::function<void()>* f, int num_threads) {
+ vector<scoped_refptr<kudu::Thread> > threads;
+ for (int i = 0; i < num_threads; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ CHECK_OK(kudu::Thread::Create("test", StringPrintf("thread%d", i),
+ *f, &new_thread));
+ threads.push_back(new_thread);
+ }
+ for (int i = 0; i < num_threads; i++) {
+ ASSERT_OK(ThreadJoiner(threads[i].get()).Join());
+ }
+}
+
+METRIC_DEFINE_counter(test_entity, test_counter, "Test Counter",
+ MetricUnit::kRequests, "Test counter");
+
+// Ensure that incrementing a counter is thread-safe.
+TEST_F(MultiThreadedMetricsTest, CounterIncrementTest) {
+ scoped_refptr<Counter> counter = new Counter(&METRIC_test_counter);
+ int num_threads = FLAGS_mt_metrics_test_num_threads;
+ int num_increments = 1000;
+ boost::function<void()> f =
+ boost::bind(CountWithCounter, counter, num_increments);
+ RunWithManyThreads(&f, num_threads);
+ ASSERT_EQ(num_threads * num_increments, counter->value());
+}
+
+// Helper function to register a bunch of counters in a loop.
+void MultiThreadedMetricsTest::RegisterCounters(
+ const scoped_refptr<MetricEntity>& metric_entity,
+ const string& name_prefix,
+ int num_counters) {
+ uint64_t tid = Env::Default()->gettid();
+ for (int i = 0; i < num_counters; i++) {
+ // This loop purposefully leaks metrics prototypes, because the metrics system
+ // expects the prototypes and their names to live forever. This is the only
+ // place we dynamically generate them for the purposes of a test, so it's easier
+ // to just leak them than to figure out a way to manage lifecycle of objects that
+ // are typically static.
+ ScopedLeakCheckDisabler disabler;
+
+ string name = strings::Substitute("$0-$1-$2", name_prefix, tid, i);
+ auto proto = new CounterPrototype(MetricPrototype::CtorArgs(
+ "test_entity", strdup(name.c_str()), "Test Counter",
+ MetricUnit::kOperations, "test counter"));
+ proto->Instantiate(metric_entity)->Increment();
+ }
+}
+
+// Ensure that adding a counter to a registry is thread-safe.
+TEST_F(MultiThreadedMetricsTest, AddCounterToRegistryTest) {
+ scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(®istry_, "my-test");
+ int num_threads = FLAGS_mt_metrics_test_num_threads;
+ int num_counters = 1000;
+ boost::function<void()> f =
+ boost::bind(RegisterCounters, entity, "prefix", num_counters);
+ RunWithManyThreads(&f, num_threads);
+ ASSERT_EQ(num_threads * num_counters, entity->UnsafeMetricsMapForTests().size());
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mt-threadlocal-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mt-threadlocal-test.cc b/be/src/kudu/util/mt-threadlocal-test.cc
new file mode 100644
index 0000000..8f0b23b
--- /dev/null
+++ b/be/src/kudu/util/mt-threadlocal-test.cc
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <vector>
+#include <unordered_set>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadlocal.h"
+#include "kudu/util/threadlocal_cache.h"
+
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace threadlocal {
+
+class ThreadLocalTest : public KuduTest {};
+
+const int kTargetCounterVal = 1000000;
+
+class Counter;
+typedef unordered_set<Counter*> CounterPtrSet;
+typedef Mutex RegistryLockType;
+typedef simple_spinlock CounterLockType;
+
+// Registry to provide reader access to the thread-local Counters.
+// The methods are only thread-safe if the calling thread holds the lock.
+class CounterRegistry {
+ public:
+ CounterRegistry() {
+ }
+
+ RegistryLockType* get_lock() const {
+ return &lock_;
+ }
+
+ bool RegisterUnlocked(Counter* counter) {
+ LOG(INFO) << "Called RegisterUnlocked()";
+ return InsertIfNotPresent(&counters_, counter);
+ }
+
+ bool UnregisterUnlocked(Counter* counter) {
+ LOG(INFO) << "Called UnregisterUnlocked()";
+ return counters_.erase(counter) > 0;
+ }
+
+ CounterPtrSet* GetCountersUnlocked() {
+ return &counters_;
+ }
+
+ private:
+ mutable RegistryLockType lock_;
+ CounterPtrSet counters_;
+ DISALLOW_COPY_AND_ASSIGN(CounterRegistry);
+};
+
+// A simple Counter class that registers itself with a CounterRegistry.
+class Counter {
+ public:
+ Counter(CounterRegistry* registry, int val)
+ : tid_(Env::Default()->gettid()),
+ registry_(CHECK_NOTNULL(registry)),
+ val_(val) {
+ LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this << ", val = " << val_;
+ std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock());
+ CHECK(registry_->RegisterUnlocked(this));
+ }
+
+ ~Counter() {
+ LOG(INFO) << "Counter::~Counter(): tid = " << tid_ << ", addr = " << this << ", val = " << val_;
+ std::lock_guard<RegistryLockType> reg_lock(*registry_->get_lock());
+ std::lock_guard<CounterLockType> self_lock(lock_);
+ LOG(INFO) << tid_ << ": deleting self from registry...";
+ CHECK(registry_->UnregisterUnlocked(this));
+ }
+
+ uint64_t tid() {
+ return tid_;
+ }
+
+ CounterLockType* get_lock() const {
+ return &lock_;
+ }
+
+ void IncrementUnlocked() {
+ val_++;
+ }
+
+ int GetValueUnlocked() {
+ return val_;
+ }
+
+ private:
+ // We expect that most of the time this lock will be uncontended.
+ mutable CounterLockType lock_;
+
+ // TID of thread that constructed this object.
+ const uint64_t tid_;
+
+ // Register / unregister ourselves with this on construction / destruction.
+ CounterRegistry* const registry_;
+
+ // Current value of the counter.
+ int val_;
+
+ DISALLOW_COPY_AND_ASSIGN(Counter);
+};
+
+// Create a new THREAD_LOCAL Counter and loop an increment operation on it.
+static void RegisterCounterAndLoopIncr(CounterRegistry* registry,
+ CountDownLatch* counters_ready,
+ CountDownLatch* reader_ready,
+ CountDownLatch* counters_done,
+ CountDownLatch* reader_done) {
+ BLOCK_STATIC_THREAD_LOCAL(Counter, counter, registry, 0);
+ // Inform the reader that we are alive.
+ counters_ready->CountDown();
+ // Let the reader initialize before we start counting.
+ reader_ready->Wait();
+ // Now rock & roll on the counting loop.
+ for (int i = 0; i < kTargetCounterVal; i++) {
+ std::lock_guard<CounterLockType> l(*counter->get_lock());
+ counter->IncrementUnlocked();
+ }
+ // Let the reader know we're ready for him to verify our counts.
+ counters_done->CountDown();
+ // Wait until the reader is done before we exit the thread, which will call
+ // delete on the Counter.
+ reader_done->Wait();
+}
+
+// Iterate over the registered counters and their values.
+static uint64_t Iterate(CounterRegistry* registry, int expected_counters) {
+ uint64_t sum = 0;
+ int seen_counters = 0;
+ std::lock_guard<RegistryLockType> l(*registry->get_lock());
+ for (Counter* counter : *registry->GetCountersUnlocked()) {
+ uint64_t value;
+ {
+ std::lock_guard<CounterLockType> l(*counter->get_lock());
+ value = counter->GetValueUnlocked();
+ }
+ LOG(INFO) << "tid " << counter->tid() << " (counter " << counter << "): " << value;
+ sum += value;
+ seen_counters++;
+ }
+ CHECK_EQ(expected_counters, seen_counters);
+ return sum;
+}
+
+static void TestThreadLocalCounters(CounterRegistry* registry, const int num_threads) {
+ LOG(INFO) << "Starting threads...";
+ vector<scoped_refptr<kudu::Thread> > threads;
+
+ CountDownLatch counters_ready(num_threads);
+ CountDownLatch reader_ready(1);
+ CountDownLatch counters_done(num_threads);
+ CountDownLatch reader_done(1);
+ for (int i = 0; i < num_threads; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+ &RegisterCounterAndLoopIncr, registry, &counters_ready, &reader_ready,
+ &counters_done, &reader_done, &new_thread));
+ threads.push_back(new_thread);
+ }
+
+ // Wait for all threads to start and register their Counters.
+ counters_ready.Wait();
+ CHECK_EQ(0, Iterate(registry, num_threads));
+ LOG(INFO) << "--";
+
+ // Let the counters start spinning.
+ reader_ready.CountDown();
+
+ // Try to catch them in the act, just for kicks.
+ for (int i = 0; i < 2; i++) {
+ Iterate(registry, num_threads);
+ LOG(INFO) << "--";
+ SleepFor(MonoDelta::FromMicroseconds(1));
+ }
+
+ // Wait until they're done and assure they sum up properly.
+ counters_done.Wait();
+ LOG(INFO) << "Checking Counter sums...";
+ CHECK_EQ(kTargetCounterVal * num_threads, Iterate(registry, num_threads));
+ LOG(INFO) << "Counter sums add up!";
+ reader_done.CountDown();
+
+ LOG(INFO) << "Joining & deleting threads...";
+ for (scoped_refptr<kudu::Thread> thread : threads) {
+ CHECK_OK(ThreadJoiner(thread.get()).Join());
+ }
+ LOG(INFO) << "Done.";
+}
+
+TEST_F(ThreadLocalTest, TestConcurrentCounters) {
+ // Run this multiple times to ensure we don't leave remnants behind in the
+ // CounterRegistry.
+ CounterRegistry registry;
+ for (int i = 0; i < 3; i++) {
+ TestThreadLocalCounters(®istry, 8);
+ }
+}
+
+// Test class that stores a string in a static thread local member.
+// This class cannot be instantiated. The methods are all static.
+class ThreadLocalString {
+ public:
+ static void set(std::string value);
+ static const std::string& get();
+ private:
+ ThreadLocalString() {
+ }
+ DECLARE_STATIC_THREAD_LOCAL(std::string, value_);
+ DISALLOW_COPY_AND_ASSIGN(ThreadLocalString);
+};
+
+DEFINE_STATIC_THREAD_LOCAL(std::string, ThreadLocalString, value_);
+
+void ThreadLocalString::set(std::string value) {
+ INIT_STATIC_THREAD_LOCAL(std::string, value_);
+ *value_ = value;
+}
+
+const std::string& ThreadLocalString::get() {
+ INIT_STATIC_THREAD_LOCAL(std::string, value_);
+ return *value_;
+}
+
+static void RunAndAssign(CountDownLatch* writers_ready,
+ CountDownLatch *readers_ready,
+ CountDownLatch *all_done,
+ CountDownLatch *threads_exiting,
+ const std::string& in,
+ std::string* out) {
+ writers_ready->Wait();
+ // Ensure it starts off as an empty string.
+ CHECK_EQ("", ThreadLocalString::get());
+ ThreadLocalString::set(in);
+
+ readers_ready->Wait();
+ out->assign(ThreadLocalString::get());
+ all_done->Wait();
+ threads_exiting->CountDown();
+}
+
+TEST_F(ThreadLocalTest, TestTLSMember) {
+ const int num_threads = 8;
+
+ vector<CountDownLatch*> writers_ready;
+ vector<CountDownLatch*> readers_ready;
+ vector<std::string*> out_strings;
+ vector<scoped_refptr<kudu::Thread> > threads;
+
+ ElementDeleter writers_deleter(&writers_ready);
+ ElementDeleter readers_deleter(&readers_ready);
+ ElementDeleter out_strings_deleter(&out_strings);
+
+ CountDownLatch all_done(1);
+ CountDownLatch threads_exiting(num_threads);
+
+ LOG(INFO) << "Starting threads...";
+ for (int i = 0; i < num_threads; i++) {
+ writers_ready.push_back(new CountDownLatch(1));
+ readers_ready.push_back(new CountDownLatch(1));
+ out_strings.push_back(new std::string());
+ scoped_refptr<kudu::Thread> new_thread;
+ CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+ &RunAndAssign, writers_ready[i], readers_ready[i],
+ &all_done, &threads_exiting, Substitute("$0", i), out_strings[i], &new_thread));
+ threads.push_back(new_thread);
+ }
+
+ // Unlatch the threads in order.
+ LOG(INFO) << "Writing to thread locals...";
+ for (int i = 0; i < num_threads; i++) {
+ writers_ready[i]->CountDown();
+ }
+ LOG(INFO) << "Reading from thread locals...";
+ for (int i = 0; i < num_threads; i++) {
+ readers_ready[i]->CountDown();
+ }
+ all_done.CountDown();
+ // threads_exiting acts as a memory barrier.
+ threads_exiting.Wait();
+ for (int i = 0; i < num_threads; i++) {
+ ASSERT_EQ(Substitute("$0", i), *out_strings[i]);
+ LOG(INFO) << "Read " << *out_strings[i];
+ }
+
+ LOG(INFO) << "Joining & deleting threads...";
+ for (scoped_refptr<kudu::Thread> thread : threads) {
+ CHECK_OK(ThreadJoiner(thread.get()).Join());
+ }
+}
+
+TEST_F(ThreadLocalTest, TestThreadLocalCache) {
+ using TLC = ThreadLocalCache<int, string>;
+ TLC* tlc = TLC::GetInstance();
+
+ // Lookup in an empty cache should return nullptr.
+ ASSERT_EQ(nullptr, tlc->Lookup(0));
+
+ // Insert more items than the cache capacity.
+ const int kLastItem = TLC::kItemCapacity * 2;
+ for (int i = 1; i <= kLastItem ; i++) {
+ auto* item = tlc->EmplaceNew(i);
+ ASSERT_NE(nullptr, item);
+ *item = Substitute("item $0", i);
+ }
+
+ // Looking up the most recent items should return them.
+ string* item = tlc->Lookup(kLastItem);
+ ASSERT_NE(nullptr, item);
+ EXPECT_EQ(*item, Substitute("item $0", kLastItem));
+
+ // Looking up evicted items should return nullptr.
+ ASSERT_EQ(nullptr, tlc->Lookup(1));
+}
+
+} // namespace threadlocal
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mutex.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mutex.cc b/be/src/kudu/util/mutex.cc
new file mode 100644
index 0000000..d55ccaf
--- /dev/null
+++ b/be/src/kudu/util/mutex.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Portions (c) 2011 The Chromium Authors.
+
+#include "kudu/util/mutex.h"
+
+#include <cerrno>
+#include <cstdint>
+#include <cstring>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/trace.h"
+
+using std::string;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+#ifndef NDEBUG
+DEFINE_bool(debug_mutex_collect_stacktrace, false,
+ "Whether to collect a stacktrace on Mutex contention in a DEBUG build");
+TAG_FLAG(debug_mutex_collect_stacktrace, advanced);
+TAG_FLAG(debug_mutex_collect_stacktrace, hidden);
+#endif
+
+namespace kudu {
+
+Mutex::Mutex()
+#ifndef NDEBUG
+ : owning_tid_(0),
+ stack_trace_(new StackTrace())
+#endif
+{
+#ifndef NDEBUG
+ // In debug, setup attributes for lock error checking.
+ pthread_mutexattr_t mta;
+ int rv = pthread_mutexattr_init(&mta);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+ rv = pthread_mutexattr_settype(&mta, PTHREAD_MUTEX_ERRORCHECK);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+ rv = pthread_mutex_init(&native_handle_, &mta);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+ rv = pthread_mutexattr_destroy(&mta);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+#else
+ // In release, go with the default lock attributes.
+ pthread_mutex_init(&native_handle_, NULL);
+#endif
+}
+
+Mutex::~Mutex() {
+ int rv = pthread_mutex_destroy(&native_handle_);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+}
+
+bool Mutex::TryAcquire() {
+ int rv = pthread_mutex_trylock(&native_handle_);
+#ifndef NDEBUG
+ DCHECK(rv == 0 || rv == EBUSY) << ". " << strerror(rv) << ". " << GetOwnerThreadInfo();
+ if (rv == 0) {
+ CheckUnheldAndMark();
+ }
+#endif
+ return rv == 0;
+}
+
+void Mutex::Acquire() {
+ // Optimize for the case when mutexes are uncontended. If they
+ // are contended, we'll have to go to sleep anyway, so the extra
+ // cost of branch mispredictions is moot.
+ //
+ // TryAcquire() is implemented as a simple CompareAndSwap inside
+ // pthreads so this does not require a system call.
+ if (PREDICT_TRUE(TryAcquire())) {
+ return;
+ }
+
+ // If we weren't able to acquire the mutex immediately, then it's
+ // worth gathering timing information about the mutex acquisition.
+ MicrosecondsInt64 start_time = GetMonoTimeMicros();
+ int rv = pthread_mutex_lock(&native_handle_);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv)
+#ifndef NDEBUG
+ << ". " << GetOwnerThreadInfo()
+#endif
+ ; // NOLINT(whitespace/semicolon)
+ MicrosecondsInt64 end_time = GetMonoTimeMicros();
+
+ int64_t wait_time = end_time - start_time;
+ if (wait_time > 0) {
+ TRACE_COUNTER_INCREMENT("mutex_wait_us", wait_time);
+ }
+
+#ifndef NDEBUG
+ CheckUnheldAndMark();
+#endif
+}
+
+void Mutex::Release() {
+#ifndef NDEBUG
+ CheckHeldAndUnmark();
+#endif
+ int rv = pthread_mutex_unlock(&native_handle_);
+ DCHECK_EQ(0, rv) << ". " << strerror(rv);
+}
+
+#ifndef NDEBUG
+void Mutex::AssertAcquired() const {
+ DCHECK_EQ(Env::Default()->gettid(), owning_tid_);
+}
+
+void Mutex::CheckHeldAndUnmark() {
+ AssertAcquired();
+ owning_tid_ = 0;
+ if (FLAGS_debug_mutex_collect_stacktrace) {
+ stack_trace_->Reset();
+ }
+}
+
+void Mutex::CheckUnheldAndMark() {
+ DCHECK_EQ(0, owning_tid_);
+ owning_tid_ = Env::Default()->gettid();
+ if (FLAGS_debug_mutex_collect_stacktrace) {
+ stack_trace_->Collect();
+ }
+}
+
+string Mutex::GetOwnerThreadInfo() const {
+ string str = Substitute("Owner tid: $0; Self tid: $1; ", owning_tid_, Env::Default()->gettid());
+ if (FLAGS_debug_mutex_collect_stacktrace) {
+ SubstituteAndAppend(&str, "Owner stack:\n$0", stack_trace_->Symbolize());
+ } else {
+ str += "To collect the owner stack trace, enable the flag --debug_mutex_collect_stacktrace";
+ }
+ return str;
+}
+
+#endif
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mutex.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mutex.h b/be/src/kudu/util/mutex.h
new file mode 100644
index 0000000..9277ac0
--- /dev/null
+++ b/be/src/kudu/util/mutex.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MUTEX_H
+#define KUDU_UTIL_MUTEX_H
+
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+class StackTrace;
+
+// A lock built around pthread_mutex_t. Does not allow recursion.
+//
+// The following checks will be performed in DEBUG mode:
+// Acquire(), TryAcquire() - the lock isn't already held.
+// Release() - the lock is already held by this thread.
+//
+class Mutex {
+ public:
+ Mutex();
+ ~Mutex();
+
+ void Acquire();
+ void Release();
+ bool TryAcquire();
+
+ void lock() { Acquire(); }
+ void unlock() { Release(); }
+ bool try_lock() { return TryAcquire(); }
+
+#ifndef NDEBUG
+ void AssertAcquired() const;
+#else
+ void AssertAcquired() const {}
+#endif
+
+ private:
+ friend class ConditionVariable;
+
+ pthread_mutex_t native_handle_;
+
+#ifndef NDEBUG
+ // Members and routines taking care of locks assertions.
+ void CheckHeldAndUnmark();
+ void CheckUnheldAndMark();
+ std::string GetOwnerThreadInfo() const;
+
+ // All private data is implicitly protected by native_handle_.
+ // Be VERY careful to only access members under that lock.
+ pid_t owning_tid_;
+ gscoped_ptr<StackTrace> stack_trace_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(Mutex);
+};
+
+// A helper class that acquires the given Lock while the MutexLock is in scope.
+class MutexLock {
+ public:
+ struct AlreadyAcquired {};
+
+ // Acquires 'lock' (must be unheld) and wraps around it.
+ //
+ // Sample usage:
+ // {
+ // MutexLock l(lock_); // acquired
+ // ...
+ // } // released
+ explicit MutexLock(Mutex& lock)
+ : lock_(&lock),
+ owned_(true) {
+ lock_->Acquire();
+ }
+
+ // Wraps around 'lock' (must already be held by this thread).
+ //
+ // Sample usage:
+ // {
+ // lock_.Acquire(); // acquired
+ // ...
+ // MutexLock l(lock_, AlreadyAcquired());
+ // ...
+ // } // released
+ MutexLock(Mutex& lock, const AlreadyAcquired&)
+ : lock_(&lock),
+ owned_(true) {
+ lock_->AssertAcquired();
+ }
+
+ void Lock() {
+ DCHECK(!owned_);
+ lock_->Acquire();
+ owned_ = true;
+ }
+
+ void Unlock() {
+ DCHECK(owned_);
+ lock_->AssertAcquired();
+ lock_->Release();
+ owned_ = false;
+ }
+
+ ~MutexLock() {
+ if (owned_) {
+ Unlock();
+ }
+ }
+
+ bool OwnsLock() const {
+ return owned_;
+ }
+
+ private:
+ Mutex* lock_;
+ bool owned_;
+ DISALLOW_COPY_AND_ASSIGN(MutexLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_MUTEX_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/dns_resolver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver-test.cc b/be/src/kudu/util/net/dns_resolver-test.cc
new file mode 100644
index 0000000..f08b089
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver-test.cc
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/dns_resolver.h"
+
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+
+class DnsResolverTest : public KuduTest {
+ protected:
+ DnsResolver resolver_;
+};
+
+TEST_F(DnsResolverTest, TestResolution) {
+ vector<Sockaddr> addrs;
+ Synchronizer s;
+ {
+ HostPort hp("localhost", 12345);
+ resolver_.ResolveAddresses(hp, &addrs, s.AsStatusCallback());
+ }
+ ASSERT_OK(s.Wait());
+ ASSERT_TRUE(!addrs.empty());
+ for (const Sockaddr& addr : addrs) {
+ LOG(INFO) << "Address: " << addr.ToString();
+ EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+ EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/dns_resolver.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver.cc b/be/src/kudu/util/net/dns_resolver.cc
new file mode 100644
index 0000000..4803688
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/dns_resolver.h"
+
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+
+DEFINE_int32(dns_num_resolver_threads, 1, "The number of threads to use for DNS resolution");
+TAG_FLAG(dns_num_resolver_threads, advanced);
+
+using std::vector;
+
+namespace kudu {
+
+DnsResolver::DnsResolver() {
+ CHECK_OK(ThreadPoolBuilder("dns-resolver")
+ .set_max_threads(FLAGS_dns_num_resolver_threads)
+ .Build(&pool_));
+}
+
+DnsResolver::~DnsResolver() {
+ pool_->Shutdown();
+}
+
+namespace {
+void DoResolution(const HostPort &hostport, vector<Sockaddr>* addresses,
+ const StatusCallback& cb) {
+ cb.Run(hostport.ResolveAddresses(addresses));
+}
+} // anonymous namespace
+
+void DnsResolver::ResolveAddresses(const HostPort& hostport,
+ vector<Sockaddr>* addresses,
+ const StatusCallback& cb) {
+ Status s = pool_->SubmitFunc(boost::bind(&DoResolution, hostport, addresses, cb));
+ if (!s.ok()) {
+ cb.Run(s);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/dns_resolver.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/dns_resolver.h b/be/src/kudu/util/net/dns_resolver.h
new file mode 100644
index 0000000..06dfa48
--- /dev/null
+++ b/be/src/kudu/util/net/dns_resolver.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_DNS_RESOLVER_H
+#define KUDU_UTIL_NET_DNS_RESOLVER_H
+
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+
+class HostPort;
+class Sockaddr;
+class ThreadPool;
+
+// DNS Resolver which supports async address resolution.
+class DnsResolver {
+ public:
+ DnsResolver();
+ ~DnsResolver();
+
+ // Resolve any addresses corresponding to this host:port pair.
+ // Note that a host may resolve to more than one IP address.
+ //
+ // 'addresses' may be NULL, in which case this function simply checks that
+ // the host/port pair can be resolved, without returning anything.
+ //
+ // When the result is available, or an error occurred, 'cb' is called
+ // with the result Status.
+ //
+ // NOTE: the callback should be fast since it is called by the DNS
+ // resolution thread.
+ // NOTE: in some rare cases, the callback may also be called inline
+ // from this function call, on the caller's thread.
+ void ResolveAddresses(const HostPort& hostport,
+ std::vector<Sockaddr>* addresses,
+ const StatusCallback& cb);
+
+ private:
+ gscoped_ptr<ThreadPool> pool_;
+
+ DISALLOW_COPY_AND_ASSIGN(DnsResolver);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_NET_DNS_RESOLVER_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/net_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util-test.cc b/be/src/kudu/util/net/net_util-test.cc
new file mode 100644
index 0000000..202ec6b
--- /dev/null
+++ b/be/src/kudu/util/net/net_util-test.cc
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class NetUtilTest : public KuduTest {
+ protected:
+ Status DoParseBindAddresses(const string& input, string* result) {
+ vector<Sockaddr> addrs;
+ RETURN_NOT_OK(ParseAddressList(input, kDefaultPort, &addrs));
+ std::sort(addrs.begin(), addrs.end());
+
+ vector<string> addr_strs;
+ for (const Sockaddr& addr : addrs) {
+ addr_strs.push_back(addr.ToString());
+ }
+ *result = JoinStrings(addr_strs, ",");
+ return Status::OK();
+ }
+
+ static const uint16_t kDefaultPort = 7150;
+};
+
+TEST(SockaddrTest, Test) {
+ Sockaddr addr;
+ ASSERT_OK(addr.ParseString("1.1.1.1:12345", 12345));
+ ASSERT_EQ(12345, addr.port());
+ ASSERT_EQ("1.1.1.1", addr.host());
+}
+
+TEST_F(NetUtilTest, TestParseAddresses) {
+ string ret;
+ ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345", &ret));
+ ASSERT_EQ("0.0.0.0:12345", ret);
+
+ ASSERT_OK(DoParseBindAddresses("0.0.0.0", &ret));
+ ASSERT_EQ("0.0.0.0:7150", ret);
+
+ ASSERT_OK(DoParseBindAddresses("0.0.0.0:12345, 0.0.0.0:12346", &ret));
+ ASSERT_EQ("0.0.0.0:12345,0.0.0.0:12346", ret);
+
+ // Test some invalid addresses.
+ Status s = DoParseBindAddresses("0.0.0.0:xyz", &ret);
+ ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+
+ s = DoParseBindAddresses("0.0.0.0:100000", &ret);
+ ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+
+ s = DoParseBindAddresses("0.0.0.0:", &ret);
+ ASSERT_STR_CONTAINS(s.ToString(), "Invalid port");
+}
+
+TEST_F(NetUtilTest, TestResolveAddresses) {
+ HostPort hp("localhost", 12345);
+ vector<Sockaddr> addrs;
+ ASSERT_OK(hp.ResolveAddresses(&addrs));
+ ASSERT_TRUE(!addrs.empty());
+ for (const Sockaddr& addr : addrs) {
+ LOG(INFO) << "Address: " << addr.ToString();
+ EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+ EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+ EXPECT_TRUE(addr.IsAnyLocalAddress());
+ }
+
+ ASSERT_OK(hp.ResolveAddresses(nullptr));
+}
+
+TEST_F(NetUtilTest, TestWithinNetwork) {
+ Sockaddr addr;
+ Network network;
+
+ ASSERT_OK(addr.ParseString("10.0.23.0:12345", 0));
+ ASSERT_OK(network.ParseCIDRString("10.0.0.0/8"));
+ EXPECT_TRUE(network.WithinNetwork(addr));
+
+ ASSERT_OK(addr.ParseString("172.28.3.4:0", 0));
+ ASSERT_OK(network.ParseCIDRString("172.16.0.0/12"));
+ EXPECT_TRUE(network.WithinNetwork(addr));
+
+ ASSERT_OK(addr.ParseString("192.168.0.23", 0));
+ ASSERT_OK(network.ParseCIDRString("192.168.1.14/16"));
+ EXPECT_TRUE(network.WithinNetwork(addr));
+
+ ASSERT_OK(addr.ParseString("8.8.8.8:0", 0));
+ ASSERT_OK(network.ParseCIDRString("0.0.0.0/0"));
+ EXPECT_TRUE(network.WithinNetwork(addr));
+
+ ASSERT_OK(addr.ParseString("192.169.0.23", 0));
+ ASSERT_OK(network.ParseCIDRString("192.168.0.0/16"));
+ EXPECT_FALSE(network.WithinNetwork(addr));
+}
+
+// Ensure that we are able to do a reverse DNS lookup on various IP addresses.
+// The reverse lookups should never fail, but may return numeric strings.
+TEST_F(NetUtilTest, TestReverseLookup) {
+ string host;
+ Sockaddr addr;
+ HostPort hp;
+ ASSERT_OK(addr.ParseString("0.0.0.0:12345", 0));
+ EXPECT_EQ(12345, addr.port());
+ ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp));
+ EXPECT_NE("0.0.0.0", hp.host());
+ EXPECT_NE("", hp.host());
+ EXPECT_EQ(12345, hp.port());
+
+ ASSERT_OK(addr.ParseString("127.0.0.1:12345", 0));
+ ASSERT_OK(HostPortFromSockaddrReplaceWildcard(addr, &hp));
+ EXPECT_EQ("127.0.0.1", hp.host());
+ EXPECT_EQ(12345, hp.port());
+}
+
+TEST_F(NetUtilTest, TestLsof) {
+ Socket s;
+ ASSERT_OK(s.Init(0));
+
+ Sockaddr addr; // wildcard
+ ASSERT_OK(s.BindAndListen(addr, 1));
+
+ ASSERT_OK(s.GetSocketAddress(&addr));
+ ASSERT_NE(addr.port(), 0);
+ vector<string> lsof_lines;
+ TryRunLsof(addr, &lsof_lines);
+ SCOPED_TRACE(JoinStrings(lsof_lines, "\n"));
+
+ ASSERT_GE(lsof_lines.size(), 3);
+ ASSERT_STR_CONTAINS(lsof_lines[2], "net_util-test");
+}
+
+TEST_F(NetUtilTest, TestGetFQDN) {
+ string fqdn;
+ ASSERT_OK(GetFQDN(&fqdn));
+ LOG(INFO) << "fqdn is " << fqdn;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/net_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util.cc b/be/src/kudu/util/net/net_util.cc
new file mode 100644
index 0000000..520882f
--- /dev/null
+++ b/be/src/kudu/util/net/net_util.cc
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/socket.h>
+#include <ifaddrs.h>
+#include <limits.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstring>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <boost/functional/hash/hash.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/trace.h"
+
+// Mac OS 10.9 does not appear to define HOST_NAME_MAX in unistd.h
+#ifndef HOST_NAME_MAX
+#define HOST_NAME_MAX 64
+#endif
+
+DEFINE_bool(fail_dns_resolution, false, "Whether to fail all dns resolution, for tests.");
+TAG_FLAG(fail_dns_resolution, hidden);
+
+using std::function;
+using std::string;
+using std::unordered_set;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+using AddrInfo = unique_ptr<addrinfo, function<void(addrinfo*)>>;
+
+// An utility wrapper around getaddrinfo() call to convert the return code
+// of the libc library function into Status.
+Status GetAddrInfo(const string& hostname,
+ const addrinfo& hints,
+ const string& op_description,
+ AddrInfo* info) {
+ addrinfo* res = nullptr;
+ const int rc = getaddrinfo(hostname.c_str(), nullptr, &hints, &res);
+ const int err = errno; // preserving the errno from the getaddrinfo() call
+ AddrInfo result(res, ::freeaddrinfo);
+ if (rc == 0) {
+ if (info != nullptr) {
+ info->swap(result);
+ }
+ return Status::OK();
+ }
+ const string err_msg = Substitute("unable to $0", op_description);
+ if (rc == EAI_SYSTEM) {
+ return Status::NetworkError(err_msg, ErrnoToString(err), err);
+ }
+ return Status::NetworkError(err_msg, gai_strerror(rc));
+}
+
+} // anonymous namespace
+
+HostPort::HostPort()
+ : host_(""),
+ port_(0) {
+}
+
+HostPort::HostPort(std::string host, uint16_t port)
+ : host_(std::move(host)), port_(port) {}
+
+HostPort::HostPort(const Sockaddr& addr)
+ : host_(addr.host()),
+ port_(addr.port()) {
+}
+
+bool operator==(const HostPort& hp1, const HostPort& hp2) {
+ return hp1.port() == hp2.port() && hp1.host() == hp2.host();
+}
+
+size_t HostPort::HashCode() const {
+ size_t seed = 0;
+ boost::hash_combine(seed, host_);
+ boost::hash_combine(seed, port_);
+ return seed;
+}
+
+Status HostPort::ParseString(const string& str, uint16_t default_port) {
+ std::pair<string, string> p = strings::Split(str, strings::delimiter::Limit(":", 1));
+
+ // Strip any whitespace from the host.
+ StripWhiteSpace(&p.first);
+
+ // Parse the port.
+ uint32_t port;
+ if (p.second.empty() && strcount(str, ':') == 0) {
+ // No port specified.
+ port = default_port;
+ } else if (!SimpleAtoi(p.second, &port) ||
+ port > 65535) {
+ return Status::InvalidArgument("Invalid port", str);
+ }
+
+ host_.swap(p.first);
+ port_ = port;
+ return Status::OK();
+}
+
+Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
+ TRACE_EVENT1("net", "HostPort::ResolveAddresses",
+ "host", host_);
+ TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ AddrInfo result;
+ const string op_description = Substitute("resolve address for $0", host_);
+ LOG_SLOW_EXECUTION(WARNING, 200, op_description) {
+ RETURN_NOT_OK(GetAddrInfo(host_, hints, op_description, &result));
+ }
+ for (const addrinfo* ai = result.get(); ai != nullptr; ai = ai->ai_next) {
+ CHECK_EQ(ai->ai_family, AF_INET);
+ struct sockaddr_in* addr = reinterpret_cast<struct sockaddr_in*>(ai->ai_addr);
+ addr->sin_port = htons(port_);
+ Sockaddr sockaddr(*addr);
+ if (addresses) {
+ addresses->push_back(sockaddr);
+ }
+ VLOG(2) << "Resolved address " << sockaddr.ToString()
+ << " for host/port " << ToString();
+ }
+ if (PREDICT_FALSE(FLAGS_fail_dns_resolution)) {
+ return Status::NetworkError("injected DNS resolution failure");
+ }
+ return Status::OK();
+}
+
+Status HostPort::ParseStrings(const string& comma_sep_addrs,
+ uint16_t default_port,
+ vector<HostPort>* res) {
+ vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", strings::SkipEmpty());
+ for (const string& addr_string : addr_strings) {
+ HostPort host_port;
+ RETURN_NOT_OK(host_port.ParseString(addr_string, default_port));
+ res->push_back(host_port);
+ }
+ return Status::OK();
+}
+
+string HostPort::ToString() const {
+ return Substitute("$0:$1", host_, port_);
+}
+
+string HostPort::ToCommaSeparatedString(const vector<HostPort>& hostports) {
+ vector<string> hostport_strs;
+ for (const HostPort& hostport : hostports) {
+ hostport_strs.push_back(hostport.ToString());
+ }
+ return JoinStrings(hostport_strs, ",");
+}
+
+Network::Network()
+ : addr_(0),
+ netmask_(0) {
+}
+
+Network::Network(uint32_t addr, uint32_t netmask)
+ : addr_(addr), netmask_(netmask) {}
+
+bool Network::WithinNetwork(const Sockaddr& addr) const {
+ return ((addr.addr().sin_addr.s_addr & netmask_) ==
+ (addr_ & netmask_));
+}
+
+Status Network::ParseCIDRString(const string& addr) {
+ std::pair<string, string> p = strings::Split(addr, strings::delimiter::Limit("/", 1));
+
+ kudu::Sockaddr sockaddr;
+ Status s = sockaddr.ParseString(p.first, 0);
+
+ uint32_t bits;
+ bool success = SimpleAtoi(p.second, &bits);
+
+ if (!s.ok() || !success || bits > 32) {
+ return Status::NetworkError("Unable to parse CIDR address", addr);
+ }
+
+ // Netmask in network byte order
+ uint32_t netmask = NetworkByteOrder::FromHost32(~(0xffffffff >> bits));
+ addr_ = sockaddr.addr().sin_addr.s_addr;
+ netmask_ = netmask;
+ return Status::OK();
+}
+
+Status Network::ParseCIDRStrings(const string& comma_sep_addrs,
+ vector<Network>* res) {
+ vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", strings::SkipEmpty());
+ for (const string& addr_string : addr_strings) {
+ Network network;
+ RETURN_NOT_OK(network.ParseCIDRString(addr_string));
+ res->push_back(network);
+ }
+ return Status::OK();
+}
+
+bool IsPrivilegedPort(uint16_t port) {
+ return port <= 1024 && port != 0;
+}
+
+Status ParseAddressList(const std::string& addr_list,
+ uint16_t default_port,
+ std::vector<Sockaddr>* addresses) {
+ vector<HostPort> host_ports;
+ RETURN_NOT_OK(HostPort::ParseStrings(addr_list, default_port, &host_ports));
+ if (host_ports.empty()) return Status::InvalidArgument("No address specified");
+ unordered_set<Sockaddr> uniqued;
+ for (const HostPort& host_port : host_ports) {
+ vector<Sockaddr> this_addresses;
+ RETURN_NOT_OK(host_port.ResolveAddresses(&this_addresses));
+
+ // Only add the unique ones -- the user may have specified
+ // some IP addresses in multiple ways
+ for (const Sockaddr& addr : this_addresses) {
+ if (InsertIfNotPresent(&uniqued, addr)) {
+ addresses->push_back(addr);
+ } else {
+ LOG(INFO) << "Address " << addr.ToString() << " for " << host_port.ToString()
+ << " duplicates an earlier resolved entry.";
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status GetHostname(string* hostname) {
+ TRACE_EVENT0("net", "GetHostname");
+ char name[HOST_NAME_MAX];
+ int ret = gethostname(name, HOST_NAME_MAX);
+ if (ret != 0) {
+ return Status::NetworkError("Unable to determine local hostname",
+ ErrnoToString(errno),
+ errno);
+ }
+ *hostname = name;
+ return Status::OK();
+}
+
+Status GetLocalNetworks(std::vector<Network>* net) {
+ struct ifaddrs *ifap = nullptr;
+
+ int ret = getifaddrs(&ifap);
+ SCOPED_CLEANUP({
+ if (ifap) freeifaddrs(ifap);
+ });
+
+ if (ret != 0) {
+ return Status::NetworkError("Unable to determine local network addresses",
+ ErrnoToString(errno),
+ errno);
+ }
+
+ net->clear();
+ for (struct ifaddrs *ifa = ifap; ifa; ifa = ifa->ifa_next) {
+ if (ifa->ifa_addr == nullptr || ifa->ifa_netmask == nullptr) continue;
+
+ if (ifa->ifa_addr->sa_family == AF_INET) {
+ Sockaddr addr(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr));
+ Sockaddr netmask(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_netmask));
+ Network network(addr.addr().sin_addr.s_addr, netmask.addr().sin_addr.s_addr);
+ net->push_back(network);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status GetFQDN(string* hostname) {
+ TRACE_EVENT0("net", "GetFQDN");
+ // Start with the non-qualified hostname
+ RETURN_NOT_OK(GetHostname(hostname));
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_flags = AI_CANONNAME;
+ AddrInfo result;
+ const string op_description =
+ Substitute("look up canonical hostname for localhost '$0'", *hostname);
+ LOG_SLOW_EXECUTION(WARNING, 200, op_description) {
+ TRACE_EVENT0("net", "getaddrinfo");
+ RETURN_NOT_OK(GetAddrInfo(*hostname, hints, op_description, &result));
+ }
+
+ *hostname = result->ai_canonname;
+ return Status::OK();
+}
+
+Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr) {
+ vector<Sockaddr> addrs;
+ RETURN_NOT_OK(host_port.ResolveAddresses(&addrs));
+ if (addrs.empty()) {
+ return Status::NetworkError("Unable to resolve address", host_port.ToString());
+ }
+ *addr = addrs[0];
+ if (addrs.size() > 1) {
+ VLOG(1) << "Hostname " << host_port.host() << " resolved to more than one address. "
+ << "Using address: " << addr->ToString();
+ }
+ return Status::OK();
+}
+
+Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp) {
+ string host;
+ if (addr.IsWildcard()) {
+ RETURN_NOT_OK(GetFQDN(&host));
+ } else {
+ host = addr.host();
+ }
+ hp->set_host(host);
+ hp->set_port(addr.port());
+ return Status::OK();
+}
+
+void TryRunLsof(const Sockaddr& addr, vector<string>* log) {
+#if defined(__APPLE__)
+ string cmd = strings::Substitute(
+ "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; "
+ "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | cut -f 2 -dp) ; do"
+ " pstree $$pid || ps h -p $$pid;"
+ "done",
+ addr.port());
+#else
+ // Little inline bash script prints the full ancestry of any pid listening
+ // on the same port as 'addr'. We could use 'pstree -s', but that option
+ // doesn't exist on el6.
+ string cmd = strings::Substitute(
+ "export PATH=$$PATH:/usr/sbin ; "
+ "lsof -n -i 'TCP:$0' -sTCP:LISTEN ; "
+ "for pid in $$(lsof -F p -n -i 'TCP:$0' -sTCP:LISTEN | grep p | cut -f 2 -dp) ; do"
+ " while [ $$pid -gt 1 ] ; do"
+ " ps h -fp $$pid ;"
+ " stat=($$(</proc/$$pid/stat)) ;"
+ " pid=$${stat[3]} ;"
+ " done ; "
+ "done",
+ addr.port());
+#endif // defined(__APPLE__)
+ LOG_STRING(WARNING, log)
+ << "Trying to use lsof to find any processes listening on "
+ << addr.ToString();
+ LOG_STRING(INFO, log) << "$ " << cmd;
+ vector<string> argv = { "bash", "-c", cmd };
+ string results;
+ Status s = Subprocess::Call(argv, "", &results);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG_STRING(WARNING, log) << s.ToString();
+ }
+ LOG_STRING(WARNING, log) << results;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/net_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util.h b/be/src/kudu/util/net/net_util.h
new file mode 100644
index 0000000..c471ae8
--- /dev/null
+++ b/be/src/kudu/util/net/net_util.h
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_NET_UTIL_H
+#define KUDU_UTIL_NET_NET_UTIL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+
+// A container for a host:port pair.
+class HostPort {
+ public:
+ HostPort();
+ HostPort(std::string host, uint16_t port);
+ explicit HostPort(const Sockaddr& addr);
+
+ bool Initialized() const {
+ return !host_.empty();
+ }
+
+ // Parse a "host:port" pair into this object.
+ // If there is no port specified in the string, then 'default_port' is used.
+ Status ParseString(const std::string& str, uint16_t default_port);
+
+ // Resolve any addresses corresponding to this host:port pair.
+ // Note that a host may resolve to more than one IP address.
+ //
+ // 'addresses' may be NULL, in which case this function simply checks that
+ // the host/port pair can be resolved, without returning anything.
+ Status ResolveAddresses(std::vector<Sockaddr>* addresses) const;
+
+ std::string ToString() const;
+
+ const std::string& host() const { return host_; }
+ void set_host(const std::string& host) { host_ = host; }
+
+ uint16_t port() const { return port_; }
+ void set_port(uint16_t port) { port_ = port; }
+
+ size_t HashCode() const;
+
+ // Parse a comma separated list of "host:port" pairs into a vector
+ // HostPort objects. If no port is specified for an entry in the
+ // comma separated list, 'default_port' is used for that entry's
+ // pair.
+ static Status ParseStrings(
+ const std::string& comma_sep_addrs, uint16_t default_port, std::vector<HostPort>* res);
+
+ // Takes a vector of HostPort objects and returns a comma separated
+ // string containing of "host:port" pairs. This method is the
+ // "inverse" of ParseStrings().
+ static std::string ToCommaSeparatedString(const std::vector<HostPort>& host_ports);
+
+ private:
+ std::string host_;
+ uint16_t port_;
+};
+
+bool operator==(const HostPort& hp1, const HostPort& hp2);
+
+// Hasher of HostPort objects for UnorderedAssociativeContainers.
+struct HostPortHasher {
+ size_t operator()(const HostPort& hp) const {
+ return hp.HashCode();
+ }
+};
+
+// Equality BinaryPredicate of HostPort objects for UnorderedAssociativeContainers.
+struct HostPortEqualityPredicate {
+ bool operator()(const HostPort& hp1, const HostPort& hp2) const {
+ return hp1 == hp2;
+ }
+};
+
+// A container for addr:mask pair.
+// Both addr and netmask are in big-endian byte order
+// (same as network byte order).
+class Network {
+ public:
+ Network();
+ Network(uint32_t addr, uint32_t netmask);
+
+ uint32_t addr() const { return addr_; }
+
+ uint32_t netmask() const { return netmask_; }
+
+ // Returns true if the address is within network.
+ bool WithinNetwork(const Sockaddr& addr) const;
+
+ // Parses a "addr/netmask" (CIDR notation) pair into this object.
+ Status ParseCIDRString(const std::string& addr);
+
+ // Parses a comma separated list of "addr/netmask" (CIDR notation)
+ // pairs into a vector of Network objects.
+ static Status ParseCIDRStrings(
+ const std::string& comma_sep_addrs, std::vector<Network>* res);
+ private:
+ uint32_t addr_;
+ uint32_t netmask_;
+};
+
+// Parse and resolve the given comma-separated list of addresses.
+//
+// The resulting addresses will be resolved, made unique, and added to
+// the 'addresses' vector.
+//
+// Any elements which do not include a port will be assigned 'default_port'.
+Status ParseAddressList(const std::string& addr_list,
+ uint16_t default_port,
+ std::vector<Sockaddr>* addresses);
+
+// Return true if the given port is likely to need root privileges to bind to.
+bool IsPrivilegedPort(uint16_t port);
+
+// Return the local machine's hostname.
+Status GetHostname(std::string* hostname);
+
+// Returns local subnets of all local network interfaces.
+Status GetLocalNetworks(std::vector<Network>* net);
+
+// Return the local machine's FQDN.
+Status GetFQDN(std::string* hostname);
+
+// Returns a single socket address from a HostPort.
+// If the hostname resolves to multiple addresses, returns the first in the
+// list and logs a message in verbose mode.
+Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr);
+
+// Converts the given Sockaddr into a HostPort, substituting the FQDN
+// in the case that the provided address is the wildcard.
+//
+// In the case of other addresses, the returned HostPort will contain just the
+// stringified form of the IP.
+Status HostPortFromSockaddrReplaceWildcard(const Sockaddr& addr, HostPort* hp);
+
+// Try to run 'lsof' to determine which process is preventing binding to
+// the given 'addr'. If pids can be determined, outputs full 'ps' and 'pstree'
+// output for that process.
+//
+// Output is issued to the log at WARNING level, or appended to 'log' if it
+// is non-NULL (mostly useful for testing).
+void TryRunLsof(const Sockaddr& addr, std::vector<std::string>* log = NULL);
+
+} // namespace kudu
+#endif