You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:39 UTC
[27/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cow_object.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cow_object.h b/be/src/kudu/util/cow_object.h
new file mode 100644
index 0000000..159a8bb
--- /dev/null
+++ b/be/src/kudu/util/cow_object.h
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <algorithm> // IWYU pragma: keep
+#include <map>
+#include <memory>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/rwc_lock.h"
+
+namespace kudu {
+
+// An object which manages its state via copy-on-write.
+//
+// Access to this object can be done more conveniently using the
+// CowLock template class defined below.
+//
+// The 'State' template parameter must be swappable using std::swap.
+template<class State>
+class CowObject {
+ public:
+ CowObject() {}
+ ~CowObject() {}
+
+ // Lock an object for read.
+ //
+ // While locked, a mutator will be blocked when trying to commit its mutation.
+ void ReadLock() const {
+ lock_.ReadLock();
+ }
+
+ // Return whether the object is locked for read.
+ bool IsReadLocked() const {
+ return lock_.HasReaders();
+ }
+
+ // Unlock an object previously locked for read, unblocking a mutator
+ // actively trying to commit its mutation.
+ void ReadUnlock() const {
+ lock_.ReadUnlock();
+ }
+
+ // Lock the object for write (preventing concurrent mutators).
+ //
+ // We defer making a dirty copy of the state to mutable_dirty() so that the
+ // copy can be avoided if no dirty changes are actually made.
+ void StartMutation() {
+ lock_.WriteLock();
+ }
+
+ // Return whether the object is locked for read and write.
+ bool IsWriteLocked() const {
+ return lock_.HasWriteLock();
+ }
+
+ // Abort the current mutation. This drops the write lock without applying any
+ // changes made to the mutable copy.
+ void AbortMutation() {
+ DCHECK(lock_.HasWriteLock());
+ dirty_state_.reset();
+ lock_.WriteUnlock();
+ }
+
+ // Commit the current mutation. This escalates to the "Commit" lock, which
+ // blocks any concurrent readers or writers, swaps in the new version of the
+ // State, and then drops the commit lock.
+ void CommitMutation() {
+ DCHECK(lock_.HasWriteLock());
+ if (!dirty_state_) {
+ AbortMutation();
+ return;
+ }
+ lock_.UpgradeToCommitLock();
+ std::swap(state_, *dirty_state_);
+ dirty_state_.reset();
+ lock_.CommitUnlock();
+ }
+
+ // Return the current state, not reflecting any in-progress mutations.
+ State& state() {
+ DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+ return state_;
+ }
+
+ const State& state() const {
+ DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+ return state_;
+ }
+
+ // Returns the current dirty state (i.e reflecting in-progress mutations).
+ // Should only be called by a thread who previously called StartMutation().
+ State* mutable_dirty() {
+ DCHECK(lock_.HasWriteLock());
+ if (!dirty_state_) {
+ dirty_state_.reset(new State(state_));
+ }
+ return dirty_state_.get();
+ }
+
+ const State& dirty() const {
+ DCHECK(lock_.HasWriteLock());
+ if (!dirty_state_) {
+ return state_;
+ }
+ return *dirty_state_.get();
+ }
+
+ private:
+ mutable RWCLock lock_;
+
+ State state_;
+ std::unique_ptr<State> dirty_state_;
+
+ DISALLOW_COPY_AND_ASSIGN(CowObject);
+};
+
+// Lock state for the following lock-guard-like classes.
+enum class LockMode {
+ // The lock is held for reading.
+ READ,
+
+ // The lock is held for reading and writing.
+ WRITE,
+
+ // The lock is not held.
+ RELEASED
+};
+
+// Defined so LockMode is compatible with DCHECK and the like.
+std::ostream& operator<<(std::ostream& o, LockMode m);
+
+// A lock-guard-like scoped object to acquire the lock on a CowObject,
+// and obtain a pointer to the correct copy to read/write.
+//
+// Example usage:
+//
+// CowObject<Foo> my_obj;
+// {
+// CowLock<Foo> l(&my_obj, LockMode::READ);
+// l.data().get_foo();
+// ...
+// }
+// {
+// CowLock<Foo> l(&my_obj, LockMode::WRITE);
+// l->mutable_data()->set_foo(...);
+// ...
+// l.Commit();
+// }
+template<class State>
+class CowLock {
+ public:
+
+ // An unlocked CowLock. This is useful for default constructing a lock to be
+ // moved in to.
+ CowLock()
+ : cow_(nullptr),
+ mode_(LockMode::RELEASED) {
+ }
+
+ // Lock in either read or write mode.
+ CowLock(CowObject<State>* cow,
+ LockMode mode)
+ : cow_(cow),
+ mode_(mode) {
+ switch (mode) {
+ case LockMode::READ: cow_->ReadLock(); break;
+ case LockMode::WRITE: cow_->StartMutation(); break;
+ default: LOG(FATAL) << "Cannot lock in mode " << mode;
+ }
+ }
+
+ // Lock in read mode.
+ // A const object may not be locked in write mode.
+ CowLock(const CowObject<State>* info,
+ LockMode mode)
+ : cow_(const_cast<CowObject<State>*>(info)),
+ mode_(mode) {
+ switch (mode) {
+ case LockMode::READ: cow_->ReadLock(); break;
+ case LockMode::WRITE: LOG(FATAL) << "Cannot write-lock a const pointer";
+ default: LOG(FATAL) << "Cannot lock in mode " << mode;
+ }
+ }
+
+ // Disable copying.
+ CowLock(const CowLock&) = delete;
+ CowLock& operator=(const CowLock&) = delete;
+
+ // Allow moving.
+ CowLock(CowLock&& other) noexcept
+ : cow_(other.cow_),
+ mode_(other.mode_) {
+ other.cow_ = nullptr;
+ other.mode_ = LockMode::RELEASED;
+ }
+ CowLock& operator=(CowLock&& other) noexcept {
+ cow_ = other.cow_;
+ mode_ = other.mode_;
+ other.cow_ = nullptr;
+ other.mode_ = LockMode::RELEASED;
+ return *this;
+ }
+
+ // Commit the underlying object.
+ // Requires that the caller hold the lock in write mode.
+ void Commit() {
+ DCHECK_EQ(LockMode::WRITE, mode_);
+ cow_->CommitMutation();
+ mode_ = LockMode::RELEASED;
+ }
+
+ void Unlock() {
+ switch (mode_) {
+ case LockMode::READ: cow_->ReadUnlock(); break;
+ case LockMode::WRITE: cow_->AbortMutation(); break;
+ default: DCHECK_EQ(LockMode::RELEASED, mode_); break;
+ }
+ mode_ = LockMode::RELEASED;
+ }
+
+ // Obtain the underlying data. In WRITE mode, this returns the
+ // same data as mutable_data() (not the safe unchanging copy).
+ const State& data() const {
+ switch (mode_) {
+ case LockMode::READ: return cow_->state();
+ case LockMode::WRITE: return cow_->dirty();
+ default: LOG(FATAL) << "Cannot access data after committing";
+ }
+ }
+
+ // Obtain the mutable data. This may only be called in WRITE mode.
+ State* mutable_data() {
+ switch (mode_) {
+ case LockMode::READ: LOG(FATAL) << "Cannot mutate data with READ lock";
+ case LockMode::WRITE: return cow_->mutable_dirty();
+ default: LOG(FATAL) << "Cannot access data after committing";
+ }
+ }
+
+ bool is_write_locked() const {
+ return mode_ == LockMode::WRITE;
+ }
+
+ // Drop the lock. If the lock is held in WRITE mode, and the
+ // lock has not yet been released, aborts the mutation, restoring
+ // the underlying object to its original data.
+ ~CowLock() {
+ Unlock();
+ }
+
+ private:
+ CowObject<State>* cow_;
+ LockMode mode_;
+};
+
+// Scoped object that locks multiple CowObjects for reading or for writing.
+// When locked for writing and mutations are completed, can also commit those
+// mutations, which releases the lock.
+//
+// CowObjects are stored in an std::map, which provides two important properties:
+// 1. AddObject() can deduplicate CowObjects already inserted.
+// 2. When locking for writing, the deterministic iteration order provided by
+// std::map prevents deadlocks.
+//
+// The use of std::map forces callers to provide a key for each CowObject. For
+// a key implementation to be usable, an appropriate overload of operator<
+// must be available.
+//
+// Unlike CowLock, does not mediate access to the CowObject data itself;
+// callers should access the data out of band.
+//
+// Sample usage:
+//
+// struct Foo {
+// string id_;
+// string data_;
+// };
+//
+// vector<CowObject<Foo>> foos;
+//
+// 1. Locking a group of CowObjects for reading:
+//
+// CowGroupLock<string, Foo> l(LockMode::RELEASED);
+// for (const auto& f : foos) {
+// l.AddObject(f.id_, f);
+// }
+// l.Lock(LockMode::READ);
+// for (const auto& f : foos) {
+// cout << f.state().data_ << endl;
+// }
+// l.Unlock();
+//
+// 2. Tracking already-write-locked CowObjects for group commit:
+//
+// CowGroupLock<string, Foo> l(LockMode::WRITE);
+// for (const auto& f : foos) {
+// l.AddObject(f.id_, f);
+// f.mutable_dirty().data_ = "modified";
+// }
+// l.Commit();
+//
+// 3. Aggregating unlocked CowObjects, locking them safely, and committing them together:
+//
+// CowGroupLock<string, Foo> l(LockMode::RELEASED);
+// for (const auto& f : foos) {
+// l.AddObject(f.id_, f);
+// }
+// l.Lock(LockMode::WRITE);
+// for (const auto& f : foos) {
+// f.mutable_dirty().data_ = "modified";
+// }
+// l.Commit();
+template<class Key, class Value>
+class CowGroupLock {
+ public:
+ explicit CowGroupLock(LockMode mode)
+ : mode_(mode) {
+ }
+
+ ~CowGroupLock() {
+ Unlock();
+ }
+
+ void Unlock() {
+ switch (mode_) {
+ case LockMode::READ:
+ for (const auto& e : cows_) {
+ e.second->ReadUnlock();
+ }
+ break;
+ case LockMode::WRITE:
+ for (const auto& e : cows_) {
+ e.second->AbortMutation();
+ }
+ break;
+ default:
+ DCHECK_EQ(LockMode::RELEASED, mode_);
+ break;
+ }
+
+ cows_.clear();
+ mode_ = LockMode::RELEASED;
+ }
+
+ void Lock(LockMode new_mode) {
+ DCHECK_EQ(LockMode::RELEASED, mode_);
+
+ switch (new_mode) {
+ case LockMode::READ:
+ for (const auto& e : cows_) {
+ e.second->ReadLock();
+ }
+ break;
+ case LockMode::WRITE:
+ for (const auto& e : cows_) {
+ e.second->StartMutation();
+ }
+ break;
+ default:
+ LOG(FATAL) << "Cannot lock in mode " << new_mode;
+ }
+ mode_ = new_mode;
+ }
+
+ void Commit() {
+ DCHECK_EQ(LockMode::WRITE, mode_);
+ for (const auto& e : cows_) {
+ e.second->CommitMutation();
+ }
+ cows_.clear();
+ mode_ = LockMode::RELEASED;
+ }
+
+ // Adds a new CowObject to be tracked by the lock guard. Does nothing if a
+ // CowObject with the same key was already added.
+ //
+ // It is the responsibility of the caller to ensure:
+ // 1. That 'object' remains alive until the lock is released.
+ // 2. That if 'object' was already added, both objects point to the same
+ // memory address.
+ // 3. That if the CowGroupLock is already locked in a particular mode,
+ // 'object' is also already locked in that mode.
+ void AddObject(Key key, const CowObject<Value>* object) {
+ AssertObjectLocked(object);
+ auto r = cows_.emplace(std::move(key), const_cast<CowObject<Value>*>(object));
+ DCHECK_EQ(r.first->second, object);
+ }
+
+ // Like the above, but for mutable objects.
+ void AddMutableObject(Key key, CowObject<Value>* object) {
+ AssertObjectLocked(object);
+ auto r = cows_.emplace(std::move(key), object);
+ DCHECK_EQ(r.first->second, object);
+ }
+
+ private:
+ void AssertObjectLocked(const CowObject<Value>* object) const {
+#ifndef NDEBUG
+ switch (mode_) {
+ case LockMode::READ:
+ DCHECK(object->IsReadLocked());
+ break;
+ case LockMode::WRITE:
+ DCHECK(object->IsWriteLocked());
+ break;
+ default:
+ DCHECK_EQ(LockMode::RELEASED, mode_);
+ break;
+ }
+#endif
+ }
+
+ std::map<Key, CowObject<Value>*> cows_;
+ LockMode mode_;
+
+ DISALLOW_COPY_AND_ASSIGN(CowGroupLock);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc-test.cc b/be/src/kudu/util/crc-test.cc
new file mode 100644
index 0000000..cf13268
--- /dev/null
+++ b/be/src/kudu/util/crc-test.cc
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace crc {
+
+using strings::Substitute;
+
+class CrcTest : public KuduTest {
+ protected:
+
+ // Returns pointer to data which must be deleted by caller.
+ static void GenerateBenchmarkData(const uint8_t** bufptr, size_t* buflen) {
+ const uint32_t kNumNumbers = 1000000;
+ const uint32_t kBytesPerNumber = sizeof(uint32_t);
+ const uint32_t kLength = kNumNumbers * kBytesPerNumber;
+ auto buf = new uint8_t[kLength];
+ for (uint32_t i = 0; i < kNumNumbers; i++) {
+ memcpy(buf + (i * kBytesPerNumber), &i, kBytesPerNumber);
+ }
+ *bufptr = buf;
+ *buflen = kLength;
+ }
+
+};
+
+// Basic functionality test.
+TEST_F(CrcTest, TestCRC32C) {
+ const std::string test_data("abcdefgh");
+ const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage test program.
+
+ Crc* crc32c = GetCrc32cInstance();
+ uint64_t data_crc = 0;
+ crc32c->Compute(test_data.data(), test_data.length(), &data_crc);
+ char buf[kFastToBufferSize];
+ const char* output = FastHex64ToBuffer(data_crc, buf);
+ LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (full 64 bits)";
+ output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf);
+ LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (truncated 32 bits)";
+ ASSERT_EQ(kExpectedCrc, data_crc);
+
+ // Using helper
+ uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length());
+ ASSERT_EQ(kExpectedCrc, data_crc2);
+
+ // Using multiple chunks
+ size_t half_length = test_data.length() / 2;
+ uint64_t data_crc3 = Crc32c(test_data.data(), half_length);
+ data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3);
+ ASSERT_EQ(kExpectedCrc, data_crc3);
+}
+
+// Simple benchmark of CRC32C throughput.
+// We should expect about 8 bytes per cycle in throughput on a single core.
+TEST_F(CrcTest, BenchmarkCRC32C) {
+ gscoped_ptr<const uint8_t[]> data;
+ const uint8_t* buf;
+ size_t buflen;
+ GenerateBenchmarkData(&buf, &buflen);
+ data.reset(buf);
+ Crc* crc32c = GetCrc32cInstance();
+ int kNumRuns = 1000;
+ if (AllowSlowTests()) {
+ kNumRuns = 40000;
+ }
+ const uint64_t kNumBytes = kNumRuns * buflen;
+ Stopwatch sw;
+ sw.start();
+ for (int i = 0; i < kNumRuns; i++) {
+ uint64_t cksum;
+ crc32c->Compute(buf, buflen, &cksum);
+ }
+ sw.stop();
+ CpuTimes elapsed = sw.elapsed();
+ LOG(INFO) << Substitute("$0 runs of CRC32C on $1 bytes of data (total: $2 bytes)"
+ " in $3 seconds; $4 bytes per millisecond, $5 bytes per nanosecond!",
+ kNumRuns, buflen, kNumBytes, elapsed.wall_seconds(),
+ (kNumBytes / elapsed.wall_millis()),
+ (kNumBytes / elapsed.wall));
+}
+
+} // namespace crc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.cc b/be/src/kudu/util/crc.cc
new file mode 100644
index 0000000..1534b8d
--- /dev/null
+++ b/be/src/kudu/util/crc.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/crc.h"
+
+#include <crcutil/interface.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+
+namespace kudu {
+namespace crc {
+
+using debug::ScopedLeakCheckDisabler;
+
+static GoogleOnceType crc32c_once = GOOGLE_ONCE_INIT;
+static Crc* crc32c_instance = nullptr;
+
+static void InitCrc32cInstance() {
+ ScopedLeakCheckDisabler disabler; // CRC instance is never freed.
+ // TODO: Is initial = 0 and roll window = 4 appropriate for all cases?
+ crc32c_instance = crcutil_interface::CRC::CreateCrc32c(true, 0, 4, nullptr);
+}
+
+Crc* GetCrc32cInstance() {
+ GoogleOnceInit(&crc32c_once, &InitCrc32cInstance);
+ return crc32c_instance;
+}
+
+uint32_t Crc32c(const void* data, size_t length) {
+ uint64_t crc32 = 0;
+ GetCrc32cInstance()->Compute(data, length, &crc32);
+ return static_cast<uint32_t>(crc32); // Only uses lower 32 bits.
+}
+
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) {
+ uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32);
+ GetCrc32cInstance()->Compute(data, length, &crc_tmp);
+ return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits.
+}
+
+} // namespace crc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/crc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.h b/be/src/kudu/util/crc.h
new file mode 100644
index 0000000..a5db4ea
--- /dev/null
+++ b/be/src/kudu/util/crc.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CRC_H_
+#define KUDU_UTIL_CRC_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <crcutil/interface.h>
+
+namespace kudu {
+namespace crc {
+
+typedef crcutil_interface::CRC Crc;
+
+// Returns pointer to singleton instance of CRC32C implementation.
+Crc* GetCrc32cInstance();
+
+// Helper function to simply calculate a CRC32C of the given data.
+uint32_t Crc32c(const void* data, size_t length);
+
+// Given CRC value of previous chunk of data,
+// extends it to new chunk and returns the result.
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32);
+
+} // namespace crc
+} // namespace kudu
+
+#endif // KUDU_UTIL_CRC_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.cc b/be/src/kudu/util/curl_util.cc
new file mode 100644
index 0000000..4eddb64
--- /dev/null
+++ b/be/src/kudu/util/curl_util.cc
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/curl_util.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+
+#include <curl/curl.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/scoped_cleanup.h"
+
+namespace kudu {
+
+namespace {
+
+inline Status TranslateError(CURLcode code) {
+ if (code == CURLE_OK) {
+ return Status::OK();
+ }
+ return Status::NetworkError("curl error", curl_easy_strerror(code));
+}
+
+extern "C" {
+size_t WriteCallback(void* buffer, size_t size, size_t nmemb, void* user_ptr) {
+ size_t real_size = size * nmemb;
+ faststring* buf = reinterpret_cast<faststring*>(user_ptr);
+ CHECK_NOTNULL(buf)->append(reinterpret_cast<const uint8_t*>(buffer), real_size);
+ return real_size;
+}
+} // extern "C"
+
+} // anonymous namespace
+
+EasyCurl::EasyCurl() {
+ // Use our own SSL initialization, and disable curl's.
+ // Both of these calls are idempotent.
+ security::InitializeOpenSSL();
+ CHECK_EQ(0, curl_global_init(CURL_GLOBAL_DEFAULT & ~CURL_GLOBAL_SSL));
+ curl_ = curl_easy_init();
+ CHECK(curl_) << "Could not init curl";
+}
+
+EasyCurl::~EasyCurl() {
+ curl_easy_cleanup(curl_);
+}
+
+Status EasyCurl::FetchURL(const std::string& url, faststring* dst,
+ const std::vector<std::string>& headers) {
+ return DoRequest(url, nullptr, dst, headers);
+}
+
+Status EasyCurl::PostToURL(const std::string& url,
+ const std::string& post_data,
+ faststring* dst) {
+ return DoRequest(url, &post_data, dst);
+}
+
+Status EasyCurl::DoRequest(const std::string& url,
+ const std::string* post_data,
+ faststring* dst,
+ const std::vector<std::string>& headers) {
+ CHECK_NOTNULL(dst)->clear();
+
+ if (!verify_peer_) {
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(
+ curl_, CURLOPT_SSL_VERIFYHOST, 0)));
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(
+ curl_, CURLOPT_SSL_VERIFYPEER, 0)));
+ }
+
+ // Add headers if specified.
+ struct curl_slist* curl_headers = nullptr;
+ auto clean_up_curl_slist = MakeScopedCleanup([&]() {
+ curl_slist_free_all(curl_headers);
+ });
+
+ for (const auto& header : headers) {
+ curl_headers = CHECK_NOTNULL(curl_slist_append(curl_headers, header.c_str()));
+ }
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, curl_headers)));
+
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_URL, url.c_str())));
+ if (return_headers_) {
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HEADER, 1)));
+ }
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback)));
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEDATA,
+ static_cast<void *>(dst))));
+ if (post_data) {
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_POSTFIELDS,
+ post_data->c_str())));
+ }
+
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_ANY)));
+ if (timeout_.Initialized()) {
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_NOSIGNAL, 1)));
+ RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_TIMEOUT_MS,
+ timeout_.ToMilliseconds())));
+ }
+ RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_)));
+ long rc; // NOLINT(*) curl wants a long
+ RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &rc)));
+ if (rc != 200) {
+ return Status::RemoteError(strings::Substitute("HTTP $0", rc));
+ }
+
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/curl_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.h b/be/src/kudu/util/curl_util.h
new file mode 100644
index 0000000..cccd2db
--- /dev/null
+++ b/be/src/kudu/util/curl_util.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CURL_UTIL_H
+#define KUDU_UTIL_CURL_UTIL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+typedef void CURL;
+
+namespace kudu {
+
+class faststring;
+
+// Simple wrapper around curl's "easy" interface, allowing the user to
+// fetch web pages into memory using a blocking API.
+//
+// This is not thread-safe.
+class EasyCurl {
+ public:
+ EasyCurl();
+ ~EasyCurl();
+
+ // Fetch the given URL into the provided buffer.
+ // Any existing data in the buffer is replaced.
+ // The optional param 'headers' holds additional headers.
+ // e.g. {"Accept-Encoding: gzip"}
+ Status FetchURL(const std::string& url,
+ faststring* dst,
+ const std::vector<std::string>& headers = {});
+
+ // Issue an HTTP POST to the given URL with the given data.
+ // Returns results in 'dst' as above.
+ Status PostToURL(const std::string& url,
+ const std::string& post_data,
+ faststring* dst);
+
+ // Set whether to verify the server's SSL certificate in the case of an HTTPS
+ // connection.
+ void set_verify_peer(bool verify) {
+ verify_peer_ = verify;
+ }
+
+ void set_return_headers(bool v) {
+ return_headers_ = v;
+ }
+
+ void set_timeout(MonoDelta t) {
+ timeout_ = t;
+ }
+
+ private:
+ // Do a request. If 'post_data' is non-NULL, does a POST.
+ // Otherwise, does a GET.
+ Status DoRequest(const std::string& url,
+ const std::string* post_data,
+ faststring* dst,
+ const std::vector<std::string>& headers = {});
+ CURL* curl_;
+
+ // Whether to verify the server certificate.
+ bool verify_peer_ = true;
+
+ // Whether to return the HTTP headers with the response.
+ bool return_headers_ = false;
+
+ MonoDelta timeout_;
+
+ DISALLOW_COPY_AND_ASSIGN(EasyCurl);
+};
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_CURL_UTIL_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util-test.cc b/be/src/kudu/util/debug-util-test.cc
new file mode 100644
index 0000000..25e4ae0
--- /dev/null
+++ b/be/src/kudu/util/debug-util-test.cc
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <dlfcn.h>
+#ifdef __linux__
+#include <link.h>
+#endif
+#include <unistd.h>
+
+#include <algorithm>
+#include <csignal>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using std::vector;
+
+DECLARE_int32(test_timeout_after);
+DECLARE_int32(stress_cpu_threads);
+
+namespace kudu {
+
+class DebugUtilTest : public KuduTest {
+};
+
+TEST_F(DebugUtilTest, TestStackTrace) {
+ StackTrace t;
+ t.Collect(0);
+ string trace = t.Symbolize();
+ ASSERT_STR_CONTAINS(trace, "kudu::DebugUtilTest_TestStackTrace_Test::TestBody");
+}
+
+// DumpThreadStack is only supported on Linux, since the implementation relies
+// on the tgkill syscall which is not portable.
+#if defined(__linux__)
+
+namespace {
+void SleeperThread(CountDownLatch* l) {
+ // We use an infinite loop around WaitFor() instead of a normal Wait()
+ // so that this test passes in TSAN. Without this, we run into this TSAN
+ // bug which prevents the sleeping thread from handling signals:
+ // https://code.google.com/p/thread-sanitizer/issues/detail?id=91
+ while (!l->WaitFor(MonoDelta::FromMilliseconds(10))) {
+ }
+}
+
+void fake_signal_handler(int signum) {}
+
+bool IsSignalHandlerRegistered(int signum) {
+ struct sigaction cur_action;
+ CHECK_EQ(0, sigaction(signum, nullptr, &cur_action));
+ return cur_action.sa_handler != SIG_DFL;
+}
+} // anonymous namespace
+
+TEST_F(DebugUtilTest, TestStackTraceInvalidTid) {
+ string s = DumpThreadStack(1);
+ ASSERT_STR_CONTAINS(s, "unable to deliver signal");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceSelf) {
+ string s = DumpThreadStack(Thread::CurrentThreadId());
+ ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceSelf_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceMainThread) {
+ string s = DumpThreadStack(getpid());
+ ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceMainThread_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestSignalStackTrace) {
+ CountDownLatch l(1);
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+ auto cleanup_thr = MakeScopedCleanup([&]() {
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
+
+ // We have to loop a little bit because it takes a little while for the thread
+ // to start up and actually call our function.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+ });
+
+ // Test that we can change the signal and that the stack traces still work,
+ // on the new signal.
+ ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+ ASSERT_OK(SetStackTraceSignal(SIGHUP));
+
+ // Should now be registered.
+ ASSERT_TRUE(IsSignalHandlerRegistered(SIGHUP));
+
+ // SIGUSR2 should be relinquished.
+ ASSERT_FALSE(IsSignalHandlerRegistered(SIGUSR2));
+
+ // Stack traces should work using the new handler.
+ ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+ // Switch back to SIGUSR2 and ensure it changes back.
+ ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+ ASSERT_TRUE(IsSignalHandlerRegistered(SIGUSR2));
+ ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+
+ // Stack traces should work using the new handler.
+ ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+ // Register our own signal handler on SIGHUP, and ensure that
+ // we get a bad Status if we try to use it.
+ signal(SIGHUP, &fake_signal_handler);
+ ASSERT_STR_CONTAINS(SetStackTraceSignal(SIGHUP).ToString(),
+ "unable to install signal handler");
+ signal(SIGHUP, SIG_DFL);
+
+ // Stack traces should be disabled
+ ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "unable to take thread stack");
+
+ // Re-enable so that other tests pass.
+ ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+}
+
+// Test which dumps all known threads within this process.
+// We don't validate the results in any way -- but this verifies that we can
+// dump library threads such as the libc timer_thread and properly time out.
+TEST_F(DebugUtilTest, TestSnapshot) {
+ // HACK: prior tests in this suite start threads. Even though they Join on the
+ // threads before the test case finishes, there is actually a very short
+ // period of time after Join() returns but before the actual thread has exited
+ // and removed itself from /proc/self/task/. That means that 'ListThreads' below
+ // can sometimes show these threads from prior test cases, and then the assertions
+ // in this test case would fail.
+ //
+ // So, we have to wait here for the number of running threads to level off to the
+ // expected value.
+ // Ensure Kernel Stack Watchdog is running.
+ KernelStackWatchdog::GetInstance();
+ int initial_thread_count =
+ 1 // main thread
+ + 1 // KernelStackWatchdog
+ + (FLAGS_test_timeout_after > 0 ? 1 : 0) // test timeout thread if running
+ + FLAGS_stress_cpu_threads;
+#ifdef THREAD_SANITIZER
+ initial_thread_count++; // tsan signal thread
+#endif
+ // The test and runtime environment runs various utility threads (for example,
+ // the kernel stack watchdog, the TSAN runtime thread, the test timeout thread, etc).
+ // Count them before we start any additional threads for this test.
+ ASSERT_EVENTUALLY([&]{
+ vector<pid_t> threads;
+ ASSERT_OK(ListThreads(&threads));
+ ASSERT_EQ(initial_thread_count, threads.size()) << threads;
+ });
+
+ // Start a bunch of sleeping threads.
+ const int kNumThreads = 30;
+ CountDownLatch l(1);
+ vector<scoped_refptr<Thread>> threads(kNumThreads);
+ for (int i = 0; i < kNumThreads; i++) {
+ ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &threads[i]));
+ }
+
+ SCOPED_CLEANUP({
+ // Allow the thread to finish.
+ l.CountDown();
+ for (auto& t : threads) {
+ t->Join();
+ }
+ });
+
+ StackTraceSnapshot snap;
+ ASSERT_OK(snap.SnapshotAllStacks());
+ int count = 0;
+ int groups = 0;
+ snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
+ groups++;
+ for (const auto& info : group) {
+ count++;
+ LOG(INFO) << info.tid << " " << info.thread_name
+ << " (" << info.status.ToString() << ")";
+ }
+ LOG(INFO) << group[0].stack.ToHexString();
+ });
+ int tsan_threads = 0;
+#ifdef THREAD_SANITIZER
+ // TSAN starts an extra thread of its own.
+ tsan_threads++;
+#endif
+ ASSERT_EQ(kNumThreads + initial_thread_count, count);
+ // The threads might not have exactly identical stacks, but
+ // we should have far fewer groups than the total number
+ // of threads.
+ ASSERT_LE(groups, kNumThreads / 2);
+ ASSERT_EQ(tsan_threads, snap.num_failed());
+}
+
+TEST_F(DebugUtilTest, Benchmark) {
+ CountDownLatch l(1);
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+ SCOPED_CLEANUP({
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
+
+ for (bool symbolize : {false, true}) {
+ MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
+ int count = 0;
+ volatile int prevent_optimize = 0;
+ while (MonoTime::Now() < end_time) {
+ StackTrace trace;
+ GetThreadStack(t->tid(), &trace);
+ if (symbolize) {
+ prevent_optimize += trace.Symbolize().size();
+ }
+ count++;
+ }
+ LOG(INFO) << "Throughput: " << count << " dumps/second (symbolize=" << symbolize << ")";
+ }
+}
+
+int TakeStackTrace(struct dl_phdr_info* /*info*/, size_t /*size*/, void* data) {
+ StackTrace* s = reinterpret_cast<StackTrace*>(data);
+ s->Collect(0);
+ return 0;
+}
+
+// Test that if we try to collect a stack trace while inside a libdl function
+// call that we properly return the bogus stack indicating the issue.
+//
+// This doesn't work in ThreadSanitizer since we don't intercept dl_iterate_phdr
+// in those builds (see note in unwind_safeness.cc).
+#ifndef THREAD_SANITIZER
+TEST_F(DebugUtilTest, TestUnwindWhileUnsafe) {
+ StackTrace s;
+ dl_iterate_phdr(&TakeStackTrace, &s);
+ ASSERT_STR_CONTAINS(s.Symbolize(), "CouldNotCollectStackTraceBecauseInsideLibDl");
+}
+#endif
+
+int DoNothingDlCallback(struct dl_phdr_info* /*info*/, size_t /*size*/, void* /*data*/) {
+ return 0;
+}
+
+// Parameterized test which performs various operations which might be dangerous to
+// collect a stack trace while the main thread tries to take stack traces. These
+// operations are all possibly executed on normal application threads, so we need to
+// ensure that if we happen to gather the stack from a thread in the middle of the
+// function that we don't crash or deadlock.
+//
+// Example self-deadlock if we didn't have the appropriate workarounds in place:
+// #0 __lll_lock_wait ()
+// #1 0x00007ffff6f16e42 in __GI___pthread_mutex_lock
+// #2 0x00007ffff6c8601f in __GI___dl_iterate_phdr
+// #3 0x0000000000695b02 in dl_iterate_phdr
+// #4 0x000000000056d013 in _ULx86_64_dwarf_find_proc_info
+// #5 0x000000000056d1d5 in fetch_proc_info (c=c@ent
+// #6 0x000000000056e2e7 in _ULx86_64_dwarf_find_save_
+// #7 0x000000000056c1b9 in _ULx86_64_dwarf_step (c=c@
+// #8 0x000000000056be21 in _ULx86_64_step
+// #9 0x0000000000566b1d in google::GetStackTrace
+// #10 0x00000000004dc4d1 in kudu::StackTrace::Collect
+// #11 kudu::(anonymous namespace)::HandleStackTraceSignal
+// #12 <signal handler called>
+// #13 0x00007ffff6f16e31 in __GI___pthread_mutex_lock
+// #14 0x00007ffff6c8601f in __GI___dl_iterate_phdr
+// #15 0x0000000000695b02 in dl_iterate_phdr
+enum DangerousOp {
+ DLOPEN_AND_CLOSE,
+ DL_ITERATE_PHDR,
+ GET_STACK_TRACE,
+ MALLOC_AND_FREE
+};
+class RaceTest : public DebugUtilTest, public ::testing::WithParamInterface<DangerousOp> {};
+INSTANTIATE_TEST_CASE_P(DifferentRaces, RaceTest,
+ ::testing::Values(DLOPEN_AND_CLOSE,
+ DL_ITERATE_PHDR,
+ GET_STACK_TRACE,
+ MALLOC_AND_FREE));
+
+void DangerousOperationThread(DangerousOp op, CountDownLatch* l) {
+ while (l->count()) {
+ switch (op) {
+ case DLOPEN_AND_CLOSE: {
+ // Check races against dlopen/dlclose.
+ void* v = dlopen("libc.so.6", RTLD_LAZY);
+ CHECK(v);
+ dlclose(v);
+ break;
+ }
+
+ case DL_ITERATE_PHDR: {
+ // Check for races against dl_iterate_phdr.
+ dl_iterate_phdr(&DoNothingDlCallback, nullptr);
+ break;
+ }
+
+ case GET_STACK_TRACE: {
+ // Check for reentrancy issues
+ GetStackTrace();
+ break;
+ }
+
+ case MALLOC_AND_FREE: {
+ // Check large allocations in tcmalloc.
+ volatile char* x = new char[1024 * 1024 * 2];
+ delete[] x;
+ break;
+ }
+ default:
+ LOG(FATAL) << "unknown op";
+ }
+ }
+}
+
+// Starts a thread performing dangerous operations and then gathers
+// its stack trace in a loop trying to trigger races.
+TEST_P(RaceTest, TestStackTraceRaces) {
+ DangerousOp op = GetParam();
+ CountDownLatch l(1);
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", "test thread", &DangerousOperationThread, op, &l, &t));
+ SCOPED_CLEANUP({
+ // Allow the thread to finish.
+ l.CountDown();
+ // Crash if we can't join the thread after a reasonable amount of time.
+ // That probably indicates a deadlock.
+ CHECK_OK(ThreadJoiner(t.get()).give_up_after_ms(10000).Join());
+ });
+ MonoTime end_time = MonoTime::Now() + MonoDelta::FromSeconds(1);
+ while (MonoTime::Now() < end_time) {
+ StackTrace trace;
+ GetThreadStack(t->tid(), &trace);
+ }
+}
+
+void BlockSignalsThread() {
+ sigset_t set;
+ sigemptyset(&set);
+ sigaddset(&set, SIGUSR2);
+ for (int i = 0; i < 3; i++) {
+ CHECK_ERR(pthread_sigmask((i % 2) ? SIG_UNBLOCK : SIG_BLOCK, &set, nullptr));
+ SleepFor(MonoDelta::FromSeconds(1));
+ }
+}
+
+TEST_F(DebugUtilTest, TestThreadBlockingSignals) {
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", "test thread", &BlockSignalsThread, &t));
+ SCOPED_CLEANUP({ t->Join(); });
+ string ret;
+ while (ret.find("unable to deliver signal") == string::npos) {
+ ret = DumpThreadStack(t->tid());
+ LOG(INFO) << ret;
+ }
+}
+
+// Test stack traces which time out despite the destination thread not blocking
+// signals.
+TEST_F(DebugUtilTest, TestTimeouts) {
+ const int kRunTimeSecs = AllowSlowTests() ? 5 : 1;
+
+ CountDownLatch l(1);
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+ auto cleanup_thr = MakeScopedCleanup([&]() {
+ // Allow the thread to finish.
+ l.CountDown();
+ t->Join();
+ });
+
+ // First, time a few stack traces to determine how long a non-timed-out stack
+ // trace takes.
+ vector<MicrosecondsInt64> durations;
+ for (int i = 0; i < 20; i++) {
+ StackTrace stack;
+ auto st = GetMonoTimeMicros();
+ ASSERT_OK(GetThreadStack(t->tid(), &stack));
+ auto dur = GetMonoTimeMicros() - st;
+ durations.push_back(dur);
+ }
+
+ // Compute the median to throw out outliers.
+ std::sort(durations.begin(), durations.end());
+ auto median_duration = durations[durations.size() / 2];
+ LOG(INFO) << "Median duration: " << median_duration << "us";
+
+ // Now take a bunch of stack traces with timeouts clustered around
+ // the expected time. When we time out, we adjust the timeout to be
+ // higher so the next attempt is less likely to time out. Conversely,
+ // when we succeed, we adjust the timeout to be shorter so the next
+ // attempt is more likely to time out. This has the effect of triggering
+ // all the interesting cases: (a) success, (b) timeout, (c) timeout
+ // exactly as the signal finishes.
+ int num_timeouts = 0;
+ int num_successes = 0;
+ auto end_time = MonoTime::Now() + MonoDelta::FromSeconds(kRunTimeSecs);
+ int64_t timeout_us = median_duration;
+ while (MonoTime::Now() < end_time) {
+ StackTraceCollector stc;
+ // Allocate Stack on the heap so that if we get a use-after-free it
+ // will be caught more easily by ASAN.
+ std::unique_ptr<StackTrace> stack(new StackTrace());
+ ASSERT_OK(stc.TriggerAsync(t->tid(), stack.get()));
+ Status s = stc.AwaitCollection(MonoTime::Now() + MonoDelta::FromMicroseconds(timeout_us));
+ if (s.ok()) {
+ num_successes++;
+ timeout_us--;
+ } else if (s.IsTimedOut()) {
+ num_timeouts++;
+ timeout_us++;
+ } else {
+ FAIL() << "Unexpected status: " << s.ToString();
+ }
+ }
+ LOG(INFO) << "Timed out " << num_timeouts << " times";
+ LOG(INFO) << "Succeeded " << num_successes << " times";
+}
+
+#endif
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util.cc b/be/src/kudu/util/debug-util.cc
new file mode 100644
index 0000000..03556d6
--- /dev/null
+++ b/be/src/kudu/util/debug-util.cc
@@ -0,0 +1,800 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/debug-util.h"
+
+#include <dirent.h>
+#ifndef __linux__
+#include <sched.h>
+#endif
+#ifdef __linux__
+#include <syscall.h>
+#else
+#include <sys/syscall.h>
+#endif
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <ctime>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <glog/raw_logging.h>
+#ifdef __linux__
+#define UNW_LOCAL_ONLY
+#include <libunwind.h>
+#endif
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/linux_syscall_support.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/debug/leak_annotations.h"
+#ifndef __linux__
+#include "kudu/util/debug/sanitizer_scopes.h"
+#endif
+#include "kudu/util/debug/unwind_safeness.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+#if defined(__APPLE__)
+typedef sig_t sighandler_t;
+#endif
+
+// In coverage builds, this symbol will be defined and allows us to flush coverage info
+// to disk before exiting.
+#if defined(__APPLE__)
+ // OS X does not support weak linking at compile time properly.
+ #if defined(COVERAGE_BUILD)
+extern "C" void __gcov_flush() __attribute__((weak_import));
+ #else
+extern "C" void (*__gcov_flush)() = nullptr;
+ #endif
+#else
+extern "C" {
+__attribute__((weak))
+void __gcov_flush();
+}
+#endif
+
+// Evil hack to grab a few useful functions from glog
+namespace google {
+
+extern int GetStackTrace(void** result, int max_depth, int skip_count);
+
+// Symbolizes a program counter. On success, returns true and write the
+// symbol name to "out". The symbol name is demangled if possible
+// (supports symbols generated by GCC 3.x or newer). Otherwise,
+// returns false.
+bool Symbolize(void *pc, char *out, int out_size);
+
+namespace glog_internal_namespace_ {
+extern void DumpStackTraceToString(string *s);
+} // namespace glog_internal_namespace_
+} // namespace google
+
+// The %p field width for printf() functions is two characters per byte.
+// For some environments, add two extra bytes for the leading "0x".
+static const int kPrintfPointerFieldWidth = 2 + 2 * sizeof(void*);
+
+// The signal that we'll use to communicate with our other threads.
+// This can't be in used by other libraries in the process.
+static int g_stack_trace_signum = SIGUSR2;
+
+// Protects g_stack_trace_signum and the installation of the signal
+// handler.
+static base::SpinLock g_signal_handler_lock(base::LINKER_INITIALIZED);
+
+namespace kudu {
+
+bool IsCoverageBuild() {
+ return __gcov_flush != nullptr;
+}
+
+void TryFlushCoverage() {
+ static base::SpinLock lock(base::LINKER_INITIALIZED);
+
+ // Flushing coverage is not reentrant or thread-safe.
+ if (!__gcov_flush || !lock.TryLock()) {
+ return;
+ }
+
+ __gcov_flush();
+
+ lock.Unlock();
+}
+
+
+namespace stack_trace_internal {
+
+// Simple notification mechanism based on futex.
+//
+// We use this instead of a mutex and condvar because we need
+// to signal it from a signal handler, and mutexes are not async-safe.
+//
+// pthread semaphores are async-signal-safe but their timedwait function
+// only supports wall clock waiting, which is a bit dangerous since we
+// need strict timeouts here.
+class CompletionFlag {
+ public:
+
+ // Mark the flag as complete, waking all waiters.
+ void Signal() {
+ complete_ = true;
+#ifndef __APPLE__
+ sys_futex(reinterpret_cast<int32_t*>(&complete_),
+ FUTEX_WAKE | FUTEX_PRIVATE_FLAG,
+ INT_MAX, // wake all
+ 0 /* ignored */);
+#endif
+ }
+
+ // Wait for the flag to be marked as complete, up until the given deadline.
+ // Returns true if the flag was marked complete before the deadline.
+ bool WaitUntil(MonoTime deadline) {
+ if (complete_) return true;
+
+ MonoTime now = MonoTime::Now();
+ while (now < deadline) {
+#ifndef __APPLE__
+ MonoDelta rem = deadline - now;
+ struct timespec ts;
+ rem.ToTimeSpec(&ts);
+ sys_futex(reinterpret_cast<int32_t*>(&complete_),
+ FUTEX_WAIT | FUTEX_PRIVATE_FLAG,
+ 0, // wait if value is still 0
+ reinterpret_cast<struct kernel_timespec *>(&ts));
+#else
+ sched_yield();
+#endif
+ if (complete_) {
+ return true;
+ }
+ now = MonoTime::Now();
+ }
+ return complete_;
+ }
+
+ void Reset() {
+ complete_ = false;
+ }
+
+ bool complete() const {
+ return complete_;
+ }
+ private:
+ std::atomic<int32_t> complete_ { 0 };
+};
+
+
+// A pointer to this structure is passed as signal data to a thread when
+// a stack trace is being remotely requested.
+//
+// The state machine is as follows (each state is a tuple of 'queued_to_tid'
+// and 'result_ready' status):
+//
+// [ kNotInUse, false ]
+// |
+// | (A)
+// v (D)
+// [ <target tid>, false ] ---> [ kNotInUse, false ] (leaked)
+// |
+// | (B)
+// v (E)
+// [ kDumpStarted, false ] ---> [ kNotInUse, false ] (tracer waits for 'result_ready')
+// | |
+// | (C) | (G)
+// v (F) v
+// [ kDumpStarted, true ] ---> [ kNotInUse, true ] (already complete)
+//
+// Transitions:
+// (A): tracer thread sets target_tid before sending a singla
+// (B): target thread CAS target_tid to kDumpStarted (and aborts on CAS failure)
+// (C,G): target thread finishes collecting stacks and signals 'result_ready'
+// (D,E,F): tracer thread exchanges 'kNotInUse' back into queued_to_tid in
+// RevokeSigData().
+struct SignalData {
+ // The actual destination for the stack trace collected from the target thread.
+ StackTrace* stack;
+
+ static const int kNotInUse = 0;
+ static const int kDumpStarted = -1;
+ // Either one of the above constants, or if the dumper thread
+ // is waiting on a response, the tid that it is waiting on.
+ std::atomic<int64_t> queued_to_tid { kNotInUse };
+
+ // Signaled when the target thread has successfully collected its stack.
+ // The dumper thread waits for this to become true.
+ CompletionFlag result_ready;
+};
+
+} // namespace stack_trace_internal
+
+using stack_trace_internal::SignalData;
+
+namespace {
+
+// Signal handler for our stack trace signal.
+// We expect that the signal is only sent from DumpThreadStack() -- not by a user.
+void HandleStackTraceSignal(int /*signum*/, siginfo_t* info, void* /*ucontext*/) {
+ // Signal handlers may be invoked at any point, so it's important to preserve
+ // errno.
+ int save_errno = errno;
+ SCOPED_CLEANUP({
+ errno = save_errno;
+ });
+ auto* sig_data = reinterpret_cast<SignalData*>(info->si_value.sival_ptr);
+ DCHECK(sig_data);
+ if (!sig_data) {
+ // Maybe the signal was sent by a user instead of by ourself, ignore it.
+ return;
+ }
+ ANNOTATE_HAPPENS_AFTER(sig_data);
+ int64_t my_tid = Thread::CurrentThreadId();
+
+ // If we were slow to process the signal, the sender may have given up and
+ // no longer wants our stack trace. In that case, the 'sig' object will
+ // no longer contain our thread.
+ if (!sig_data->queued_to_tid.compare_exchange_strong(my_tid, SignalData::kDumpStarted)) {
+ return;
+ }
+ // Marking it as kDumpStarted ensures that the caller thread must now wait
+ // for our response, since we are writing directly into their StackTrace object.
+ sig_data->stack->Collect(/*skip_frames=*/1);
+ sig_data->result_ready.Signal();
+}
+
+bool InitSignalHandlerUnlocked(int signum) {
+ enum InitState {
+ UNINITIALIZED,
+ INIT_ERROR,
+ INITIALIZED
+ };
+ static InitState state = UNINITIALIZED;
+
+ // If we've already registered a handler, but we're being asked to
+ // change our signal, unregister the old one.
+ if (signum != g_stack_trace_signum && state == INITIALIZED) {
+ struct sigaction old_act;
+ PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0);
+ if (old_act.sa_sigaction == &HandleStackTraceSignal) {
+ signal(g_stack_trace_signum, SIG_DFL);
+ }
+ }
+
+ // If we'd previously had an error, but the signal number
+ // is changing, we should mark ourselves uninitialized.
+ if (signum != g_stack_trace_signum) {
+ g_stack_trace_signum = signum;
+ state = UNINITIALIZED;
+ }
+
+ if (state == UNINITIALIZED) {
+ struct sigaction old_act;
+ PCHECK(sigaction(g_stack_trace_signum, nullptr, &old_act) == 0);
+ if (old_act.sa_handler != SIG_DFL &&
+ old_act.sa_handler != SIG_IGN) {
+ state = INIT_ERROR;
+ LOG(WARNING) << "signal handler for stack trace signal "
+ << g_stack_trace_signum
+ << " is already in use: "
+ << "Kudu will not produce thread stack traces.";
+ } else {
+ // No one appears to be using the signal. This is racy, but there is no
+ // atomic swap capability.
+ struct sigaction act;
+ memset(&act, 0, sizeof(act));
+ act.sa_sigaction = &HandleStackTraceSignal;
+ act.sa_flags = SA_SIGINFO | SA_RESTART;
+ struct sigaction old_act;
+ CHECK_ERR(sigaction(g_stack_trace_signum, &act, &old_act));
+ sighandler_t old_handler = old_act.sa_handler;
+ if (old_handler != SIG_IGN &&
+ old_handler != SIG_DFL) {
+ LOG(FATAL) << "raced against another thread installing a signal handler";
+ }
+ state = INITIALIZED;
+ }
+ }
+ return state == INITIALIZED;
+}
+
+#ifdef __linux__
+GoogleOnceType g_prime_libunwind_once;
+
+void PrimeLibunwind() {
+ // The first call into libunwind does some unsafe double-checked locking
+ // for initialization. So, we make sure that the first call is not concurrent
+ // with any other call.
+ unw_cursor_t cursor;
+ unw_context_t uc;
+ unw_getcontext(&uc);
+ RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed");
+}
+#endif
+} // anonymous namespace
+
+Status SetStackTraceSignal(int signum) {
+ base::SpinLockHolder h(&g_signal_handler_lock);
+ if (!InitSignalHandlerUnlocked(signum)) {
+ return Status::InvalidArgument("unable to install signal handler");
+ }
+ return Status::OK();
+}
+
+StackTraceCollector::StackTraceCollector(StackTraceCollector&& other) noexcept
+ : tid_(other.tid_),
+ sig_data_(other.sig_data_) {
+ other.tid_ = 0;
+ other.sig_data_ = nullptr;
+}
+
+StackTraceCollector::~StackTraceCollector() {
+ if (sig_data_) {
+ RevokeSigData();
+ }
+}
+
+#ifdef __linux__
+bool StackTraceCollector::RevokeSigData() {
+ // First, exchange the atomic variable back to 'not in use'. This ensures
+ // that, if the signalled thread hasn't started filling in the trace yet,
+ // it will see the 'kNotInUse' value and abort.
+ int64_t old_val = sig_data_->queued_to_tid.exchange(SignalData::kNotInUse);
+
+ // We now have two cases to consider.
+
+ // 1) Timed out, but signal still pending and signal handler not yet invoked.
+ //
+ // In this case, the signal handler hasn't started collecting a stack trace, so when
+ // we exchange 'queued_to_tid', we see that it is still "queued". In case the signal
+ // later gets delivered, we can't free the 'sig_data_' struct itself. We intentionally
+ // leak it. Note, however, that if the signal handler later runs, it will see that we
+ // exchanged out its tid from 'queued_to_tid' and therefore won't attempt to write
+ // into the 'stack' structure.
+ if (old_val == tid_) {
+ // TODO(todd) instead of leaking, we can insert these lost structs into a global
+ // free-list, and then reuse them the next time we want to send a signal. The re-use
+ // is safe since access is limited to a specific tid.
+ DLOG(WARNING) << "Leaking SignalData structure " << sig_data_ << " after lost signal "
+ << "to thread " << tid_;
+ ANNOTATE_LEAKING_OBJECT_PTR(sig_data_);
+ sig_data_ = nullptr;
+ return false;
+ }
+
+ // 2) The signal was delivered. Either the thread is currently collecting its stack
+ // trace (in which case we have to wait for it to finish), or it has already completed
+ // (in which case waiting is a no-op).
+ CHECK_EQ(old_val, SignalData::kDumpStarted);
+ CHECK(sig_data_->result_ready.WaitUntil(MonoTime::Max()));
+ delete sig_data_;
+ sig_data_ = nullptr;
+ return true;
+}
+
+
+Status StackTraceCollector::TriggerAsync(int64_t tid, StackTrace* stack) {
+ CHECK(!sig_data_ && tid_ == 0) << "TriggerAsync() must not be called more than once per instance";
+
+ // Ensure that our signal handler is installed.
+ {
+ base::SpinLockHolder h(&g_signal_handler_lock);
+ if (!InitSignalHandlerUnlocked(g_stack_trace_signum)) {
+ return Status::NotSupported("unable to take thread stack: signal handler unavailable");
+ }
+ }
+ // Ensure that libunwind is primed for use before we send any signals. Otherwise
+ // we can hit a deadlock with the following stack:
+ // GoogleOnceInit() [waits on the 'once' to finish, but will never finish]
+ // StackTrace::Collect()
+ // <signal handler>
+ // PrimeLibUnwind
+ // GoogleOnceInit() [not yet initted, so starts initializing]
+ // StackTrace::Collect()
+ GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
+
+ std::unique_ptr<SignalData> data(new SignalData());
+ // Set the target TID in our communication structure, so if we end up with any
+ // delayed signal reaching some other thread, it will know to ignore it.
+ data->queued_to_tid = tid;
+ data->stack = CHECK_NOTNULL(stack);
+
+ // We use the raw syscall here instead of kill() to ensure that we don't accidentally
+ // send a signal to some other process in the case that the thread has exited and
+ // the TID been recycled.
+ siginfo_t info;
+ memset(&info, 0, sizeof(info));
+ info.si_signo = g_stack_trace_signum;
+ info.si_code = SI_QUEUE;
+ info.si_pid = getpid();
+ info.si_uid = getuid();
+ info.si_value.sival_ptr = data.get();
+ // Since we're using a signal to pass information between the two threads,
+ // we need to help TSAN out and explicitly tell it about the happens-before
+ // relationship here.
+ ANNOTATE_HAPPENS_BEFORE(data.get());
+ if (syscall(SYS_rt_tgsigqueueinfo, getpid(), tid, g_stack_trace_signum, &info) != 0) {
+ return Status::NotFound("unable to deliver signal: process may have exited");
+ }
+
+ // The signal is now pending to the target thread. We don't store it in a unique_ptr
+ // inside the class since we need to be careful to destruct it safely in case the
+ // target thread hasn't yet received the signal when this instance gets destroyed.
+ sig_data_ = data.release();
+ tid_ = tid;
+
+ return Status::OK();
+}
+
+Status StackTraceCollector::AwaitCollection(MonoTime deadline) {
+ CHECK(sig_data_) << "Must successfully call TriggerAsync() first";
+
+ // We give the thread ~1s to respond. In testing, threads typically respond within
+ // a few milliseconds, so this timeout is very conservative.
+ //
+ // The main reason that a thread would not respond is that it has blocked signals. For
+ // example, glibc's timer_thread doesn't respond to our signal, so we always time out
+ // on that one.
+ ignore_result(sig_data_->result_ready.WaitUntil(deadline));
+
+ // Whether or not we timed out above, revoke the signal data structure.
+ // It's possible that the above 'Wait' times out but it succeeds exactly
+ // after that timeout. In that case, RevokeSigData() will return true
+ // and we can return a successful result, because the destination stack trace
+ // has in fact been populated.
+ bool completed = RevokeSigData();
+ if (!completed) {
+ return Status::TimedOut("thread did not respond: maybe it is blocking signals");
+ }
+
+ return Status::OK();
+}
+
+#else // #ifdef __linux__ ...
+Status StackTraceCollector::TriggerAsync(int64_t tid_, StackTrace* stack) {
+ return Status::NotSupported("unsupported platform");
+}
+Status StackTraceCollector::AwaitCollection(MonoTime deadline) {
+ return Status::NotSupported("unsupported platform");
+}
+bool StackTraceCollector::RevokeSigData() {
+ return false;
+}
+#endif // #ifdef __linux__ ... #else ...
+
+Status GetThreadStack(int64_t tid, StackTrace* stack) {
+ StackTraceCollector c;
+ RETURN_NOT_OK(c.TriggerAsync(tid, stack));
+ RETURN_NOT_OK(c.AwaitCollection(MonoTime::Now() + MonoDelta::FromSeconds(1)));
+ return Status::OK();
+}
+
+string DumpThreadStack(int64_t tid) {
+ StackTrace trace;
+ Status s = GetThreadStack(tid, &trace);
+ if (s.ok()) {
+ return trace.Symbolize();
+ }
+ return strings::Substitute("<$0>", s.ToString());
+}
+
+Status ListThreads(vector<pid_t> *tids) {
+#ifndef __linux__
+ return Status::NotSupported("unable to list threads on this platform");
+#else
+ DIR *dir = opendir("/proc/self/task/");
+ if (dir == NULL) {
+ return Status::IOError("failed to open task dir", ErrnoToString(errno), errno);
+ }
+ struct dirent *d;
+ while ((d = readdir(dir)) != NULL) {
+ if (d->d_name[0] != '.') {
+ uint32_t tid;
+ if (!safe_strtou32(d->d_name, &tid)) {
+ LOG(WARNING) << "bad tid found in procfs: " << d->d_name;
+ continue;
+ }
+ tids->push_back(tid);
+ }
+ }
+ closedir(dir);
+ return Status::OK();
+#endif // __linux__
+}
+
+string GetStackTrace() {
+ string s;
+ google::glog_internal_namespace_::DumpStackTraceToString(&s);
+ return s;
+}
+
+string GetStackTraceHex() {
+ char buf[1024];
+ HexStackTraceToString(buf, 1024);
+ return buf;
+}
+
+void HexStackTraceToString(char* buf, size_t size) {
+ StackTrace trace;
+ trace.Collect(1);
+ trace.StringifyToHex(buf, size);
+}
+
+string GetLogFormatStackTraceHex() {
+ StackTrace trace;
+ trace.Collect(1);
+ return trace.ToLogFormatHexString();
+}
+
+// Bogus empty function which we use below to fill in the stack trace with
+// something readable to indicate that stack trace collection was unavailable.
+void CouldNotCollectStackTraceBecauseInsideLibDl() {
+}
+
+void StackTrace::Collect(int skip_frames) {
+ if (!debug::SafeToUnwindStack()) {
+ // Build a fake stack so that the user sees an appropriate message upon symbolizing
+ // rather than seeing an empty stack.
+ uintptr_t f_ptr = reinterpret_cast<uintptr_t>(&CouldNotCollectStackTraceBecauseInsideLibDl);
+ // Increase the pointer by one byte since the return address from a function call
+ // would not be the beginning of the function itself.
+ frames_[0] = reinterpret_cast<void*>(f_ptr + 1);
+ num_frames_ = 1;
+ return;
+ }
+ const int kMaxDepth = arraysize(frames_);
+
+#ifdef __linux__
+ GoogleOnceInit(&g_prime_libunwind_once, &PrimeLibunwind);
+
+ unw_cursor_t cursor;
+ unw_context_t uc;
+ unw_getcontext(&uc);
+ RAW_CHECK(unw_init_local(&cursor, &uc) >= 0, "unw_init_local failed");
+ skip_frames++; // Do not include the "Collect" frame
+
+ num_frames_ = 0;
+ while (num_frames_ < kMaxDepth) {
+ void *ip;
+ int ret = unw_get_reg(&cursor, UNW_REG_IP, reinterpret_cast<unw_word_t *>(&ip));
+ if (ret < 0) {
+ break;
+ }
+ if (skip_frames > 0) {
+ skip_frames--;
+ } else {
+ frames_[num_frames_++] = ip;
+ }
+ ret = unw_step(&cursor);
+ if (ret <= 0) {
+ break;
+ }
+ }
+#else
+ // On OSX, use the unwinder from glog. However, that unwinder has an issue where
+ // concurrent invocations will return no frames. See:
+ // https://github.com/google/glog/issues/298
+ // The worst result here is an empty result.
+
+ // google::GetStackTrace has a data race. This is called frequently, so better
+ // to ignore it with an annotation rather than use a suppression.
+ debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+ num_frames_ = google::GetStackTrace(frames_, kMaxDepth, skip_frames + 1);
+#endif
+}
+
+void StackTrace::StringifyToHex(char* buf, size_t size, int flags) const {
+ char* dst = buf;
+
+ // Reserve kHexEntryLength for the first iteration of the loop, 1 byte for a
+ // space (which we may not need if there's just one frame), and 1 for a nul
+ // terminator.
+ char* limit = dst + size - kHexEntryLength - 2;
+ for (int i = 0; i < num_frames_ && dst < limit; i++) {
+ if (i != 0) {
+ *dst++ = ' ';
+ }
+ if (flags & HEX_0X_PREFIX) {
+ *dst++ = '0';
+ *dst++ = 'x';
+ }
+ // See note in Symbolize() below about why we subtract 1 from each address here.
+ uintptr_t addr = reinterpret_cast<uintptr_t>(frames_[i]);
+ if (addr > 0 && !(flags & NO_FIX_CALLER_ADDRESSES)) {
+ addr--;
+ }
+ FastHex64ToBuffer(addr, dst);
+ dst += kHexEntryLength;
+ }
+ *dst = '\0';
+}
+
+string StackTrace::ToHexString(int flags) const {
+ // Each frame requires kHexEntryLength, plus a space
+ // We also need one more byte at the end for '\0'
+ int len_per_frame = kHexEntryLength;
+ len_per_frame++; // For the separating space.
+ if (flags & HEX_0X_PREFIX) {
+ len_per_frame += 2;
+ }
+ int buf_len = kMaxFrames * len_per_frame + 1;
+ char buf[buf_len];
+ StringifyToHex(buf, buf_len, flags);
+ return string(buf);
+}
+
+// Symbolization function borrowed from glog.
+string StackTrace::Symbolize() const {
+ string ret;
+ for (int i = 0; i < num_frames_; i++) {
+ void* pc = frames_[i];
+
+ char tmp[1024];
+ const char* symbol = "(unknown)";
+
+ // The return address 'pc' on the stack is the address of the instruction
+ // following the 'call' instruction. In the case of calling a function annotated
+ // 'noreturn', this address may actually be the first instruction of the next
+ // function, because the function we care about ends with the 'call'.
+ // So, we subtract 1 from 'pc' so that we're pointing at the 'call' instead
+ // of the return address.
+ //
+ // For example, compiling a C program with -O2 that simply calls 'abort()' yields
+ // the following disassembly:
+ // Disassembly of section .text:
+ //
+ // 0000000000400440 <main>:
+ // 400440: 48 83 ec 08 sub $0x8,%rsp
+ // 400444: e8 c7 ff ff ff callq 400410 <ab...@plt>
+ //
+ // 0000000000400449 <_start>:
+ // 400449: 31 ed xor %ebp,%ebp
+ // ...
+ //
+ // If we were to take a stack trace while inside 'abort', the return pointer
+ // on the stack would be 0x400449 (the first instruction of '_start'). By subtracting
+ // 1, we end up with 0x400448, which is still within 'main'.
+ //
+ // This also ensures that we point at the correct line number when using addr2line
+ // on logged stacks.
+ //
+ // We check that the pc is not 0 to avoid undefined behavior in the case of
+ // invalid unwinding (see KUDU-2433).
+ if (pc && google::Symbolize(
+ reinterpret_cast<char *>(pc) - 1, tmp, sizeof(tmp))) {
+ symbol = tmp;
+ }
+ StringAppendF(&ret, " @ %*p %s\n", kPrintfPointerFieldWidth, pc, symbol);
+ }
+ return ret;
+}
+
+string StackTrace::ToLogFormatHexString() const {
+ string ret;
+ for (int i = 0; i < num_frames_; i++) {
+ void* pc = frames_[i];
+ StringAppendF(&ret, " @ %*p\n", kPrintfPointerFieldWidth, pc);
+ }
+ return ret;
+}
+
+uint64_t StackTrace::HashCode() const {
+ return util_hash::CityHash64(reinterpret_cast<const char*>(frames_),
+ sizeof(frames_[0]) * num_frames_);
+}
+
+bool StackTrace::LessThan(const StackTrace& s) const {
+ return std::lexicographical_compare(frames_, &frames_[num_frames_],
+ s.frames_, &s.frames_[num_frames_]);
+}
+
+Status StackTraceSnapshot::SnapshotAllStacks() {
+ if (IsBeingDebugged()) {
+ return Status::Incomplete("not collecting stack trace since debugger or strace is attached");
+ }
+
+ vector<pid_t> tids;
+ RETURN_NOT_OK_PREPEND(ListThreads(&tids), "could not list threads");
+
+ collectors_.clear();
+ collectors_.resize(tids.size());
+ infos_.clear();
+ infos_.resize(tids.size());
+ for (int i = 0; i < tids.size(); i++) {
+ infos_[i].tid = tids[i];
+ infos_[i].status = collectors_[i].TriggerAsync(tids[i], &infos_[i].stack);
+ }
+
+ // Now collect the thread names while we are waiting on stack trace collection.
+ if (capture_thread_names_) {
+ for (auto& info : infos_) {
+ if (!info.status.ok()) continue;
+
+ // Get the thread's name by reading proc.
+ // TODO(todd): should we have the dumped thread fill in its own name using
+ // prctl to avoid having to open and read /proc? Or maybe we should use the
+ // Kudu ThreadMgr to get the thread names for the cases where we are using
+ // the kudu::Thread wrapper at least.
+ faststring buf;
+ Status s = ReadFileToString(Env::Default(),
+ strings::Substitute("/proc/self/task/$0/comm", info.tid),
+ &buf);
+ if (!s.ok()) {
+ info.thread_name = "<unknown name>";
+ } else {
+ info.thread_name = buf.ToString();
+ StripTrailingNewline(&info.thread_name);
+ }
+ }
+ }
+ num_failed_ = 0;
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(1);
+ for (int i = 0; i < infos_.size(); i++) {
+ infos_[i].status = infos_[i].status.AndThen([&] {
+ return collectors_[i].AwaitCollection(deadline);
+ });
+ if (!infos_[i].status.ok()) {
+ num_failed_++;
+ CHECK(!infos_[i].stack.HasCollected()) << infos_[i].status.ToString();
+ }
+ }
+ collectors_.clear();
+
+ std::sort(infos_.begin(), infos_.end(), [](const ThreadInfo& a, const ThreadInfo& b) {
+ return a.stack.LessThan(b.stack);
+ });
+ return Status::OK();
+}
+
+void StackTraceSnapshot::VisitGroups(const StackTraceSnapshot::VisitorFunc& visitor) {
+ auto group_start = infos_.begin();
+ auto group_end = group_start;
+ while (group_end != infos_.end()) {
+ do {
+ ++group_end;
+ } while (group_end != infos_.end() && group_end->stack.Equals(group_start->stack));
+ visitor(ArrayView<ThreadInfo>(&*group_start, std::distance(group_start, group_end)));
+ group_start = group_end;
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util.h b/be/src/kudu/util/debug-util.h
new file mode 100644
index 0000000..e8c94ea
--- /dev/null
+++ b/be/src/kudu/util/debug-util.h
@@ -0,0 +1,321 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_DEBUG_UTIL_H
+#define KUDU_UTIL_DEBUG_UTIL_H
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <cstring>
+#include <functional>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+template <typename T> class ArrayView;
+class MonoTime;
+class StackTrace;
+class StackTraceCollector;
+
+namespace stack_trace_internal {
+struct SignalData;
+}
+
+// Return true if coverage is enabled.
+bool IsCoverageBuild();
+
+// Try to flush coverage info. If another thread is already flushing
+// coverage, this returns without doing anything, since flushing coverage
+// is not thread-safe or re-entrant.
+void TryFlushCoverage();
+
+// Return a list of all of the thread IDs currently running in this process.
+// Not async-safe.
+Status ListThreads(std::vector<pid_t>* tids);
+
+// Set which POSIX signal number should be used internally for triggering
+// stack traces. If the specified signal handler is already in use, this
+// returns an error, and stack traces will be disabled.
+Status SetStackTraceSignal(int signum);
+
+// Return the stack trace of the given thread, stringified and symbolized.
+//
+// Note that the symbolization happens on the calling thread, not the target
+// thread, so this is relatively low-impact on the target.
+//
+// This is safe to use against the current thread, the main thread, or any other
+// thread. It requires that the target thread has not blocked POSIX signals. If
+// it has, an error message will be returned.
+//
+// This function is thread-safe.
+//
+// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since
+// it internally uses signals that will cause the debugger to stop. Consider checking
+// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use
+// cases.
+std::string DumpThreadStack(int64_t tid);
+
+// Capture the thread stack of another thread
+//
+// NOTE: if Kudu is running inside a debugger, this can be annoying to a developer since
+// it internally uses signals that will cause the debugger to stop. Consider checking
+// 'IsBeingDebugged()' from os-util.h before using this function for non-critical use
+// cases.
+Status GetThreadStack(int64_t tid, StackTrace* stack);
+
+// Return the current stack trace, stringified.
+std::string GetStackTrace();
+
+// Return the current stack trace, in hex form. This is significantly
+// faster than GetStackTrace() above, so should be used in performance-critical
+// places like TRACE() calls. If you really need blazing-fast speed, though,
+// use HexStackTraceToString() into a stack-allocated buffer instead --
+// this call causes a heap allocation for the std::string.
+//
+// Note that this is much more useful in the context of a static binary,
+// since addr2line wouldn't know where shared libraries were mapped at
+// runtime.
+std::string GetStackTraceHex();
+
+// This is the same as GetStackTraceHex(), except multi-line in a format that
+// looks very similar to GetStackTrace() but without symbols. Because it's in
+// that format, the tool stacktrace_addr2line.pl in the kudu build-support
+// directory can symbolize it automatically (to the extent that addr2line(1)
+// is able to find the symbols).
+std::string GetLogFormatStackTraceHex();
+
+// Collect the current stack trace in hex form into the given buffer.
+//
+// The resulting trace just includes the hex addresses, space-separated. This is suitable
+// for later stringification by pasting into 'addr2line' for example.
+//
+// This function is async-safe.
+void HexStackTraceToString(char* buf, size_t size);
+
+// Efficient class for collecting and later stringifying a stack trace.
+//
+// Requires external synchronization.
+class StackTrace {
+ public:
+
+ // Constructs a new (uncollected) stack trace.
+ StackTrace()
+ : num_frames_(0) {
+ }
+
+ // Resets the stack trace to an uncollected state.
+ void Reset() {
+ num_frames_ = 0;
+ }
+
+ // Returns true if Collect() (but not Reset()) has been called on this stack trace.
+ bool HasCollected() const {
+ return num_frames_ > 0;
+ }
+
+ // Copies the contents of 's' into this stack trace.
+ void CopyFrom(const StackTrace& s) {
+ memcpy(this, &s, sizeof(s));
+ }
+
+ // Returns true if the stack trace 's' matches this trace.
+ bool Equals(const StackTrace& s) const {
+ return s.num_frames_ == num_frames_ &&
+ strings::memeq(frames_, s.frames_,
+ num_frames_ * sizeof(frames_[0]));
+ }
+
+ // Comparison operator for use in sorting.
+ bool LessThan(const StackTrace& s) const;
+
+ // Collect and store the current stack trace. Skips the top 'skip_frames' frames
+ // from the stack. For example, a value of '1' will skip whichever function
+ // called the 'Collect()' function. The 'Collect' function itself is always skipped.
+ //
+ // This function is async-safe.
+ void Collect(int skip_frames = 0);
+
+ int num_frames() const {
+ return num_frames_;
+ }
+
+ void* frame(int i) const {
+ DCHECK_LE(i, num_frames_);
+ return frames_[i];
+ }
+
+ enum Flags {
+ // Do not fix up the addresses on the stack to try to point to the 'call'
+ // instructions instead of the return address. This is necessary when dumping
+ // addresses to be interpreted by 'pprof', which does this fix-up itself.
+ NO_FIX_CALLER_ADDRESSES = 1,
+
+ // Prefix each hex address with '0x'. This is required by the go version
+ // of pprof when parsing stack traces.
+ HEX_0X_PREFIX = 1 << 1,
+ };
+
+ // Stringify the trace into the given buffer.
+ // The resulting output is hex addresses suitable for passing into 'addr2line'
+ // later.
+ //
+ // Async-safe.
+ void StringifyToHex(char* buf, size_t size, int flags = 0) const;
+
+ // Same as above, but returning a std::string.
+ // This is not async-safe.
+ std::string ToHexString(int flags = 0) const;
+
+ // Return a string with a symbolized backtrace in a format suitable for
+ // printing to a log file.
+ // This is not async-safe.
+ std::string Symbolize() const;
+
+ // Return a string with a hex-only backtrace in the format typically used in
+ // log files. Similar to the format given by Symbolize(), but symbols are not
+ // resolved (only the hex addresses are given).
+ std::string ToLogFormatHexString() const;
+
+ uint64_t HashCode() const;
+
+ private:
+ enum {
+ // The maximum number of stack frames to collect.
+ kMaxFrames = 16,
+
+ // The max number of characters any frame requires in string form.
+ kHexEntryLength = 16
+ };
+
+ int num_frames_;
+ void* frames_[kMaxFrames];
+};
+
+// Utility class for gathering a process-wide snapshot of the stack traces
+// of all threads.
+class StackTraceSnapshot {
+ public:
+ // The information about each thread will be gathered in a struct.
+ struct ThreadInfo {
+ // The TID of the thread.
+ int64_t tid;
+
+ // The status of collection. If a thread exits during collection or
+ // was blocking signals, it's possible to have an error here.
+ Status status;
+
+ // The name of the thread.
+ // May be missing if 'status' is not OK or if thread name collection was
+ // disabled.
+ std::string thread_name;
+
+ // The current stack trace of the thread.
+ // Always missing if 'status' is not OK.
+ StackTrace stack;
+ };
+ using VisitorFunc = std::function<void(ArrayView<ThreadInfo> group)>;
+
+ void set_capture_thread_names(bool c) {
+ capture_thread_names_ = c;
+ }
+
+ // Snapshot the stack traces of all threads in the process. This may return a bad
+ // Status in the case that stack traces aren't supported on the platform, or if
+ // the process is running inside a debugger.
+ //
+ // NOTE: this may take some time and should not be called in a latency-sensitive
+ // context.
+ Status SnapshotAllStacks();
+
+ // After having collected stacks, visit them, grouped by shared
+ // stack trace. The visitor function will be called once per group.
+ // Each group is guaranteed to be non-empty.
+ //
+ // Any threads which failed to collect traces are returned as a single group
+ // having empty stack traces.
+ //
+ // REQUIRES: a previous successful call to SnapshotAllStacks().
+ void VisitGroups(const VisitorFunc& visitor);
+
+ // Return the number of threads which were interrogated for a stack trace.
+ //
+ // NOTE: this includes threads which failed to collect.
+ int num_threads() const { return infos_.size(); }
+
+ // Return the number of threads which failed to collect a stack trace.
+ int num_failed() const { return num_failed_; }
+
+ private:
+ std::vector<StackTraceSnapshot::ThreadInfo> infos_;
+ std::vector<StackTraceCollector> collectors_;
+ int num_failed_ = 0;
+
+ bool capture_thread_names_ = true;
+};
+
+
+// Class to collect the stack trace of another thread within this process.
+// This allows for more advanced use cases than 'DumpThreadStack(tid)' above.
+// Namely, this provides an asynchronous trigger/collect API so that many
+// stack traces can be collected from many different threads in parallel using
+// different instances of this object.
+class StackTraceCollector {
+ public:
+ StackTraceCollector() = default;
+ StackTraceCollector(StackTraceCollector&& other) noexcept;
+ ~StackTraceCollector();
+
+ // Send the asynchronous request to the the thread with TID 'tid'
+ // to collect its stack trace into '*stack'.
+ //
+ // NOTE: 'stack' must remain a valid pointer until AwaitCollection() has
+ // completed.
+ //
+ // Returns OK if the signal was sent successfully.
+ Status TriggerAsync(int64_t tid, StackTrace* stack);
+
+ // Wait for the stack trace to be collected from the target thread.
+ //
+ // REQUIRES: TriggerAsync() has returned successfully.
+ Status AwaitCollection(MonoTime deadline);
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(StackTraceCollector);
+
+ // Safely sets 'sig_data_' back to nullptr after having sent an asynchronous
+ // stack trace request. See implementation for details.
+ //
+ // Returns true if the stack trace was collected before revocation
+ // and false if it was not.
+ //
+ // POSTCONDITION: sig_data_ == nullptr
+ bool RevokeSigData();
+
+ int64_t tid_ = 0;
+ stack_trace_internal::SignalData* sig_data_ = nullptr;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/debug/leak_annotations.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug/leak_annotations.h b/be/src/kudu/util/debug/leak_annotations.h
new file mode 100644
index 0000000..2bfc3d8
--- /dev/null
+++ b/be/src/kudu/util/debug/leak_annotations.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_
+#define KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_
+
+// Ignore a single leaked object, given its pointer.
+// Does nothing if LeakSanitizer is not enabled.
+#define ANNOTATE_LEAKING_OBJECT_PTR(p)
+
+#if defined(__has_feature)
+# if __has_feature(address_sanitizer)
+# if defined(__linux__)
+
+#undef ANNOTATE_LEAKING_OBJECT_PTR
+#define ANNOTATE_LEAKING_OBJECT_PTR(p) __lsan_ignore_object(p);
+
+# endif
+# endif
+#endif
+
+// API definitions from LLVM lsan_interface.h
+
+extern "C" {
+ // Allocations made between calls to __lsan_disable() and __lsan_enable() will
+ // be treated as non-leaks. Disable/enable pairs may be nested.
+ void __lsan_disable();
+ void __lsan_enable();
+
+ // The heap object into which p points will be treated as a non-leak.
+ void __lsan_ignore_object(const void *p);
+
+ // The user may optionally provide this function to disallow leak checking
+ // for the program it is linked into (if the return value is non-zero). This
+ // function must be defined as returning a constant value; any behavior beyond
+ // that is unsupported.
+ int __lsan_is_turned_off();
+
+ // Check for leaks now. This function behaves identically to the default
+ // end-of-process leak check. In particular, it will terminate the process if
+ // leaks are found and the exitcode runtime flag is non-zero.
+ // Subsequent calls to this function will have no effect and end-of-process
+ // leak check will not run. Effectively, end-of-process leak check is moved to
+ // the time of first invocation of this function.
+ // By calling this function early during process shutdown, you can instruct
+ // LSan to ignore shutdown-only leaks which happen later on.
+ void __lsan_do_leak_check();
+
+ // Check for leaks now. Returns zero if no leaks have been found or if leak
+ // detection is disabled, non-zero otherwise.
+ // This function may be called repeatedly, e.g. to periodically check a
+ // long-running process. It prints a leak report if appropriate, but does not
+ // terminate the process. It does not affect the behavior of
+ // __lsan_do_leak_check() or the end-of-process leak check, and is not
+ // affected by them.
+ int __lsan_do_recoverable_leak_check();
+} // extern "C"
+
+namespace kudu {
+namespace debug {
+
+class ScopedLSANDisabler {
+ public:
+ ScopedLSANDisabler() { __lsan_disable(); }
+ ~ScopedLSANDisabler() { __lsan_enable(); }
+};
+
+} // namespace debug
+} // namespace kudu
+
+#endif // KUDU_UTIL_DEBUG_LEAK_ANNOTATIONS_H_