You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:40 UTC

[28/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache.cc b/be/src/kudu/util/cache.cc
new file mode 100644
index 0000000..00f2e52
--- /dev/null
+++ b/be/src/kudu/util/cache.cc
@@ -0,0 +1,572 @@
+// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "kudu/util/cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util_prod.h"
+
+#if !defined(__APPLE__)
+#include "kudu/util/nvm_cache.h"
+#endif
+
+// Useful in tests that require accurate cache capacity accounting.
+DEFINE_bool(cache_force_single_shard, false,
+            "Override all cache implementations to use just one shard");
+TAG_FLAG(cache_force_single_shard, hidden);
+
+DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
+              "The MemTracker associated with a cache can accumulate error up to "
+              "this ratio to improve performance. For tests.");
+TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
+
+using std::atomic;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+Cache::~Cache() {
+}
+
+namespace {
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure.  Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+  Cache::EvictionCallback* eviction_callback;
+  LRUHandle* next_hash;
+  LRUHandle* next;
+  LRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  std::atomic<int32_t> refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+
+  // The storage for the key/value pair itself. The data is stored as:
+  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
+  uint8_t kv_data[1];   // Beginning of key/value pair
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  uint8_t* mutable_val_ptr() {
+    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
+    return &kv_data[val_offset];
+  }
+
+  const uint8_t* val_ptr() const {
+    return const_cast<LRUHandle*>(this)->mutable_val_ptr();
+  }
+
+  Slice value() const {
+    return Slice(val_ptr(), val_length);
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  LRUHandle* Insert(LRUHandle* h) {
+    LRUHandle** ptr = FindPointer(h->key(), h->hash);
+    LRUHandle* old = *ptr;
+    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
+    *ptr = h;
+    if (old == nullptr) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  LRUHandle* Remove(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = FindPointer(key, hash);
+    LRUHandle* result = *ptr;
+    if (result != nullptr) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  LRUHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != nullptr &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    auto new_list = new LRUHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      LRUHandle* h = list_[i];
+      while (h != nullptr) {
+        LRUHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// A single shard of sharded cache.
+class LRUCache {
+ public:
+  explicit LRUCache(MemTracker* tracker);
+  ~LRUCache();
+
+  // Separate from constructor so caller can easily make an array of LRUCache
+  void SetCapacity(size_t capacity) {
+    capacity_ = capacity;
+    max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
+  }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(LRUHandle* handle, Cache::EvictionCallback* eviction_callback);
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+
+ private:
+  void LRU_Remove(LRUHandle* e);
+  void LRU_Append(LRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(LRUHandle* e);
+  // Call the user's eviction callback, if it exists, and free the entry.
+  void FreeEntry(LRUHandle* e);
+
+  // Update the memtracker's consumption by the given amount.
+  //
+  // This "buffers" the updates locally in 'deferred_consumption_' until the amount
+  // of accumulated delta is more than ~1% of the cache capacity. This improves
+  // performance under workloads with high eviction rates for a few reasons:
+  //
+  // 1) once the cache reaches its full capacity, we expect it to remain there
+  // in steady state. Each insertion is usually matched by an eviction, and unless
+  // the total size of the evicted item(s) is much different than the size of the
+  // inserted item, each eviction event is unlikely to change the total cache usage
+  // much. So, we expect that the accumulated error will mostly remain around 0
+  // and we can avoid propagating changes to the MemTracker at all.
+  //
+  // 2) because the cache implementation is sharded, we do this tracking in a bunch
+  // of different locations, avoiding bouncing cache-lines between cores. By contrast
+  // the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
+  //
+  // Positive delta indicates an increased memory consumption.
+  void UpdateMemTracker(int64_t delta);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+  size_t usage_;
+
+  // Dummy head of LRU list.
+  // lru.prev is newest entry, lru.next is oldest entry.
+  LRUHandle lru_;
+
+  HandleTable table_;
+
+  MemTracker* mem_tracker_;
+  atomic<int64_t> deferred_consumption_ { 0 };
+
+  // Initialized based on capacity_ to ensure an upper bound on the error on the
+  // MemTracker consumption.
+  int64_t max_deferred_consumption_;
+
+  CacheMetrics* metrics_;
+};
+
+LRUCache::LRUCache(MemTracker* tracker)
+ : usage_(0),
+   mem_tracker_(tracker),
+   metrics_(nullptr) {
+  // Make empty circular linked list
+  lru_.next = &lru_;
+  lru_.prev = &lru_;
+}
+
+LRUCache::~LRUCache() {
+  for (LRUHandle* e = lru_.next; e != &lru_; ) {
+    LRUHandle* next = e->next;
+    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
+        << "caller has an unreleased handle";
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+  mem_tracker_->Consume(deferred_consumption_);
+}
+
+bool LRUCache::Unref(LRUHandle* e) {
+  DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
+  return e->refs.fetch_sub(1) == 1;
+}
+
+void LRUCache::FreeEntry(LRUHandle* e) {
+  DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  UpdateMemTracker(-static_cast<int64_t>(e->charge));
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  delete [] e;
+}
+
+void LRUCache::UpdateMemTracker(int64_t delta) {
+  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
+  int64_t new_deferred = old_deferred + delta;
+
+  if (new_deferred > max_deferred_consumption_ ||
+      new_deferred < -max_deferred_consumption_) {
+    int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
+    mem_tracker_->Consume(to_propagate);
+  }
+}
+
+void LRUCache::LRU_Remove(LRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  usage_ -= e->charge;
+}
+
+void LRUCache::LRU_Append(LRUHandle* e) {
+  // Make "e" newest entry by inserting just before lru_
+  e->next = &lru_;
+  e->prev = lru_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
+  LRUHandle* e;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != nullptr) {
+      e->refs.fetch_add(1, std::memory_order_relaxed);
+      LRU_Remove(e);
+      LRU_Append(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  if (metrics_) {
+    metrics_->lookups->Increment();
+    bool was_hit = (e != nullptr);
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void LRUCache::Release(Cache::Handle* handle) {
+  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+Cache::Handle* LRUCache::Insert(LRUHandle* e, Cache::EvictionCallback *eviction_callback) {
+
+  // Set the remaining LRUHandle members which were not already allocated during
+  // Allocate().
+  e->eviction_callback = eviction_callback;
+  e->refs.store(2, std::memory_order_relaxed);  // One from LRUCache, one for the returned handle
+  UpdateMemTracker(e->charge);
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(e->charge);
+    metrics_->inserts->Increment();
+  }
+
+  LRUHandle* to_remove_head = nullptr;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    LRU_Append(e);
+
+    LRUHandle* old = table_.Insert(e);
+    if (old != nullptr) {
+      LRU_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && lru_.next != &lru_) {
+      LRUHandle* old = lru_.next;
+      LRU_Remove(old);
+      table_.Remove(old->key(), old->hash);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  while (to_remove_head != nullptr) {
+    LRUHandle* next = to_remove_head->next;
+    FreeEntry(to_remove_head);
+    to_remove_head = next;
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void LRUCache::Erase(const Slice& key, uint32_t hash) {
+  LRUHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != nullptr) {
+      LRU_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != NULL
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int DetermineShardBits() {
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+      0 : Bits::Log2Ceiling(base::NumCPUs());
+  VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
+  return bits;
+}
+
+class ShardedLRUCache : public Cache {
+ private:
+  shared_ptr<MemTracker> mem_tracker_;
+  gscoped_ptr<CacheMetrics> metrics_;
+  vector<LRUCache*> shards_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+
+  // Protects 'metrics_'. Used only when metrics are set, to ensure
+  // that they are set only once in test environments.
+  MutexType metrics_lock_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  uint32_t Shard(uint32_t hash) {
+    // Widen to uint64 before shifting, or else on a single CPU,
+    // we would try to shift a uint32_t by 32 bits, which is undefined.
+    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+  }
+
+ public:
+  explicit ShardedLRUCache(size_t capacity, const string& id)
+      : shard_bits_(DetermineShardBits()) {
+    // A cache is often a singleton, so:
+    // 1. We reuse its MemTracker if one already exists, and
+    // 2. It is directly parented to the root MemTracker.
+    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+        -1, strings::Substitute("$0-sharded_lru_cache", id));
+
+    int num_shards = 1 << shard_bits_;
+    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
+    for (int s = 0; s < num_shards; s++) {
+      gscoped_ptr<LRUCache> shard(new LRUCache(mem_tracker_.get()));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedLRUCache() {
+    STLDeleteElements(&shards_);
+  }
+
+  virtual Handle* Insert(PendingHandle* handle,
+                         Cache::EvictionCallback* eviction_callback) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
+    return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
+  }
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+  }
+  virtual void Release(Handle* handle) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+  virtual void Erase(const Slice& key) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+  virtual Slice Value(Handle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->value();
+  }
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+    // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
+    // causes TSAN errors. So, we'll ensure that metrics only get attached once, from
+    // whichever server starts first. This has the downside that, in test builds, we won't
+    // get accurate cache metrics, but that's probably better than spurious failures.
+    std::lock_guard<simple_spinlock> l(metrics_lock_);
+    if (metrics_) {
+      CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
+      return;
+    }
+    metrics_.reset(new CacheMetrics(entity));
+    for (LRUCache* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE {
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+    int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+    uint8_t* buf = new uint8_t[sizeof(LRUHandle)
+                               + key_len_padded + val_len // the kv_data VLA data
+                               - 1 // (the VLA has a 1-byte placeholder)
+                               ];
+    LRUHandle* handle = reinterpret_cast<LRUHandle*>(buf);
+    handle->key_length = key_len;
+    handle->val_length = val_len;
+    handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(buf) : charge;
+    handle->hash = HashSlice(key);
+    memcpy(handle->kv_data, key.data(), key_len);
+
+    return reinterpret_cast<PendingHandle*>(handle);
+  }
+
+  virtual void Free(PendingHandle* h) OVERRIDE {
+    uint8_t* data = reinterpret_cast<uint8_t*>(h);
+    delete [] data;
+  }
+
+  virtual uint8_t* MutableValue(PendingHandle* h) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(h)->mutable_val_ptr();
+  }
+
+};
+
+}  // end anonymous namespace
+
+Cache* NewLRUCache(CacheType type, size_t capacity, const string& id) {
+  switch (type) {
+    case DRAM_CACHE:
+      return new ShardedLRUCache(capacity, id);
+#if defined(HAVE_LIB_VMEM)
+    case NVM_CACHE:
+      return NewLRUNvmCache(capacity, id);
+#endif
+    default:
+      LOG(FATAL) << "Unsupported LRU cache type: " << type;
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache.h b/be/src/kudu/util/cache.h
new file mode 100644
index 0000000..82ef8c9
--- /dev/null
+++ b/be/src/kudu/util/cache.h
@@ -0,0 +1,216 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A Cache is an interface that maps keys to values.  It has internal
+// synchronization and may be safely accessed concurrently from
+// multiple threads.  It may automatically evict entries to make room
+// for new entries.  Values have a specified charge against the cache
+// capacity.  For example, a cache where the values are variable
+// length strings, may use the length of the string as the charge for
+// the string.
+//
+// This is taken from LevelDB and evolved to fit the kudu codebase.
+//
+// TODO: this is pretty lock-heavy. Would be good to sub out something
+// a little more concurrent.
+
+#ifndef KUDU_UTIL_CACHE_H_
+#define KUDU_UTIL_CACHE_H_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+class Cache;
+class MetricEntity;
+
+enum CacheType {
+  DRAM_CACHE,
+  NVM_CACHE
+};
+
+// Create a new cache with a fixed size capacity.  This implementation
+// of Cache uses a least-recently-used eviction policy.
+Cache* NewLRUCache(CacheType type, size_t capacity, const std::string& id);
+
+class Cache {
+ public:
+  // Callback interface which is called when an entry is evicted from the
+  // cache.
+  class EvictionCallback {
+   public:
+    virtual void EvictedEntry(Slice key, Slice value) = 0;
+    virtual ~EvictionCallback() {}
+  };
+
+  Cache() { }
+
+  // Destroys all existing entries by calling the "deleter"
+  // function that was passed to the constructor.
+  virtual ~Cache();
+
+  // Opaque handle to an entry stored in the cache.
+  struct Handle { };
+
+  // Custom handle "deleter", primarily intended for use with std::unique_ptr.
+  //
+  // Sample usage:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     unique_ptr<Cache::Handle, Cache::HandleDeleter> h(
+  //       cache->Lookup(...), Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  // Or:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     Cache::UniqueHandle h(cache->Lookup(...), Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  class HandleDeleter {
+   public:
+    explicit HandleDeleter(Cache* c)
+        : c_(c) {
+    }
+
+    void operator()(Cache::Handle* h) const {
+      c_->Release(h);
+    }
+
+   private:
+    Cache* c_;
+  };
+  typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle;
+
+  // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times
+  // blocks were requested that the users were hoping to get the block from the cache, along with
+  // with the basic metrics.
+  // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics.
+  // This helps in determining if we are effectively caching the blocks that matter the most.
+  enum CacheBehavior {
+    EXPECT_IN_CACHE,
+    NO_EXPECT_IN_CACHE
+  };
+
+  // If the cache has no mapping for "key", returns NULL.
+  //
+  // Else return a handle that corresponds to the mapping.  The caller
+  // must call this->Release(handle) when the returned mapping is no
+  // longer needed.
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) = 0;
+
+  // Release a mapping returned by a previous Lookup().
+  // REQUIRES: handle must not have been released yet.
+  // REQUIRES: handle must have been returned by a method on *this.
+  virtual void Release(Handle* handle) = 0;
+
+  // Return the value encapsulated in a handle returned by a
+  // successful Lookup().
+  // REQUIRES: handle must not have been released yet.
+  // REQUIRES: handle must have been returned by a method on *this.
+  virtual Slice Value(Handle* handle) = 0;
+
+  // If the cache contains entry for key, erase it.  Note that the
+  // underlying entry will be kept around until all existing handles
+  // to it have been released.
+  virtual void Erase(const Slice& key) = 0;
+
+  // Pass a metric entity in order to start recoding metrics.
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& metric_entity) = 0;
+
+  // ------------------------------------------------------------
+  // Insertion path
+  // ------------------------------------------------------------
+  //
+  // Because some cache implementations (eg NVM) manage their own memory, and because we'd
+  // like to read blocks directly into cache-managed memory rather than causing an extra
+  // memcpy, the insertion of a new element into the cache requires two phases. First, a
+  // PendingHandle is allocated with space for the value, and then it is later inserted.
+  //
+  // For example:
+  //
+  //   PendingHandle* ph = cache_->Allocate("my entry", value_size, charge);
+  //   if (!ReadDataFromDisk(cache_->MutableValue(ph)).ok()) {
+  //     cache_->Free(ph);
+  //     ... error handling ...
+  //     return;
+  //   }
+  //   Handle* h = cache_->Insert(ph, my_eviction_callback);
+  //   ...
+  //   cache_->Release(h);
+
+  // Opaque handle to an entry which is being prepared to be added to
+  // the cache.
+  struct PendingHandle { };
+
+  // Indicates that the charge of an item in the cache should be calculated
+  // based on its memory consumption.
+  static constexpr int kAutomaticCharge = -1;
+
+  // Allocate space for a new entry to be inserted into the cache.
+  //
+  // The provided 'key' is copied into the resulting handle object.
+  // The allocated handle has enough space such that the value can
+  // be written into cache_->MutableValue(handle).
+  //
+  // If 'charge' is not 'kAutomaticCharge', then the cache capacity will be charged
+  // the explicit amount. This is useful when caching items that are small but need to
+  // maintain a bounded count (eg file descriptors) rather than caring about their actual
+  // memory usage.
+  //
+  // Note that this does not mutate the cache itself: lookups will
+  // not be able to find the provided key until it is inserted.
+  //
+  // It is possible that this will return NULL if the cache is above its capacity
+  // and eviction fails to free up enough space for the requested allocation.
+  //
+  // NOTE: the returned memory is not automatically freed by the cache: the
+  // caller must either free it using Free(), or insert it using Insert().
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) = 0;
+
+  // Default 'charge' should be kAutomaticCharge.
+  // (default arguments on virtual functions are prohibited)
+  PendingHandle* Allocate(Slice key, int val_len) {
+    return Allocate(key, val_len, kAutomaticCharge);
+  }
+
+  virtual uint8_t* MutableValue(PendingHandle* handle) = 0;
+
+  // Commit a prepared entry into the cache.
+  //
+  // Returns a handle that corresponds to the mapping.  The caller
+  // must call this->Release(handle) when the returned mapping is no
+  // longer needed. This method always succeeds and returns a non-null
+  // entry, since the space was reserved above.
+  //
+  // The 'pending' entry passed here should have been allocated using
+  // Cache::Allocate() above.
+  //
+  // If 'eviction_callback' is non-NULL, then it will be called when the
+  // entry is later evicted or when the cache shuts down.
+  virtual Handle* Insert(PendingHandle* pending, EvictionCallback* eviction_callback) = 0;
+
+  // Free 'ptr', which must have been previously allocated using 'Allocate'.
+  virtual void Free(PendingHandle* ptr) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Cache);
+};
+
+}  // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache_metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache_metrics.cc b/be/src/kudu/util/cache_metrics.cc
new file mode 100644
index 0000000..ac2fadf
--- /dev/null
+++ b/be/src/kudu/util/cache_metrics.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/cache_metrics.h"
+
+#include "kudu/util/metrics.h"
+
+METRIC_DEFINE_counter(server, block_cache_inserts,
+                      "Block Cache Inserts", kudu::MetricUnit::kBlocks,
+                      "Number of blocks inserted in the cache");
+METRIC_DEFINE_counter(server, block_cache_lookups,
+                      "Block Cache Lookups", kudu::MetricUnit::kBlocks,
+                      "Number of blocks looked up from the cache");
+METRIC_DEFINE_counter(server, block_cache_evictions,
+                      "Block Cache Evictions", kudu::MetricUnit::kBlocks,
+                      "Number of blocks evicted from the cache");
+METRIC_DEFINE_counter(server, block_cache_misses,
+                      "Block Cache Misses", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that didn't yield a block");
+METRIC_DEFINE_counter(server, block_cache_misses_caching,
+                      "Block Cache Misses (Caching)", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that were expecting a block that didn't yield one."
+                      "Use this number instead of cache_misses when trying to determine how "
+                      "efficient the cache is");
+METRIC_DEFINE_counter(server, block_cache_hits,
+                      "Block Cache Hits", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that found a block");
+METRIC_DEFINE_counter(server, block_cache_hits_caching,
+                      "Block Cache Hits (Caching)", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that were expecting a block that found one."
+                      "Use this number instead of cache_hits when trying to determine how "
+                      "efficient the cache is");
+
+METRIC_DEFINE_gauge_uint64(server, block_cache_usage, "Block Cache Memory Usage",
+                           kudu::MetricUnit::kBytes,
+                           "Memory consumed by the block cache");
+
+namespace kudu {
+
+#define MINIT(member, x) member(METRIC_##x.Instantiate(entity))
+#define GINIT(member, x) member(METRIC_##x.Instantiate(entity, 0))
+CacheMetrics::CacheMetrics(const scoped_refptr<MetricEntity>& entity)
+  : MINIT(inserts, block_cache_inserts),
+    MINIT(lookups, block_cache_lookups),
+    MINIT(evictions, block_cache_evictions),
+    MINIT(cache_hits, block_cache_hits),
+    MINIT(cache_hits_caching, block_cache_hits_caching),
+    MINIT(cache_misses, block_cache_misses),
+    MINIT(cache_misses_caching, block_cache_misses_caching),
+    GINIT(cache_usage, block_cache_usage) {
+}
+#undef MINIT
+#undef GINIT
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache_metrics.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache_metrics.h b/be/src/kudu/util/cache_metrics.h
new file mode 100644
index 0000000..04a546b
--- /dev/null
+++ b/be/src/kudu/util/cache_metrics.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CACHE_METRICS_H
+#define KUDU_UTIL_CACHE_METRICS_H
+
+#include <cstdint>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/metrics.h"
+
+namespace kudu {
+
+struct CacheMetrics {
+  explicit CacheMetrics(const scoped_refptr<MetricEntity>& metric_entity);
+
+  scoped_refptr<Counter> inserts;
+  scoped_refptr<Counter> lookups;
+  scoped_refptr<Counter> evictions;
+  scoped_refptr<Counter> cache_hits;
+  scoped_refptr<Counter> cache_hits_caching;
+  scoped_refptr<Counter> cache_misses;
+  scoped_refptr<Counter> cache_misses_caching;
+
+  scoped_refptr<AtomicGauge<uint64_t> > cache_usage;
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_CACHE_METRICS_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/callback_bind-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/callback_bind-test.cc b/be/src/kudu/util/callback_bind-test.cc
new file mode 100644
index 0000000..392f496
--- /dev/null
+++ b/be/src/kudu/util/callback_bind-test.cc
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+
+namespace kudu {
+
+using std::string;
+
+static int Return5() {
+  return 5;
+}
+
+TEST(CallbackBindTest, TestFreeFunction) {
+  Callback<int(void)> func_cb = Bind(&Return5);
+  ASSERT_EQ(5, func_cb.Run());
+}
+
+class Ref : public RefCountedThreadSafe<Ref> {
+ public:
+  int Foo() { return 3; }
+};
+
+// Simple class that helps with verifying ref counting.
+// Not thread-safe.
+struct RefCountable {
+  RefCountable()
+      : refs(0) {
+  }
+  void AddRef() const {
+    refs++;
+  }
+  void Release() const {
+    refs--;
+  }
+  void Print() const {
+    LOG(INFO) << "Hello. Refs: " << refs;
+  }
+
+  mutable int refs;
+  DISALLOW_COPY_AND_ASSIGN(RefCountable);
+};
+
+TEST(CallbackBindTest, TestClassMethod) {
+  scoped_refptr<Ref> ref = new Ref();
+  Callback<int(void)> ref_cb = Bind(&Ref::Foo, ref);
+  ref = nullptr;
+  ASSERT_EQ(3, ref_cb.Run());
+}
+
+int ReturnI(int i, const char* str) {
+  return i;
+}
+
+TEST(CallbackBindTest, TestPartialBind) {
+  Callback<int(const char*)> cb = Bind(&ReturnI, 23);
+  ASSERT_EQ(23, cb.Run("hello world"));
+}
+
+char IncrementChar(gscoped_ptr<char> in) {
+  return *in + 1;
+}
+
+TEST(CallbackBindTest, TestCallScopedPtrArg) {
+  // Calling a function with a gscoped_ptr argument is just like any other
+  // function which takes gscoped_ptr:
+  gscoped_ptr<char> foo(new char('x'));
+  Callback<char(gscoped_ptr<char>)> cb = Bind(&IncrementChar);
+  ASSERT_EQ('y', cb.Run(std::move(foo)));
+}
+
+TEST(CallbackBindTest, TestBindScopedPtrArg) {
+  // Binding a function with a gscoped_ptr argument requires using Passed()
+  gscoped_ptr<char> foo(new char('x'));
+  Callback<char(void)> cb = Bind(&IncrementChar, Passed(&foo));
+  ASSERT_EQ('y', cb.Run());
+}
+
+// Test that the ref counting functionality works.
+TEST(CallbackBindTest, TestRefCounting) {
+  RefCountable countable;
+  {
+    ASSERT_EQ(0, countable.refs);
+    Closure cb = Bind(&RefCountable::Print, &countable);
+    ASSERT_EQ(1, countable.refs);
+    cb.Run();
+    ASSERT_EQ(1, countable.refs);
+  }
+  ASSERT_EQ(0, countable.refs);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding-inl.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding-inl.h b/be/src/kudu/util/coding-inl.h
new file mode 100644
index 0000000..a47e9ce
--- /dev/null
+++ b/be/src/kudu/util/coding-inl.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// Some portions Copyright (c) 2011 The LevelDB Authors.
+//
+// Endian-neutral encoding:
+// * Fixed-length numbers are encoded with least-significant byte first
+// * In addition we support variable length "varint" encoding
+// * Strings are encoded prefixed by their length in varint format
+
+#ifndef KUDU_UTIL_CODING_INL_H
+#define KUDU_UTIL_CODING_INL_H
+
+#include <cstdint>
+#include <cstring>
+
+#include "kudu/gutil/port.h"  // IWYU pragma: keep
+// IWYU pragma: no_include <endian.h>
+
+namespace kudu {
+
+inline uint8_t *InlineEncodeVarint32(uint8_t *dst, uint32_t v) {
+  // Operate on characters as unsigneds
+  uint8_t *ptr = dst;
+  static const int B = 128;
+  if (v < (1<<7)) {
+    *(ptr++) = v;
+  } else if (v < (1<<14)) {
+    *(ptr++) = v | B;
+    *(ptr++) = v>>7;
+  } else if (v < (1<<21)) {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = v>>14;
+  } else if (v < (1<<28)) {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = (v>>14) | B;
+    *(ptr++) = v>>21;
+  } else {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = (v>>14) | B;
+    *(ptr++) = (v>>21) | B;
+    *(ptr++) = v>>28;
+  }
+  return ptr;
+}
+
+inline void InlineEncodeFixed32(uint8_t *buf, uint32_t value) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  memcpy(buf, &value, sizeof(value));
+#else
+  buf[0] = value & 0xff;
+  buf[1] = (value >> 8) & 0xff;
+  buf[2] = (value >> 16) & 0xff;
+  buf[3] = (value >> 24) & 0xff;
+#endif
+}
+
+inline void InlineEncodeFixed64(uint8_t *buf, uint64_t value) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  memcpy(buf, &value, sizeof(value));
+#else
+  buf[0] = value & 0xff;
+  buf[1] = (value >> 8) & 0xff;
+  buf[2] = (value >> 16) & 0xff;
+  buf[3] = (value >> 24) & 0xff;
+  buf[4] = (value >> 32) & 0xff;
+  buf[5] = (value >> 40) & 0xff;
+  buf[6] = (value >> 48) & 0xff;
+  buf[7] = (value >> 56) & 0xff;
+#endif
+}
+
+
+// Standard Put... routines append to a string
+template <class StrType>
+inline void InlinePutFixed32(StrType *dst, uint32_t value) {
+  uint8_t buf[sizeof(value)];
+  InlineEncodeFixed32(buf, value);
+  dst->append(buf, sizeof(buf));
+}
+
+template <class StrType>
+inline void InlinePutFixed64(StrType *dst, uint64_t value) {
+  uint8_t buf[sizeof(value)];
+  InlineEncodeFixed64(buf, value);
+  dst->append(buf, sizeof(buf));
+}
+
+template <class StrType>
+inline void InlinePutVarint32(StrType* dst, uint32_t v) {
+  // We resize the array and then size it back down as appropriate
+  // rather than using append(), since the generated code ends up
+  // being substantially shorter.
+  int old_size = dst->size();
+  dst->resize(old_size + 5);
+  uint8_t* p = &(*dst)[old_size];
+  uint8_t *ptr = InlineEncodeVarint32(p, v);
+
+  dst->resize(old_size + ptr - p);
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding.cc b/be/src/kudu/util/coding.cc
new file mode 100644
index 0000000..952af28
--- /dev/null
+++ b/be/src/kudu/util/coding.cc
@@ -0,0 +1,142 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/coding.h"
+#include "kudu/util/coding-inl.h"
+#include "kudu/util/faststring.h"
+
+namespace kudu {
+
+void PutVarint32(faststring* dst, uint32_t v) {
+  uint8_t buf[5];
+  uint8_t* ptr = InlineEncodeVarint32(buf, v);
+  dst->append(buf, ptr - buf);
+}
+
+uint8_t* EncodeVarint64(uint8_t* dst, uint64_t v) {
+  static const int B = 128;
+  while (v >= B) {
+    *(dst++) = (v & (B-1)) | B;
+    v >>= 7;
+  }
+  *(dst++) = static_cast<uint8_t>(v);
+  return dst;
+}
+
+void PutFixed32(faststring *dst, uint32_t value) {
+  InlinePutFixed32(dst, value);
+}
+
+void PutFixed64(faststring *dst, uint64_t value) {
+  InlinePutFixed64(dst, value);
+}
+
+void PutVarint64(faststring *dst, uint64_t v) {
+  uint8_t buf[10];
+  uint8_t* ptr = EncodeVarint64(buf, v);
+  dst->append(buf, ptr - buf);
+}
+
+void PutLengthPrefixedSlice(faststring* dst, const Slice& value) {
+  PutVarint32(dst, value.size());
+  dst->append(value.data(), value.size());
+}
+
+void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value) {
+  PutFixed32(dst, value.size());
+  dst->append(value.data(), value.size());
+}
+
+int VarintLength(uint64_t v) {
+  int len = 1;
+  while (v >= 128) {
+    v >>= 7;
+    len++;
+  }
+  return len;
+}
+
+const uint8_t *GetVarint32PtrFallback(const uint8_t *p,
+                                   const uint8_t *limit,
+                                   uint32_t* value) {
+  uint32_t result = 0;
+  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
+    uint32_t byte = *p;
+    p++;
+    if (byte & 128) {
+      // More bytes are present
+      result |= ((byte & 127) << shift);
+    } else {
+      result |= (byte << shift);
+      *value = result;
+      return p;
+    }
+  }
+  return nullptr;
+}
+
+bool GetVarint32(Slice* input, uint32_t* value) {
+  const uint8_t *p = input->data();
+  const uint8_t *limit = p + input->size();
+  const uint8_t *q = GetVarint32Ptr(p, limit, value);
+  if (q == nullptr) {
+    return false;
+  } else {
+    *input = Slice(q, limit - q);
+    return true;
+  }
+}
+
+const uint8_t *GetVarint64Ptr(const uint8_t *p, const uint8_t *limit, uint64_t* value) {
+  uint64_t result = 0;
+  for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
+    uint64_t byte = *p;
+    p++;
+    if (byte & 128) {
+      // More bytes are present
+      result |= ((byte & 127) << shift);
+    } else {
+      result |= (byte << shift);
+      *value = result;
+      return p;
+    }
+  }
+  return nullptr;
+}
+
+bool GetVarint64(Slice* input, uint64_t* value) {
+  const uint8_t *p = input->data();
+  const uint8_t *limit = p + input->size();
+  const uint8_t *q = GetVarint64Ptr(p, limit, value);
+  if (q == nullptr) {
+    return false;
+  } else {
+    *input = Slice(q, limit - q);
+    return true;
+  }
+}
+
+const uint8_t *GetLengthPrefixedSlice(const uint8_t *p, const uint8_t *limit,
+                                   Slice* result) {
+  uint32_t len = 0;
+  p = GetVarint32Ptr(p, limit, &len);
+  if (p == nullptr) return nullptr;
+  if (p + len > limit) return nullptr;
+  *result = Slice(p, len);
+  return p + len;
+}
+
+bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
+  uint32_t len = 0;
+  if (GetVarint32(input, &len) &&
+      input->size() >= len) {
+    *result = Slice(input->data(), len);
+    input->remove_prefix(len);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding.h b/be/src/kudu/util/coding.h
new file mode 100644
index 0000000..0612533
--- /dev/null
+++ b/be/src/kudu/util/coding.h
@@ -0,0 +1,113 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// Endian-neutral encoding:
+// * Fixed-length numbers are encoded with least-significant byte first
+// * In addition we support variable length "varint" encoding
+// * Strings are encoded prefixed by their length in varint format
+
+#ifndef STORAGE_LEVELDB_UTIL_CODING_H_
+#define STORAGE_LEVELDB_UTIL_CODING_H_
+
+#include <cstdint>
+#include <cstring>
+
+#include "kudu/util/slice.h"
+#include "kudu/gutil/port.h"  // IWYU pragma: keep
+// IWYU pragma: no_include <endian.h>
+
+namespace kudu {
+
+class faststring;
+
+extern void PutFixed32(faststring* dst, uint32_t value);
+extern void PutFixed64(faststring* dst, uint64_t value);
+extern void PutVarint32(faststring* dst, uint32_t value);
+extern void PutVarint64(faststring* dst, uint64_t value);
+
+// Put a length-prefixed Slice into the buffer. The length prefix
+// is varint-encoded.
+extern void PutLengthPrefixedSlice(faststring* dst, const Slice& value);
+
+// Put a length-prefixed Slice into the buffer. The length prefix
+// is 32-bit fixed encoded in little endian.
+extern void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value);
+
+// Standard Get... routines parse a value from the beginning of a Slice
+// and advance the slice past the parsed value.
+extern bool GetVarint32(Slice* input, uint32_t* value);
+extern bool GetVarint64(Slice* input, uint64_t* value);
+extern bool GetLengthPrefixedSlice(Slice* input, Slice* result);
+
+// Pointer-based variants of GetVarint...  These either store a value
+// in *v and return a pointer just past the parsed value, or return
+// NULL on error.  These routines only look at bytes in the range
+// [p..limit-1]
+extern const uint8_t *GetVarint32Ptr(const uint8_t *p,const uint8_t *limit, uint32_t* v);
+extern const uint8_t *GetVarint64Ptr(const uint8_t *p,const uint8_t *limit, uint64_t* v);
+
+// Returns the length of the varint32 or varint64 encoding of "v"
+extern int VarintLength(uint64_t v);
+
+// Lower-level versions of Put... that write directly into a character buffer
+// REQUIRES: dst has enough space for the value being written
+extern void EncodeFixed32(uint8_t *dst, uint32_t value);
+extern void EncodeFixed64(uint8_t *dst, uint64_t value);
+
+// Lower-level versions of Put... that write directly into a character buffer
+// and return a pointer just past the last byte written.
+// REQUIRES: dst has enough space for the value being written
+extern uint8_t *EncodeVarint32(uint8_t *dst, uint32_t value);
+extern uint8_t *EncodeVarint64(uint8_t *dst, uint64_t value);
+
+// Lower-level versions of Get... that read directly from a character buffer
+// without any bounds checking.
+
+inline uint32_t DecodeFixed32(const uint8_t *ptr) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    // Load the raw bytes
+    uint32_t result;
+    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain load
+    return result;
+#else
+    return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
+#endif
+}
+
+inline uint64_t DecodeFixed64(const uint8_t *ptr) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    // Load the raw bytes
+    uint64_t result;
+    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain load
+    return result;
+#else
+    uint64_t lo = DecodeFixed32(ptr);
+    uint64_t hi = DecodeFixed32(ptr + 4);
+    return (hi << 32) | lo;
+#endif
+}
+
+// Internal routine for use by fallback path of GetVarint32Ptr
+extern const uint8_t *GetVarint32PtrFallback(const uint8_t *p,
+                                             const uint8_t *limit,
+                                             uint32_t* value);
+inline const uint8_t *GetVarint32Ptr(const uint8_t *p,
+                                     const uint8_t *limit,
+                                     uint32_t* value) {
+  if (PREDICT_TRUE(p < limit)) {
+    uint32_t result = *p;
+    if (PREDICT_TRUE((result & 128) == 0)) {
+      *value = result;
+      return p + 1;
+    }
+  }
+  return GetVarint32PtrFallback(p, limit, value);
+}
+
+}  // namespace kudu
+
+#endif  // STORAGE_LEVELDB_UTIL_CODING_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression-test.cc b/be/src/kudu/util/compression/compression-test.cc
new file mode 100644
index 0000000..6b46a4f
--- /dev/null
+++ b/be/src/kudu/util/compression/compression-test.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/compression/compression.pb.h"
+#include "kudu/util/compression/compression_codec.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using std::vector;
+
+class TestCompression : public KuduTest {};
+
+static void TestCompressionCodec(CompressionType compression) {
+  const int kInputSize = 64;
+
+  const CompressionCodec* codec;
+  uint8_t ibuffer[kInputSize];
+  uint8_t ubuffer[kInputSize];
+  size_t compressed;
+
+  // Fill the test input buffer
+  memset(ibuffer, 'Z', kInputSize);
+
+  // Get the specified compression codec
+  ASSERT_OK(GetCompressionCodec(compression, &codec));
+
+  // Allocate the compression buffer
+  size_t max_compressed = codec->MaxCompressedLength(kInputSize);
+  ASSERT_LT(max_compressed, (kInputSize * 2));
+  gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]);
+
+  // Compress and uncompress
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+
+  // Compress slices and uncompress
+  vector<Slice> v;
+  v.emplace_back(ibuffer, 1);
+  for (int i = 1; i <= kInputSize; i += 7)
+    v.emplace_back(ibuffer + i, 7);
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+}
+
+TEST_F(TestCompression, TestNoCompressionCodec) {
+  const CompressionCodec* codec;
+  ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec));
+  ASSERT_EQ(nullptr, codec);
+}
+
+TEST_F(TestCompression, TestSnappyCompressionCodec) {
+  TestCompressionCodec(SNAPPY);
+}
+
+TEST_F(TestCompression, TestLz4CompressionCodec) {
+  TestCompressionCodec(LZ4);
+}
+
+TEST_F(TestCompression, TestZlibCompressionCodec) {
+  TestCompressionCodec(ZLIB);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression.proto b/be/src/kudu/util/compression/compression.proto
new file mode 100644
index 0000000..a0f5343
--- /dev/null
+++ b/be/src/kudu/util/compression/compression.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+enum CompressionType {
+  UNKNOWN_COMPRESSION = 999;
+  DEFAULT_COMPRESSION = 0;
+  NO_COMPRESSION = 1;
+  SNAPPY = 2;
+  LZ4 = 3;
+  ZLIB = 4;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression_codec.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression_codec.cc b/be/src/kudu/util/compression/compression_codec.cc
new file mode 100644
index 0000000..a2231b6
--- /dev/null
+++ b/be/src/kudu/util/compression/compression_codec.cc
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/compression/compression_codec.h"
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <lz4.h>
+#include <snappy-sinksource.h>
+#include <snappy.h>
+#include <zlib.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/string_case.h"
+
+namespace kudu {
+
+using std::vector;
+
+CompressionCodec::CompressionCodec() {
+}
+CompressionCodec::~CompressionCodec() {
+}
+
+class SlicesSource : public snappy::Source {
+ public:
+  explicit SlicesSource(const std::vector<Slice>& slices)
+    : slice_index_(0),
+      slice_offset_(0),
+      slices_(slices) {
+    available_ = TotalSize();
+  }
+
+  size_t Available() const OVERRIDE {
+    return available_;
+  }
+
+  const char* Peek(size_t* len) OVERRIDE {
+    if (available_ == 0) {
+      *len = 0;
+      return nullptr;
+    }
+
+    const Slice& data = slices_[slice_index_];
+    *len = data.size() - slice_offset_;
+    return reinterpret_cast<const char *>(data.data()) + slice_offset_;
+  }
+
+  void Skip(size_t n) OVERRIDE {
+    DCHECK_LE(n, Available());
+    if (n == 0) return;
+
+    available_ -= n;
+    if ((n + slice_offset_) < slices_[slice_index_].size()) {
+      slice_offset_ += n;
+    } else {
+      n -= slices_[slice_index_].size() - slice_offset_;
+      slice_index_++;
+      while (n > 0 && n >= slices_[slice_index_].size()) {
+        n -= slices_[slice_index_].size();
+        slice_index_++;
+      }
+      slice_offset_ = n;
+    }
+  }
+
+  void Dump(faststring *buffer) {
+    buffer->reserve(buffer->size() + TotalSize());
+    for (const Slice& block : slices_) {
+      buffer->append(block.data(), block.size());
+    }
+  }
+
+ private:
+  size_t TotalSize(void) const {
+    size_t size = 0;
+    for (const Slice& data : slices_) {
+      size += data.size();
+    }
+    return size;
+  }
+
+ private:
+  size_t available_;
+  size_t slice_index_;
+  size_t slice_offset_;
+  const vector<Slice>& slices_;
+};
+
+class SnappyCodec : public CompressionCodec {
+ public:
+  static SnappyCodec *GetSingleton() {
+    return Singleton<SnappyCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(),
+                        reinterpret_cast<char *>(compressed), compressed_length);
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    SlicesSource source(input_slices);
+    snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed));
+    if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) {
+      return Status::Corruption("unable to compress the buffer");
+    }
+    return Status::OK();
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()),
+                                         compressed.size(), reinterpret_cast<char *>(uncompressed));
+    return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return snappy::MaxCompressedLength(source_bytes);
+  }
+
+  CompressionType type() const override {
+    return SNAPPY;
+  }
+};
+
+class Lz4Codec : public CompressionCodec {
+ public:
+  static Lz4Codec *GetSingleton() {
+    return Singleton<Lz4Codec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    int n = LZ4_compress(reinterpret_cast<const char *>(input.data()),
+                         reinterpret_cast<char *>(compressed), input.size());
+    *compressed_length = n;
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    int n = LZ4_decompress_fast(reinterpret_cast<const char *>(compressed.data()),
+                                reinterpret_cast<char *>(uncompressed), uncompressed_length);
+    if (n != compressed.size()) {
+      return Status::Corruption(
+        StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n),
+                     KUDU_REDACT(compressed.ToDebugString(100)));
+    }
+    return Status::OK();
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return LZ4_compressBound(source_bytes);
+  }
+
+  CompressionType type() const override {
+    return LZ4;
+  }
+};
+
+/**
+ * TODO: use a instance-local Arena and pass alloc/free into zlib
+ * so that it allocates from the arena.
+ */
+class ZlibCodec : public CompressionCodec {
+ public:
+  static ZlibCodec *GetSingleton() {
+    return Singleton<ZlibCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    *compressed_length = MaxCompressedLength(input.size());
+    int err = ::compress(compressed, compressed_length, input.data(), input.size());
+    return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer");
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    // TODO: use z_stream
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE {
+    int err = ::uncompress(uncompressed, &uncompressed_length,
+                           compressed.data(), compressed.size());
+    return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
+    return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14)));
+  }
+
+  CompressionType type() const override {
+    return ZLIB;
+  }
+};
+
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec) {
+  switch (compression) {
+    case NO_COMPRESSION:
+      *codec = nullptr;
+      break;
+    case SNAPPY:
+      *codec = SnappyCodec::GetSingleton();
+      break;
+    case LZ4:
+      *codec = Lz4Codec::GetSingleton();
+      break;
+    case ZLIB:
+      *codec = ZlibCodec::GetSingleton();
+      break;
+    default:
+      return Status::NotFound("bad compression type");
+  }
+  return Status::OK();
+}
+
+CompressionType GetCompressionCodecType(const std::string& name) {
+  std::string uname;
+  ToUpperCase(name, &uname);
+
+  if (uname == "SNAPPY")
+    return SNAPPY;
+  if (uname == "LZ4")
+    return LZ4;
+  if (uname == "ZLIB")
+    return ZLIB;
+  if (uname == "NONE")
+    return NO_COMPRESSION;
+
+  LOG(WARNING) << "Unable to recognize the compression codec '" << name
+               << "' using no compression as default.";
+  return NO_COMPRESSION;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression_codec.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression_codec.h b/be/src/kudu/util/compression/compression_codec.h
new file mode 100644
index 0000000..4f81fd3
--- /dev/null
+++ b/be/src/kudu/util/compression/compression_codec.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CFILE_COMPRESSION_CODEC_H
+#define KUDU_CFILE_COMPRESSION_CODEC_H
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <snappy-stubs-public.h>
+
+#include "kudu/util/compression/compression.pb.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class CompressionCodec {
+ public:
+  CompressionCodec();
+  virtual ~CompressionCodec();
+
+  // REQUIRES: "compressed" must point to an area of memory that is at
+  // least "MaxCompressedLength(input_length)" bytes in length.
+  //
+  // Takes the data stored in "input[0..input_length]" and stores
+  // it in the array pointed to by "compressed".
+  //
+  // returns the length of the compressed output.
+  virtual Status Compress(const Slice& input,
+                          uint8_t *compressed, size_t *compressed_length) const = 0;
+
+  virtual Status Compress(const std::vector<Slice>& input_slices,
+                          uint8_t *compressed, size_t *compressed_length) const = 0;
+
+  // Given data in "compressed[0..compressed_length-1]" generated by
+  // calling the Compress routine, this routine stores the uncompressed data
+  // to uncompressed[0..uncompressed_length-1]
+  // returns false if the message is corrupted and could not be uncompressed
+  virtual Status Uncompress(const Slice& compressed,
+                            uint8_t *uncompressed, size_t uncompressed_length) const = 0;
+
+  // Returns the maximal size of the compressed representation of
+  // input data that is "source_bytes" bytes in length.
+  virtual size_t MaxCompressedLength(size_t source_bytes) const = 0;
+
+  // Return the type of compression implemented by this codec.
+  virtual CompressionType type() const = 0;
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CompressionCodec);
+};
+
+// Returns the compression codec for the specified type.
+//
+// The returned codec is a singleton and should be not be destroyed.
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec);
+
+// Returns the compression codec type given the name
+CompressionType GetCompressionCodecType(const std::string& name);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/condition_variable.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/condition_variable.cc b/be/src/kudu/util/condition_variable.cc
new file mode 100644
index 0000000..369d20d
--- /dev/null
+++ b/be/src/kudu/util/condition_variable.cc
@@ -0,0 +1,142 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "kudu/util/condition_variable.h"
+
+#include <sys/time.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <ctime>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+ConditionVariable::ConditionVariable(Mutex* user_lock)
+    : user_mutex_(&user_lock->native_handle_)
+#if !defined(NDEBUG)
+    , user_lock_(user_lock)
+#endif
+{
+  int rv = 0;
+#if defined(__APPLE__)
+  rv = pthread_cond_init(&condition_, nullptr);
+#else
+  // On Linux we can't use relative times like on macOS; reconfiguring the
+  // condition variable to use the monotonic clock means we can use support
+  // WaitFor with our MonoTime implementation.
+  pthread_condattr_t attrs;
+  rv = pthread_condattr_init(&attrs);
+  DCHECK_EQ(0, rv);
+  pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
+  rv = pthread_cond_init(&condition_, &attrs);
+  pthread_condattr_destroy(&attrs);
+#endif
+  DCHECK_EQ(0, rv);
+}
+
+ConditionVariable::~ConditionVariable() {
+  int rv = pthread_cond_destroy(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+void ConditionVariable::Wait() const {
+  ThreadRestrictions::AssertWaitAllowed();
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+  int rv = pthread_cond_wait(&condition_, user_mutex_);
+  DCHECK_EQ(0, rv);
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+}
+
+bool ConditionVariable::WaitUntil(const MonoTime& until) const {
+  ThreadRestrictions::AssertWaitAllowed();
+
+  // Have we already timed out?
+  MonoTime now = MonoTime::Now();
+  if (now > until) {
+    return false;
+  }
+
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+
+#if defined(__APPLE__)
+  // macOS does not provide a way to configure pthread_cond_timedwait() to use
+  // monotonic clocks, so we must convert the deadline into a delta and perform
+  // a relative wait.
+  MonoDelta delta = until - now;
+  struct timespec relative_time;
+  delta.ToTimeSpec(&relative_time);
+  int rv = pthread_cond_timedwait_relative_np(
+      &condition_, user_mutex_, &relative_time);
+#else
+  struct timespec absolute_time;
+  until.ToTimeSpec(&absolute_time);
+  int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time);
+#endif
+  DCHECK(rv == 0 || rv == ETIMEDOUT)
+    << "unexpected pthread_cond_timedwait return value: " << rv;
+
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+  return rv == 0;
+}
+
+bool ConditionVariable::WaitFor(const MonoDelta& delta) const {
+  ThreadRestrictions::AssertWaitAllowed();
+
+  // Negative delta means we've already timed out.
+  int64_t nsecs = delta.ToNanoseconds();
+  if (nsecs < 0) {
+    return false;
+  }
+
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+
+#if defined(__APPLE__)
+  struct timespec relative_time;
+  delta.ToTimeSpec(&relative_time);
+  int rv = pthread_cond_timedwait_relative_np(
+      &condition_, user_mutex_, &relative_time);
+#else
+  // The timeout argument to pthread_cond_timedwait is in absolute time.
+  struct timespec absolute_time;
+  MonoTime deadline = MonoTime::Now() + delta;
+  deadline.ToTimeSpec(&absolute_time);
+  int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time);
+#endif
+
+  DCHECK(rv == 0 || rv == ETIMEDOUT)
+    << "unexpected pthread_cond_timedwait return value: " << rv;
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+  return rv == 0;
+}
+
+void ConditionVariable::Broadcast() {
+  int rv = pthread_cond_broadcast(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+void ConditionVariable::Signal() {
+  int rv = pthread_cond_signal(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/condition_variable.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/condition_variable.h b/be/src/kudu/util/condition_variable.h
new file mode 100644
index 0000000..1245646
--- /dev/null
+++ b/be/src/kudu/util/condition_variable.h
@@ -0,0 +1,118 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// ConditionVariable wraps pthreads condition variable synchronization or, on
+// Windows, simulates it.  This functionality is very helpful for having
+// several threads wait for an event, as is common with a thread pool managed
+// by a master.  The meaning of such an event in the (worker) thread pool
+// scenario is that additional tasks are now available for processing.  It is
+// used in Chrome in the DNS prefetching system to notify worker threads that
+// a queue now has items (tasks) which need to be tended to.  A related use
+// would have a pool manager waiting on a ConditionVariable, waiting for a
+// thread in the pool to announce (signal) that there is now more room in a
+// (bounded size) communications queue for the manager to deposit tasks, or,
+// as a second example, that the queue of tasks is completely empty and all
+// workers are waiting.
+//
+// USAGE NOTE 1: spurious signal events are possible with this and
+// most implementations of condition variables.  As a result, be
+// *sure* to retest your condition before proceeding.  The following
+// is a good example of doing this correctly:
+//
+// while (!work_to_be_done()) Wait(...);
+//
+// In contrast do NOT do the following:
+//
+// if (!work_to_be_done()) Wait(...);  // Don't do this.
+//
+// Especially avoid the above if you are relying on some other thread only
+// issuing a signal up *if* there is work-to-do.  There can/will
+// be spurious signals.  Recheck state on waiting thread before
+// assuming the signal was intentional. Caveat caller ;-).
+//
+// USAGE NOTE 2: Broadcast() frees up all waiting threads at once,
+// which leads to contention for the locks they all held when they
+// called Wait().  This results in POOR performance.  A much better
+// approach to getting a lot of threads out of Wait() is to have each
+// thread (upon exiting Wait()) call Signal() to free up another
+// Wait'ing thread.  Look at condition_variable_unittest.cc for
+// both examples.
+//
+// Broadcast() can be used nicely during teardown, as it gets the job
+// done, and leaves no sleeping threads... and performance is less
+// critical at that point.
+//
+// The semantics of Broadcast() are carefully crafted so that *all*
+// threads that were waiting when the request was made will indeed
+// get signaled.  Some implementations mess up, and don't signal them
+// all, while others allow the wait to be effectively turned off (for
+// a while while waiting threads come around).  This implementation
+// appears correct, as it will not "lose" any signals, and will guarantee
+// that all threads get signaled by Broadcast().
+//
+// This implementation offers support for "performance" in its selection of
+// which thread to revive.  Performance, in direct contrast with "fairness,"
+// assures that the thread that most recently began to Wait() is selected by
+// Signal to revive.  Fairness would (if publicly supported) assure that the
+// thread that has Wait()ed the longest is selected. The default policy
+// may improve performance, as the selected thread may have a greater chance of
+// having some of its stack data in various CPU caches.
+//
+// For a discussion of the many very subtle implementation details, see the FAQ
+// at the end of condition_variable_win.cc.
+
+#ifndef BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_
+#define BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_
+
+#include <pthread.h>
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+class MonoDelta;
+class MonoTime;
+class Mutex;
+
+class ConditionVariable {
+ public:
+  // Construct a cv for use with ONLY one user lock.
+  explicit ConditionVariable(Mutex* user_lock);
+
+  ~ConditionVariable();
+
+  // Wait() releases the caller's critical section atomically as it starts to
+  // sleep, and the reacquires it when it is signaled.
+  void Wait() const;
+
+  // Like Wait(), but only waits up to a certain point in time.
+  //
+  // Returns true if we were Signal()'ed, or false if we reached 'until'.
+  bool WaitUntil(const MonoTime& until) const;
+
+  // Like Wait(), but only waits up to a limited amount of time.
+  //
+  // Returns true if we were Signal()'ed, or false if 'delta' elapsed.
+  bool WaitFor(const MonoDelta& delta) const;
+
+  // Broadcast() revives all waiting threads.
+  void Broadcast();
+  // Signal() revives one waiting thread.
+  void Signal();
+
+ private:
+
+  mutable pthread_cond_t condition_;
+  pthread_mutex_t* user_mutex_;
+
+#if !defined(NDEBUG)
+  Mutex* user_lock_;     // Needed to adjust shadow lock state on wait.
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
+};
+
+}  // namespace kudu
+
+#endif  // BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/countdown_latch-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/countdown_latch-test.cc b/be/src/kudu/util/countdown_latch-test.cc
new file mode 100644
index 0000000..adb2623
--- /dev/null
+++ b/be/src/kudu/util/countdown_latch-test.cc
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+static void DecrementLatch(CountDownLatch* latch, int amount) {
+  if (amount == 1) {
+    latch->CountDown();
+    return;
+  }
+  latch->CountDown(amount);
+}
+
+// Tests that we can decrement the latch by arbitrary amounts, as well
+// as 1 by one.
+TEST(TestCountDownLatch, TestLatch) {
+
+  gscoped_ptr<ThreadPool> pool;
+  ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).Build(&pool));
+
+  CountDownLatch latch(1000);
+
+  // Decrement the count by 1 in another thread, this should not fire the
+  // latch.
+  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1)));
+  ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200)));
+  ASSERT_EQ(999, latch.count());
+
+  // Now decrement by 1000 this should decrement to 0 and fire the latch
+  // (even though 1000 is one more than the current count).
+  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000)));
+  latch.Wait();
+  ASSERT_EQ(0, latch.count());
+}
+
+// Test that resetting to zero while there are waiters lets the waiters
+// continue.
+TEST(TestCountDownLatch, TestResetToZero) {
+  CountDownLatch cdl(100);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "cdl-test", &CountDownLatch::Wait, &cdl, &t));
+
+  // Sleep for a bit until it's likely the other thread is waiting on the latch.
+  SleepFor(MonoDelta::FromMilliseconds(10));
+  cdl.Reset(0);
+  t->Join();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/countdown_latch.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/countdown_latch.h b/be/src/kudu/util/countdown_latch.h
new file mode 100644
index 0000000..9a8000d
--- /dev/null
+++ b/be/src/kudu/util/countdown_latch.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_COUNTDOWN_LATCH_H
+#define KUDU_UTIL_COUNTDOWN_LATCH_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+// This is a C++ implementation of the Java CountDownLatch
+// class.
+// See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html
+class CountDownLatch {
+ public:
+  // Initialize the latch with the given initial count.
+  explicit CountDownLatch(int count)
+    : cond_(&lock_),
+      count_(count) {
+  }
+
+  // Decrement the count of this latch by 'amount'
+  // If the new count is less than or equal to zero, then all waiting threads are woken up.
+  // If the count is already zero, this has no effect.
+  void CountDown(int amount) {
+    DCHECK_GE(amount, 0);
+    MutexLock lock(lock_);
+    if (count_ == 0) {
+      return;
+    }
+
+    if (amount >= count_) {
+      count_ = 0;
+    } else {
+      count_ -= amount;
+    }
+
+    if (count_ == 0) {
+      // Latch has triggered.
+      cond_.Broadcast();
+    }
+  }
+
+  // Decrement the count of this latch.
+  // If the new count is zero, then all waiting threads are woken up.
+  // If the count is already zero, this has no effect.
+  void CountDown() {
+    CountDown(1);
+  }
+
+  // Wait until the count on the latch reaches zero.
+  // If the count is already zero, this returns immediately.
+  void Wait() const {
+    ThreadRestrictions::AssertWaitAllowed();
+    MutexLock lock(lock_);
+    while (count_ > 0) {
+      cond_.Wait();
+    }
+  }
+
+  // Waits for the count on the latch to reach zero, or until 'until' time is reached.
+  // Returns true if the count became zero, false otherwise.
+  bool WaitUntil(const MonoTime& when) const {
+    ThreadRestrictions::AssertWaitAllowed();
+    MutexLock lock(lock_);
+    while (count_ > 0) {
+      if (!cond_.WaitUntil(when)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Waits for the count on the latch to reach zero, or until 'delta' time elapses.
+  // Returns true if the count became zero, false otherwise.
+  bool WaitFor(const MonoDelta& delta) const {
+    return WaitUntil(MonoTime::Now() + delta);
+  }
+
+  // Reset the latch with the given count. This is equivalent to reconstructing
+  // the latch. If 'count' is 0, and there are currently waiters, those waiters
+  // will be triggered as if you counted down to 0.
+  void Reset(uint64_t count) {
+    MutexLock lock(lock_);
+    count_ = count;
+    if (count_ == 0) {
+      // Awake any waiters if we reset to 0.
+      cond_.Broadcast();
+    }
+  }
+
+  uint64_t count() const {
+    MutexLock lock(lock_);
+    return count_;
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CountDownLatch);
+  mutable Mutex lock_;
+  ConditionVariable cond_;
+
+  uint64_t count_;
+};
+
+// Utility class which calls latch->CountDown() in its destructor.
+class CountDownOnScopeExit {
+ public:
+  explicit CountDownOnScopeExit(CountDownLatch *latch) : latch_(latch) {}
+  ~CountDownOnScopeExit() {
+    latch_->CountDown();
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CountDownOnScopeExit);
+
+  CountDownLatch *latch_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cow_object.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cow_object.cc b/be/src/kudu/util/cow_object.cc
new file mode 100644
index 0000000..a22393c
--- /dev/null
+++ b/be/src/kudu/util/cow_object.cc
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/cow_object.h"
+
+using std::ostream;
+
+namespace kudu {
+
+ostream& operator<<(ostream& o, LockMode m) {
+  switch (m) {
+    case LockMode::READ: o << "READ"; break;
+    case LockMode::WRITE: o << "WRITE"; break;
+    case LockMode::RELEASED: o << "RELEASED"; break;
+    default: o << "UNKNOWN"; break;
+  }
+  return o;
+}
+
+} // namespace kudu