You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:19 UTC
[07/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.h b/be/src/kudu/util/rwc_lock.h
new file mode 100644
index 0000000..7b78e35
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.h
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RWC_LOCK_H
+#define KUDU_UTIL_RWC_LOCK_H
+
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// A read-write-commit lock.
+//
+// This lock has three modes: read, write, and commit.
+// The lock compatibility matrix is as follows:
+//
+// Read Write Commit
+// Read X X
+// Write X
+// Commit
+//
+// An 'X' indicates that the two types of locks may be
+// held at the same time.
+//
+// In prose:
+// - Multiple threads may hold the Read lock at the same time.
+// - A single thread may hold the Write lock, potentially at the
+// same time as any number of readers.
+// - A single thread may hold the Commit lock, but this lock is completely
+// exclusive (no concurrent readers or writers).
+//
+// A typical use case for this type of lock is when a structure is read often,
+// occasionally updated, and the update operation can take a long time. In this
+// use case, the readers simply use ReadLock() and ReadUnlock(), while the
+// writer uses a copy-on-write technique like:
+//
+// obj->lock.WriteLock();
+// // NOTE: cannot safely mutate obj->state directly here, since readers
+// // may be concurrent! So, we make a local copy to mutate.
+// my_local_copy = obj->state;
+// SomeLengthyMutation(my_local_copy);
+// obj->lock.UpgradeToCommitLock();
+// obj->state = my_local_copy;
+// obj->lock.CommitUnlock();
+//
+// This is more efficient than a standard Reader-Writer lock since the lengthy
+// mutation is only protected against other concurrent mutators, and readers
+// may continue to run with no contention.
+//
+// For the common pattern described above, the 'CowObject<>' template class defined
+// in cow_object.h is more convenient than manual locking.
+//
+// NOTE: this implementation currently does not implement any starvation protection
+// or fairness. If the read lock is being constantly acquired (i.e reader count
+// never drops to 0) then UpgradeToCommitLock() may block arbitrarily long.
+class RWCLock {
+ public:
+ RWCLock();
+ ~RWCLock();
+
+ // Acquire the lock in read mode. Upon return, guarantees that:
+ // - Other threads may concurrently hold the lock for Read.
+ // - Either zero or one thread may hold the lock for Write.
+ // - No threads hold the lock for Commit.
+ void ReadLock();
+ void ReadUnlock();
+
+ // Return true if there are any readers currently holding the lock.
+ // Useful for debug assertions.
+ bool HasReaders() const;
+
+ // Return true if the current thread holds the write lock.
+ //
+ // In DEBUG mode this is accurate -- we track the current holder's tid.
+ // In non-DEBUG mode, this may sometimes return true even if another thread
+ // is in fact the holder.
+ // Thus, this is only really useful in the context of a DCHECK assertion.
+ bool HasWriteLock() const;
+
+ // Boost-like wrappers, so boost lock guards work
+ void lock_shared() { ReadLock(); }
+ void unlock_shared() { ReadUnlock(); }
+
+ // Acquire the lock in write mode. Upon return, guarantees that:
+ // - Other threads may concurrently hold the lock for Read.
+ // - No other threads hold the lock for Write or Commit.
+ void WriteLock();
+ void WriteUnlock();
+
+ // Boost-like wrappers
+ void lock() { WriteLock(); }
+ void unlock() { WriteUnlock(); }
+
+ // Upgrade the lock from Write mode to Commit mode.
+ // Requires that the current thread holds the lock in Write mode.
+ // Upon return, guarantees:
+ // - No other thread holds the lock in any mode.
+ void UpgradeToCommitLock();
+ void CommitUnlock();
+
+ private:
+ // Variants of the functions above that must be called with lock_ held.
+ bool HasReadersUnlocked() const;
+ bool HasWriteLockUnlocked() const;
+
+ // Lock which protects reader_count_ and write_locked_.
+ // Additionally, while the commit lock is held, the
+ // locking thread holds this mutex, which prevents any new
+ // threads from obtaining the lock in any mode.
+ mutable Mutex lock_;
+ ConditionVariable no_mutators_, no_readers_;
+ int reader_count_;
+ bool write_locked_;
+
+#ifndef NDEBUG
+ static const int kBacktraceBufSize = 1024;
+ int64_t writer_tid_;
+ int64_t last_writelock_acquire_time_;
+ char last_writer_backtrace_[kBacktraceBufSize];
+#endif // NDEBUG
+
+ DISALLOW_COPY_AND_ASSIGN(RWCLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RWC_LOCK_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/safe_math-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math-test.cc b/be/src/kudu/util/safe_math-test.cc
new file mode 100644
index 0000000..d3a81c6
--- /dev/null
+++ b/be/src/kudu/util/safe_math-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdint.h>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <gtest/gtest.h>
+#include "kudu/util/safe_math.h"
+
+namespace kudu {
+template<typename T>
+static void DoTest(T a, T b, bool expected) {
+ SCOPED_TRACE(a);
+ SCOPED_TRACE(b);
+ bool overflow = false;
+ T ret = AddWithOverflowCheck(a, b, &overflow);
+ EXPECT_EQ(overflow, expected);
+ if (!overflow) {
+ EXPECT_EQ(ret, a + b);
+ }
+}
+
+TEST(TestSafeMath, TestSignedInts) {
+ // Overflow above max of range.
+ DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 15, true);
+ DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 10, false);
+
+ // Underflow around negative
+ DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -15, true);
+ DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -5, false);
+
+}
+
+TEST(TestSafeMath, TestUnsignedInts) {
+ // Overflow above max
+ DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 15, true);
+ DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 10, false);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/safe_math.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math.h b/be/src/kudu/util/safe_math.h
new file mode 100644
index 0000000..4c126dd
--- /dev/null
+++ b/be/src/kudu/util/safe_math.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Inline functions for doing overflow-safe operations on integers.
+// These should be used when doing bounds checks on user-provided data,
+// for example.
+// See also: https://www.securecoding.cert.org/confluence/display/cplusplus/INT32-CPP.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow
+#ifndef KUDU_UTIL_SAFE_MATH_H
+#define KUDU_UTIL_SAFE_MATH_H
+
+#include "kudu/gutil/mathlimits.h"
+
+namespace kudu {
+
+namespace safe_math_internal {
+
+// Template which is specialized for signed and unsigned types separately.
+template<typename Type, bool is_signed>
+struct WithOverflowCheck {
+};
+
+
+// Specialization for signed types.
+template<typename Type>
+struct WithOverflowCheck<Type, true> {
+ static inline Type Add(Type a, Type b, bool *overflowed) {
+ // Implementation from the CERT article referenced in the file header.
+ *overflowed = (((a > 0) && (b > 0) && (a > (MathLimits<Type>::kMax - b))) ||
+ ((a < 0) && (b < 0) && (a < (MathLimits<Type>::kMin - b))));
+ return a + b;
+ }
+};
+
+// Specialization for unsigned types.
+template<typename Type>
+struct WithOverflowCheck<Type, false> {
+ static inline Type Add(Type a, Type b, bool *overflowed) {
+ Type ret = a + b;
+ *overflowed = ret < a;
+ return a + b;
+ }
+};
+
+} // namespace safe_math_internal
+
+// Add 'a' and 'b', and set *overflowed to true if overflow occured.
+template<typename Type>
+inline Type AddWithOverflowCheck(Type a, Type b, bool *overflowed) {
+ // Pick the right specialization based on whether Type is signed.
+ typedef safe_math_internal::WithOverflowCheck<Type, MathLimits<Type>::kIsSigned> my_struct;
+ return my_struct::Add(a, b, overflowed);
+}
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/scoped_cleanup-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup-test.cc b/be/src/kudu/util/scoped_cleanup-test.cc
new file mode 100644
index 0000000..2e77705
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/scoped_cleanup.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+
+TEST(ScopedCleanup, TestCleanup) {
+ int var = 0;
+ {
+ auto saved = var;
+ auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+ var = 42;
+ }
+ ASSERT_EQ(0, var);
+}
+
+TEST(ScopedCleanup, TestCleanupMacro) {
+ int var = 0;
+ {
+ auto saved = var;
+ SCOPED_CLEANUP({ var = saved; });
+ var = 42;
+ }
+ ASSERT_EQ(0, var);
+}
+
+
+TEST(ScopedCleanup, TestCancelCleanup) {
+ int var = 0;
+ {
+ auto saved = var;
+ auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+ var = 42;
+ cleanup.cancel();
+ }
+ ASSERT_EQ(42, var);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup.h b/be/src/kudu/util/scoped_cleanup.h
new file mode 100644
index 0000000..8ecfbcb
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "kudu/gutil/macros.h"
+
+// Run the given function body (which is typically a block of code surrounded by
+// curly-braces) when the current scope exits.
+//
+// Example:
+// int fd = open(...);
+// SCOPED_CLEANUP({ close(fd); });
+//
+// NOTE: in the case that you want to cancel the cleanup, use the more verbose
+// (non-macro) form below.
+#define SCOPED_CLEANUP(func_body) \
+ auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
+
+namespace kudu {
+
+// A scoped object which runs a cleanup function when going out of scope. Can
+// be used for scoped resource cleanup.
+//
+// Use 'MakeScopedCleanup()' below to instantiate.
+template<typename F>
+class ScopedCleanup {
+ public:
+ explicit ScopedCleanup(F f)
+ : cancelled_(false),
+ f_(std::move(f)) {
+ }
+ ~ScopedCleanup() {
+ if (!cancelled_) {
+ f_();
+ }
+ }
+ void cancel() { cancelled_ = true; }
+
+ private:
+ bool cancelled_;
+ F f_;
+};
+
+// Creates a new scoped cleanup instance with the provided function.
+template<typename F>
+ScopedCleanup<F> MakeScopedCleanup(F f) {
+ return ScopedCleanup<F>(f);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.cc b/be/src/kudu/util/semaphore.cc
new file mode 100644
index 0000000..72ff214
--- /dev/null
+++ b/be/src/kudu/util/semaphore.cc
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <cstdlib>
+#include <ctime>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+Semaphore::Semaphore(int capacity) {
+ DCHECK_GE(capacity, 0);
+ if (sem_init(&sem_, 0, capacity) != 0) {
+ Fatal("init");
+ }
+}
+
+Semaphore::~Semaphore() {
+ if (sem_destroy(&sem_) != 0) {
+ Fatal("destroy");
+ }
+}
+
+void Semaphore::Acquire() {
+ while (true) {
+ int ret;
+ RETRY_ON_EINTR(ret, sem_wait(&sem_));
+ if (ret == 0) {
+ // TODO(todd): would be nice to track acquisition time, etc.
+ return;
+ }
+ Fatal("wait");
+ }
+}
+
+bool Semaphore::TryAcquire() {
+ int ret;
+ RETRY_ON_EINTR(ret, sem_trywait(&sem_));
+ if (ret == 0) {
+ return true;
+ }
+ if (errno == EAGAIN) {
+ return false;
+ }
+ Fatal("trywait");
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+ int64_t microtime = GetCurrentTimeMicros();
+ microtime += timeout.ToMicroseconds();
+
+ struct timespec abs_timeout;
+ MonoDelta::NanosToTimeSpec(microtime * MonoTime::kNanosecondsPerMicrosecond,
+ &abs_timeout);
+
+ while (true) {
+ int ret;
+ RETRY_ON_EINTR(ret, sem_timedwait(&sem_, &abs_timeout));
+ if (ret == 0) return true;
+ if (errno == ETIMEDOUT) return false;
+ Fatal("timedwait");
+ }
+}
+
+void Semaphore::Release() {
+ PCHECK(sem_post(&sem_) == 0);
+}
+
+int Semaphore::GetValue() {
+ int val;
+ PCHECK(sem_getvalue(&sem_, &val) == 0);
+ return val;
+}
+
+void Semaphore::Fatal(const char* action) {
+ PLOG(FATAL) << "Could not " << action << " semaphore "
+ << reinterpret_cast<void*>(&sem_);
+ abort(); // unnecessary, but avoids gcc complaining
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.h b/be/src/kudu/util/semaphore.h
new file mode 100644
index 0000000..4f12658
--- /dev/null
+++ b/be/src/kudu/util/semaphore.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SEMAPHORE_H
+#define KUDU_UTIL_SEMAPHORE_H
+
+#include <semaphore.h>
+#if defined(__APPLE__)
+#include <dispatch/dispatch.h>
+#include "kudu/util/atomic.h"
+#endif // define(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+class MonoDelta;
+
+// Wrapper for POSIX semaphores.
+class Semaphore {
+ public:
+ // Initialize the semaphore with the specified capacity.
+ explicit Semaphore(int capacity);
+ ~Semaphore();
+
+ // Acquire the semaphore.
+ void Acquire();
+
+ // Acquire the semaphore within the given timeout. Returns true if successful.
+ bool TimedAcquire(const MonoDelta& timeout);
+
+ // Try to acquire the semaphore immediately. Returns false if unsuccessful.
+ bool TryAcquire();
+
+ // Release the semaphore.
+ void Release();
+
+ // Get the current value of the semaphore.
+ int GetValue();
+
+ // Boost-compatible wrappers.
+ void lock() { Acquire(); }
+ void unlock() { Release(); }
+ bool try_lock() { return TryAcquire(); }
+
+ private:
+#if !defined(__APPLE__)
+ // Log a fatal error message. Separated out to keep the main functions
+ // as small as possible in terms of code size.
+ void Fatal(const char* action) ATTRIBUTE_NORETURN;
+#endif // !define(__APPLE__)
+
+#if defined(__APPLE__)
+ dispatch_semaphore_t sem_;
+ AtomicInt<int32_t> count_;
+#else
+ sem_t sem_;
+#endif // define(__APPLE__)
+ DISALLOW_COPY_AND_ASSIGN(Semaphore);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SEMAPHORE_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/semaphore_macosx.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore_macosx.cc b/be/src/kudu/util/semaphore_macosx.cc
new file mode 100644
index 0000000..e2d235c
--- /dev/null
+++ b/be/src/kudu/util/semaphore_macosx.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+Semaphore::Semaphore(int capacity)
+ : count_(capacity) {
+ DCHECK_GE(capacity, 0);
+ sem_ = dispatch_semaphore_create(capacity);
+ CHECK_NOTNULL(sem_);
+}
+
+Semaphore::~Semaphore() {
+ dispatch_release(sem_);
+}
+
+void Semaphore::Acquire() {
+ // If the timeout is DISPATCH_TIME_FOREVER, then dispatch_semaphore_wait()
+ // waits forever and always returns zero.
+ CHECK(dispatch_semaphore_wait(sem_, DISPATCH_TIME_FOREVER) == 0);
+ count_.IncrementBy(-1);
+}
+
+bool Semaphore::TryAcquire() {
+ // The dispatch_semaphore_wait() function returns zero upon success and
+ // non-zero after the timeout expires.
+ if (dispatch_semaphore_wait(sem_, DISPATCH_TIME_NOW) == 0) {
+ count_.IncrementBy(-1);
+ return true;
+ }
+ return false;
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+ dispatch_time_t t = dispatch_time(DISPATCH_TIME_NOW, timeout.ToNanoseconds());
+ if (dispatch_semaphore_wait(sem_, t) == 0) {
+ count_.IncrementBy(-1);
+ return true;
+ }
+ return false;
+}
+
+void Semaphore::Release() {
+ dispatch_semaphore_signal(sem_);
+ count_.IncrementBy(1);
+}
+
+int Semaphore::GetValue() {
+ return count_.Load();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/signal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.cc b/be/src/kudu/util/signal.cc
new file mode 100644
index 0000000..e8b6e79
--- /dev/null
+++ b/be/src/kudu/util/signal.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/signal.h"
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+void SetSignalHandler(int signal, SignalHandlerCallback handler) {
+ struct sigaction act;
+ act.sa_handler = handler;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ PCHECK(sigaction(signal, &act, nullptr) == 0);
+}
+
+void IgnoreSigPipe() {
+ SetSignalHandler(SIGPIPE, SIG_IGN);
+}
+
+void ResetSigPipeHandlerToDefault() {
+ SetSignalHandler(SIGPIPE, SIG_DFL);
+}
+
+// We unblock all signal masks since they are inherited.
+void ResetAllSignalMasksToUnblocked() {
+ sigset_t signals;
+ PCHECK(sigfillset(&signals) == 0);
+ PCHECK(sigprocmask(SIG_UNBLOCK, &signals, nullptr) == 0);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/signal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.h b/be/src/kudu/util/signal.h
new file mode 100644
index 0000000..0c88a80
--- /dev/null
+++ b/be/src/kudu/util/signal.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <signal.h>
+
+namespace kudu {
+
+#if defined(__linux__)
+typedef sighandler_t SignalHandlerCallback;
+#else
+typedef sig_t SignalHandlerCallback;
+#endif
+
+// Set a process-wide signal handler.
+void SetSignalHandler(int signal, SignalHandlerCallback handler);
+
+// Set the disposition of SIGPIPE to SIG_IGN.
+void IgnoreSigPipe();
+
+// Set the disposition of SIGPIPE to SIG_DFL.
+void ResetSigPipeHandlerToDefault();
+
+// Unblock all signal masks.
+void ResetAllSignalMasksToUnblocked();
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice-test.cc b/be/src/kudu/util/slice-test.cc
new file mode 100644
index 0000000..0f7a893
--- /dev/null
+++ b/be/src/kudu/util/slice-test.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include <cstdint>
+#include <map>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+
+using std::string;
+
+namespace kudu {
+
+typedef SliceMap<int>::type MySliceMap;
+
+TEST(SliceTest, TestSliceMap) {
+ MySliceMap my_map;
+ Slice a("a");
+ Slice b("b");
+ Slice c("c");
+
+ // Insertion is deliberately out-of-order; the map should restore order.
+ InsertOrDie(&my_map, c, 3);
+ InsertOrDie(&my_map, a, 1);
+ InsertOrDie(&my_map, b, 2);
+
+ int expectedValue = 0;
+ for (const MySliceMap::value_type& pair : my_map) {
+ int data = 'a' + expectedValue++;
+ ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), pair.first);
+ ASSERT_EQ(expectedValue, pair.second);
+ }
+
+ expectedValue = 0;
+ for (auto iter = my_map.begin(); iter != my_map.end(); iter++) {
+ int data = 'a' + expectedValue++;
+ ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), iter->first);
+ ASSERT_EQ(expectedValue, iter->second);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.cc b/be/src/kudu/util/slice.cc
new file mode 100644
index 0000000..775d54a
--- /dev/null
+++ b/be/src/kudu/util/slice.cc
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include <cctype>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/status.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+Status Slice::check_size(size_t expected_size) const {
+ if (PREDICT_FALSE(size() != expected_size)) {
+ return Status::Corruption(StringPrintf("Unexpected Slice size. "
+ "Expected %zu but got %zu.", expected_size, size()), KUDU_REDACT(ToDebugString(100)));
+ }
+ return Status::OK();
+}
+
+// Return a string that contains the copy of the referenced data.
+std::string Slice::ToString() const {
+ return std::string(reinterpret_cast<const char *>(data_), size_);
+}
+
+std::string Slice::ToDebugString(size_t max_len) const {
+ size_t bytes_to_print = size_;
+ bool abbreviated = false;
+ if (max_len != 0 && bytes_to_print > max_len) {
+ bytes_to_print = max_len;
+ abbreviated = true;
+ }
+
+ int size = 0;
+ for (int i = 0; i < bytes_to_print; i++) {
+ if (!isgraph(data_[i])) {
+ size += 4;
+ } else {
+ size++;
+ }
+ }
+ if (abbreviated) {
+ size += 20; // extra padding
+ }
+
+ std::string ret;
+ ret.reserve(size);
+ for (int i = 0; i < bytes_to_print; i++) {
+ if (!isgraph(data_[i])) {
+ StringAppendF(&ret, "\\x%02x", data_[i] & 0xff);
+ } else {
+ ret.push_back(data_[i]);
+ }
+ }
+ if (abbreviated) {
+ StringAppendF(&ret, "...<%zd bytes total>", size_);
+ }
+ return ret;
+}
+
+bool IsAllZeros(const Slice& s) {
+ // Walk a pointer through the slice instead of using s[i]
+ // since this is way faster in debug mode builds. We also do some
+ // manual unrolling for the same purpose.
+ const uint8_t* p = &s[0];
+ int rem = s.size();
+
+ while (rem >= 8) {
+ if (UNALIGNED_LOAD64(p) != 0) return false;
+ rem -= 8;
+ p += 8;
+ }
+
+ while (rem > 0) {
+ if (*p++ != '\0') return false;
+ rem--;
+ }
+ return true;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/slice.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.h b/be/src/kudu/util/slice.h
new file mode 100644
index 0000000..d34c744
--- /dev/null
+++ b/be/src/kudu/util/slice.h
@@ -0,0 +1,332 @@
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+
+#ifndef KUDU_UTIL_SLICE_H_
+#define KUDU_UTIL_SLICE_H_
+
+// NOTE: using stdint.h instead of cstdint because this file is supposed
+// to be processed by a compiler lacking C++11 support.
+#include <stdint.h>
+
+#include <cassert>
+#include <cstddef>
+#include <cstring>
+#include <iosfwd>
+#include <map>
+#include <string>
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/faststring.h"
+#endif
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/port.h"
+#endif
+#include "kudu/util/kudu_export.h"
+
+namespace kudu {
+
+class Status;
+
+/// @brief A wrapper around externally allocated data.
+///
+/// Slice is a simple structure containing a pointer into some external
+/// storage and a size. The user of a Slice must ensure that the slice
+/// is not used after the corresponding external storage has been
+/// deallocated.
+///
+/// Multiple threads can invoke const methods on a Slice without
+/// external synchronization, but if any of the threads may call a
+/// non-const method, all threads accessing the same Slice must use
+/// external synchronization.
+///
+/// Slices can be built around faststrings and StringPieces using constructors
+/// with implicit casts. Both StringPieces and faststrings depend on a great
+/// deal of gutil code.
+class KUDU_EXPORT Slice {
+ public:
+ /// Create an empty slice.
+ Slice() : data_(reinterpret_cast<const uint8_t *>("")),
+ size_(0) { }
+
+ /// Create a slice that refers to a @c uint8_t byte array.
+ ///
+ /// @param [in] d
+ /// The input array.
+ /// @param [in] n
+ /// Number of bytes in the array.
+ Slice(const uint8_t* d, size_t n) : data_(d), size_(n) { }
+
+ /// Create a slice that refers to a @c char byte array.
+ ///
+ /// @param [in] d
+ /// The input array.
+ /// @param [in] n
+ /// Number of bytes in the array.
+ Slice(const char* d, size_t n) :
+ data_(reinterpret_cast<const uint8_t *>(d)),
+ size_(n) { }
+
+ /// Create a slice that refers to the contents of the given string.
+ ///
+ /// @param [in] s
+ /// The input string.
+ Slice(const std::string& s) : // NOLINT(runtime/explicit)
+ data_(reinterpret_cast<const uint8_t *>(s.data())),
+ size_(s.size()) { }
+
+ /// Create a slice that refers to a C-string s[0,strlen(s)-1].
+ ///
+ /// @param [in] s
+ /// The input C-string.
+ Slice(const char* s) : // NOLINT(runtime/explicit)
+ data_(reinterpret_cast<const uint8_t *>(s)),
+ size_(strlen(s)) { }
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+ /// Create a slice that refers to the contents of a faststring.
+ ///
+ /// @note Further appends to the faststring may invalidate this slice.
+ ///
+ /// @param [in] s
+ /// The input faststring.
+ Slice(const faststring &s) // NOLINT(runtime/explicit)
+ : data_(s.data()),
+ size_(s.size()) {
+ }
+
+ /// Create a slice that refers to the contents of a string piece.
+ ///
+ /// @param [in] s
+ /// The input StringPiece.
+ Slice(const StringPiece& s) // NOLINT(runtime/explicit)
+ : data_(reinterpret_cast<const uint8_t*>(s.data())),
+ size_(s.size()) {
+ }
+#endif
+
+ /// @return A pointer to the beginning of the referenced data.
+ const uint8_t* data() const { return data_; }
+
+ /// @return A mutable pointer to the beginning of the referenced data.
+ uint8_t *mutable_data() { return const_cast<uint8_t *>(data_); }
+
+ /// @return The length (in bytes) of the referenced data.
+ size_t size() const { return size_; }
+
+ /// @return @c true iff the length of the referenced data is zero.
+ bool empty() const { return size_ == 0; }
+
+ /// @pre n < size()
+ ///
+ /// @param [in] n
+ /// The index of the byte.
+ /// @return the n-th byte in the referenced data.
+ const uint8_t &operator[](size_t n) const {
+ assert(n < size());
+ return data_[n];
+ }
+
+ /// Change this slice to refer to an empty array.
+ void clear() {
+ data_ = reinterpret_cast<const uint8_t *>("");
+ size_ = 0;
+ }
+
+ /// Drop the first "n" bytes from this slice.
+ ///
+ /// @pre n <= size()
+ ///
+ /// @note Only the base and bounds of the slice are changed;
+ /// the data is not modified.
+ ///
+ /// @param [in] n
+ /// Number of bytes that should be dropped from the beginning.
+ void remove_prefix(size_t n) {
+ assert(n <= size());
+ data_ += n;
+ size_ -= n;
+ }
+
+ /// Truncate the slice to the given number of bytes.
+ ///
+ /// @pre n <= size()
+ ///
+ /// @note Only the base and bounds of the slice are changed;
+ /// the data is not modified.
+ ///
+ /// @param [in] n
+ /// The new size of the slice.
+ void truncate(size_t n) {
+ assert(n <= size());
+ size_ = n;
+ }
+
+ /// Check that the slice has the expected size.
+ ///
+ /// @param [in] expected_size
+ /// @return Status::Corruption() iff size() != @c expected_size
+ Status check_size(size_t expected_size) const;
+
+ /// @return A string that contains a copy of the referenced data.
+ std::string ToString() const;
+
+ /// Get printable representation of the data in the slice.
+ ///
+ /// @param [in] max_len
+ /// The maximum number of bytes to output in the printable format;
+ /// @c 0 means no limit.
+ /// @return A string with printable representation of the data.
+ std::string ToDebugString(size_t max_len = 0) const;
+
+ /// Do a three-way comparison of the slice's data.
+ ///
+ /// @param [in] b
+ /// The other slice to compare with.
+ /// @return Values are
+ /// @li < 0 iff "*this" < "b"
+ /// @li == 0 iff "*this" == "b"
+ /// @li > 0 iff "*this" > "b"
+ int compare(const Slice& b) const;
+
+ /// Check whether the slice starts with the given prefix.
+ /// @param [in] x
+ /// The slice in question.
+ /// @return @c true iff "x" is a prefix of "*this"
+ bool starts_with(const Slice& x) const {
+ return ((size_ >= x.size_) &&
+ (MemEqual(data_, x.data_, x.size_)));
+ }
+
+ /// @brief Comparator struct, useful for ordered collections (like STL maps).
+ struct Comparator {
+ /// Compare two slices using Slice::compare()
+ ///
+ /// @param [in] a
+ /// The slice to call Slice::compare() at.
+ /// @param [in] b
+ /// The slice to use as a parameter for Slice::compare().
+ /// @return @c true iff @c a is less than @c b by Slice::compare().
+ bool operator()(const Slice& a, const Slice& b) const {
+ return a.compare(b) < 0;
+ }
+ };
+
+ /// Relocate/copy the slice's data into a new location.
+ ///
+ /// @param [in] d
+ /// The new location for the data. If it's the same location, then no
+ /// relocation is done. It is assumed that the new location is
+ /// large enough to fit the data.
+ void relocate(uint8_t* d) {
+ if (data_ != d) {
+ memcpy(d, data_, size_);
+ data_ = d;
+ }
+ }
+
+ private:
+ friend bool operator==(const Slice& x, const Slice& y);
+
+ static bool MemEqual(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+ return strings::memeq(a, b, n);
+#else
+ return memcmp(a, b, n) == 0;
+#endif
+ }
+
+ static int MemCompare(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+ return strings::fastmemcmp_inlined(a, b, n);
+#else
+ return memcmp(a, b, n);
+#endif
+ }
+
+ const uint8_t* data_;
+ size_t size_;
+
+ // Intentionally copyable
+};
+
+/// Check whether two slices are identical.
+///
+/// @param [in] x
+/// One slice.
+/// @param [in] y
+/// Another slice.
+/// @return @c true iff two slices contain byte-for-byte identical data.
+inline bool operator==(const Slice& x, const Slice& y) {
+ return ((x.size() == y.size()) &&
+ (Slice::MemEqual(x.data(), y.data(), x.size())));
+}
+
+/// Check whether two slices are not identical.
+///
+/// @param [in] x
+/// One slice.
+/// @param [in] y
+/// Another slice.
+/// @return @c true iff slices contain different data.
+inline bool operator!=(const Slice& x, const Slice& y) {
+ return !(x == y);
+}
+
+/// Output printable representation of the slice into the given output stream.
+///
+/// @param [out] o
+/// The output stream to print the info.
+/// @param [in] s
+/// The slice to print.
+/// @return Reference to the updated output stream.
+inline std::ostream& operator<<(std::ostream& o, const Slice& s) {
+ return o << s.ToDebugString(16); // should be enough for anyone...
+}
+
+inline int Slice::compare(const Slice& b) const {
+ const int min_len = (size_ < b.size_) ? size_ : b.size_;
+ int r = MemCompare(data_, b.data_, min_len);
+ if (r == 0) {
+ if (size_ < b.size_) r = -1;
+ else if (size_ > b.size_) r = +1;
+ }
+ return r;
+}
+
+// We don't run TSAN on this function because it makes it really slow and causes some
+// test timeouts. This is only used on local buffers anyway, so we don't lose much
+// by not checking it.
+#ifdef KUDU_HEADERS_NO_STUBS
+ATTRIBUTE_NO_SANITIZE_THREAD
+#endif
+bool IsAllZeros(const Slice& s);
+
+/// @brief STL map whose keys are Slices.
+///
+/// An example of usage:
+/// @code
+/// typedef SliceMap<int>::type MySliceMap;
+///
+/// MySliceMap my_map;
+/// my_map.insert(MySliceMap::value_type(a, 1));
+/// my_map.insert(MySliceMap::value_type(b, 2));
+/// my_map.insert(MySliceMap::value_type(c, 3));
+///
+/// for (const MySliceMap::value_type& pair : my_map) {
+/// ...
+/// }
+/// @endcode
+template <typename T>
+struct SliceMap {
+ /// A handy typedef for the slice map with appropriate comparison operator.
+ typedef std::map<Slice, T, Slice::Comparator> type;
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_SLICE_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/sorted_disjoint_interval_list-test.cc b/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
new file mode 100644
index 0000000..8e0fe70
--- /dev/null
+++ b/be/src/kudu/util/sorted_disjoint_interval_list-test.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/sorted_disjoint_interval_list.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::vector;
+
+namespace kudu {
+
+class TestSortedDisjointIntervalList : public KuduTest {
+};
+
+typedef int PointType;
+typedef std::pair<PointType, PointType> ClosedInterval;
+
+TEST_F(TestSortedDisjointIntervalList, TestBasic) {
+ // Coalesce an empty interval list.
+ vector<ClosedInterval> intervals = {};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ vector<ClosedInterval> expected = {};
+ ASSERT_EQ(expected, intervals);
+
+ // Coalesce an interval list with length 0 interval.
+ intervals = {{26, 26}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ expected = {{26, 26}};
+ ASSERT_EQ(expected, intervals);
+
+ // Coalesce an interval list with a single interval.
+ intervals = {{33, 69}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ expected = {{33, 69}};
+ ASSERT_EQ(expected, intervals);
+
+ // Coalesce an interval list with adjacent ranges.
+ intervals = {{4, 7}, {3, 4}, {1, 2}, {-23, 1}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ expected = {{-23, 2}, {3, 7}};
+ ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestOverlappedIntervals) {
+ vector<ClosedInterval> intervals = {{4, 7}, {3, 9}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ vector<ClosedInterval> expected = {{3, 9}};
+ ASSERT_EQ(expected, intervals);
+
+ intervals = {{4, 7}, {3, 9}, {-23, 1},
+ {4, 350}, {369, 400}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ expected = {{-23, 1}, {3, 350}, {369, 400}};
+ ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestDuplicateIntervals) {
+ vector<ClosedInterval> intervals = {{1, 2}, {4, 7},
+ {1, 2}, {1, 2}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ const vector<ClosedInterval> expected = {{1, 2}, {4, 7}};
+ ASSERT_EQ(expected, intervals);
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestInvalidIntervals) {
+ vector<ClosedInterval> intervals = {{1, 2}, {10, 2},
+ {4, 7}, {40, 7}};
+ ASSERT_TRUE(CoalesceIntervals<PointType>(&intervals).IsInvalidArgument());
+}
+
+TEST_F(TestSortedDisjointIntervalList, TestSingleElementIntervals) {
+ vector<ClosedInterval> intervals = {{0, 0}, {0, 1}, {1, 2}};
+ ASSERT_OK(CoalesceIntervals<PointType>(&intervals));
+ const vector<ClosedInterval> expected = {{0, 2}};
+ ASSERT_EQ(expected, intervals);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/sorted_disjoint_interval_list.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/sorted_disjoint_interval_list.h b/be/src/kudu/util/sorted_disjoint_interval_list.h
new file mode 100644
index 0000000..d3180a9
--- /dev/null
+++ b/be/src/kudu/util/sorted_disjoint_interval_list.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Constructs a sorted disjoint interval list in-place given a list of intervals.
+// The result is written back to the given 'intervals'.
+//
+// Returns an error if the list contains any invalid intervals.
+//
+// Sorted disjoint interval list is a data structure holding a group of sorted
+// non-overlapping ranges. The operation to construct such one is O(nlg n + n)
+// where 'n' is the number of intervals.
+//
+// For example, given the input interval list:
+//
+// [------2-------) [-----1-----)
+// [--3--) [---5--) [----4----)
+//
+// The output sorted disjoint interval list:
+//
+// [----------1----------) [-----2-----)
+//
+//
+// This method assumes that all intervals are "half-open" intervals -- the
+// intervals are inclusive of their start point and exclusive of end point,
+// e.g., [3, 6). Note that interval with the same start and end point is
+// considered to be valid in this implementation.
+// It also assumes 'PointType' has a proper defined comparator.
+template<typename PointType>
+Status CoalesceIntervals(std::vector<std::pair<PointType, PointType>>* intervals) {
+ if (intervals->empty()) return Status::OK();
+
+ // Sort the intervals to prepare for coalescing overlapped ranges.
+ for (const auto& interval : *intervals) {
+ if (interval.first > interval.second) {
+ return Status::InvalidArgument(strings::Substitute("invalid interval: [$0, $1)",
+ interval.first,
+ interval.second));
+ }
+ }
+ std::sort(intervals->begin(), intervals->end());
+
+ // Traverse the intervals to coalesce overlapped intervals. During the process,
+ // uses 'head', 'tail' to track the start and end point of the current disjoint
+ // interval.
+ auto head = intervals->begin();
+ auto tail = head;
+ while (++tail != intervals->end()) {
+ // If interval 'head' and 'tail' overlap with each other, coalesce them and move
+ // to next. Otherwise, the two intervals are disjoint.
+ if (head->second >= tail->first) {
+ if (tail->second > head->second) head->second = std::move(tail->second);
+ } else {
+ // The two intervals are disjoint. If the 'head' previously already coalesced
+ // some intervals, 'head' and 'tail' will not be adjacent. If so, move 'tail'
+ // to the next 'head' to make sure we do not include any of the previously-coalesced
+ // intervals.
+ ++head;
+ if (head != tail) *head = std::move(*tail);
+ }
+ }
+
+ // Truncate the rest useless elements, if any.
+ intervals->erase(++head, tail);
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling-test.cc b/be/src/kudu/util/spinlock_profiling-test.cc
new file mode 100644
index 0000000..d0ef2b4
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling-test.cc
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/util/spinlock_profiling.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+// Can't include gutil/synchronization_profiling.h directly as it'll
+// declare a weak symbol directly in this unit test, which the runtime
+// linker will prefer over equivalent strong symbols for some reason. By
+// declaring the symbol without providing an empty definition, the strong
+// symbols are chosen when provided via shared libraries.
+//
+// Further reading:
+// - http://stackoverflow.com/questions/20658809/dynamic-loading-and-weak-symbol-resolution
+// - http://notmysock.org/blog/php/weak-symbols-arent.html
+namespace gutil {
+extern void SubmitSpinLockProfileData(const void *, int64);
+} // namespace gutil
+
+namespace kudu {
+
+class SpinLockProfilingTest : public KuduTest {};
+
+TEST_F(SpinLockProfilingTest, TestSpinlockProfiling) {
+ scoped_refptr<Trace> t(new Trace);
+ base::SpinLock lock;
+ {
+ ADOPT_TRACE(t.get());
+ gutil::SubmitSpinLockProfileData(&lock, 4000000);
+ }
+ std::string result = t->DumpToString();
+ LOG(INFO) << "trace: " << result;
+ ASSERT_STR_CONTAINS(result, "\"spinlock_wait_cycles\":4000000");
+ // We can't assert more specifically because the CyclesPerSecond
+ // on different machines might be different.
+ ASSERT_STR_CONTAINS(result, "Waited ");
+ ASSERT_STR_CONTAINS(result, "on lock ");
+
+ ASSERT_GT(GetSpinLockContentionMicros(), 0);
+}
+
+TEST_F(SpinLockProfilingTest, TestStackCollection) {
+ StartSynchronizationProfiling();
+ base::SpinLock lock;
+ gutil::SubmitSpinLockProfileData(&lock, 12345);
+ StopSynchronizationProfiling();
+ std::ostringstream str;
+ int64_t dropped = 0;
+ FlushSynchronizationProfile(&str, &dropped);
+ std::string s = str.str();
+ ASSERT_STR_CONTAINS(s, "12345 1 @ ");
+ ASSERT_EQ(0, dropped);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.cc b/be/src/kudu/util/spinlock_profiling.cc
new file mode 100644
index 0000000..e7f93b0
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/spinlock_profiling.h"
+
+#include <sstream>
+#include <string>
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int32(lock_contention_trace_threshold_cycles,
+ 2000000, // 2M cycles should be about 1ms
+ "If acquiring a spinlock takes more than this number of "
+ "cycles, and a Trace is currently active, then the current "
+ "stack trace is logged to the trace buffer.");
+TAG_FLAG(lock_contention_trace_threshold_cycles, hidden);
+
+METRIC_DEFINE_gauge_uint64(server, spinlock_contention_time,
+ "Spinlock Contention Time", kudu::MetricUnit::kMicroseconds,
+ "Amount of time consumed by contention on internal spinlocks since the server "
+ "started. If this increases rapidly, it may indicate a performance issue in Kudu "
+ "internals triggered by a particular workload and warrant investigation.",
+ kudu::EXPOSE_AS_COUNTER);
+
+
+using base::SpinLock;
+using base::SpinLockHolder;
+
+namespace kudu {
+
+static const double kMicrosPerSecond = 1000000.0;
+
+static LongAdder* g_contended_cycles = nullptr;
+
+namespace {
+
+// Implements a very simple linear-probing hashtable of stack traces with
+// a fixed number of entries.
+//
+// Threads experiencing contention record their stacks into this hashtable,
+// or increment an already-existing entry. Each entry has its own lock,
+// but we can "skip" an entry under contention, and spread out a single stack
+// into multiple buckets if necessary.
+//
+// A thread collecting a profile collects stack traces out of the hash table
+// and resets the counts to 0 as they are collected.
+class ContentionStacks {
+ public:
+ ContentionStacks()
+ : dropped_samples_(0) {
+ }
+
+ // Add a stack trace to the table.
+ void AddStack(const StackTrace& s, int64_t cycles);
+
+ // Flush stacks from the buffer to 'out'. See the docs for FlushSynchronizationProfile()
+ // in spinlock_profiling.h for details on format.
+ //
+ // On return, guarantees that any stack traces that were present at the beginning of
+ // the call have been flushed. However, new stacks can be added concurrently with this call.
+ void Flush(std::ostringstream* out, int64_t* dropped);
+
+ private:
+
+ // Collect the next sample from the underlying buffer, and set it back to 0 count
+ // (thus marking it as "empty").
+ //
+ // 'iterator' serves as a way to keep track of the current position in the buffer.
+ // Callers should initially set it to 0, and then pass the same pointer to each
+ // call to CollectSample. This serves to loop through the collected samples.
+ bool CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count, int64_t* cycles);
+
+ // Hashtable entry.
+ struct Entry {
+ Entry() : trip_count(0),
+ cycle_count(0) {
+ }
+
+ // Protects all other entries.
+ SpinLock lock;
+
+ // The number of times we've experienced contention with a stack trace equal
+ // to 'trace'.
+ //
+ // If this is 0, then the entry is "unclaimed" and the other fields are not
+ // considered valid.
+ int64_t trip_count;
+
+ // The total number of cycles spent waiting at this stack trace.
+ int64_t cycle_count;
+
+ // A cached hashcode of the trace.
+ uint64_t hash;
+
+ // The actual stack trace.
+ StackTrace trace;
+ };
+
+ enum {
+ kNumEntries = 1024,
+ kNumLinearProbeAttempts = 4
+ };
+ Entry entries_[kNumEntries];
+
+ // The number of samples which were dropped due to contention on this structure or
+ // due to the hashtable being too full.
+ AtomicInt<int64_t> dropped_samples_;
+};
+
+Atomic32 g_profiling_enabled = 0;
+ContentionStacks* g_contention_stacks = nullptr;
+
+void ContentionStacks::AddStack(const StackTrace& s, int64_t cycles) {
+ uint64_t hash = s.HashCode();
+
+ // Linear probe up to 4 attempts before giving up
+ for (int i = 0; i < kNumLinearProbeAttempts; i++) {
+ Entry* e = &entries_[(hash + i) % kNumEntries];
+ if (!e->lock.TryLock()) {
+ // If we fail to lock it, we can safely just use a different slot.
+ // It's OK if a single stack shows up multiple times, because pprof
+ // aggregates them in the end anyway.
+ continue;
+ }
+
+ if (e->trip_count == 0) {
+ // It's an un-claimed slot. Claim it.
+ e->hash = hash;
+ e->trace.CopyFrom(s);
+ } else if (e->hash != hash || !e->trace.Equals(s)) {
+ // It's claimed by a different stack trace.
+ e->lock.Unlock();
+ continue;
+ }
+
+ // Contribute to the stats for this stack.
+ e->cycle_count += cycles;
+ e->trip_count++;
+ e->lock.Unlock();
+ return;
+ }
+
+ // If we failed to find a matching hashtable slot, or we hit lock contention
+ // trying to record our sample, add it to the dropped sample count.
+ dropped_samples_.Increment();
+}
+
+void ContentionStacks::Flush(std::ostringstream* out, int64_t* dropped) {
+ uint64_t iterator = 0;
+ StackTrace t;
+ int64_t cycles;
+ int64_t count;
+ while (g_contention_stacks->CollectSample(&iterator, &t, &count, &cycles)) {
+ *out << cycles << " " << count
+ << " @ " << t.ToHexString(StackTrace::NO_FIX_CALLER_ADDRESSES |
+ StackTrace::HEX_0X_PREFIX)
+ << std::endl;
+ }
+
+ *dropped += dropped_samples_.Exchange(0);
+}
+
+bool ContentionStacks::CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count,
+ int64_t* cycles) {
+ while (*iterator < kNumEntries) {
+ Entry* e = &entries_[(*iterator)++];
+ SpinLockHolder l(&e->lock);
+ if (e->trip_count == 0) continue;
+
+ *trip_count = e->trip_count;
+ *cycles = e->cycle_count;
+ s->CopyFrom(e->trace);
+
+ e->trip_count = 0;
+ e->cycle_count = 0;
+ return true;
+ }
+
+ // Looped through the whole array and found nothing.
+ return false;
+}
+
+
+void SubmitSpinLockProfileData(const void *contendedlock, int64_t wait_cycles) {
+ TRACE_COUNTER_INCREMENT("spinlock_wait_cycles", wait_cycles);
+ bool profiling_enabled = base::subtle::Acquire_Load(&g_profiling_enabled);
+ bool long_wait_time = wait_cycles > FLAGS_lock_contention_trace_threshold_cycles;
+ // Short circuit this function quickly in the common case.
+ if (PREDICT_TRUE(!profiling_enabled && !long_wait_time)) {
+ return;
+ }
+
+ static __thread bool in_func = false;
+ if (in_func) return; // non-re-entrant
+ in_func = true;
+
+ StackTrace stack;
+ stack.Collect();
+
+ if (profiling_enabled) {
+ DCHECK_NOTNULL(g_contention_stacks)->AddStack(stack, wait_cycles);
+ }
+
+ if (PREDICT_FALSE(long_wait_time)) {
+ Trace* t = Trace::CurrentTrace();
+ if (t) {
+ double seconds = static_cast<double>(wait_cycles) / base::CyclesPerSecond();
+ char backtrace_buffer[1024];
+ stack.StringifyToHex(backtrace_buffer, arraysize(backtrace_buffer));
+ TRACE_TO(t, "Waited $0 on lock $1. stack: $2",
+ HumanReadableElapsedTime::ToShortString(seconds), contendedlock,
+ backtrace_buffer);
+ }
+ }
+
+ LongAdder* la = reinterpret_cast<LongAdder*>(
+ base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&g_contended_cycles)));
+ if (la) {
+ la->IncrementBy(wait_cycles);
+ }
+
+ in_func = false;
+}
+
+void DoInit() {
+ base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contention_stacks),
+ reinterpret_cast<uintptr_t>(new ContentionStacks()));
+ base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contended_cycles),
+ reinterpret_cast<uintptr_t>(new LongAdder()));
+}
+
+} // anonymous namespace
+
+void InitSpinLockContentionProfiling() {
+ static GoogleOnceType once = GOOGLE_ONCE_INIT;
+ GoogleOnceInit(&once, DoInit);
+}
+
+
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity) {
+ InitSpinLockContentionProfiling();
+ entity->NeverRetire(
+ METRIC_spinlock_contention_time.InstantiateFunctionGauge(
+ entity, Bind(&GetSpinLockContentionMicros)));
+}
+
+uint64_t GetSpinLockContentionMicros() {
+ int64_t wait_cycles = DCHECK_NOTNULL(g_contended_cycles)->Value();
+ double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+ * kMicrosPerSecond;
+ return implicit_cast<int64_t>(micros);
+}
+
+void StartSynchronizationProfiling() {
+ InitSpinLockContentionProfiling();
+ base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, 1);
+}
+
+void FlushSynchronizationProfile(std::ostringstream* out,
+ int64_t* drop_count) {
+ CHECK_NOTNULL(g_contention_stacks)->Flush(out, drop_count);
+}
+
+void StopSynchronizationProfiling() {
+ InitSpinLockContentionProfiling();
+ CHECK_GE(base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, -1), 0);
+}
+
+} // namespace kudu
+
+// The hook expected by gutil is in the gutil namespace. Simply forward into the
+// kudu namespace so we don't need to qualify everything.
+namespace gutil {
+void SubmitSpinLockProfileData(const void *contendedlock, int64_t wait_cycles) {
+ kudu::SubmitSpinLockProfileData(contendedlock, wait_cycles);
+}
+} // namespace gutil
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/spinlock_profiling.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.h b/be/src/kudu/util/spinlock_profiling.h
new file mode 100644
index 0000000..702eb18
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SPINLOCK_PROFILING_H
+#define KUDU_UTIL_SPINLOCK_PROFILING_H
+
+#include <cstdint>
+#include <iosfwd>
+
+#include "kudu/gutil/ref_counted.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+// Enable instrumentation of spinlock contention.
+//
+// Calling this method currently does nothing, except for ensuring
+// that the spinlock_profiling.cc object file gets linked into your
+// executable. It needs to be somewhere reachable in your code,
+// just so that gcc doesn't omit the underlying module from the binary.
+void InitSpinLockContentionProfiling();
+
+// Return the total number of microseconds spent in spinlock contention
+// since the server started.
+uint64_t GetSpinLockContentionMicros();
+
+// Register metrics in the given server entity which measure the amount of
+// spinlock contention.
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity);
+
+// Enable process-wide synchronization profiling.
+//
+// While profiling is enabled, spinlock contention will be recorded in a buffer.
+// The caller should periodically call FlushSynchronizationProfile() to empty
+// the buffer, or else profiles may be dropped.
+void StartSynchronizationProfiling();
+
+// Flush the current buffer of contention profile samples to the given stream.
+//
+// Each stack trace that has been observed results in at least one line of the
+// following format:
+// <cycles> <trip count> @ <hex stack trace>
+//
+// Flushing the data also clears the current buffer of trace samples.
+// This may be called while synchronization profiling is enabled or after it has
+// been disabled.
+//
+// *dropped_samples will be incremented by the number of samples which were dropped
+// due to the contention buffer overflowing. If profiling is enabled during this
+// call, then the 'drop_count' may be slightly out-of-date with respect to the
+// returned samples.
+void FlushSynchronizationProfile(std::ostringstream* out, int64_t* drop_count);
+
+// Stop collecting contention profiles.
+void StopSynchronizationProfiling();
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SPINLOCK_PROFILING_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/stack_watchdog-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stack_watchdog-test.cc b/be/src/kudu/util/stack_watchdog-test.cc
new file mode 100644
index 0000000..aefe220
--- /dev/null
+++ b/be/src/kudu/util/stack_watchdog-test.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/kernel_stack_watchdog.h"
+
+#include <ostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_int32(hung_task_check_interval_ms);
+DECLARE_int32(inject_latency_on_kernel_stack_lookup_ms);
+
+namespace kudu {
+
+class StackWatchdogTest : public KuduTest {
+ public:
+ virtual void SetUp() OVERRIDE {
+ KuduTest::SetUp();
+ KernelStackWatchdog::GetInstance()->SaveLogsForTests(true);
+ ANNOTATE_BENIGN_RACE(&FLAGS_hung_task_check_interval_ms, "");
+ ANNOTATE_BENIGN_RACE(&FLAGS_inject_latency_on_kernel_stack_lookup_ms, "");
+ FLAGS_hung_task_check_interval_ms = 10;
+ }
+};
+
+// The KernelStackWatchdog is only enabled on Linux, since we can't get kernel
+// stack traces on other platforms.
+#if defined(__linux__)
+TEST_F(StackWatchdogTest, TestWatchdog) {
+ vector<string> log;
+ {
+ SCOPED_WATCH_STACK(20);
+ for (int i = 0; i < 50; i++) {
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+ // Wait for several samples, since it's possible that we get unlucky
+ // and the watchdog sees us just before or after a sleep.
+ if (log.size() > 5) {
+ break;
+ }
+ }
+ }
+ string s = JoinStrings(log, "\n");
+ ASSERT_STR_CONTAINS(s, "TestWatchdog_Test::TestBody()");
+ ASSERT_STR_CONTAINS(s, "nanosleep");
+}
+#endif
+
+// Test that SCOPED_WATCH_STACK scopes can be nested.
+TEST_F(StackWatchdogTest, TestNestedScopes) {
+ vector<string> log;
+ int line1;
+ int line2;
+ {
+ SCOPED_WATCH_STACK(20); line1 = __LINE__;
+ {
+ SCOPED_WATCH_STACK(20); line2 = __LINE__;
+ for (int i = 0; i < 50; i++) {
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+ if (log.size() > 3) {
+ break;
+ }
+ }
+ }
+ }
+
+ // Verify that both nested scopes were collected.
+ string s = JoinStrings(log, "\n");
+ ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line1));
+ ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line2));
+}
+
+TEST_F(StackWatchdogTest, TestPerformance) {
+ // Reset the check interval to be reasonable. Otherwise the benchmark
+ // wastes a lot of CPU running the watchdog thread too often.
+ FLAGS_hung_task_check_interval_ms = 500;
+ LOG_TIMING(INFO, "1M SCOPED_WATCH_STACK()s") {
+ for (int i = 0; i < 1000000; i++) {
+ SCOPED_WATCH_STACK(100);
+ }
+ }
+}
+
+// Stress test to ensure that we properly handle the case where threads are short-lived
+// and the watchdog may try to grab a stack of a thread that has already exited.
+//
+// This also serves as a benchmark -- we make the stack-grabbing especially slow and
+// ensure that we can still start and join threads quickly.
+TEST_F(StackWatchdogTest, TestShortLivedThreadsStress) {
+ // Run the stack watchdog continuously.
+ FLAGS_hung_task_check_interval_ms = 0;
+
+ // Make the actual stack trace collection slow. In practice we find that
+ // stack trace collection can often take quite some time due to symbolization, etc.
+ FLAGS_inject_latency_on_kernel_stack_lookup_ms = 1000;
+
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+ vector<thread> threads(100);
+ int started = 0;
+ while (MonoTime::Now() < deadline) {
+ thread* t = &threads[started % threads.size()];
+ if (t->joinable()) {
+ t->join();
+ }
+ *t = thread([&]() {
+ // Trigger watchdog at 1ms, but then sleep for 2ms, to ensure that
+ // the watchdog has plenty of work to do.
+ SCOPED_WATCH_STACK(1);
+ SleepFor(MonoDelta::FromMilliseconds(2));
+ });
+ started++;
+ }
+ for (auto& t : threads) {
+ if (t.joinable()) t.join();
+ }
+ LOG(INFO) << "started and joined " << started << " threads";
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status-test.cc b/be/src/kudu/util/status-test.cc
new file mode 100644
index 0000000..a0aef3d
--- /dev/null
+++ b/be/src/kudu/util/status-test.cc
@@ -0,0 +1,119 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cerrno>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(StatusTest, TestPosixCode) {
+ Status ok = Status::OK();
+ ASSERT_EQ(0, ok.posix_code());
+ Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+ ASSERT_EQ(ENOTDIR, file_error.posix_code());
+}
+
+TEST(StatusTest, TestToString) {
+ Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+ ASSERT_EQ(string("IO error: file error (error 20)"), file_error.ToString());
+}
+
+TEST(StatusTest, TestClonePrepend) {
+ Status file_error = Status::IOError("file error", "msg2", ENOTDIR);
+ Status appended = file_error.CloneAndPrepend("Heading");
+ ASSERT_EQ(string("IO error: Heading: file error: msg2 (error 20)"), appended.ToString());
+}
+
+TEST(StatusTest, TestCloneAppend) {
+ Status remote_error = Status::RemoteError("Application error");
+ Status appended = remote_error.CloneAndAppend(Status::NotFound("Unknown tablet").ToString());
+ ASSERT_EQ(string("Remote error: Application error: Not found: Unknown tablet"),
+ appended.ToString());
+}
+
+TEST(StatusTest, TestMemoryUsage) {
+ ASSERT_EQ(0, Status::OK().memory_footprint_excluding_this());
+ ASSERT_GT(Status::IOError(
+ "file error", "some other thing", ENOTDIR).memory_footprint_excluding_this(), 0);
+}
+
+TEST(StatusTest, TestMoveConstructor) {
+ // OK->OK move should do nothing.
+ {
+ Status src = Status::OK();
+ Status dst = std::move(src);
+ ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+ ASSERT_OK(dst);
+ }
+
+ // Moving a not-OK status into a new one should make the moved status
+ // "OK".
+ {
+ Status src = Status::NotFound("foo");
+ Status dst = std::move(src);
+ ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+ ASSERT_EQ("Not found: foo", dst.ToString());
+ }
+}
+
+TEST(StatusTest, TestMoveAssignment) {
+ // OK->Bad move should clear the source status and also make the
+ // destination status OK.
+ {
+ Status src = Status::OK();
+ Status dst = Status::NotFound("orig dst");
+ dst = std::move(src);
+ ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+ ASSERT_OK(dst);
+ }
+
+ // Bad->Bad move.
+ {
+ Status src = Status::NotFound("orig src");
+ Status dst = Status::NotFound("orig dst");
+ dst = std::move(src);
+ ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+ ASSERT_EQ("Not found: orig src", dst.ToString());
+ }
+
+ // Bad->OK move
+ {
+ Status src = Status::NotFound("orig src");
+ Status dst = Status::OK();
+ dst = std::move(src);
+ ASSERT_OK(src); // NOLINT(bugprone-use-after-move)
+ ASSERT_EQ("Not found: orig src", dst.ToString());
+ }
+}
+
+TEST(StatusTest, TestAndThen) {
+ ASSERT_OK(Status::OK().AndThen(Status::OK)
+ .AndThen(Status::OK)
+ .AndThen(Status::OK));
+
+ ASSERT_TRUE(Status::InvalidArgument("").AndThen([] { return Status::IllegalState(""); })
+ .IsInvalidArgument());
+ ASSERT_TRUE(Status::InvalidArgument("").AndThen(Status::OK)
+ .IsInvalidArgument());
+ ASSERT_TRUE(Status::OK().AndThen([] { return Status::InvalidArgument(""); })
+ .AndThen(Status::OK)
+ .IsInvalidArgument());
+
+ ASSERT_EQ("foo: bar",
+ Status::OK().CloneAndPrepend("baz")
+ .AndThen([] {
+ return Status::InvalidArgument("bar").CloneAndPrepend("foo");
+ }).message());
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/status.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.cc b/be/src/kudu/util/status.cc
new file mode 100644
index 0000000..1197682
--- /dev/null
+++ b/be/src/kudu/util/status.cc
@@ -0,0 +1,170 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/status.h"
+
+#include <cstdio>
+#include <cstring>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+const char* Status::CopyState(const char* state) {
+ uint32_t size;
+ strings::memcpy_inlined(&size, state, sizeof(size));
+ auto result = new char[size + 7];
+ strings::memcpy_inlined(result, state, size + 7);
+ return result;
+}
+
+Status::Status(Code code, const Slice& msg, const Slice& msg2,
+ int16_t posix_code) {
+ DCHECK(code != kOk);
+ const uint32_t len1 = msg.size();
+ const uint32_t len2 = msg2.size();
+ const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+ auto result = new char[size + 7];
+ memcpy(result, &size, sizeof(size));
+ result[4] = static_cast<char>(code);
+ memcpy(result + 5, &posix_code, sizeof(posix_code));
+ memcpy(result + 7, msg.data(), len1);
+ if (len2) {
+ result[7 + len1] = ':';
+ result[8 + len1] = ' ';
+ memcpy(result + 9 + len1, msg2.data(), len2);
+ }
+ state_ = result;
+}
+
+std::string Status::CodeAsString() const {
+ if (state_ == nullptr) {
+ return "OK";
+ }
+
+ const char* type;
+ switch (code()) {
+ case kOk:
+ type = "OK";
+ break;
+ case kNotFound:
+ type = "Not found";
+ break;
+ case kCorruption:
+ type = "Corruption";
+ break;
+ case kNotSupported:
+ type = "Not implemented";
+ break;
+ case kInvalidArgument:
+ type = "Invalid argument";
+ break;
+ case kIOError:
+ type = "IO error";
+ break;
+ case kAlreadyPresent:
+ type = "Already present";
+ break;
+ case kRuntimeError:
+ type = "Runtime error";
+ break;
+ case kNetworkError:
+ type = "Network error";
+ break;
+ case kIllegalState:
+ type = "Illegal state";
+ break;
+ case kNotAuthorized:
+ type = "Not authorized";
+ break;
+ case kAborted:
+ type = "Aborted";
+ break;
+ case kRemoteError:
+ type = "Remote error";
+ break;
+ case kServiceUnavailable:
+ type = "Service unavailable";
+ break;
+ case kTimedOut:
+ type = "Timed out";
+ break;
+ case kUninitialized:
+ type = "Uninitialized";
+ break;
+ case kConfigurationError:
+ type = "Configuration error";
+ break;
+ case kIncomplete:
+ type = "Incomplete";
+ break;
+ case kEndOfFile:
+ type = "End of file";
+ break;
+ }
+ return std::string(type);
+}
+
+std::string Status::ToString() const {
+ std::string result(CodeAsString());
+ if (state_ == nullptr) {
+ return result;
+ }
+
+ result.append(": ");
+ Slice msg = message();
+ result.append(reinterpret_cast<const char*>(msg.data()), msg.size());
+ int16_t posix = posix_code();
+ if (posix != -1) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), " (error %d)", posix);
+ result.append(buf);
+ }
+ return result;
+}
+
+Slice Status::message() const {
+ if (state_ == nullptr) {
+ return Slice();
+ }
+
+ uint32_t length;
+ memcpy(&length, state_, sizeof(length));
+ return Slice(state_ + 7, length);
+}
+
+int16_t Status::posix_code() const {
+ if (state_ == nullptr) {
+ return 0;
+ }
+ int16_t posix_code;
+ memcpy(&posix_code, state_ + 5, sizeof(posix_code));
+ return posix_code;
+}
+
+Status Status::CloneAndPrepend(const Slice& msg) const {
+ if (ok()) {
+ return *this;
+ }
+ return Status(code(), msg, message(), posix_code());
+}
+
+Status Status::CloneAndAppend(const Slice& msg) const {
+ if (ok()) {
+ return *this;
+ }
+ return Status(code(), message(), msg, posix_code());
+}
+
+size_t Status::memory_footprint_excluding_this() const {
+ return state_ ? kudu_malloc_usable_size(state_) : 0;
+}
+
+size_t Status::memory_footprint_including_this() const {
+ return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+} // namespace kudu