You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:19 UTC

[07/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.h b/be/src/kudu/util/rwc_lock.h
new file mode 100644
index 0000000..7b78e35
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RWC_LOCK_H
+#define KUDU_UTIL_RWC_LOCK_H
+
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// A read-write-commit lock.
+//
+// This lock has three modes: read, write, and commit.
+// The lock compatibility matrix is as follows:
+//
+//           Read    Write    Commit
+//  Read      X        X
+//  Write     X
+//  Commit
+//
+// An 'X' indicates that the two types of locks may be
+// held at the same time.
+//
+// In prose:
+// - Multiple threads may hold the Read lock at the same time.
+// - A single thread may hold the Write lock, potentially at the
+//   same time as any number of readers.
+// - A single thread may hold the Commit lock, but this lock is completely
+//   exclusive (no concurrent readers or writers).
+//
+// A typical use case for this type of lock is when a structure is read often,
+// occasionally updated, and the update operation can take a long time. In this
+// use case, the readers simply use ReadLock() and ReadUnlock(), while the
+// writer uses a copy-on-write technique like:
+//
+//   obj->lock.WriteLock();
+//   // NOTE: cannot safely mutate obj->state directly here, since readers
+//   // may be concurrent! So, we make a local copy to mutate.
+//   my_local_copy = obj->state;
+//   SomeLengthyMutation(my_local_copy);
+//   obj->lock.UpgradeToCommitLock();
+//   obj->state = my_local_copy;
+//   obj->lock.CommitUnlock();
+//
+// This is more efficient than a standard Reader-Writer lock since the lengthy
+// mutation is only protected against other concurrent mutators, and readers
+// may continue to run with no contention.
+//
+// For the common pattern described above, the 'CowObject<>' template class defined
+// in cow_object.h is more convenient than manual locking.
+//
+// NOTE: this implementation currently does not implement any starvation protection
+// or fairness. If the read lock is being constantly acquired (i.e reader count
+// never drops to 0) then UpgradeToCommitLock() may block arbitrarily long.
+class RWCLock {
+ public:
+  RWCLock();
+  ~RWCLock();
+
+  // Acquire the lock in read mode. Upon return, guarantees that:
+  // - Other threads may concurrently hold the lock for Read.
+  // - Either zero or one thread may hold the lock for Write.
+  // - No threads hold the lock for Commit.
+  void ReadLock();
+  void ReadUnlock();
+
+  // Return true if there are any readers currently holding the lock.
+  // Useful for debug assertions.
+  bool HasReaders() const;
+
+  // Return true if the current thread holds the write lock.
+  //
+  // In DEBUG mode this is accurate -- we track the current holder's tid.
+  // In non-DEBUG mode, this may sometimes return true even if another thread
+  // is in fact the holder.
+  // Thus, this is only really useful in the context of a DCHECK assertion.
+  bool HasWriteLock() const;
+
+  // Boost-like wrappers, so boost lock guards work
+  void lock_shared() { ReadLock(); }
+  void unlock_shared() { ReadUnlock(); }
+
+  // Acquire the lock in write mode. Upon return, guarantees that:
+  // - Other threads may concurrently hold the lock for Read.
+  // - No other threads hold the lock for Write or Commit.
+  void WriteLock();
+  void WriteUnlock();
+
+  // Boost-like wrappers
+  void lock() { WriteLock(); }
+  void unlock() { WriteUnlock(); }
+
+  // Upgrade the lock from Write mode to Commit mode.
+  // Requires that the current thread holds the lock in Write mode.
+  // Upon return, guarantees:
+  // - No other thread holds the lock in any mode.
+  void UpgradeToCommitLock();
+  void CommitUnlock();
+
+ private:
+  // Variants of the functions above that must be called with lock_ held.
+  bool HasReadersUnlocked() const;
+  bool HasWriteLockUnlocked() const;
+
+  // Lock which protects reader_count_ and write_locked_.
+  // Additionally, while the commit lock is held, the
+  // locking thread holds this mutex, which prevents any new
+  // threads from obtaining the lock in any mode.
+  mutable Mutex lock_;
+  ConditionVariable no_mutators_, no_readers_;
+  int reader_count_;
+  bool write_locked_;
+
+#ifndef NDEBUG
+  static const int kBacktraceBufSize = 1024;
+  int64_t writer_tid_;
+  int64_t last_writelock_acquire_time_;
+  char last_writer_backtrace_[kBacktraceBufSize];
+#endif // NDEBUG
+
+  DISALLOW_COPY_AND_ASSIGN(RWCLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RWC_LOCK_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/safe_math-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math-test.cc b/be/src/kudu/util/safe_math-test.cc
new file mode 100644
index 0000000..d3a81c6
--- /dev/null
+++ b/be/src/kudu/util/safe_math-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdint.h>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <gtest/gtest.h>
+#include "kudu/util/safe_math.h"
+
+namespace kudu {
+template<typename T>
+static void DoTest(T a, T b, bool expected) {
+  SCOPED_TRACE(a);
+  SCOPED_TRACE(b);
+  bool overflow = false;
+  T ret = AddWithOverflowCheck(a, b, &overflow);
+  EXPECT_EQ(overflow, expected);
+  if (!overflow) {
+    EXPECT_EQ(ret, a + b);
+  }
+}
+
+TEST(TestSafeMath, TestSignedInts) {
+  // Overflow above max of range.
+  DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 15, true);
+  DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 10, false);
+
+  // Underflow around negative
+  DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -15, true);
+  DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -5, false);
+
+}
+
+TEST(TestSafeMath, TestUnsignedInts) {
+  // Overflow above max
+  DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 15, true);
+  DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 10, false);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/safe_math.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math.h b/be/src/kudu/util/safe_math.h
new file mode 100644
index 0000000..4c126dd
--- /dev/null
+++ b/be/src/kudu/util/safe_math.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Inline functions for doing overflow-safe operations on integers.
+// These should be used when doing bounds checks on user-provided data,
+// for example.
+// See also: https://www.securecoding.cert.org/confluence/display/cplusplus/INT32-CPP.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow
+#ifndef KUDU_UTIL_SAFE_MATH_H
+#define KUDU_UTIL_SAFE_MATH_H
+
+#include "kudu/gutil/mathlimits.h"
+
+namespace kudu {
+
+namespace safe_math_internal {
+
+// Template which is specialized for signed and unsigned types separately.
+template<typename Type, bool is_signed>
+struct WithOverflowCheck {
+};
+
+
+// Specialization for signed types.
+template<typename Type>
+struct WithOverflowCheck<Type, true> {
+  static inline Type Add(Type a, Type b, bool *overflowed) {
+    // Implementation from the CERT article referenced in the file header.
+    *overflowed = (((a > 0) && (b > 0) && (a > (MathLimits<Type>::kMax - b))) ||
+                   ((a < 0) && (b < 0) && (a < (MathLimits<Type>::kMin - b))));
+    return a + b;
+  }
+};
+
+// Specialization for unsigned types.
+template<typename Type>
+struct WithOverflowCheck<Type, false> {
+  static inline Type Add(Type a, Type b, bool *overflowed) {
+    Type ret = a + b;
+    *overflowed = ret < a;
+    return a + b;
+  }
+};
+
+} // namespace safe_math_internal
+
+// Add 'a' and 'b', and set *overflowed to true if overflow occured.
+template<typename Type>
+inline Type AddWithOverflowCheck(Type a, Type b, bool *overflowed) {
+  // Pick the right specialization based on whether Type is signed.
+  typedef safe_math_internal::WithOverflowCheck<Type, MathLimits<Type>::kIsSigned> my_struct;
+  return my_struct::Add(a, b, overflowed);
+}
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/scoped_cleanup-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup-test.cc b/be/src/kudu/util/scoped_cleanup-test.cc
new file mode 100644
index 0000000..2e77705
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/scoped_cleanup.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+
+TEST(ScopedCleanup, TestCleanup) {
+  int var = 0;
+  {
+    auto saved = var;
+    auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+    var = 42;
+  }
+  ASSERT_EQ(0, var);
+}
+
+TEST(ScopedCleanup, TestCleanupMacro) {
+  int var = 0;
+  {
+    auto saved = var;
+    SCOPED_CLEANUP({ var = saved; });
+    var = 42;
+  }
+  ASSERT_EQ(0, var);
+}
+
+
+TEST(ScopedCleanup, TestCancelCleanup) {
+  int var = 0;
+  {
+    auto saved = var;
+    auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+    var = 42;
+    cleanup.cancel();
+  }
+  ASSERT_EQ(42, var);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup.h b/be/src/kudu/util/scoped_cleanup.h
new file mode 100644
index 0000000..8ecfbcb
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "kudu/gutil/macros.h"
+
+// Run the given function body (which is typically a block of code surrounded by
+// curly-braces) when the current scope exits.
+//
+// Example:
+//   int fd = open(...);
+//   SCOPED_CLEANUP({ close(fd); });
+//
+// NOTE: in the case that you want to cancel the cleanup, use the more verbose
+// (non-macro) form below.
+#define SCOPED_CLEANUP(func_body) \
+  auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
+
+namespace kudu {
+
+// A scoped object which runs a cleanup function when going out of scope. Can
+// be used for scoped resource cleanup.
+//
+// Use 'MakeScopedCleanup()' below to instantiate.
+template<typename F>
+class ScopedCleanup {
+ public:
+  explicit ScopedCleanup(F f)
+      : cancelled_(false),
+        f_(std::move(f)) {
+  }
+  ~ScopedCleanup() {
+    if (!cancelled_) {
+      f_();
+    }
+  }
+  void cancel() { cancelled_ = true; }
+
+ private:
+  bool cancelled_;
+  F f_;
+};
+
+// Creates a new scoped cleanup instance with the provided function.
+template<typename F>
+ScopedCleanup<F> MakeScopedCleanup(F f) {
+  return ScopedCleanup<F>(f);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.cc b/be/src/kudu/util/semaphore.cc
new file mode 100644
index 0000000..72ff214
--- /dev/null
+++ b/be/src/kudu/util/semaphore.cc
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <cstdlib>
+#include <ctime>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+Semaphore::Semaphore(int capacity) {
+  DCHECK_GE(capacity, 0);
+  if (sem_init(&sem_, 0, capacity) != 0) {
+    Fatal("init");
+  }
+}
+
+Semaphore::~Semaphore() {
+  if (sem_destroy(&sem_) != 0) {
+    Fatal("destroy");
+  }
+}
+
+void Semaphore::Acquire() {
+  while (true) {
+    int ret;
+    RETRY_ON_EINTR(ret, sem_wait(&sem_));
+    if (ret == 0) {
+      // TODO(todd): would be nice to track acquisition time, etc.
+      return;
+    }
+    Fatal("wait");
+  }
+}
+
+bool Semaphore::TryAcquire() {
+  int ret;
+  RETRY_ON_EINTR(ret, sem_trywait(&sem_));
+  if (ret == 0) {
+    return true;
+  }
+  if (errno == EAGAIN) {
+    return false;
+  }
+  Fatal("trywait");
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+  int64_t microtime = GetCurrentTimeMicros();
+  microtime += timeout.ToMicroseconds();
+
+  struct timespec abs_timeout;
+  MonoDelta::NanosToTimeSpec(microtime * MonoTime::kNanosecondsPerMicrosecond,
+                             &abs_timeout);
+
+  while (true) {
+    int ret;
+    RETRY_ON_EINTR(ret, sem_timedwait(&sem_, &abs_timeout));
+    if (ret == 0) return true;
+    if (errno == ETIMEDOUT) return false;
+    Fatal("timedwait");
+  }
+}
+
+void Semaphore::Release() {
+  PCHECK(sem_post(&sem_) == 0);
+}
+
+int Semaphore::GetValue() {
+  int val;
+  PCHECK(sem_getvalue(&sem_, &val) == 0);
+  return val;
+}
+
+void Semaphore::Fatal(const char* action) {
+  PLOG(FATAL) << "Could not " << action << " semaphore "
+              << reinterpret_cast<void*>(&sem_);
+  abort(); // unnecessary, but avoids gcc complaining
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.h b/be/src/kudu/util/semaphore.h
new file mode 100644
index 0000000..4f12658
--- /dev/null
+++ b/be/src/kudu/util/semaphore.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SEMAPHORE_H
+#define KUDU_UTIL_SEMAPHORE_H
+
+#include <semaphore.h>
+#if defined(__APPLE__)
+#include <dispatch/dispatch.h>
+#include "kudu/util/atomic.h"
+#endif  // define(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+class MonoDelta;
+
+// Wrapper for POSIX semaphores.
+class Semaphore {
+ public:
+  // Initialize the semaphore with the specified capacity.
+  explicit Semaphore(int capacity);
+  ~Semaphore();
+
+  // Acquire the semaphore.
+  void Acquire();
+
+  // Acquire the semaphore within the given timeout. Returns true if successful.
+  bool TimedAcquire(const MonoDelta& timeout);
+
+  // Try to acquire the semaphore immediately. Returns false if unsuccessful.
+  bool TryAcquire();
+
+  // Release the semaphore.
+  void Release();
+
+  // Get the current value of the semaphore.
+  int GetValue();
+
+  // Boost-compatible wrappers.
+  void lock() { Acquire(); }
+  void unlock() { Release(); }
+  bool try_lock() { return TryAcquire(); }
+
+ private:
+#if !defined(__APPLE__)
+  // Log a fatal error message. Separated out to keep the main functions
+  // as small as possible in terms of code size.
+  void Fatal(const char* action) ATTRIBUTE_NORETURN;
+#endif  // !define(__APPLE__)
+
+#if defined(__APPLE__)
+  dispatch_semaphore_t sem_;
+  AtomicInt<int32_t> count_;
+#else
+  sem_t sem_;
+#endif  // define(__APPLE__)
+  DISALLOW_COPY_AND_ASSIGN(Semaphore);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SEMAPHORE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore_macosx.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore_macosx.cc b/be/src/kudu/util/semaphore_macosx.cc
new file mode 100644
index 0000000..e2d235c
--- /dev/null
+++ b/be/src/kudu/util/semaphore_macosx.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+Semaphore::Semaphore(int capacity)
+  : count_(capacity) {
+  DCHECK_GE(capacity, 0);
+  sem_ = dispatch_semaphore_create(capacity);
+  CHECK_NOTNULL(sem_);
+}
+
+Semaphore::~Semaphore() {
+  dispatch_release(sem_);
+}
+
+void Semaphore::Acquire() {
+  // If the timeout is DISPATCH_TIME_FOREVER, then dispatch_semaphore_wait()
+  // waits forever and always returns zero.
+  CHECK(dispatch_semaphore_wait(sem_, DISPATCH_TIME_FOREVER) == 0);
+  count_.IncrementBy(-1);
+}
+
+bool Semaphore::TryAcquire() {
+  // The dispatch_semaphore_wait() function returns zero upon success and
+  // non-zero after the timeout expires.
+  if (dispatch_semaphore_wait(sem_, DISPATCH_TIME_NOW) == 0) {
+    count_.IncrementBy(-1);
+    return true;
+  }
+  return false;
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+  dispatch_time_t t = dispatch_time(DISPATCH_TIME_NOW, timeout.ToNanoseconds());
+  if (dispatch_semaphore_wait(sem_, t) == 0) {
+    count_.IncrementBy(-1);
+    return true;
+  }
+  return false;
+}
+
+void Semaphore::Release() {
+  dispatch_semaphore_signal(sem_);
+  count_.IncrementBy(1);
+}
+
+int Semaphore::GetValue() {
+  return count_.Load();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/signal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.cc b/be/src/kudu/util/signal.cc
new file mode 100644
index 0000000..e8b6e79
--- /dev/null
+++ b/be/src/kudu/util/signal.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/signal.h"
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+void SetSignalHandler(int signal, SignalHandlerCallback handler) {
+  struct sigaction act;
+  act.sa_handler = handler;
+  sigemptyset(&act.sa_mask);
+  act.sa_flags = 0;
+  PCHECK(sigaction(signal, &act, nullptr) == 0);
+}
+
+void IgnoreSigPipe() {
+  SetSignalHandler(SIGPIPE, SIG_IGN);
+}
+
+void ResetSigPipeHandlerToDefault() {
+  SetSignalHandler(SIGPIPE, SIG_DFL);
+}
+
+// We unblock all signal masks since they are inherited.
+void ResetAllSignalMasksToUnblocked() {
+  sigset_t signals;
+  PCHECK(sigfillset(&signals) == 0);
+  PCHECK(sigprocmask(SIG_UNBLOCK, &signals, nullptr) == 0);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/signal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.h b/be/src/kudu/util/signal.h
new file mode 100644
index 0000000..0c88a80
--- /dev/null
+++ b/be/src/kudu/util/signal.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <signal.h>
+
+namespace kudu {
+
+#if defined(__linux__)
+typedef sighandler_t SignalHandlerCallback;
+#else
+typedef sig_t SignalHandlerCallback;
+#endif
+
+// Set a process-wide signal handler.
+void SetSignalHandler(int signal, SignalHandlerCallback handler);
+
+// Set the disposition of SIGPIPE to SIG_IGN.
+void IgnoreSigPipe();
+
+// Set the disposition of SIGPIPE to SIG_DFL.
+void ResetSigPipeHandlerToDefault();
+
+// Unblock all signal masks.
+void ResetAllSignalMasksToUnblocked();
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice-test.cc b/be/src/kudu/util/slice-test.cc
new file mode 100644
index 0000000..0f7a893
--- /dev/null
+++ b/be/src/kudu/util/slice-test.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include <cstdint>
+#include <map>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+
+using std::string;
+
+namespace kudu {
+
+typedef SliceMap<int>::type MySliceMap;
+
+TEST(SliceTest, TestSliceMap) {
+  MySliceMap my_map;
+  Slice a("a");
+  Slice b("b");
+  Slice c("c");
+
+  // Insertion is deliberately out-of-order; the map should restore order.
+  InsertOrDie(&my_map, c, 3);
+  InsertOrDie(&my_map, a, 1);
+  InsertOrDie(&my_map, b, 2);
+
+  int expectedValue = 0;
+  for (const MySliceMap::value_type& pair : my_map) {
+    int data = 'a' + expectedValue++;
+    ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), pair.first);
+    ASSERT_EQ(expectedValue, pair.second);
+  }
+
+  expectedValue = 0;
+  for (auto iter = my_map.begin(); iter != my_map.end(); iter++) {
+    int data = 'a' + expectedValue++;
+    ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), iter->first);
+    ASSERT_EQ(expectedValue, iter->second);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.cc b/be/src/kudu/util/slice.cc
new file mode 100644
index 0000000..775d54a
--- /dev/null
+++ b/be/src/kudu/util/slice.cc
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include <cctype>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/status.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+Status Slice::check_size(size_t expected_size) const {
+  if (PREDICT_FALSE(size() != expected_size)) {
+    return Status::Corruption(StringPrintf("Unexpected Slice size. "
+        "Expected %zu but got %zu.", expected_size, size()), KUDU_REDACT(ToDebugString(100)));
+  }
+  return Status::OK();
+}
+
+// Return a string that contains the copy of the referenced data.
+std::string Slice::ToString() const {
+  return std::string(reinterpret_cast<const char *>(data_), size_);
+}
+
+std::string Slice::ToDebugString(size_t max_len) const {
+  size_t bytes_to_print = size_;
+  bool abbreviated = false;
+  if (max_len != 0 && bytes_to_print > max_len) {
+    bytes_to_print = max_len;
+    abbreviated = true;
+  }
+
+  int size = 0;
+  for (int i = 0; i < bytes_to_print; i++) {
+    if (!isgraph(data_[i])) {
+      size += 4;
+    } else {
+      size++;
+    }
+  }
+  if (abbreviated) {
+    size += 20;  // extra padding
+  }
+
+  std::string ret;
+  ret.reserve(size);
+  for (int i = 0; i < bytes_to_print; i++) {
+    if (!isgraph(data_[i])) {
+      StringAppendF(&ret, "\\x%02x", data_[i] & 0xff);
+    } else {
+      ret.push_back(data_[i]);
+    }
+  }
+  if (abbreviated) {
+    StringAppendF(&ret, "...<%zd bytes total>", size_);
+  }
+  return ret;
+}
+
+bool IsAllZeros(const Slice& s) {
+  // Walk a pointer through the slice instead of using s[i]
+  // since this is way faster in debug mode builds. We also do some
+  // manual unrolling for the same purpose.
+  const uint8_t* p = &s[0];
+  int rem = s.size();
+
+  while (rem >= 8) {
+    if (UNALIGNED_LOAD64(p) != 0) return false;
+    rem -= 8;
+    p += 8;
+  }
+
+  while (rem > 0) {
+    if (*p++ != '\0') return false;
+    rem--;
+  }
+  return true;
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.h b/be/src/kudu/util/slice.h
new file mode 100644
index 0000000..d34c744
--- /dev/null
+++ b/be/src/kudu/util/slice.h
@@ -0,0 +1,332 @@
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+
+#ifndef KUDU_UTIL_SLICE_H_
+#define KUDU_UTIL_SLICE_H_
+
+// NOTE: using stdint.h instead of cstdint because this file is supposed
+//       to be processed by a compiler lacking C++11 support.
+#include <stdint.h>
+
+#include <cassert>
+#include <cstddef>
+#include <cstring>
+#include <iosfwd>
+#include <map>
+#include <string>
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/faststring.h"
+#endif
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/port.h"
+#endif
+#include "kudu/util/kudu_export.h"
+
+namespace kudu {
+
+class Status;
+
+/// @brief A wrapper around externally allocated data.
+///
+/// Slice is a simple structure containing a pointer into some external
+/// storage and a size. The user of a Slice must ensure that the slice
+/// is not used after the corresponding external storage has been
+/// deallocated.
+///
+/// Multiple threads can invoke const methods on a Slice without
+/// external synchronization, but if any of the threads may call a
+/// non-const method, all threads accessing the same Slice must use
+/// external synchronization.
+///
+/// Slices can be built around faststrings and StringPieces using constructors
+/// with implicit casts. Both StringPieces and faststrings depend on a great
+/// deal of gutil code.
+class KUDU_EXPORT Slice {
+ public:
+  /// Create an empty slice.
+  Slice() : data_(reinterpret_cast<const uint8_t *>("")),
+            size_(0) { }
+
+  /// Create a slice that refers to a @c uint8_t byte array.
+  ///
+  /// @param [in] d
+  ///   The input array.
+  /// @param [in] n
+  ///   Number of bytes in the array.
+  Slice(const uint8_t* d, size_t n) : data_(d), size_(n) { }
+
+  /// Create a slice that refers to a @c char byte array.
+  ///
+  /// @param [in] d
+  ///   The input array.
+  /// @param [in] n
+  ///   Number of bytes in the array.
+  Slice(const char* d, size_t n) :
+    data_(reinterpret_cast<const uint8_t *>(d)),
+    size_(n) { }
+
+  /// Create a slice that refers to the contents of the given string.
+  ///
+  /// @param [in] s
+  ///   The input string.
+  Slice(const std::string& s) : // NOLINT(runtime/explicit)
+    data_(reinterpret_cast<const uint8_t *>(s.data())),
+    size_(s.size()) { }
+
+  /// Create a slice that refers to a C-string s[0,strlen(s)-1].
+  ///
+  /// @param [in] s
+  ///   The input C-string.
+  Slice(const char* s) : // NOLINT(runtime/explicit)
+    data_(reinterpret_cast<const uint8_t *>(s)),
+    size_(strlen(s)) { }
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+  /// Create a slice that refers to the contents of a faststring.
+  ///
+  /// @note Further appends to the faststring may invalidate this slice.
+  ///
+  /// @param [in] s
+  ///   The input faststring.
+  Slice(const faststring &s) // NOLINT(runtime/explicit)
+    : data_(s.data()),
+      size_(s.size()) {
+  }
+
+  /// Create a slice that refers to the contents of a string piece.
+  ///
+  /// @param [in] s
+  ///   The input StringPiece.
+  Slice(const StringPiece& s) // NOLINT(runtime/explicit)
+    : data_(reinterpret_cast<const uint8_t*>(s.data())),
+      size_(s.size()) {
+  }
+#endif
+
+  /// @return A pointer to the beginning of the referenced data.
+  const uint8_t* data() const { return data_; }
+
+  /// @return A mutable pointer to the beginning of the referenced data.
+  uint8_t *mutable_data() { return const_cast<uint8_t *>(data_); }
+
+  /// @return The length (in bytes) of the referenced data.
+  size_t size() const { return size_; }
+
+  /// @return @c true iff the length of the referenced data is zero.
+  bool empty() const { return size_ == 0; }
+
+  /// @pre n < size()
+  ///
+  /// @param [in] n
+  ///   The index of the byte.
+  /// @return the n-th byte in the referenced data.
+  const uint8_t &operator[](size_t n) const {
+    assert(n < size());
+    return data_[n];
+  }
+
+  /// Change this slice to refer to an empty array.
+  void clear() {
+    data_ = reinterpret_cast<const uint8_t *>("");
+    size_ = 0;
+  }
+
+  /// Drop the first "n" bytes from this slice.
+  ///
+  /// @pre n <= size()
+  ///
+  /// @note Only the base and bounds of the slice are changed;
+  ///   the data is not modified.
+  ///
+  /// @param [in] n
+  ///   Number of bytes that should be dropped from the beginning.
+  void remove_prefix(size_t n) {
+    assert(n <= size());
+    data_ += n;
+    size_ -= n;
+  }
+
+  /// Truncate the slice to the given number of bytes.
+  ///
+  /// @pre n <= size()
+  ///
+  /// @note Only the base and bounds of the slice are changed;
+  ///   the data is not modified.
+  ///
+  /// @param [in] n
+  ///   The new size of the slice.
+  void truncate(size_t n) {
+    assert(n <= size());
+    size_ = n;
+  }
+
+  /// Check that the slice has the expected size.
+  ///
+  /// @param [in] expected_size
+  /// @return Status::Corruption() iff size() != @c expected_size
+  Status check_size(size_t expected_size) const;
+
+  /// @return A string that contains a copy of the referenced data.
+  std::string ToString() const;
+
+  /// Get printable representation of the data in the slice.
+  ///
+  /// @param [in] max_len
+  ///   The maximum number of bytes to output in the printable format;
+  ///   @c 0 means no limit.
+  /// @return A string with printable representation of the data.
+  std::string ToDebugString(size_t max_len = 0) const;
+
+  /// Do a three-way comparison of the slice's data.
+  ///
+  /// @param [in] b
+  ///   The other slice to compare with.
+  /// @return Values are
+  ///   @li <  0 iff "*this" <  "b"
+  ///   @li == 0 iff "*this" == "b"
+  ///   @li >  0 iff "*this" >  "b"
+  int compare(const Slice& b) const;
+
+  /// Check whether the slice starts with the given prefix.
+  /// @param [in] x
+  ///   The slice in question.
+  /// @return @c true iff "x" is a prefix of "*this"
+  bool starts_with(const Slice& x) const {
+    return ((size_ >= x.size_) &&
+            (MemEqual(data_, x.data_, x.size_)));
+  }
+
+  /// @brief Comparator struct, useful for ordered collections (like STL maps).
+  struct Comparator {
+    /// Compare two slices using Slice::compare()
+    ///
+    /// @param [in] a
+    ///   The slice to call Slice::compare() at.
+    /// @param [in] b
+    ///   The slice to use as a parameter for Slice::compare().
+    /// @return @c true iff @c a is less than @c b by Slice::compare().
+    bool operator()(const Slice& a, const Slice& b) const {
+      return a.compare(b) < 0;
+    }
+  };
+
+  /// Relocate/copy the slice's data into a new location.
+  ///
+  /// @param [in] d
+  ///   The new location for the data. If it's the same location, then no
+  ///   relocation is done. It is assumed that the new location is
+  ///   large enough to fit the data.
+  void relocate(uint8_t* d) {
+    if (data_ != d) {
+      memcpy(d, data_, size_);
+      data_ = d;
+    }
+  }
+
+ private:
+  friend bool operator==(const Slice& x, const Slice& y);
+
+  static bool MemEqual(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+    return strings::memeq(a, b, n);
+#else
+    return memcmp(a, b, n) == 0;
+#endif
+  }
+
+  static int MemCompare(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+    return strings::fastmemcmp_inlined(a, b, n);
+#else
+    return memcmp(a, b, n);
+#endif
+  }
+
+  const uint8_t* data_;
+  size_t size_;
+
+  // Intentionally copyable
+};
+
+/// Check whether two slices are identical.
+///
+/// @param [in] x
+///   One slice.
+/// @param [in] y
+///   Another slice.
+/// @return @c true iff two slices contain byte-for-byte identical data.
+inline bool operator==(const Slice& x, const Slice& y) {
+  return ((x.size() == y.size()) &&
+          (Slice::MemEqual(x.data(), y.data(), x.size())));
+}
+
+/// Check whether two slices are not identical.
+///
+/// @param [in] x
+///   One slice.
+/// @param [in] y
+///   Another slice.
+/// @return @c true iff slices contain different data.
+inline bool operator!=(const Slice& x, const Slice& y) {
+  return !(x == y);
+}
+
+/// Output printable representation of the slice into the given output stream.
+///
+/// @param [out] o
+///   The output stream to print the info.
+/// @param [in] s
+///   The slice to print.
+/// @return Reference to the updated output stream.
+inline std::ostream& operator<<(std::ostream& o, const Slice& s) {
+  return o << s.ToDebugString(16); // should be enough for anyone...
+}
+
+inline int Slice::compare(const Slice& b) const {
+  const int min_len = (size_ < b.size_) ? size_ : b.size_;
+  int r = MemCompare(data_, b.data_, min_len);
+  if (r == 0) {
+    if (size_ < b.size_) r = -1;
+    else if (size_ > b.size_) r = +1;
+  }
+  return r;
+}
+
+// We don't run TSAN on this function because it makes it really slow and causes some
+// test timeouts. This is only used on local buffers anyway, so we don't lose much
+// by not checking it.
+#ifdef KUDU_HEADERS_NO_STUBS
+ATTRIBUTE_NO_SANITIZE_THREAD
+#endif
+bool IsAllZeros(const Slice& s);
+
+/// @brief STL map whose keys are Slices.
+///
+/// An example of usage:
+/// @code
+///   typedef SliceMap<int>::type MySliceMap;
+///
+///   MySliceMap my_map;
+///   my_map.insert(MySliceMap::value_type(a, 1));
+///   my_map.insert(MySliceMap::value_type(b, 2));
+///   my_map.insert(MySliceMap::value_type(c, 3));
+///
+///   for (const MySliceMap::value_type& pair : my_map) {
+///     ...
+///   }
+/// @endcode
+template <typename T>
+struct SliceMap {
+  /// A handy typedef for the slice map with appropriate comparison operator.
+  typedef std::map<Slice, T, Slice::Comparator> type;
+};
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_SLICE_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/sorted_disjoint_interval_list-test.cc b/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
new file mode 100644
index 0000000..8e0fe70
--- /dev/null
+++ b/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/sorted_disjoint_interval_list.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+
+class TestSortedDisjointIntervalList : public KuduTest {
+};
+
+typedef int PointType;
+typedef std::pair<PointType, PointType> ClosedInterval;
+
+TEST_F(TestSortedDisjointIntervalList, TestBasic) {
+  // Coalesce an empty interval list.
+  vector<ClosedInterval> intervals = {};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  vector<ClosedInterval> expected = {};
+  ASSERT_EQ(expected, intervals);
+
+  // Coalesce an interval list with length 0 interval.
+  intervals = {{26, 26}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  expected = {{26, 26}};
+  ASSERT_EQ(expected, intervals);
+
+  // Coalesce an interval list with a single interval.
+  intervals = {{33, 69}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  expected = {{33, 69}};
+  ASSERT_EQ(expected, intervals);
+
+  // Coalesce an interval list with adjacent ranges.
+  intervals = {{4, 7}, {3, 4}, {1, 2}, {-23, 1}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  expected = {{-23, 2}, {3, 7}};
+  ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestOverlappedIntervals) {
+  vector<ClosedInterval> intervals = {{4, 7}, {3, 9}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  vector<ClosedInterval> expected = {{3, 9}};
+  ASSERT_EQ(expected, intervals);
+
+  intervals = {{4, 7}, {3, 9}, {-23, 1},
+               {4, 350}, {369, 400}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  expected = {{-23, 1}, {3, 350}, {369, 400}};
+  ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestDuplicateIntervals) {
+  vector<ClosedInterval> intervals = {{1, 2}, {4, 7},
+                                      {1, 2}, {1, 2}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  const vector<ClosedInterval> expected = {{1, 2}, {4, 7}};
+  ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestInvalidIntervals) {
+  vector<ClosedInterval> intervals = {{1, 2}, {10, 2},
+                                      {4, 7}, {40, 7}};
+  ASSERT_TRUE(CoalesceIntervals<PointType>(&intervals).IsInvalidArgument());
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestSingleElementIntervals) {
+  vector<ClosedInterval> intervals = {{0, 0}, {0, 1}, {1, 2}};
+  ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+  const vector<ClosedInterval> expected = {{0, 2}};
+  ASSERT_EQ(expected, intervals);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/sorted_disjoint_interval_list.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/sorted_disjoint_interval_list.h b/be/src/kudu/util/sorted_disjoint_interval_list.h
new file mode 100644
index 0000000..d3180a9
--- /dev/null
+++ b/be/src/kudu/util/sorted_disjoint_interval_list.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Constructs a sorted disjoint interval list in-place given a list of intervals.
+// The result is written back to the given 'intervals'.
+//
+// Returns an error if the list contains any invalid intervals.
+//
+// Sorted disjoint interval list is a data structure holding a group of sorted
+// non-overlapping ranges. The operation to construct such one is O(nlg n + n)
+// where 'n' is the number of intervals.
+//
+// For example, given the input interval list:
+//
+//   [------2-------)         [-----1-----)
+//       [--3--)    [---5--)    [----4----)
+//
+// The output sorted disjoint interval list:
+//
+//   [----------1----------)  [-----2-----)
+//
+//
+// This method assumes that all intervals are "half-open" intervals -- the
+// intervals are inclusive of their start point and exclusive of end point,
+// e.g., [3, 6). Note that interval with the same start and end point is
+// considered to be valid in this implementation.
+// It also assumes 'PointType' has a proper defined comparator.
+template<typename PointType>
+Status CoalesceIntervals(std::vector<std::pair<PointType, PointType>>* intervals) {
+  if (intervals->empty()) return Status::OK();
+
+  // Sort the intervals to prepare for coalescing overlapped ranges.
+  for (const auto& interval : *intervals) {
+    if (interval.first > interval.second) {
+      return Status::InvalidArgument(strings::Substitute("invalid interval: [$0, $1)",
+                                                         interval.first,
+                                                         interval.second));
+    }
+  }
+  std::sort(intervals->begin(), intervals->end());
+
+  // Traverse the intervals to coalesce overlapped intervals. During the process,
+  // uses 'head', 'tail' to track the start and end point of the current disjoint
+  // interval.
+  auto head = intervals->begin();
+  auto tail = head;
+  while (++tail != intervals->end()) {
+    // If interval 'head' and 'tail' overlap with each other, coalesce them and move
+    // to next. Otherwise, the two intervals are disjoint.
+    if (head->second >= tail->first) {
+      if (tail->second > head->second) head->second = std::move(tail->second);
+    } else {
+      // The two intervals are disjoint. If the 'head' previously already coalesced
+      // some intervals, 'head' and 'tail' will not be adjacent. If so, move 'tail'
+      // to the next 'head' to make sure we do not include any of the previously-coalesced
+      // intervals.
+      ++head;
+      if (head != tail) *head = std::move(*tail);
+    }
+  }
+
+  // Truncate the rest useless elements, if any.
+  intervals->erase(++head, tail);
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling-test.cc b/be/src/kudu/util/spinlock_profiling-test.cc
new file mode 100644
index 0000000..d0ef2b4
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling-test.cc
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/util/spinlock_profiling.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+// Can't include gutil/synchronization_profiling.h directly as it'll
+// declare a weak symbol directly in this unit test, which the runtime
+// linker will prefer over equivalent strong symbols for some reason. By
+// declaring the symbol without providing an empty definition, the strong
+// symbols are chosen when provided via shared libraries.
+//
+// Further reading:
+// - http://stackoverflow.com/questions/20658809/dynamic-loading-and-weak-symbol-resolution
+// - http://notmysock.org/blog/php/weak-symbols-arent.html
+namespace gutil {
+extern void SubmitSpinLockProfileData(const void *, int64);
+} // namespace gutil
+
+namespace kudu {
+
+class SpinLockProfilingTest : public KuduTest {};
+
+TEST_F(SpinLockProfilingTest, TestSpinlockProfiling) {
+  scoped_refptr<Trace> t(new Trace);
+  base::SpinLock lock;
+  {
+    ADOPT_TRACE(t.get());
+    gutil::SubmitSpinLockProfileData(&lock, 4000000);
+  }
+  std::string result = t->DumpToString();
+  LOG(INFO) << "trace: " << result;
+  ASSERT_STR_CONTAINS(result, "\"spinlock_wait_cycles\":4000000");
+  // We can't assert more specifically because the CyclesPerSecond
+  // on different machines might be different.
+  ASSERT_STR_CONTAINS(result, "Waited ");
+  ASSERT_STR_CONTAINS(result, "on lock ");
+
+  ASSERT_GT(GetSpinLockContentionMicros(), 0);
+}
+
+TEST_F(SpinLockProfilingTest, TestStackCollection) {
+  StartSynchronizationProfiling();
+  base::SpinLock lock;
+  gutil::SubmitSpinLockProfileData(&lock, 12345);
+  StopSynchronizationProfiling();
+  std::ostringstream str;
+  int64_t dropped = 0;
+  FlushSynchronizationProfile(&str, &dropped);
+  std::string s = str.str();
+  ASSERT_STR_CONTAINS(s, "12345 1 @ ");
+  ASSERT_EQ(0, dropped);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.cc b/be/src/kudu/util/spinlock_profiling.cc
new file mode 100644
index 0000000..e7f93b0
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/spinlock_profiling.h"
+
+#include <sstream>
+#include <string>
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int32(lock_contention_trace_threshold_cycles,
+             2000000, // 2M cycles should be about 1ms
+             "If acquiring a spinlock takes more than this number of "
+             "cycles, and a Trace is currently active, then the current "
+             "stack trace is logged to the trace buffer.");
+TAG_FLAG(lock_contention_trace_threshold_cycles, hidden);
+
+METRIC_DEFINE_gauge_uint64(server, spinlock_contention_time,
+    "Spinlock Contention Time", kudu::MetricUnit::kMicroseconds,
+    "Amount of time consumed by contention on internal spinlocks since the server "
+    "started. If this increases rapidly, it may indicate a performance issue in Kudu "
+    "internals triggered by a particular workload and warrant investigation.",
+    kudu::EXPOSE_AS_COUNTER);
+
+
+using base::SpinLock;
+using base::SpinLockHolder;
+
+namespace kudu {
+
+static const double kMicrosPerSecond = 1000000.0;
+
+static LongAdder* g_contended_cycles = nullptr;
+
+namespace {
+
+// Implements a very simple linear-probing hashtable of stack traces with
+// a fixed number of entries.
+//
+// Threads experiencing contention record their stacks into this hashtable,
+// or increment an already-existing entry. Each entry has its own lock,
+// but we can "skip" an entry under contention, and spread out a single stack
+// into multiple buckets if necessary.
+//
+// A thread collecting a profile collects stack traces out of the hash table
+// and resets the counts to 0 as they are collected.
+class ContentionStacks {
+ public:
+  ContentionStacks()
+    : dropped_samples_(0) {
+  }
+
+  // Add a stack trace to the table.
+  void AddStack(const StackTrace& s, int64_t cycles);
+
+  // Flush stacks from the buffer to 'out'. See the docs for FlushSynchronizationProfile()
+  // in spinlock_profiling.h for details on format.
+  //
+  // On return, guarantees that any stack traces that were present at the beginning of
+  // the call have been flushed. However, new stacks can be added concurrently with this call.
+  void Flush(std::ostringstream* out, int64_t* dropped);
+
+ private:
+
+  // Collect the next sample from the underlying buffer, and set it back to 0 count
+  // (thus marking it as "empty").
+  //
+  // 'iterator' serves as a way to keep track of the current position in the buffer.
+  // Callers should initially set it to 0, and then pass the same pointer to each
+  // call to CollectSample. This serves to loop through the collected samples.
+  bool CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count, int64_t* cycles);
+
+  // Hashtable entry.
+  struct Entry {
+    Entry() : trip_count(0),
+              cycle_count(0) {
+    }
+
+    // Protects all other entries.
+    SpinLock lock;
+
+    // The number of times we've experienced contention with a stack trace equal
+    // to 'trace'.
+    //
+    // If this is 0, then the entry is "unclaimed" and the other fields are not
+    // considered valid.
+    int64_t trip_count;
+
+    // The total number of cycles spent waiting at this stack trace.
+    int64_t cycle_count;
+
+    // A cached hashcode of the trace.
+    uint64_t hash;
+
+    // The actual stack trace.
+    StackTrace trace;
+  };
+
+  enum {
+    kNumEntries = 1024,
+    kNumLinearProbeAttempts = 4
+  };
+  Entry entries_[kNumEntries];
+
+  // The number of samples which were dropped due to contention on this structure or
+  // due to the hashtable being too full.
+  AtomicInt<int64_t> dropped_samples_;
+};
+
+Atomic32 g_profiling_enabled = 0;
+ContentionStacks* g_contention_stacks = nullptr;
+
+void ContentionStacks::AddStack(const StackTrace& s, int64_t cycles) {
+  uint64_t hash = s.HashCode();
+
+  // Linear probe up to 4 attempts before giving up
+  for (int i = 0; i < kNumLinearProbeAttempts; i++) {
+    Entry* e = &entries_[(hash + i) % kNumEntries];
+    if (!e->lock.TryLock()) {
+      // If we fail to lock it, we can safely just use a different slot.
+      // It's OK if a single stack shows up multiple times, because pprof
+      // aggregates them in the end anyway.
+      continue;
+    }
+
+    if (e->trip_count == 0) {
+      // It's an un-claimed slot. Claim it.
+      e->hash = hash;
+      e->trace.CopyFrom(s);
+    } else if (e->hash != hash || !e->trace.Equals(s)) {
+      // It's claimed by a different stack trace.
+      e->lock.Unlock();
+      continue;
+    }
+
+    // Contribute to the stats for this stack.
+    e->cycle_count += cycles;
+    e->trip_count++;
+    e->lock.Unlock();
+    return;
+  }
+
+  // If we failed to find a matching hashtable slot, or we hit lock contention
+  // trying to record our sample, add it to the dropped sample count.
+  dropped_samples_.Increment();
+}
+
+void ContentionStacks::Flush(std::ostringstream* out, int64_t* dropped) {
+  uint64_t iterator = 0;
+  StackTrace t;
+  int64_t cycles;
+  int64_t count;
+  while (g_contention_stacks->CollectSample(&iterator, &t, &count, &cycles)) {
+    *out << cycles << " " << count
+         << " @ " << t.ToHexString(StackTrace::NO_FIX_CALLER_ADDRESSES |
+                                   StackTrace::HEX_0X_PREFIX)
+         << std::endl;
+  }
+
+  *dropped += dropped_samples_.Exchange(0);
+}
+
+bool ContentionStacks::CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count,
+                                     int64_t* cycles) {
+  while (*iterator < kNumEntries) {
+    Entry* e = &entries_[(*iterator)++];
+    SpinLockHolder l(&e->lock);
+    if (e->trip_count == 0) continue;
+
+    *trip_count = e->trip_count;
+    *cycles = e->cycle_count;
+    s->CopyFrom(e->trace);
+
+    e->trip_count = 0;
+    e->cycle_count = 0;
+    return true;
+  }
+
+  // Looped through the whole array and found nothing.
+  return false;
+}
+
+
+void SubmitSpinLockProfileData(const void *contendedlock, int64_t wait_cycles) {
+  TRACE_COUNTER_INCREMENT("spinlock_wait_cycles", wait_cycles);
+  bool profiling_enabled = base::subtle::Acquire_Load(&g_profiling_enabled);
+  bool long_wait_time = wait_cycles > FLAGS_lock_contention_trace_threshold_cycles;
+  // Short circuit this function quickly in the common case.
+  if (PREDICT_TRUE(!profiling_enabled && !long_wait_time)) {
+    return;
+  }
+
+  static __thread bool in_func = false;
+  if (in_func) return; // non-re-entrant
+  in_func = true;
+
+  StackTrace stack;
+  stack.Collect();
+
+  if (profiling_enabled) {
+    DCHECK_NOTNULL(g_contention_stacks)->AddStack(stack, wait_cycles);
+  }
+
+  if (PREDICT_FALSE(long_wait_time)) {
+    Trace* t = Trace::CurrentTrace();
+    if (t) {
+      double seconds = static_cast<double>(wait_cycles) / base::CyclesPerSecond();
+      char backtrace_buffer[1024];
+      stack.StringifyToHex(backtrace_buffer, arraysize(backtrace_buffer));
+      TRACE_TO(t, "Waited $0 on lock $1. stack: $2",
+               HumanReadableElapsedTime::ToShortString(seconds), contendedlock,
+               backtrace_buffer);
+    }
+  }
+
+  LongAdder* la = reinterpret_cast<LongAdder*>(
+      base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&g_contended_cycles)));
+  if (la) {
+    la->IncrementBy(wait_cycles);
+  }
+
+  in_func = false;
+}
+
+void DoInit() {
+  base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contention_stacks),
+                              reinterpret_cast<uintptr_t>(new ContentionStacks()));
+  base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contended_cycles),
+                              reinterpret_cast<uintptr_t>(new LongAdder()));
+}
+
+} // anonymous namespace
+
+void InitSpinLockContentionProfiling() {
+  static GoogleOnceType once = GOOGLE_ONCE_INIT;
+  GoogleOnceInit(&once, DoInit);
+}
+
+
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity) {
+  InitSpinLockContentionProfiling();
+  entity->NeverRetire(
+      METRIC_spinlock_contention_time.InstantiateFunctionGauge(
+          entity, Bind(&GetSpinLockContentionMicros)));
+}
+
+uint64_t GetSpinLockContentionMicros() {
+  int64_t wait_cycles = DCHECK_NOTNULL(g_contended_cycles)->Value();
+  double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+    * kMicrosPerSecond;
+  return implicit_cast<int64_t>(micros);
+}
+
+void StartSynchronizationProfiling() {
+  InitSpinLockContentionProfiling();
+  base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, 1);
+}
+
+void FlushSynchronizationProfile(std::ostringstream* out,
+                                 int64_t* drop_count) {
+  CHECK_NOTNULL(g_contention_stacks)->Flush(out, drop_count);
+}
+
+void StopSynchronizationProfiling() {
+  InitSpinLockContentionProfiling();
+  CHECK_GE(base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, -1), 0);
+}
+
+} // namespace kudu
+
+// The hook expected by gutil is in the gutil namespace. Simply forward into the
+// kudu namespace so we don't need to qualify everything.
+namespace gutil {
+void SubmitSpinLockProfileData(const void *contendedlock, int64_t wait_cycles) {
+  kudu::SubmitSpinLockProfileData(contendedlock, wait_cycles);
+}
+} // namespace gutil

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.h b/be/src/kudu/util/spinlock_profiling.h
new file mode 100644
index 0000000..702eb18
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SPINLOCK_PROFILING_H
+#define KUDU_UTIL_SPINLOCK_PROFILING_H
+
+#include <cstdint>
+#include <iosfwd>
+
+#include "kudu/gutil/ref_counted.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+// Enable instrumentation of spinlock contention.
+//
+// Calling this method currently does nothing, except for ensuring
+// that the spinlock_profiling.cc object file gets linked into your
+// executable. It needs to be somewhere reachable in your code,
+// just so that gcc doesn't omit the underlying module from the binary.
+void InitSpinLockContentionProfiling();
+
+// Return the total number of microseconds spent in spinlock contention
+// since the server started.
+uint64_t GetSpinLockContentionMicros();
+
+// Register metrics in the given server entity which measure the amount of
+// spinlock contention.
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity);
+
+// Enable process-wide synchronization profiling.
+//
+// While profiling is enabled, spinlock contention will be recorded in a buffer.
+// The caller should periodically call FlushSynchronizationProfile() to empty
+// the buffer, or else profiles may be dropped.
+void StartSynchronizationProfiling();
+
+// Flush the current buffer of contention profile samples to the given stream.
+//
+// Each stack trace that has been observed results in at least one line of the
+// following format:
+//   <cycles> <trip count> @ <hex stack trace>
+//
+// Flushing the data also clears the current buffer of trace samples.
+// This may be called while synchronization profiling is enabled or after it has
+// been disabled.
+//
+// *dropped_samples will be incremented by the number of samples which were dropped
+// due to the contention buffer overflowing. If profiling is enabled during this
+// call, then the 'drop_count' may be slightly out-of-date with respect to the
+// returned samples.
+void FlushSynchronizationProfile(std::ostringstream* out, int64_t* drop_count);
+
+// Stop collecting contention profiles.
+void StopSynchronizationProfiling();
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SPINLOCK_PROFILING_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/stack_watchdog-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stack_watchdog-test.cc b/be/src/kudu/util/stack_watchdog-test.cc
new file mode 100644
index 0000000..aefe220
--- /dev/null
+++ b/be/src/kudu/util/stack_watchdog-test.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/kernel_stack_watchdog.h"
+
+#include <ostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_int32(hung_task_check_interval_ms);
+DECLARE_int32(inject_latency_on_kernel_stack_lookup_ms);
+
+namespace kudu {
+
+class StackWatchdogTest : public KuduTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    KernelStackWatchdog::GetInstance()->SaveLogsForTests(true);
+    ANNOTATE_BENIGN_RACE(&FLAGS_hung_task_check_interval_ms, "");
+    ANNOTATE_BENIGN_RACE(&FLAGS_inject_latency_on_kernel_stack_lookup_ms, "");
+    FLAGS_hung_task_check_interval_ms = 10;
+  }
+};
+
+// The KernelStackWatchdog is only enabled on Linux, since we can't get kernel
+// stack traces on other platforms.
+#if defined(__linux__)
+TEST_F(StackWatchdogTest, TestWatchdog) {
+  vector<string> log;
+  {
+    SCOPED_WATCH_STACK(20);
+    for (int i = 0; i < 50; i++) {
+      SleepFor(MonoDelta::FromMilliseconds(100));
+      log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+      // Wait for several samples, since it's possible that we get unlucky
+      // and the watchdog sees us just before or after a sleep.
+      if (log.size() > 5) {
+        break;
+      }
+    }
+  }
+  string s = JoinStrings(log, "\n");
+  ASSERT_STR_CONTAINS(s, "TestWatchdog_Test::TestBody()");
+  ASSERT_STR_CONTAINS(s, "nanosleep");
+}
+#endif
+
+// Test that SCOPED_WATCH_STACK scopes can be nested.
+TEST_F(StackWatchdogTest, TestNestedScopes) {
+  vector<string> log;
+  int line1;
+  int line2;
+  {
+    SCOPED_WATCH_STACK(20); line1 = __LINE__;
+    {
+      SCOPED_WATCH_STACK(20); line2 = __LINE__;
+      for (int i = 0; i < 50; i++) {
+        SleepFor(MonoDelta::FromMilliseconds(100));
+        log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+        if (log.size() > 3) {
+          break;
+        }
+      }
+    }
+  }
+
+  // Verify that both nested scopes were collected.
+  string s = JoinStrings(log, "\n");
+  ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line1));
+  ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line2));
+}
+
+TEST_F(StackWatchdogTest, TestPerformance) {
+  // Reset the check interval to be reasonable. Otherwise the benchmark
+  // wastes a lot of CPU running the watchdog thread too often.
+  FLAGS_hung_task_check_interval_ms = 500;
+  LOG_TIMING(INFO, "1M SCOPED_WATCH_STACK()s") {
+    for (int i = 0; i < 1000000; i++) {
+      SCOPED_WATCH_STACK(100);
+    }
+  }
+}
+
+// Stress test to ensure that we properly handle the case where threads are short-lived
+// and the watchdog may try to grab a stack of a thread that has already exited.
+//
+// This also serves as a benchmark -- we make the stack-grabbing especially slow and
+// ensure that we can still start and join threads quickly.
+TEST_F(StackWatchdogTest, TestShortLivedThreadsStress) {
+  // Run the stack watchdog continuously.
+  FLAGS_hung_task_check_interval_ms = 0;
+
+  // Make the actual stack trace collection slow. In practice we find that
+  // stack trace collection can often take quite some time due to symbolization, etc.
+  FLAGS_inject_latency_on_kernel_stack_lookup_ms = 1000;
+
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  vector<thread> threads(100);
+  int started = 0;
+  while (MonoTime::Now() < deadline) {
+    thread* t = &threads[started % threads.size()];
+    if (t->joinable()) {
+      t->join();
+    }
+    *t = thread([&]() {
+        // Trigger watchdog at 1ms, but then sleep for 2ms, to ensure that
+        // the watchdog has plenty of work to do.
+        SCOPED_WATCH_STACK(1);
+        SleepFor(MonoDelta::FromMilliseconds(2));
+      });
+    started++;
+  }
+  for (auto& t : threads) {
+    if (t.joinable()) t.join();
+  }
+  LOG(INFO) << "started and joined " << started << " threads";
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status-test.cc b/be/src/kudu/util/status-test.cc
new file mode 100644
index 0000000..a0aef3d
--- /dev/null
+++ b/be/src/kudu/util/status-test.cc
@@ -0,0 +1,119 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cerrno>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(StatusTest, TestPosixCode) {
+  Status ok = Status::OK();
+  ASSERT_EQ(0, ok.posix_code());
+  Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+  ASSERT_EQ(ENOTDIR, file_error.posix_code());
+}
+
+TEST(StatusTest, TestToString) {
+  Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+  ASSERT_EQ(string("IO error: file error (error 20)"), file_error.ToString());
+}
+
+TEST(StatusTest, TestClonePrepend) {
+  Status file_error = Status::IOError("file error", "msg2", ENOTDIR);
+  Status appended = file_error.CloneAndPrepend("Heading");
+  ASSERT_EQ(string("IO error: Heading: file error: msg2 (error 20)"), appended.ToString());
+}
+
+TEST(StatusTest, TestCloneAppend) {
+  Status remote_error = Status::RemoteError("Application error");
+  Status appended = remote_error.CloneAndAppend(Status::NotFound("Unknown tablet").ToString());
+  ASSERT_EQ(string("Remote error: Application error: Not found: Unknown tablet"),
+            appended.ToString());
+}
+
+TEST(StatusTest, TestMemoryUsage) {
+  ASSERT_EQ(0, Status::OK().memory_footprint_excluding_this());
+  ASSERT_GT(Status::IOError(
+      "file error", "some other thing", ENOTDIR).memory_footprint_excluding_this(), 0);
+}
+
+TEST(StatusTest, TestMoveConstructor) {
+  // OK->OK move should do nothing.
+  {
+    Status src = Status::OK();
+    Status dst = std::move(src);
+    ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+    ASSERT_OK(dst);
+  }
+
+  // Moving a not-OK status into a new one should make the moved status
+  // "OK".
+  {
+    Status src = Status::NotFound("foo");
+    Status dst = std::move(src);
+    ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ("Not found: foo", dst.ToString());
+  }
+}
+
+TEST(StatusTest, TestMoveAssignment) {
+  // OK->Bad move should clear the source status and also make the
+  // destination status OK.
+  {
+    Status src = Status::OK();
+    Status dst = Status::NotFound("orig dst");
+    dst = std::move(src);
+    ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+    ASSERT_OK(dst);
+  }
+
+  // Bad->Bad move.
+  {
+    Status src = Status::NotFound("orig src");
+    Status dst = Status::NotFound("orig dst");
+    dst = std::move(src);
+    ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ("Not found: orig src", dst.ToString());
+  }
+
+  // Bad->OK move
+  {
+    Status src = Status::NotFound("orig src");
+    Status dst = Status::OK();
+    dst = std::move(src);
+    ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ("Not found: orig src", dst.ToString());
+  }
+}
+
+TEST(StatusTest, TestAndThen) {
+  ASSERT_OK(Status::OK().AndThen(Status::OK)
+                        .AndThen(Status::OK)
+                        .AndThen(Status::OK));
+
+  ASSERT_TRUE(Status::InvalidArgument("").AndThen([] { return Status::IllegalState(""); })
+                                         .IsInvalidArgument());
+  ASSERT_TRUE(Status::InvalidArgument("").AndThen(Status::OK)
+                                         .IsInvalidArgument());
+  ASSERT_TRUE(Status::OK().AndThen([] { return Status::InvalidArgument(""); })
+                          .AndThen(Status::OK)
+                          .IsInvalidArgument());
+
+  ASSERT_EQ("foo: bar",
+            Status::OK().CloneAndPrepend("baz")
+                        .AndThen([] {
+                          return Status::InvalidArgument("bar").CloneAndPrepend("foo");
+                        }).message());
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.cc b/be/src/kudu/util/status.cc
new file mode 100644
index 0000000..1197682
--- /dev/null
+++ b/be/src/kudu/util/status.cc
@@ -0,0 +1,170 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/status.h"
+
+#include <cstdio>
+#include <cstring>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+const char* Status::CopyState(const char* state) {
+  uint32_t size;
+  strings::memcpy_inlined(&size, state, sizeof(size));
+  auto result = new char[size + 7];
+  strings::memcpy_inlined(result, state, size + 7);
+  return result;
+}
+
+Status::Status(Code code, const Slice& msg, const Slice& msg2,
+               int16_t posix_code) {
+  DCHECK(code != kOk);
+  const uint32_t len1 = msg.size();
+  const uint32_t len2 = msg2.size();
+  const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+  auto result = new char[size + 7];
+  memcpy(result, &size, sizeof(size));
+  result[4] = static_cast<char>(code);
+  memcpy(result + 5, &posix_code, sizeof(posix_code));
+  memcpy(result + 7, msg.data(), len1);
+  if (len2) {
+    result[7 + len1] = ':';
+    result[8 + len1] = ' ';
+    memcpy(result + 9 + len1, msg2.data(), len2);
+  }
+  state_ = result;
+}
+
+std::string Status::CodeAsString() const {
+  if (state_ == nullptr) {
+    return "OK";
+  }
+
+  const char* type;
+  switch (code()) {
+    case kOk:
+      type = "OK";
+      break;
+    case kNotFound:
+      type = "Not found";
+      break;
+    case kCorruption:
+      type = "Corruption";
+      break;
+    case kNotSupported:
+      type = "Not implemented";
+      break;
+    case kInvalidArgument:
+      type = "Invalid argument";
+      break;
+    case kIOError:
+      type = "IO error";
+      break;
+    case kAlreadyPresent:
+      type = "Already present";
+      break;
+    case kRuntimeError:
+      type = "Runtime error";
+      break;
+    case kNetworkError:
+      type = "Network error";
+      break;
+    case kIllegalState:
+      type = "Illegal state";
+      break;
+    case kNotAuthorized:
+      type = "Not authorized";
+      break;
+    case kAborted:
+      type = "Aborted";
+      break;
+    case kRemoteError:
+      type = "Remote error";
+      break;
+    case kServiceUnavailable:
+      type = "Service unavailable";
+      break;
+    case kTimedOut:
+      type = "Timed out";
+      break;
+    case kUninitialized:
+      type = "Uninitialized";
+      break;
+    case kConfigurationError:
+      type = "Configuration error";
+      break;
+    case kIncomplete:
+      type = "Incomplete";
+      break;
+    case kEndOfFile:
+      type = "End of file";
+      break;
+  }
+  return std::string(type);
+}
+
+std::string Status::ToString() const {
+  std::string result(CodeAsString());
+  if (state_ == nullptr) {
+    return result;
+  }
+
+  result.append(": ");
+  Slice msg = message();
+  result.append(reinterpret_cast<const char*>(msg.data()), msg.size());
+  int16_t posix = posix_code();
+  if (posix != -1) {
+    char buf[64];
+    snprintf(buf, sizeof(buf), " (error %d)", posix);
+    result.append(buf);
+  }
+  return result;
+}
+
+Slice Status::message() const {
+  if (state_ == nullptr) {
+    return Slice();
+  }
+
+  uint32_t length;
+  memcpy(&length, state_, sizeof(length));
+  return Slice(state_ + 7, length);
+}
+
+int16_t Status::posix_code() const {
+  if (state_ == nullptr) {
+    return 0;
+  }
+  int16_t posix_code;
+  memcpy(&posix_code, state_ + 5, sizeof(posix_code));
+  return posix_code;
+}
+
+Status Status::CloneAndPrepend(const Slice& msg) const {
+  if (ok()) {
+    return *this;
+  }
+  return Status(code(), msg, message(), posix_code());
+}
+
+Status Status::CloneAndAppend(const Slice& msg) const {
+  if (ok()) {
+    return *this;
+  }
+  return Status(code(), message(), msg, posix_code());
+}
+
+size_t Status::memory_footprint_excluding_this() const {
+  return state_ ? kudu_malloc_usable_size(state_) : 0;
+}
+
+size_t Status::memory_footprint_including_this() const {
+  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+}  // namespace kudu