You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:39 UTC

[27/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cow_object.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cow_object.h b/be/src/kudu/util/cow_object.h
new file mode 100644
index 0000000..159a8bb
--- /dev/null
+++ b/be/src/kudu/util/cow_object.h
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <algorithm> // IWYU pragma: keep
+#include <map>
+#include <memory>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/rwc_lock.h"
+
+namespace kudu {
+
+// An object which manages its state via copy-on-write.
+//
+// Access to this object can be done more conveniently using the
+// CowLock template class defined below.
+//
+// The 'State' template parameter must be swappable using std::swap.
+template<class State>
+class CowObject {
+ public:
+  CowObject() {}
+  ~CowObject() {}
+
+  // Lock an object for read.
+  //
+  // While locked, a mutator will be blocked when trying to commit its mutation.
+  void ReadLock() const {
+    lock_.ReadLock();
+  }
+
+  // Return whether the object is locked for read.
+  bool IsReadLocked() const {
+    return lock_.HasReaders();
+  }
+
+  // Unlock an object previously locked for read, unblocking a mutator
+  // actively trying to commit its mutation.
+  void ReadUnlock() const {
+    lock_.ReadUnlock();
+  }
+
+  // Lock the object for write (preventing concurrent mutators).
+  //
+  // We defer making a dirty copy of the state to mutable_dirty() so that the
+  // copy can be avoided if no dirty changes are actually made.
+  void StartMutation() {
+    lock_.WriteLock();
+  }
+
+  // Return whether the object is locked for read and write.
+  bool IsWriteLocked() const {
+    return lock_.HasWriteLock();
+  }
+
+  // Abort the current mutation. This drops the write lock without applying any
+  // changes made to the mutable copy.
+  void AbortMutation() {
+    DCHECK(lock_.HasWriteLock());
+    dirty_state_.reset();
+    lock_.WriteUnlock();
+  }
+
+  // Commit the current mutation. This escalates to the "Commit" lock, which
+  // blocks any concurrent readers or writers, swaps in the new version of the
+  // State, and then drops the commit lock.
+  void CommitMutation() {
+    DCHECK(lock_.HasWriteLock());
+    if (!dirty_state_) {
+      AbortMutation();
+      return;
+    }
+    lock_.UpgradeToCommitLock();
+    std::swap(state_, *dirty_state_);
+    dirty_state_.reset();
+    lock_.CommitUnlock();
+  }
+
+  // Return the current state, not reflecting any in-progress mutations.
+  State& state() {
+    DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+    return state_;
+  }
+
+  const State& state() const {
+    DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+    return state_;
+  }
+
+  // Returns the current dirty state (i.e reflecting in-progress mutations).
+  // Should only be called by a thread who previously called StartMutation().
+  State* mutable_dirty() {
+    DCHECK(lock_.HasWriteLock());
+    if (!dirty_state_) {
+      dirty_state_.reset(new State(state_));
+    }
+    return dirty_state_.get();
+  }
+
+  const State& dirty() const {
+    DCHECK(lock_.HasWriteLock());
+    if (!dirty_state_) {
+      return state_;
+    }
+    return *dirty_state_.get();
+  }
+
+ private:
+  mutable RWCLock lock_;
+
+  State state_;
+  std::unique_ptr<State> dirty_state_;
+
+  DISALLOW_COPY_AND_ASSIGN(CowObject);
+};
+
+// Lock state for the following lock-guard-like classes.
+enum class LockMode {
+  // The lock is held for reading.
+  READ,
+
+  // The lock is held for reading and writing.
+  WRITE,
+
+  // The lock is not held.
+  RELEASED
+};
+
+// Defined so LockMode is compatible with DCHECK and the like.
+std::ostream& operator<<(std::ostream& o, LockMode m);
+
+// A lock-guard-like scoped object to acquire the lock on a CowObject,
+// and obtain a pointer to the correct copy to read/write.
+//
+// Example usage:
+//
+//   CowObject<Foo> my_obj;
+//   {
+//     CowLock<Foo> l(&my_obj, LockMode::READ);
+//     l.data().get_foo();
+//     ...
+//   }
+//   {
+//     CowLock<Foo> l(&my_obj, LockMode::WRITE);
+//     l->mutable_data()->set_foo(...);
+//     ...
+//     l.Commit();
+//   }
+template<class State>
+class CowLock {
+ public:
+
+   // An unlocked CowLock. This is useful for default constructing a lock to be
+   // moved in to.
+   CowLock()
+    : cow_(nullptr),
+      mode_(LockMode::RELEASED) {
+   }
+
+  // Lock in either read or write mode.
+  CowLock(CowObject<State>* cow,
+          LockMode mode)
+    : cow_(cow),
+      mode_(mode) {
+    switch (mode) {
+      case LockMode::READ: cow_->ReadLock(); break;
+      case LockMode::WRITE: cow_->StartMutation(); break;
+      default: LOG(FATAL) << "Cannot lock in mode " << mode;
+    }
+  }
+
+  // Lock in read mode.
+  // A const object may not be locked in write mode.
+  CowLock(const CowObject<State>* info,
+          LockMode mode)
+    : cow_(const_cast<CowObject<State>*>(info)),
+      mode_(mode) {
+    switch (mode) {
+      case LockMode::READ: cow_->ReadLock(); break;
+      case LockMode::WRITE: LOG(FATAL) << "Cannot write-lock a const pointer";
+      default: LOG(FATAL) << "Cannot lock in mode " << mode;
+    }
+  }
+
+  // Disable copying.
+  CowLock(const CowLock&) = delete;
+  CowLock& operator=(const CowLock&) = delete;
+
+  // Allow moving.
+  CowLock(CowLock&& other) noexcept
+    : cow_(other.cow_),
+      mode_(other.mode_) {
+    other.cow_ = nullptr;
+    other.mode_ = LockMode::RELEASED;
+  }
+  CowLock& operator=(CowLock&& other) noexcept {
+    cow_ = other.cow_;
+    mode_ = other.mode_;
+    other.cow_ = nullptr;
+    other.mode_ = LockMode::RELEASED;
+    return *this;
+  }
+
+  // Commit the underlying object.
+  // Requires that the caller hold the lock in write mode.
+  void Commit() {
+    DCHECK_EQ(LockMode::WRITE, mode_);
+    cow_->CommitMutation();
+    mode_ = LockMode::RELEASED;
+  }
+
+  void Unlock() {
+    switch (mode_) {
+      case LockMode::READ: cow_->ReadUnlock(); break;
+      case LockMode::WRITE: cow_->AbortMutation(); break;
+      default: DCHECK_EQ(LockMode::RELEASED, mode_); break;
+    }
+    mode_ = LockMode::RELEASED;
+  }
+
+  // Obtain the underlying data. In WRITE mode, this returns the
+  // same data as mutable_data() (not the safe unchanging copy).
+  const State& data() const {
+    switch (mode_) {
+      case LockMode::READ: return cow_->state();
+      case LockMode::WRITE: return cow_->dirty();
+      default: LOG(FATAL) << "Cannot access data after committing";
+    }
+  }
+
+  // Obtain the mutable data. This may only be called in WRITE mode.
+  State* mutable_data() {
+    switch (mode_) {
+      case LockMode::READ: LOG(FATAL) << "Cannot mutate data with READ lock";
+      case LockMode::WRITE: return cow_->mutable_dirty();
+      default: LOG(FATAL) << "Cannot access data after committing";
+    }
+  }
+
+  bool is_write_locked() const {
+    return mode_ == LockMode::WRITE;
+  }
+
+  // Drop the lock. If the lock is held in WRITE mode, and the
+  // lock has not yet been released, aborts the mutation, restoring
+  // the underlying object to its original data.
+  ~CowLock() {
+    Unlock();
+  }
+
+ private:
+  CowObject<State>* cow_;
+  LockMode mode_;
+};
+
+// Scoped object that locks multiple CowObjects for reading or for writing.
+// When locked for writing and mutations are completed, can also commit those
+// mutations, which releases the lock.
+//
+// CowObjects are stored in an std::map, which provides two important properties:
+// 1. AddObject() can deduplicate CowObjects already inserted.
+// 2. When locking for writing, the deterministic iteration order provided by
+//    std::map prevents deadlocks.
+//
+// The use of std::map forces callers to provide a key for each CowObject. For
+// a key implementation to be usable, an appropriate overload of operator<
+// must be available.
+//
+// Unlike CowLock, does not mediate access to the CowObject data itself;
+// callers should access the data out of band.
+//
+// Sample usage:
+//
+//   struct Foo {
+//     string id_;
+//     string data_;
+//   };
+//
+//   vector<CowObject<Foo>> foos;
+//
+// 1. Locking a group of CowObjects for reading:
+//
+//   CowGroupLock<string, Foo> l(LockMode::RELEASED);
+//   for (const auto& f : foos) {
+//     l.AddObject(f.id_, f);
+//   }
+//   l.Lock(LockMode::READ);
+//   for (const auto& f : foos) {
+//     cout << f.state().data_ << endl;
+//   }
+//   l.Unlock();
+//
+// 2. Tracking already-write-locked CowObjects for group commit:
+//
+//   CowGroupLock<string, Foo> l(LockMode::WRITE);
+//   for (const auto& f : foos) {
+//     l.AddObject(f.id_, f);
+//     f.mutable_dirty().data_ = "modified";
+//   }
+//   l.Commit();
+//
+// 3. Aggregating unlocked CowObjects, locking them safely, and committing them together:
+//
+//   CowGroupLock<string, Foo> l(LockMode::RELEASED);
+//   for (const auto& f : foos) {
+//     l.AddObject(f.id_, f);
+//   }
+//   l.Lock(LockMode::WRITE);
+//   for (const auto& f : foos) {
+//     f.mutable_dirty().data_ = "modified";
+//   }
+//   l.Commit();
+template<class Key, class Value>
+class CowGroupLock {
+ public:
+  explicit CowGroupLock(LockMode mode)
+    : mode_(mode) {
+  }
+
+  ~CowGroupLock() {
+    Unlock();
+  }
+
+  void Unlock() {
+    switch (mode_) {
+      case LockMode::READ:
+        for (const auto& e : cows_) {
+          e.second->ReadUnlock();
+        }
+        break;
+      case LockMode::WRITE:
+        for (const auto& e : cows_) {
+          e.second->AbortMutation();
+        }
+        break;
+      default:
+        DCHECK_EQ(LockMode::RELEASED, mode_);
+        break;
+    }
+
+    cows_.clear();
+    mode_ = LockMode::RELEASED;
+  }
+
+  void Lock(LockMode new_mode) {
+    DCHECK_EQ(LockMode::RELEASED, mode_);
+
+    switch (new_mode) {
+      case LockMode::READ:
+        for (const auto& e : cows_) {
+          e.second->ReadLock();
+        }
+        break;
+      case LockMode::WRITE:
+        for (const auto& e : cows_) {
+          e.second->StartMutation();
+        }
+        break;
+      default:
+        LOG(FATAL) << "Cannot lock in mode " << new_mode;
+    }
+    mode_ = new_mode;
+  }
+
+  void Commit() {
+    DCHECK_EQ(LockMode::WRITE, mode_);
+    for (const auto& e : cows_) {
+      e.second->CommitMutation();
+    }
+    cows_.clear();
+    mode_ = LockMode::RELEASED;
+  }
+
+  // Adds a new CowObject to be tracked by the lock guard. Does nothing if a
+  // CowObject with the same key was already added.
+  //
+  // It is the responsibility of the caller to ensure:
+  // 1. That 'object' remains alive until the lock is released.
+  // 2. That if 'object' was already added, both objects point to the same
+  //    memory address.
+  // 3. That if the CowGroupLock is already locked in a particular mode,
+  //    'object' is also already locked in that mode.
+  void AddObject(Key key, const CowObject<Value>* object) {
+    AssertObjectLocked(object);
+    auto r = cows_.emplace(std::move(key), const_cast<CowObject<Value>*>(object));
+    DCHECK_EQ(r.first->second, object);
+  }
+
+  // Like the above, but for mutable objects.
+  void AddMutableObject(Key key, CowObject<Value>* object) {
+    AssertObjectLocked(object);
+    auto r = cows_.emplace(std::move(key), object);
+    DCHECK_EQ(r.first->second, object);
+  }
+
+ private:
+  void AssertObjectLocked(const CowObject<Value>* object) const {
+#ifndef NDEBUG
+    switch (mode_) {
+      case LockMode::READ:
+        DCHECK(object->IsReadLocked());
+        break;
+      case LockMode::WRITE:
+        DCHECK(object->IsWriteLocked());
+        break;
+      default:
+        DCHECK_EQ(LockMode::RELEASED, mode_);
+        break;
+    }
+#endif
+  }
+
+  std::map<Key, CowObject<Value>*> cows_;
+  LockMode mode_;
+
+  DISALLOW_COPY_AND_ASSIGN(CowGroupLock);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc-test.cc b/be/src/kudu/util/crc-test.cc
new file mode 100644
index 0000000..cf13268
--- /dev/null
+++ b/be/src/kudu/util/crc-test.cc
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace crc {
+
+using strings::Substitute;
+
+class CrcTest : public KuduTest {
+ protected:
+
+  // Returns pointer to data which must be deleted by caller.
+  static void GenerateBenchmarkData(const uint8_t** bufptr, size_t* buflen) {
+    const uint32_t kNumNumbers = 1000000;
+    const uint32_t kBytesPerNumber = sizeof(uint32_t);
+    const uint32_t kLength = kNumNumbers * kBytesPerNumber;
+    auto buf = new uint8_t[kLength];
+    for (uint32_t i = 0; i < kNumNumbers; i++) {
+      memcpy(buf + (i * kBytesPerNumber), &i, kBytesPerNumber);
+    }
+    *bufptr = buf;
+    *buflen = kLength;
+  }
+
+};
+
+// Basic functionality test.
+TEST_F(CrcTest, TestCRC32C) {
+  const std::string test_data("abcdefgh");
+  const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage test program.
+
+  Crc* crc32c = GetCrc32cInstance();
+  uint64_t data_crc = 0;
+  crc32c->Compute(test_data.data(), test_data.length(), &data_crc);
+  char buf[kFastToBufferSize];
+  const char* output = FastHex64ToBuffer(data_crc, buf);
+  LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (full 64 bits)";
+  output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf);
+  LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (truncated 32 bits)";
+  ASSERT_EQ(kExpectedCrc, data_crc);
+
+  // Using helper
+  uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length());
+  ASSERT_EQ(kExpectedCrc, data_crc2);
+
+  // Using multiple chunks
+  size_t half_length = test_data.length() / 2;
+  uint64_t data_crc3 = Crc32c(test_data.data(), half_length);
+  data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3);
+  ASSERT_EQ(kExpectedCrc, data_crc3);
+}
+
+// Simple benchmark of CRC32C throughput.
+// We should expect about 8 bytes per cycle in throughput on a single core.
+TEST_F(CrcTest, BenchmarkCRC32C) {
+  gscoped_ptr<const uint8_t[]> data;
+  const uint8_t* buf;
+  size_t buflen;
+  GenerateBenchmarkData(&buf, &buflen);
+  data.reset(buf);
+  Crc* crc32c = GetCrc32cInstance();
+  int kNumRuns = 1000;
+  if (AllowSlowTests()) {
+    kNumRuns = 40000;
+  }
+  const uint64_t kNumBytes = kNumRuns * buflen;
+  Stopwatch sw;
+  sw.start();
+  for (int i = 0; i < kNumRuns; i++) {
+    uint64_t cksum;
+    crc32c->Compute(buf, buflen, &cksum);
+  }
+  sw.stop();
+  CpuTimes elapsed = sw.elapsed();
+  LOG(INFO) << Substitute("$0 runs of CRC32C on $1 bytes of data (total: $2 bytes)"
+                          " in $3 seconds; $4 bytes per millisecond, $5 bytes per nanosecond!",
+                          kNumRuns, buflen, kNumBytes, elapsed.wall_seconds(),
+                          (kNumBytes / elapsed.wall_millis()),
+                          (kNumBytes / elapsed.wall));
+}
+
+} // namespace crc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.cc b/be/src/kudu/util/crc.cc
new file mode 100644
index 0000000..1534b8d
--- /dev/null
+++ b/be/src/kudu/util/crc.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/crc.h"
+
+#include <crcutil/interface.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+
+namespace kudu {
+namespace crc {
+
+using debug::ScopedLeakCheckDisabler;
+
+static GoogleOnceType crc32c_once = GOOGLE_ONCE_INIT;
+static Crc* crc32c_instance = nullptr;
+
+static void InitCrc32cInstance() {
+  ScopedLeakCheckDisabler disabler; // CRC instance is never freed.
+  // TODO: Is initial = 0 and roll window = 4 appropriate for all cases?
+  crc32c_instance = crcutil_interface::CRC::CreateCrc32c(true, 0, 4, nullptr);
+}
+
+Crc* GetCrc32cInstance() {
+  GoogleOnceInit(&crc32c_once, &InitCrc32cInstance);
+  return crc32c_instance;
+}
+
+uint32_t Crc32c(const void* data, size_t length) {
+  uint64_t crc32 = 0;
+  GetCrc32cInstance()->Compute(data, length, &crc32);
+  return static_cast<uint32_t>(crc32); // Only uses lower 32 bits.
+}
+
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) {
+  uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32);
+  GetCrc32cInstance()->Compute(data, length, &crc_tmp);
+  return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits.
+}
+
+} // namespace crc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.h b/be/src/kudu/util/crc.h
new file mode 100644
index 0000000..a5db4ea
--- /dev/null
+++ b/be/src/kudu/util/crc.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CRC_H_
+#define KUDU_UTIL_CRC_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <crcutil/interface.h>
+
+namespace kudu {
+namespace crc {
+
+typedef crcutil_interface::CRC Crc;
+
+// Returns pointer to singleton instance of CRC32C implementation.
+Crc* GetCrc32cInstance();
+
+// Helper function to simply calculate a CRC32C of the given data.
+uint32_t Crc32c(const void* data, size_t length);
+
+// Given CRC value of previous chunk of data,
+// extends it to new chunk and returns the result.
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32);
+
+} // namespace crc
+} // namespace kudu
+
+#endif // KUDU_UTIL_CRC_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.cc b/be/src/kudu/util/curl_util.cc
new file mode 100644
index 0000000..4eddb64
--- /dev/null
+++ b/be/src/kudu/util/curl_util.cc
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/curl_util.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+
+#include <curl/curl.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/scoped_cleanup.h"
+
+namespace kudu {
+
+namespace {
+
+inline Status TranslateError(CURLcode code) {
+  if (code == CURLE_OK) {
+    return Status::OK();
+  }
+  return Status::NetworkError("curl error", curl_easy_strerror(code));
+}
+
+extern "C" {
+size_t WriteCallback(void* buffer, size_t size, size_t nmemb, void* user_ptr) {
+  size_t real_size = size * nmemb;
+  faststring* buf = reinterpret_cast<faststring*>(user_ptr);
+  CHECK_NOTNULL(buf)->append(reinterpret_cast<const uint8_t*>(buffer), real_size);
+  return real_size;
+}
+} // extern "C"
+
+} // anonymous namespace
+
+EasyCurl::EasyCurl() {
+  // Use our own SSL initialization, and disable curl's.
+  // Both of these calls are idempotent.
+  security::InitializeOpenSSL();
+  CHECK_EQ(0, curl_global_init(CURL_GLOBAL_DEFAULT & ~CURL_GLOBAL_SSL));
+  curl_ = curl_easy_init();
+  CHECK(curl_) << "Could not init curl";
+}
+
+EasyCurl::~EasyCurl() {
+  curl_easy_cleanup(curl_);
+}
+
+Status EasyCurl::FetchURL(const std::string& url, faststring* dst,
+                          const std::vector<std::string>& headers) {
+  return DoRequest(url, nullptr, dst, headers);
+}
+
+Status EasyCurl::PostToURL(const std::string& url,
+                           const std::string& post_data,
+                           faststring* dst) {
+  return DoRequest(url, &post_data, dst);
+}
+
+Status EasyCurl::DoRequest(const std::string& url,
+                           const std::string* post_data,
+                           faststring* dst,
+                           const std::vector<std::string>& headers) {
+  CHECK_NOTNULL(dst)->clear();
+
+  if (!verify_peer_) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(
+        curl_, CURLOPT_SSL_VERIFYHOST, 0)));
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(
+        curl_, CURLOPT_SSL_VERIFYPEER, 0)));
+  }
+
+  // Add headers if specified.
+  struct curl_slist* curl_headers = nullptr;
+  auto clean_up_curl_slist = MakeScopedCleanup([&]() {
+    curl_slist_free_all(curl_headers);
+  });
+
+  for (const auto& header : headers) {
+    curl_headers = CHECK_NOTNULL(curl_slist_append(curl_headers, header.c_str()));
+  }
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, curl_headers)));
+
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_URL, url.c_str())));
+  if (return_headers_) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HEADER, 1)));
+  }
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback)));
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEDATA,
+                                                static_cast<void *>(dst))));
+  if (post_data) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_POSTFIELDS,
+                                                  post_data->c_str())));
+  }
+
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_ANY)));
+  if (timeout_.Initialized()) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_NOSIGNAL, 1)));
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS,
+        timeout_.ToMilliseconds())));
+  }
+  RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_)));
+  long rc; // NOLINT(*) curl wants a long
+  RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &rc)));
+  if (rc != 200) {
+    return Status::RemoteError(strings::Substitute("HTTP $0", rc));
+  }
+
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.h b/be/src/kudu/util/curl_util.h
new file mode 100644
index 0000000..cccd2db
--- /dev/null
+++ b/be/src/kudu/util/curl_util.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CURL_UTIL_H
+#define KUDU_UTIL_CURL_UTIL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+typedef void CURL;
+
+namespace kudu {
+
+class faststring;
+
+// Simple wrapper around curl's "easy" interface, allowing the user to
+// fetch web pages into memory using a blocking API.
+//
+// This is not thread-safe.
+class EasyCurl {
+ public:
+  EasyCurl();
+  ~EasyCurl();
+
+  // Fetch the given URL into the provided buffer.
+  // Any existing data in the buffer is replaced.
+  // The optional param 'headers' holds additional headers.
+  // e.g. {"Accept-Encoding: gzip"}
+  Status FetchURL(const std::string& url,
+                  faststring* dst,
+                  const std::vector<std::string>& headers = {});
+
+  // Issue an HTTP POST to the given URL with the given data.
+  // Returns results in 'dst' as above.
+  Status PostToURL(const std::string& url,
+                   const std::string& post_data,
+                   faststring* dst);
+
+  // Set whether to verify the server's SSL certificate in the case of an HTTPS
+  // connection.
+  void set_verify_peer(bool verify) {
+    verify_peer_ = verify;
+  }
+
+  void set_return_headers(bool v) {
+    return_headers_ = v;
+  }
+
+  void set_timeout(MonoDelta t) {
+    timeout_ = t;
+  }
+
+ private:
+  // Do a request. If 'post_data' is non-NULL, does a POST.
+  // Otherwise, does a GET.
+  Status DoRequest(const std::string& url,
+                   const std::string* post_data,
+                   faststring* dst,
+                   const std::vector<std::string>& headers = {});
+  CURL* curl_;
+
+  // Whether to verify the server certificate.
+  bool verify_peer_ = true;
+
+  // Whether to return the HTTP headers with the response.
+  bool return_headers_ = false;
+
+  MonoDelta timeout_;
+
+  DISALLOW_COPY_AND_ASSIGN(EasyCurl);
+};
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_CURL_UTIL_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util-test.cc b/be/src/kudu/util/debug-util-test.cc
new file mode 100644
index 0000000..25e4ae0
--- /dev/null
+++ b/be/src/kudu/util/debug-util-test.cc
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <dlfcn.h>
+#ifdef __linux__
+#include <link.h>
+#endif
+#include <unistd.h>
+
+#include <algorithm>
+#include <csignal>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using std::vector;
+
+DECLARE_int32(test_timeout_after);
+DECLARE_int32(stress_cpu_threads);
+
+namespace kudu {
+
+class DebugUtilTest : public KuduTest {
+};
+
+TEST_F(DebugUtilTest, TestStackTrace) {
+  StackTrace t;
+  t.Collect(0);
+  string trace = t.Symbolize();
+  ASSERT_STR_CONTAINS(trace, "kudu::DebugUtilTest_TestStackTrace_Test::TestBody");
+}
+
+// DumpThreadStack is only supported on Linux, since the implementation relies
+// on the tgkill syscall which is not portable.
+#if defined(__linux__)
+
+namespace {
+void SleeperThread(CountDownLatch* l) {
+  // We use an infinite loop around WaitFor() instead of a normal Wait()
+  // so that this test passes in TSAN. Without this, we run into this TSAN
+  // bug which prevents the sleeping thread from handling signals:
+  // https://code.google.com/p/thread-sanitizer/issues/detail?id=91
+  while (!l->WaitFor(MonoDelta::FromMilliseconds(10))) {
+  }
+}
+
+void fake_signal_handler(int signum) {}
+
+bool IsSignalHandlerRegistered(int signum) {
+  struct sigaction cur_action;
+  CHECK_EQ(0, sigaction(signum, nullptr, &cur_action));
+  return cur_action.sa_handler != SIG_DFL;
+}
+} // anonymous namespace
+
+TEST_F(DebugUtilTest, TestStackTraceInvalidTid) {
+  string s = DumpThreadStack(1);
+  ASSERT_STR_CONTAINS(s, "unable to deliver signal");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceSelf) {
+  string s = DumpThreadStack(Thread::CurrentThreadId());
+  ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceSelf_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceMainThread) {
+  string s = DumpThreadStack(getpid());
+  ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceMainThread_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestSignalStackTrace) {
+  CountDownLatch l(1);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+  auto cleanup_thr = MakeScopedCleanup([&]() {
+      // Allow the thread to finish.
+      l.CountDown();
+      t->Join();
+    });
+
+  // We have to loop a little bit because it takes a little while for the thread
+  // to start up and actually call our function.
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+    });
+
+  // Test that we can change the signal and that the stack traces still work,
+  // on the new signal.
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+  ASSERT_OK(SetStackTraceSignal(SIGHUP));
+
+  // Should now be registered.
+  ASSERT_TRUE(IsSignalHandlerRegistered(SIGHUP));
+
+  // SIGUSR2 should be relinquished.
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGUSR2));
+
+  // Stack traces should work using the new handler.
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+  // Switch back to SIGUSR2 and ensure it changes back.
+  ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+  ASSERT_TRUE(IsSignalHandlerRegistered(SIGUSR2));
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+
+  // Stack traces should work using the new handler.
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+  // Register our own signal handler on SIGHUP, and ensure that
+  // we get a bad Status if we try to use it.
+  signal(SIGHUP, &fake_signal_handler);
+  ASSERT_STR_CONTAINS(SetStackTraceSignal(SIGHUP).ToString(),
+                      "unable to install signal handler");
+  signal(SIGHUP, SIG_DFL);
+
+  // Stack traces should be disabled
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "unable to take thread stack");
+
+  // Re-enable so that other tests pass.
+  ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+}
+
+// Test which dumps all known threads within this process.
+// We don't validate the results in any way -- but this verifies that we can
+// dump library threads such as the libc timer_thread and properly time out.
+TEST_F(DebugUtilTest, TestSnapshot) {
+  // HACK: prior tests in this suite start threads. Even though they Join on the
+  // threads before the test case finishes, there is actually a very short
+  // period of time after Join() returns but before the actual thread has exited
+  // and removed itself from /proc/self/task/. That means that 'ListThreads' below
+  // can sometimes show these threads from prior test cases, and then the assertions
+  // in this test case would fail.
+  //
+  // So, we have to wait here for the number of running threads to level off to the
+  // expected value.
+  // Ensure Kernel Stack Watchdog is running.
+  KernelStackWatchdog::GetInstance();
+  int initial_thread_count =
+      1 // main thread
+      + 1 // KernelStackWatchdog
+      + (FLAGS_test_timeout_after > 0 ? 1 : 0) // test timeout thread if running
+      + FLAGS_stress_cpu_threads;
+#ifdef THREAD_SANITIZER
+  initial_thread_count++; // tsan signal thread
+#endif
+  // The test and runtime environment runs various utility threads (for example,
+  // the kernel stack watchdog, the TSAN runtime thread, the test timeout thread, etc).
+  // Count them before we start any additional threads for this test.
+  ASSERT_EVENTUALLY([&]{
+      vector<pid_t> threads;
+      ASSERT_OK(ListThreads(&threads));
+      ASSERT_EQ(initial_thread_count, threads.size()) << threads;
+    });
+
+  // Start a bunch of sleeping threads.
+  const int kNumThreads = 30;
+  CountDownLatch l(1);
+  vector<scoped_refptr<Thread>> threads(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &threads[i]));
+  }
+
+  SCOPED_CLEANUP({
+      // Allow the thread to finish.
+      l.CountDown();
+      for (auto& t : threads) {
+        t->Join();
+      }
+    });
+
+  StackTraceSnapshot snap;
+  ASSERT_OK(snap.SnapshotAllStacks());
+  int count = 0;
+  int groups = 0;
+  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
+      groups++;
+      for (const auto& info : group) {
+        count++;
+        LOG(INFO) << info.tid << " " << info.thread_name
+                  << " (" << info.status.ToString() << ")";
+      }
+      LOG(INFO) << group[0].stack.ToHexString();
+    });
+  int tsan_threads = 0;
+#ifdef THREAD_SANITIZER
+  // TSAN starts an extra thread of its own.
+  tsan_threads++;
+#endif
+  ASSERT_EQ(kNumThreads + initial_thread_count, count);
+  // The threads might not have exactly identical stacks, but
+  // we should have far fewer groups than the total number
+  // of threads.
+  ASSERT_LE(groups, kNumThreads / 2);
+  ASSERT_EQ(tsan_threads, snap.num_failed());
+}
+
+TEST_F(DebugUtilTest, Benchmark) {
+  CountDownLatch l(1);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+  SCOPED_CLEANUP({
+      // Allow the thread to finish.
+      l.CountDown();
+      t->Join();
+    });
+
+  for (bool symbolize : {false, true}) {
+    MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
+    int count = 0;
+    volatile int prevent_optimize = 0;
+    while (MonoTime::Now() < end_time) {
+      StackTrace trace;
+      GetThreadStack(t->tid(), &trace);
+      if (symbolize) {
+        prevent_optimize += trace.Symbolize().size();
+      }
+      count++;
+    }
+    LOG(INFO) << "Throughput: " << count << " dumps/second (symbolize=" << symbolize << ")";
+  }
+}
+
+int TakeStackTrace(struct dl_phdr_info* /*info*/, size_t /*size*/, void* data) {
+  StackTrace* s = reinterpret_cast<StackTrace*>(data);
+  s->Collect(0);
+  return 0;
+}
+
+// Test that if we try to collect a stack trace while inside a libdl function
+// call that we properly return the bogus stack indicating the issue.
+//
+// This doesn't work in ThreadSanitizer since we don't intercept dl_iterate_phdr
+// in those builds (see note in unwind_safeness.cc).
+#ifndef THREAD_SANITIZER
+TEST_F(DebugUtilTest, TestUnwindWhileUnsafe) {
+  StackTrace s;
+  dl_iterate_phdr(&TakeStackTrace, &s);
+  ASSERT_STR_CONTAINS(s.Symbolize(), "CouldNotCollectStackTraceBecauseInsideLibDl");
+}
+#endif
+
+int DoNothingDlCallback(struct dl_phdr_info* /*info*/, size_t /*size*/, void* /*data*/) {
+  return 0;
+}
+
+// Parameterized test which performs various operations which might be dangerous to
+// collect a stack trace while the main thread tries to take stack traces.  These
+// operations are all possibly executed on normal application threads, so we need to
+// ensure that if we happen to gather the stack from a thread in the middle of the
+// function that we don't crash or deadlock.
+//
+// Example self-deadlock if we didn't have the appropriate workarounds in place:
+//  #0  __lll_lock_wait ()
+//  #1  0x00007ffff6f16e42 in __GI___pthread_mutex_lock
+//  #2  0x00007ffff6c8601f in __GI___dl_iterate_phdr
+//  #3  0x0000000000695b02 in dl_iterate_phdr
+//  #4  0x000000000056d013 in _ULx86_64_dwarf_find_proc_info
+//  #5  0x000000000056d1d5 in fetch_proc_info (c=c@ent
+//  #6  0x000000000056e2e7 in _ULx86_64_dwarf_find_save_
+//  #7  0x000000000056c1b9 in _ULx86_64_dwarf_step (c=c@
+//  #8  0x000000000056be21 in _ULx86_64_step
+//  #9  0x0000000000566b1d in google::GetStackTrace
+//  #10 0x00000000004dc4d1 in kudu::StackTrace::Collect
+//  #11 kudu::(anonymous namespace)::HandleStackTraceSignal
+//  #12 <signal handler called>
+//  #13 0x00007ffff6f16e31 in __GI___pthread_mutex_lock
+//  #14 0x00007ffff6c8601f in __GI___dl_iterate_phdr
+//  #15 0x0000000000695b02 in dl_iterate_phdr
+enum DangerousOp {
+  DLOPEN_AND_CLOSE,
+  DL_ITERATE_PHDR,
+  GET_STACK_TRACE,
+  MALLOC_AND_FREE
+};
+class RaceTest : public DebugUtilTest, public ::testing::WithParamInterface<DangerousOp> {};
+INSTANTIATE_TEST_CASE_P(DifferentRaces, RaceTest,
+                        ::testing::Values(DLOPEN_AND_CLOSE,
+                                          DL_ITERATE_PHDR,
+                                          GET_STACK_TRACE,
+                                          MALLOC_AND_FREE));
+
+void DangerousOperationThread(DangerousOp op, CountDownLatch* l) {
+  while (l->count()) {
+    switch (op) {
+      case DLOPEN_AND_CLOSE: {
+        // Check races against dlopen/dlclose.
+        void* v = dlopen("libc.so.6", RTLD_LAZY);
+        CHECK(v);
+        dlclose(v);
+        break;
+      }
+
+      case DL_ITERATE_PHDR: {
+        // Check for races against dl_iterate_phdr.
+        dl_iterate_phdr(&DoNothingDlCallback, nullptr);
+        break;
+      }
+
+      case GET_STACK_TRACE: {
+        // Check for reentrancy issues
+        GetStackTrace();
+        break;
+      }
+
+      case MALLOC_AND_FREE: {
+        // Check large allocations in tcmalloc.
+        volatile char* x = new char[1024 * 1024 * 2];
+        delete[] x;
+        break;
+      }
+      default:
+        LOG(FATAL) << "unknown op";
+    }
+  }
+}
+
+// Starts a thread performing dangerous operations and then gathers
+// its stack trace in a loop trying to trigger races.
+TEST_P(RaceTest, TestStackTraceRaces) {
+  DangerousOp op = GetParam();
+  CountDownLatch l(1);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &DangerousOperationThread, op, &l, &t));
+  SCOPED_CLEANUP({
+      // Allow the thread to finish.
+      l.CountDown();
+      // Crash if we can't join the thread after a reasonable amount of time.
+      // That probably indicates a deadlock.
+      CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
+    });
+  MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
+  while (MonoTime::Now() < end_time) {
+    StackTrace trace;
+    GetThreadStack(t->tid(), &trace);
+  }
+}
+
+void BlockSignalsThread() {
+  sigset_t set;
+  sigemptyset(&set);
+  sigaddset(&set, SIGUSR2);
+  for (int i = 0; i < 3; i++) {
+    CHECK_ERR(pthread_sigmask((i % 2) ? SIG_UNBLOCK : SIG_BLOCK, &set, nullptr));
+    SleepFor(MonoDelta::FromSeconds(1));
+  }
+}
+
+TEST_F(DebugUtilTest, TestThreadBlockingSignals) {
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &BlockSignalsThread, &t));
+  SCOPED_CLEANUP({ t->Join(); });
+  string ret;
+  while (ret.find("unable to deliver signal") == string::npos) {
+    ret = DumpThreadStack(t->tid());
+    LOG(INFO) << ret;
+  }
+}
+
+// Test stack traces which time out despite the destination thread not blocking
+// signals.
+TEST_F(DebugUtilTest, TestTimeouts) {
+  const int kRunTimeSecs = AllowSlowTests() ? 5 : 1;
+
+  CountDownLatch l(1);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+  auto cleanup_thr = MakeScopedCleanup([&]() {
+      // Allow the thread to finish.
+      l.CountDown();
+      t->Join();
+    });
+
+  // First, time a few stack traces to determine how long a non-timed-out stack
+  // trace takes.
+  vector<MicrosecondsInt64> durations;
+  for (int i = 0; i < 20; i++) {
+    StackTrace stack;
+    auto st = GetMonoTimeMicros();
+    ASSERT_OK(GetThreadStack(t->tid(), &stack));
+    auto dur = GetMonoTimeMicros() - st;
+    durations.push_back(dur);
+  }
+
+  // Compute the median to throw out outliers.
+  std::sort(durations.begin(), durations.end());
+  auto median_duration = durations[durations.size() / 2];
+  LOG(INFO) << "Median duration: " << median_duration << "us";
+
+  // Now take a bunch of stack traces with timeouts clustered around
+  // the expected time. When we time out, we adjust the timeout to be
+  // higher so the next attempt is less likely to time out. Conversely,
+  // when we succeed, we adjust the timeout to be shorter so the next
+  // attempt is more likely to time out. This has the effect of triggering
+  // all the interesting cases: (a) success, (b) timeout, (c) timeout
+  // exactly as the signal finishes.
+  int num_timeouts = 0;
+  int num_successes = 0;
+  auto end_time = MonoTime::Now() + MonoDelta::FromSeconds(kRunTimeSecs);
+  int64_t timeout_us = median_duration;
+  while (MonoTime::Now() < end_time) {
+    StackTraceCollector stc;
+    // Allocate Stack on the heap so that if we get a use-after-free it
+    // will be caught more easily by ASAN.
+    std::unique_ptr<StackTrace> stack(new StackTrace());
+    ASSERT_OK(stc.TriggerAsync(t->tid(), stack.get()));
+    Status s = stc.AwaitCollection(MonoTime::Now() + MonoDelta::FromMicroseconds(timeout_us));
+    if (s.ok()) {
+      num_successes++;
+      timeout_us--;
+    } else if (s.IsTimedOut()) {
+      num_timeouts++;
+      timeout_us++;
+    } else {
+      FAIL() << "Unexpected status: " << s.ToString();
+    }
+  }
+  LOG(INFO) << "Timed out " << num_timeouts << " times";
+  LOG(INFO) << "Succeeded " << num_successes << " times";
+}
+
+#endif
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util.cc b/be/src/kudu/util/debug-util.cc
new file mode 100644
index 0000000..03556d6
--- /dev/null
+++ b/be/src/kudu/util/debug-util.cc
@@ -0,0 +1,800 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/debug-util.h"
+
+#include <dirent.h>
+#ifndef __linux__
+#include <sched.h>
+#endif
+#ifdef __linux__
+#include <syscall.h>
+#else
+#include <sys/syscall.h>
+#endif
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <ctime>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <glog/raw_logging.h>
+#ifdef __linux__
+#define UNW_LOCAL_ONLY
+#include <libunwind.h>
+#endif
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/linux_syscall_support.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/debug/leak_annotations.h"
+#ifndef __linux__
+#include "kudu/util/debug/sanitizer_scopes.h"
+#endif
+#include "kudu/util/debug/unwind_safeness.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+#if defined(__APPLE__)
+typedef sig_t sighandler_t;
+#endif
+
+// In coverage builds, this symbol will be defined and allows us to flush coverage info
+// to disk before exiting.
+#if defined(__APPLE__)
+  // OS X does not support weak linking at compile time properly.
+  #if defined(COVERAGE_BUILD)
+extern "C" void __gcov_flush() __attribute__((weak_import));
+  #else
+extern "C" void (*__gcov_flush)() = nullptr;
+  #endif
+#else
+extern "C" {
+__attribute__((weak))
+void __gcov_flush();
+}
+#endif
+
+// Evil hack to grab a few useful functions from glog
+namespace google {
+
+extern int GetStackTrace(void** result, int max_depth, int skip_count);
+
+// Symbolizes a program counter.  On success, returns true and write the
+// symbol name to "out".  The symbol name is demangled if possible
+// (supports symbols generated by GCC 3.x or newer).  Otherwise,
+// returns false.
+bool Symbolize(void *pc, char *out, int out_size);
+
+namespace glog_internal_namespace_ {
+extern void DumpStackTraceToString(string *s);
+} // namespace glog_internal_namespace_
+} // namespace google
+
+// The %p field width for printf() functions is two characters per byte.
+// For some environments, add two extra bytes for the leading "0x".
+static const int kPrintfPointerFieldWidth = 2 + 2 * sizeof(void*);
+
+// The signal that we'll use to communicate with our other threads.
+// This can't be in used by other libraries in the process.
+static int g_stack_trace_signum = SIGUSR2;
+
+// Protects g_stack_trace_signum and the installation of the signal
+// handler.
+static base::SpinLock g_signal_handler_lock(base::LINKER_INITIALIZED);
+
+namespace kudu {
+
+bool IsCoverageBuild() {
+  return __gcov_flush != nullptr;
+}
+
+void TryFlushCoverage() {
+  static base::SpinLock lock(base::LINKER_INITIALIZED);
+
+  // Flushing coverage is not reentrant or thread-safe.
+  if (!__gcov_flush || !lock.TryLock()) {
+    return;
+  }
+
+  __gcov_flush();
+
+  lock.Unlock();
+}
+
+
+namespace stack_trace_internal {
+
+// Simple notification mechanism based on futex.
+//
+// We use this instead of a mutex and condvar because we need
+// to signal it from a signal handler, and mutexes are not async-safe.
+//
+// pthread semaphores are async-signal-safe but their timedwait function
+// only supports wall clock waiting, which is a bit dangerous since we
+// need strict timeouts here.
+class CompletionFlag {
+ public:
+
+  // Mark the flag as complete, waking all waiters.
+  void Signal() {
+    complete_ = true;
+#ifndef __APPLE__
+    sys_futex(reinterpret_cast<int32_t*>(&complete_),
+              FUTEX_WAKE | FUTEX_PRIVATE_FLAG,
+              INT_MAX, // wake all
+              0 /* ignored */);
+#endif
+  }
+
+  // Wait for the flag to be marked as complete, up until the given deadline.
+  // Returns true if the flag was marked complete before the deadline.
+  bool WaitUntil(MonoTime deadline) {
+    if (complete_) return true;
+
+    MonoTime now = MonoTime::Now();
+    while (now < deadline) {
+#ifndef __APPLE__
+      MonoDelta rem = deadline - now;
+      struct timespec ts;
+      rem.ToTimeSpec(&ts);
+      sys_futex(reinterpret_cast<int32_t*>(&complete_),
+                FUTEX_WAIT | FUTEX_PRIVATE_FLAG,
+                0, // wait if value is still 0
+                reinterpret_cast<struct kernel_timespec *>(&ts));
+#else
+      sched_yield();
+#endif
+      if (complete_) {
+        return true;
+      }
+      now = MonoTime::Now();
+    }
+    return complete_;
+  }
+
+  void Reset() {
+    complete_ = false;
+  }
+
+  bool complete() const {
+    return complete_;
+  }
+ private:
+  std::atomic<int32_t> complete_ { 0 };
+};
+
+
+// A pointer to this structure is passed as signal data to a thread when
+// a stack trace is being remotely requested.
+//
+// The state machine is as follows (each state is a tuple of 'queued_to_tid'
+// and 'result_ready' status):
+//
+//   [ kNotInUse, false ]
+//           |
+//           | (A)
+//           v                (D)
+//   [ <target tid>, false ]  --->  [ kNotInUse, false ] (leaked)
+//           |
+//           | (B)
+//           v                (E)
+//   [ kDumpStarted, false ]  --->  [ kNotInUse, false ] (tracer waits for 'result_ready')
+//           |                                 |
+//           | (C)                             | (G)
+//           v                (F)              v
+//   [ kDumpStarted, true ]   --->  [ kNotInUse, true ] (already complete)
+//
+// Transitions:
+//    (A): tracer thread sets target_tid before sending a singla
+//    (B): target thread CAS target_tid to kDumpStarted (and aborts on CAS failure)
+//    (C,G): target thread finishes collecting stacks and signals 'result_ready'
+//    (D,E,F): tracer thread exchanges 'kNotInUse' back into queued_to_tid in
+//             RevokeSigData().
+struct SignalData {
+  // The actual destination for the stack trace collected from the target thread.
+  StackTrace* stack;
+
+  static const int kNotInUse = 0;
+  static const int kDumpStarted = -1;
+  // Either one of the above constants, or if the dumper thread
+  // is waiting on a response, the tid that it is waiting on.
+  std::atomic<int64_t> queued_to_tid { kNotInUse };
+
+  // Signaled when the target thread has successfully collected its stack.
+  // The dumper thread waits for this to become true.
+  CompletionFlag result_ready;
+};
+
+} // namespace stack_trace_internal
+
+using stack_trace_internal::SignalData;
+
+namespace {
+
+// Signal handler for our stack trace signal.
+// We expect that the signal is only sent from DumpThreadStack() -- not by a user.
+void HandleStackTraceSignal(int /*signum*/, siginfo_t* info, void* /*ucontext*/) {
+  // Signal handlers may be invoked at any point, so it's important to preserve
+  // errno.
+  int save_errno = errno;
+  SCOPED_CLEANUP({
+      errno = save_errno;
+    });
+  auto* sig_data = reinterpret_cast<SignalData*>(info->si_value.sival_ptr);
+  DCHECK(sig_data);
+  if (!sig_data) {
+    // Maybe the signal was sent by a user instead of by ourself, ignore it.
+    return;
+  }
+  ANNOTATE_HAPPENS_AFTER(sig_data);
+  int64_t my_tid = Thread::CurrentThreadId();
+
+  // If we were slow to process the signal, the sender may have given up and
+  // no longer wants our stack trace. In that case, the 'sig' object will
+  // no longer contain our thread.
+  if (!sig_data->queued_to_tid.compare_exchange_strong(my_tid, SignalData::kDumpStarted)) {
+    return;
+  }
+  // Marking it as kDumpStarted ensures that the caller thread must now wait
+  // for our response, since we are writing directly into their StackTrace object.
+  sig_data->stack->Collect(/*skip_frames=*/1);
+  sig_data->result_ready.Signal();
+}
+
+bool InitSignalHandlerUnlocked(int signum) {
+  enum InitState {
+    UNINITIALIZED,
+    INIT_ERROR,
+    INITIALIZED
+  };
+  static InitState state = UNINITIALIZED;
+
+  // If we've already registered a handler, but we're being asked to
+  // change our signal, unregister the old one.
+  if (signum != g_stack_trace_signum && state == INITIALIZED) {
+    struct sigaction old_act;
+    PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0);
+    if (old_act.sa_sigaction == &HandleStackTraceSignal) {
+      signal(g_stack_trace_signum, SIG_DFL);
+    }
+  }
+
+  // If we'd previously had an error, but the signal number
+  // is changing, we should mark ourselves uninitialized.
+  if (signum != g_stack_trace_signum) {
+    g_stack_trace_signum = signum;
+    state = UNINITIALIZED;
+  }
+
+  if (state == UNINITIALIZED) {
+    struct sigaction old_act;
+    PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0);
+    if (old_act.sa_handler != SIG_DFL &&
+        old_act.sa_handler != SIG_IGN) {
+      state = INIT_ERROR;
+      LOG(WARNING) << "signal handler for stack trace signal "
+                   << g_stack_trace_signum
+                   << " is already in use: "
+                   << "Kudu will not produce thread stack traces.";
+    } else {
+      // No one appears to be using the signal. This is racy, but there is no
+      // atomic swap capability.
+      struct sigaction act;
+      memset(&act, 0, sizeof(act));
+      act.sa_sigaction = &HandleStackTraceSignal;
+      act.sa_flags = SA_SIGINFO | SA_RESTART;
+      struct sigaction old_act;
+      CHECK_ERR(sigaction(g_stack_trace_signum, &act, &old_act));
+      sighandler_t old_handler = old_act.sa_handler;
+      if (old_handler != SIG_IGN &&
+          old_handler != SIG_DFL) {
+        LOG(FATAL) << "raced against another thread installing a signal handler";
+      }
+      state = INITIALIZED;
+    }
+  }
+  return state == INITIALIZED;
+}
+
+#ifdef __linux__
+GoogleOnceType g_prime_libunwind_once;
+
+void PrimeLibunwind() {
+  // The first call into libunwind does some unsafe double-checked locking
+  // for initialization. So, we make sure that the first call is not concurrent
+  // with any other call.
+  unw_cursor_t cursor;
+  unw_context_t uc;
+  unw_getcontext(&uc);
+  RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed");
+}
+#endif
+} // anonymous namespace
+
+Status SetStackTraceSignal(int signum) {
+  base::SpinLockHolder h(&g_signal_handler_lock);
+  if (!InitSignalHandlerUnlocked(signum)) {
+    return Status::InvalidArgument("unable to install signal handler");
+  }
+  return Status::OK();
+}
+
+StackTraceCollector::StackTraceCollector(StackTraceCollector&& other) noexcept
+    : tid_(other.tid_),
+      sig_data_(other.sig_data_) {
+  other.tid_ = 0;
+  other.sig_data_ = nullptr;
+}
+
+StackTraceCollector::~StackTraceCollector() {
+  if (sig_data_) {
+    RevokeSigData();
+  }
+}
+
+#ifdef __linux__
+bool StackTraceCollector::RevokeSigData() {
+  // First, exchange the atomic variable back to 'not in use'. This ensures
+  // that, if the signalled thread hasn't started filling in the trace yet,
+  // it will see the 'kNotInUse' value and abort.
+  int64_t old_val = sig_data_->queued_to_tid.exchange(SignalData::kNotInUse);
+
+  // We now have two cases to consider.
+
+  // 1) Timed out, but signal still pending and signal handler not yet invoked.
+  //
+  //    In this case, the signal handler hasn't started collecting a stack trace, so when
+  //    we exchange 'queued_to_tid', we see that it is still "queued". In case the signal
+  //    later gets delivered, we can't free the 'sig_data_' struct itself. We intentionally
+  //    leak it. Note, however, that if the signal handler later runs, it will see that we
+  //    exchanged out its tid from 'queued_to_tid' and therefore won't attempt to write
+  //    into the 'stack' structure.
+  if (old_val == tid_) {
+    // TODO(todd) instead of leaking, we can insert these lost structs into a global
+    // free-list, and then reuse them the next time we want to send a signal. The re-use
+    // is safe since access is limited to a specific tid.
+    DLOG(WARNING) << "Leaking SignalData structure " << sig_data_ << " after lost signal "
+                  << "to thread " << tid_;
+    ANNOTATE_LEAKING_OBJECT_PTR(sig_data_);
+    sig_data_ = nullptr;
+    return false;
+  }
+
+  // 2) The signal was delivered. Either the thread is currently collecting its stack
+  //    trace (in which case we have to wait for it to finish), or it has already completed
+  //    (in which case waiting is a no-op).
+  CHECK_EQ(old_val, SignalData::kDumpStarted);
+  CHECK(sig_data_->result_ready.WaitUntil(MonoTime::Max()));
+  delete sig_data_;
+  sig_data_ = nullptr;
+  return true;
+}
+
+
+Status StackTraceCollector::TriggerAsync(int64_t tid, StackTrace* stack) {
+  CHECK(!sig_data_ && tid_ == 0) << "TriggerAsync() must not be called more than once per instance";
+
+  // Ensure that our signal handler is installed.
+  {
+    base::SpinLockHolder h(&g_signal_handler_lock);
+    if (!InitSignalHandlerUnlocked(g_stack_trace_signum)) {
+      return Status::NotSupported("unable to take thread stack: signal handler unavailable");
+    }
+  }
+  // Ensure that libunwind is primed for use before we send any signals. Otherwise
+  // we can hit a deadlock with the following stack:
+  //   GoogleOnceInit()   [waits on the 'once' to finish, but will never finish]
+  //   StackTrace::Collect()
+  //   <signal handler>
+  //   PrimeLibUnwind
+  //   GoogleOnceInit()   [not yet initted, so starts initializing]
+  //   StackTrace::Collect()
+  GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
+
+  std::unique_ptr<SignalData> data(new SignalData());
+  // Set the target TID in our communication structure, so if we end up with any
+  // delayed signal reaching some other thread, it will know to ignore it.
+  data->queued_to_tid = tid;
+  data->stack = CHECK_NOTNULL(stack);
+
+  // We use the raw syscall here instead of kill() to ensure that we don't accidentally
+  // send a signal to some other process in the case that the thread has exited and
+  // the TID been recycled.
+  siginfo_t info;
+  memset(&info, 0, sizeof(info));
+  info.si_signo = g_stack_trace_signum;
+  info.si_code = SI_QUEUE;
+  info.si_pid = getpid();
+  info.si_uid = getuid();
+  info.si_value.sival_ptr = data.get();
+  // Since we're using a signal to pass information between the two threads,
+  // we need to help TSAN out and explicitly tell it about the happens-before
+  // relationship here.
+  ANNOTATE_HAPPENS_BEFORE(data.get());
+  if (syscall(SYS_rt_tgsigqueueinfo, getpid(), tid, g_stack_trace_signum, &info) != 0) {
+    return Status::NotFound("unable to deliver signal: process may have exited");
+  }
+
+  // The signal is now pending to the target thread. We don't store it in a unique_ptr
+  // inside the class since we need to be careful to destruct it safely in case the
+  // target thread hasn't yet received the signal when this instance gets destroyed.
+  sig_data_ = data.release();
+  tid_ = tid;
+
+  return Status::OK();
+}
+
+Status StackTraceCollector::AwaitCollection(MonoTime deadline) {
+  CHECK(sig_data_) << "Must successfully call TriggerAsync() first";
+
+  // We give the thread ~1s to respond. In testing, threads typically respond within
+  // a few milliseconds, so this timeout is very conservative.
+  //
+  // The main reason that a thread would not respond is that it has blocked signals. For
+  // example, glibc's timer_thread doesn't respond to our signal, so we always time out
+  // on that one.
+  ignore_result(sig_data_->result_ready.WaitUntil(deadline));
+
+  // Whether or not we timed out above, revoke the signal data structure.
+  // It's possible that the above 'Wait' times out but it succeeds exactly
+  // after that timeout. In that case, RevokeSigData() will return true
+  // and we can return a successful result, because the destination stack trace
+  // has in fact been populated.
+  bool completed = RevokeSigData();
+  if (!completed) {
+    return Status::TimedOut("thread did not respond: maybe it is blocking signals");
+  }
+
+  return Status::OK();
+}
+
+#else  // #ifdef __linux__ ...
+Status StackTraceCollector::TriggerAsync(int64_t tid_, StackTrace* stack) {
+  return Status::NotSupported("unsupported platform");
+}
+Status StackTraceCollector::AwaitCollection(MonoTime deadline) {
+  return Status::NotSupported("unsupported platform");
+}
+bool StackTraceCollector::RevokeSigData() {
+  return false;
+}
+#endif // #ifdef __linux__ ... #else ...
+
+Status GetThreadStack(int64_t tid, StackTrace* stack) {
+  StackTraceCollector c;
+  RETURN_NOT_OK(c.TriggerAsync(tid, stack));
+  RETURN_NOT_OK(c.AwaitCollection(MonoTime::Now() + MonoDelta::FromSeconds(1)));
+  return Status::OK();
+}
+
+string DumpThreadStack(int64_t tid) {
+  StackTrace trace;
+  Status s = GetThreadStack(tid, &trace);
+  if (s.ok()) {
+    return trace.Symbolize();
+  }
+  return strings::Substitute("<$0>", s.ToString());
+}
+
+Status ListThreads(vector<pid_t> *tids) {
+#ifndef __linux__
+  return Status::NotSupported("unable to list threads on this platform");
+#else
+  DIR *dir = opendir("/proc/self/task/");
+  if (dir == NULL) {
+    return Status::IOError("failed to open task dir", ErrnoToString(errno), errno);
+  }
+  struct dirent *d;
+  while ((d = readdir(dir)) != NULL) {
+    if (d->d_name[0] != '.') {
+      uint32_t tid;
+      if (!safe_strtou32(d->d_name, &tid)) {
+        LOG(WARNING) << "bad tid found in procfs: " << d->d_name;
+        continue;
+      }
+      tids->push_back(tid);
+    }
+  }
+  closedir(dir);
+  return Status::OK();
+#endif // __linux__
+}
+
+string GetStackTrace() {
+  string s;
+  google::glog_internal_namespace_::DumpStackTraceToString(&s);
+  return s;
+}
+
+string GetStackTraceHex() {
+  char buf[1024];
+  HexStackTraceToString(buf, 1024);
+  return buf;
+}
+
+void HexStackTraceToString(char* buf, size_t size) {
+  StackTrace trace;
+  trace.Collect(1);
+  trace.StringifyToHex(buf, size);
+}
+
+string GetLogFormatStackTraceHex() {
+  StackTrace trace;
+  trace.Collect(1);
+  return trace.ToLogFormatHexString();
+}
+
+// Bogus empty function which we use below to fill in the stack trace with
+// something readable to indicate that stack trace collection was unavailable.
+void CouldNotCollectStackTraceBecauseInsideLibDl() {
+}
+
+void StackTrace::Collect(int skip_frames) {
+  if (!debug::SafeToUnwindStack()) {
+    // Build a fake stack so that the user sees an appropriate message upon symbolizing
+    // rather than seeing an empty stack.
+    uintptr_t f_ptr = reinterpret_cast<uintptr_t>(&CouldNotCollectStackTraceBecauseInsideLibDl);
+    // Increase the pointer by one byte since the return address from a function call
+    // would not be the beginning of the function itself.
+    frames_[0] = reinterpret_cast<void*>(f_ptr + 1);
+    num_frames_ = 1;
+    return;
+  }
+  const int kMaxDepth = arraysize(frames_);
+
+#ifdef __linux__
+  GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
+
+  unw_cursor_t cursor;
+  unw_context_t uc;
+  unw_getcontext(&uc);
+  RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed");
+  skip_frames++;         // Do not include the "Collect" frame
+
+  num_frames_ = 0;
+  while (num_frames_ < kMaxDepth) {
+    void *ip;
+    int ret = unw_get_reg(&cursor, UNW_REG_IP, reinterpret_cast<unw_word_t *>(&ip));
+    if (ret < 0) {
+      break;
+    }
+    if (skip_frames > 0) {
+      skip_frames--;
+    } else {
+      frames_[num_frames_++] = ip;
+    }
+    ret = unw_step(&cursor);
+    if (ret <= 0) {
+      break;
+    }
+  }
+#else
+  // On OSX, use the unwinder from glog. However, that unwinder has an issue where
+  // concurrent invocations will return no frames. See:
+  // https://github.com/google/glog/issues/298
+  // The worst result here is an empty result.
+
+  // google::GetStackTrace has a data race. This is called frequently, so better
+  // to ignore it with an annotation rather than use a suppression.
+  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+  num_frames_ = google::GetStackTrace(frames_, kMaxDepth, skip_frames + 1);
+#endif
+}
+
+void StackTrace::StringifyToHex(char* buf, size_t size, int flags) const {
+  char* dst = buf;
+
+  // Reserve kHexEntryLength for the first iteration of the loop, 1 byte for a
+  // space (which we may not need if there's just one frame), and 1 for a nul
+  // terminator.
+  char* limit = dst + size - kHexEntryLength - 2;
+  for (int i = 0; i < num_frames_ && dst < limit; i++) {
+    if (i != 0) {
+      *dst++ = ' ';
+    }
+    if (flags & HEX_0X_PREFIX) {
+      *dst++ = '0';
+      *dst++ = 'x';
+    }
+    // See note in Symbolize() below about why we subtract 1 from each address here.
+    uintptr_t addr = reinterpret_cast<uintptr_t>(frames_[i]);
+    if (addr > 0 && !(flags & NO_FIX_CALLER_ADDRESSES)) {
+      addr--;
+    }
+    FastHex64ToBuffer(addr, dst);
+    dst += kHexEntryLength;
+  }
+  *dst = '\0';
+}
+
+string StackTrace::ToHexString(int flags) const {
+  // Each frame requires kHexEntryLength, plus a space
+  // We also need one more byte at the end for '\0'
+  int len_per_frame = kHexEntryLength;
+  len_per_frame++; // For the separating space.
+  if  (flags & HEX_0X_PREFIX) {
+    len_per_frame += 2;
+  }
+  int buf_len = kMaxFrames * len_per_frame + 1;
+  char buf[buf_len];
+  StringifyToHex(buf, buf_len, flags);
+  return string(buf);
+}
+
+// Symbolization function borrowed from glog.
+string StackTrace::Symbolize() const {
+  string ret;
+  for (int i = 0; i < num_frames_; i++) {
+    void* pc = frames_[i];
+
+    char tmp[1024];
+    const char* symbol = "(unknown)";
+
+    // The return address 'pc' on the stack is the address of the instruction
+    // following the 'call' instruction. In the case of calling a function annotated
+    // 'noreturn', this address may actually be the first instruction of the next
+    // function, because the function we care about ends with the 'call'.
+    // So, we subtract 1 from 'pc' so that we're pointing at the 'call' instead
+    // of the return address.
+    //
+    // For example, compiling a C program with -O2 that simply calls 'abort()' yields
+    // the following disassembly:
+    //     Disassembly of section .text:
+    //
+    //     0000000000400440 <main>:
+    //       400440:	48 83 ec 08          	sub    $0x8,%rsp
+    //       400444:	e8 c7 ff ff ff       	callq  400410 <ab...@plt>
+    //
+    //     0000000000400449 <_start>:
+    //       400449:	31 ed                	xor    %ebp,%ebp
+    //       ...
+    //
+    // If we were to take a stack trace while inside 'abort', the return pointer
+    // on the stack would be 0x400449 (the first instruction of '_start'). By subtracting
+    // 1, we end up with 0x400448, which is still within 'main'.
+    //
+    // This also ensures that we point at the correct line number when using addr2line
+    // on logged stacks.
+    //
+    // We check that the pc is not 0 to avoid undefined behavior in the case of
+    // invalid unwinding (see KUDU-2433).
+    if (pc && google::Symbolize(
+            reinterpret_cast<char *>(pc) - 1, tmp, sizeof(tmp))) {
+      symbol = tmp;
+    }
+    StringAppendF(&ret, "    @ %*p  %s\n", kPrintfPointerFieldWidth, pc, symbol);
+  }
+  return ret;
+}
+
+string StackTrace::ToLogFormatHexString() const {
+  string ret;
+  for (int i = 0; i < num_frames_; i++) {
+    void* pc = frames_[i];
+    StringAppendF(&ret, "    @ %*p\n", kPrintfPointerFieldWidth, pc);
+  }
+  return ret;
+}
+
+uint64_t StackTrace::HashCode() const {
+  return util_hash::CityHash64(reinterpret_cast<const char*>(frames_),
+                               sizeof(frames_[0]) * num_frames_);
+}
+
+bool StackTrace::LessThan(const StackTrace& s) const {
+  return std::lexicographical_compare(frames_, &frames_[num_frames_],
+                                      s.frames_, &s.frames_[num_frames_]);
+}
+
+Status StackTraceSnapshot::SnapshotAllStacks() {
+  if (IsBeingDebugged()) {
+    return Status::Incomplete("not collecting stack trace since debugger or strace is attached");
+  }
+
+  vector<pid_t> tids;
+  RETURN_NOT_OK_PREPEND(ListThreads(&tids), "could not list threads");
+
+  collectors_.clear();
+  collectors_.resize(tids.size());
+  infos_.clear();
+  infos_.resize(tids.size());
+  for (int i = 0; i < tids.size(); i++) {
+    infos_[i].tid = tids[i];
+    infos_[i].status = collectors_[i].TriggerAsync(tids[i], &infos_[i].stack);
+  }
+
+  // Now collect the thread names while we are waiting on stack trace collection.
+  if (capture_thread_names_) {
+    for (auto& info : infos_) {
+      if (!info.status.ok()) continue;
+
+      // Get the thread's name by reading proc.
+      // TODO(todd): should we have the dumped thread fill in its own name using
+      // prctl to avoid having to open and read /proc? Or maybe we should use the
+      // Kudu ThreadMgr to get the thread names for the cases where we are using
+      // the kudu::Thread wrapper at least.
+      faststring buf;
+      Status s = ReadFileToString(Env::Default(),
+                                  strings::Substitute("/proc/self/task/$0/comm", info.tid),
+                                  &buf);
+      if (!s.ok()) {
+        info.thread_name = "<unknown name>";
+      }  else {
+        info.thread_name = buf.ToString();
+        StripTrailingNewline(&info.thread_name);
+      }
+    }
+  }
+  num_failed_ = 0;
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(1);
+  for (int i = 0; i < infos_.size(); i++) {
+    infos_[i].status = infos_[i].status.AndThen([&] {
+        return collectors_[i].AwaitCollection(deadline);
+      });
+    if (!infos_[i].status.ok()) {
+      num_failed_++;
+      CHECK(!infos_[i].stack.HasCollected()) << infos_[i].status.ToString();
+    }
+  }
+  collectors_.clear();
+
+  std::sort(infos_.begin(), infos_.end(), [](const ThreadInfo& a, const ThreadInfo& b) {
+      return a.stack.LessThan(b.stack);
+    });
+  return Status::OK();
+}
+
+void StackTraceSnapshot::VisitGroups(const StackTraceSnapshot::VisitorFunc& visitor) {
+  auto group_start = infos_.begin();
+  auto group_end = group_start;
+  while (group_end != infos_.end()) {
+    do {
+      ++group_end;
+    } while (group_end != infos_.end() && group_end->stack.Equals(group_start->stack));
+    visitor(ArrayView<ThreadInfo>(&*group_start, std::distance(group_start, group_end)));
+    group_start = group_end;
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util.h b/be/src/kudu/util/debug-util.h
new file mode 100644
index 0000000..e8c94ea
--- /dev/null
+++ b/be/src/kudu/util/debug-util.h
@@ -0,0 +1,321 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_DEBUG_UTIL_H
+#define KUDU_UTIL_DEBUG_UTIL_H
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <cstring>
+#include <functional>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+template <typename T> class ArrayView;
+class MonoTime;
+class StackTrace;
+class StackTraceCollector;
+
+namespace stack_trace_internal {
+struct SignalData;
+}
+
+// Return true if coverage is enabled.
+bool IsCoverageBuild();
+
+// Try to flush coverage info. If another thread is already flushing
+// coverage, this returns without doing anything, since flushing coverage
+// is not thread-safe or re-entrant.
+void TryFlushCoverage();
+
+// Return a list of all of the thread IDs currently running in this process.
+// Not async-safe.
+Status ListThreads(std::vector<pid_t>* tids);
+
+// Set which POSIX signal number should be used internally for triggering
+// stack traces. If the specified signal handler is already in use, this
+// returns an error, and stack traces will be disabled.
+Status SetStackTraceSignal(int signum);
+
+// Return the stack trace of the given thread, stringified and symbolized.
+//
+// Note that the symbolization happens on the calling thread, not the target
+// thread, so this is relatively low-impact on the target.
+//
+// This is safe to use against the current thread, the main thread, or any other
+// thread. It requires that the target thread has not blocked POSIX signals. If
+// it has, an error message will be returned.
+//
+// This function is thread-safe.
+//
+// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since
+// it internally uses signals that will cause the debugger to stop. Consider checking
+// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use
+// cases.
+std::string DumpThreadStack(int64_t tid);
+
+// Capture the thread stack of another thread
+//
+// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since
+// it internally uses signals that will cause the debugger to stop. Consider checking
+// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use
+// cases.
+Status GetThreadStack(int64_t tid, StackTrace* stack);
+
+// Return the current stack trace, stringified.
+std::string GetStackTrace();
+
+// Return the current stack trace, in hex form. This is significantly
+// faster than GetStackTrace() above, so should be used in performance-critical
+// places like TRACE() calls. If you really need blazing-fast speed, though,
+// use HexStackTraceToString() into a stack-allocated buffer instead --
+// this call causes a heap allocation for the std::string.
+//
+// Note that this is much more useful in the context of a static binary,
+// since addr2line wouldn't know where shared libraries were mapped at
+// runtime.
+std::string GetStackTraceHex();
+
+// This is the same as GetStackTraceHex(), except multi-line in a format that
+// looks very similar to GetStackTrace() but without symbols. Because it's in
+// that format, the tool stacktrace_addr2line.pl in the kudu build-support
+// directory can symbolize it automatically (to the extent that addr2line(1)
+// is able to find the symbols).
+std::string GetLogFormatStackTraceHex();
+
+// Collect the current stack trace in hex form into the given buffer.
+//
+// The resulting trace just includes the hex addresses, space-separated. This is suitable
+// for later stringification by pasting into 'addr2line' for example.
+//
+// This function is async-safe.
+void HexStackTraceToString(char* buf, size_t size);
+
+// Efficient class for collecting and later stringifying a stack trace.
+//
+// Requires external synchronization.
+class StackTrace {
+ public:
+
+  // Constructs a new (uncollected) stack trace.
+  StackTrace()
+    : num_frames_(0) {
+  }
+
+  // Resets the stack trace to an uncollected state.
+  void Reset() {
+    num_frames_ = 0;
+  }
+
+  // Returns true if Collect() (but not Reset()) has been called on this stack trace.
+  bool HasCollected() const {
+    return num_frames_ > 0;
+  }
+
+  // Copies the contents of 's' into this stack trace.
+  void CopyFrom(const StackTrace& s) {
+    memcpy(this, &s, sizeof(s));
+  }
+
+  // Returns true if the stack trace 's' matches this trace.
+  bool Equals(const StackTrace& s) const {
+    return s.num_frames_ == num_frames_ &&
+      strings::memeq(frames_, s.frames_,
+                     num_frames_ * sizeof(frames_[0]));
+  }
+
+  // Comparison operator for use in sorting.
+  bool LessThan(const StackTrace& s) const;
+
+  // Collect and store the current stack trace. Skips the top 'skip_frames' frames
+  // from the stack. For example, a value of '1' will skip whichever function
+  // called the 'Collect()' function. The 'Collect' function itself is always skipped.
+  //
+  // This function is async-safe.
+  void Collect(int skip_frames = 0);
+
+  int num_frames() const {
+    return num_frames_;
+  }
+
+  void* frame(int i) const {
+    DCHECK_LE(i, num_frames_);
+    return frames_[i];
+  }
+
+  enum Flags {
+    // Do not fix up the addresses on the stack to try to point to the 'call'
+    // instructions instead of the return address. This is necessary when dumping
+    // addresses to be interpreted by 'pprof', which does this fix-up itself.
+    NO_FIX_CALLER_ADDRESSES = 1,
+
+    // Prefix each hex address with '0x'. This is required by the go version
+    // of pprof when parsing stack traces.
+    HEX_0X_PREFIX = 1 << 1,
+  };
+
+  // Stringify the trace into the given buffer.
+  // The resulting output is hex addresses suitable for passing into 'addr2line'
+  // later.
+  //
+  // Async-safe.
+  void StringifyToHex(char* buf, size_t size, int flags = 0) const;
+
+  // Same as above, but returning a std::string.
+  // This is not async-safe.
+  std::string ToHexString(int flags = 0) const;
+
+  // Return a string with a symbolized backtrace in a format suitable for
+  // printing to a log file.
+  // This is not async-safe.
+  std::string Symbolize() const;
+
+  // Return a string with a hex-only backtrace in the format typically used in
+  // log files. Similar to the format given by Symbolize(), but symbols are not
+  // resolved (only the hex addresses are given).
+  std::string ToLogFormatHexString() const;
+
+  uint64_t HashCode() const;
+
+ private:
+  enum {
+    // The maximum number of stack frames to collect.
+    kMaxFrames = 16,
+
+    // The max number of characters any frame requires in string form.
+    kHexEntryLength = 16
+  };
+
+  int num_frames_;
+  void* frames_[kMaxFrames];
+};
+
+// Utility class for gathering a process-wide snapshot of the stack traces
+// of all threads.
+class StackTraceSnapshot {
+ public:
+  // The information about each thread will be gathered in a struct.
+  struct ThreadInfo {
+    // The TID of the thread.
+    int64_t tid;
+
+    // The status of collection. If a thread exits during collection or
+    // was blocking signals, it's possible to have an error here.
+    Status status;
+
+    // The name of the thread.
+    // May be missing if 'status' is not OK or if thread name collection was
+    // disabled.
+    std::string thread_name;
+
+    // The current stack trace of the thread.
+    // Always missing if 'status' is not OK.
+    StackTrace stack;
+  };
+  using VisitorFunc = std::function<void(ArrayView<ThreadInfo> group)>;
+
+  void set_capture_thread_names(bool c) {
+    capture_thread_names_ = c;
+  }
+
+  // Snapshot the stack traces of all threads in the process. This may return a bad
+  // Status in the case that stack traces aren't supported on the platform, or if
+  // the process is running inside a debugger.
+  //
+  // NOTE: this may take some time and should not be called in a latency-sensitive
+  // context.
+  Status SnapshotAllStacks();
+
+  // After having collected stacks, visit them, grouped by shared
+  // stack trace. The visitor function will be called once per group.
+  // Each group is guaranteed to be non-empty.
+  //
+  // Any threads which failed to collect traces are returned as a single group
+  // having empty stack traces.
+  //
+  // REQUIRES: a previous successful call to SnapshotAllStacks().
+  void VisitGroups(const VisitorFunc& visitor);
+
+  // Return the number of threads which were interrogated for a stack trace.
+  //
+  // NOTE: this includes threads which failed to collect.
+  int num_threads() const { return infos_.size(); }
+
+  // Return the number of threads which failed to collect a stack trace.
+  int num_failed() const { return num_failed_; }
+
+ private:
+  std::vector<StackTraceSnapshot::ThreadInfo> infos_;
+  std::vector<StackTraceCollector> collectors_;
+  int num_failed_ = 0;
+
+  bool capture_thread_names_ = true;
+};
+
+
+// Class to collect the stack trace of another thread within this process.
+// This allows for more advanced use cases than 'DumpThreadStack(tid)' above.
+// Namely, this provides an asynchronous trigger/collect API so that many
+// stack traces can be collected from many different threads in parallel using
+// different instances of this object.
+class StackTraceCollector {
+ public:
+  StackTraceCollector() = default;
+  StackTraceCollector(StackTraceCollector&& other) noexcept;
+  ~StackTraceCollector();
+
+  // Send the asynchronous request to the the thread with TID 'tid'
+  // to collect its stack trace into '*stack'.
+  //
+  // NOTE: 'stack' must remain a valid pointer until AwaitCollection() has
+  // completed.
+  //
+  // Returns OK if the signal was sent successfully.
+  Status TriggerAsync(int64_t tid, StackTrace* stack);
+
+  // Wait for the stack trace to be collected from the target thread.
+  //
+  // REQUIRES: TriggerAsync() has returned successfully.
+  Status AwaitCollection(MonoTime deadline);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(StackTraceCollector);
+
+  // Safely sets 'sig_data_' back to nullptr after having sent an asynchronous
+  // stack trace request. See implementation for details.
+  //
+  // Returns true if the stack trace was collected before revocation
+  // and false if it was not.
+  //
+  // POSTCONDITION: sig_data_ == nullptr
+  bool RevokeSigData();
+
+  int64_t tid_ = 0;
+  stack_trace_internal::SignalData* sig_data_ = nullptr;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug/leak_annotations.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug/leak_annotations.h b/be/src/kudu/util/debug/leak_annotations.h
new file mode 100644
index 0000000..2bfc3d8
--- /dev/null
+++ b/be/src/kudu/util/debug/leak_annotations.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_
+#define KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_
+
+// Ignore a single leaked object, given its pointer.
+// Does nothing if LeakSanitizer is not enabled.
+#define ANNOTATE_LEAKING_OBJECT_PTR(p)
+
+#if defined(__has_feature)
+#  if __has_feature(address_sanitizer)
+#    if defined(__linux__)
+
+#undef ANNOTATE_LEAKING_OBJECT_PTR
+#define ANNOTATE_LEAKING_OBJECT_PTR(p) __lsan_ignore_object(p);
+
+#    endif
+#  endif
+#endif
+
+// API definitions from LLVM lsan_interface.h
+
+extern "C" {
+  // Allocations made between calls to __lsan_disable() and __lsan_enable() will
+  // be treated as non-leaks. Disable/enable pairs may be nested.
+  void __lsan_disable();
+  void __lsan_enable();
+
+  // The heap object into which p points will be treated as a non-leak.
+  void __lsan_ignore_object(const void *p);
+
+  // The user may optionally provide this function to disallow leak checking
+  // for the program it is linked into (if the return value is non-zero). This
+  // function must be defined as returning a constant value; any behavior beyond
+  // that is unsupported.
+  int __lsan_is_turned_off();
+
+  // Check for leaks now. This function behaves identically to the default
+  // end-of-process leak check. In particular, it will terminate the process if
+  // leaks are found and the exitcode runtime flag is non-zero.
+  // Subsequent calls to this function will have no effect and end-of-process
+  // leak check will not run. Effectively, end-of-process leak check is moved to
+  // the time of first invocation of this function.
+  // By calling this function early during process shutdown, you can instruct
+  // LSan to ignore shutdown-only leaks which happen later on.
+  void __lsan_do_leak_check();
+
+  // Check for leaks now. Returns zero if no leaks have been found or if leak
+  // detection is disabled, non-zero otherwise.
+  // This function may be called repeatedly, e.g. to periodically check a
+  // long-running process. It prints a leak report if appropriate, but does not
+  // terminate the process. It does not affect the behavior of
+  // __lsan_do_leak_check() or the end-of-process leak check, and is not
+  // affected by them.
+  int __lsan_do_recoverable_leak_check();
+} // extern "C"
+
+namespace kudu {
+namespace debug {
+
+class ScopedLSANDisabler {
+ public:
+  ScopedLSANDisabler() { __lsan_disable(); }
+  ~ScopedLSANDisabler() { __lsan_enable(); }
+};
+
+} // namespace debug
+} // namespace kudu
+
+#endif  // KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_