You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:41 UTC

[29/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-stream-utils.inline.h b/be/src/kudu/util/bit-stream-utils.inline.h
new file mode 100644
index 0000000..d168bda
--- /dev/null
+++ b/be/src/kudu/util/bit-stream-utils.inline.h
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
+#define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
+
+#include <algorithm>
+
+#include "glog/logging.h"
+#include "kudu/util/bit-stream-utils.h"
+#include "kudu/util/alignment.h"
+
+namespace kudu {
+
+inline void BitWriter::PutValue(uint64_t v, int num_bits) {
+  DCHECK_LE(num_bits, 64);
+  // Truncate the higher-order bits. This is necessary to
+  // support signed values.
+  v &= ~0ULL >> (64 - num_bits);
+
+
+  buffered_values_ |= v << bit_offset_;
+  bit_offset_ += num_bits;
+
+  if (PREDICT_FALSE(bit_offset_ >= 64)) {
+    // Flush buffered_values_ and write out bits of v that did not fit
+    buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + 8, 8));
+    buffer_->resize(byte_offset_ + 8);
+    DCHECK_LE(byte_offset_ + 8, buffer_->capacity());
+    memcpy(buffer_->data() + byte_offset_, &buffered_values_, 8);
+    buffered_values_ = 0;
+    byte_offset_ += 8;
+    bit_offset_ -= 64;
+    buffered_values_ = BitUtil::ShiftRightZeroOnOverflow(v, (num_bits - bit_offset_));
+  }
+  DCHECK_LT(bit_offset_, 64);
+}
+
+inline void BitWriter::Flush(bool align) {
+  int num_bytes = BitUtil::Ceil(bit_offset_, 8);
+  buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8));
+  buffer_->resize(byte_offset_ + num_bytes);
+  DCHECK_LE(byte_offset_ + num_bytes, buffer_->capacity());
+  memcpy(buffer_->data() + byte_offset_, &buffered_values_, num_bytes);
+
+  if (align) {
+    buffered_values_ = 0;
+    byte_offset_ += num_bytes;
+    bit_offset_ = 0;
+  }
+}
+
+inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) {
+  Flush(/* align */ true);
+  buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8));
+  buffer_->resize(byte_offset_ + num_bytes);
+  uint8_t* ptr = buffer_->data() + byte_offset_;
+  byte_offset_ += num_bytes;
+  DCHECK_LE(byte_offset_, buffer_->capacity());
+  return ptr;
+}
+
+template<typename T>
+inline void BitWriter::PutAligned(T val, int num_bytes) {
+  DCHECK_LE(num_bytes, sizeof(T));
+  uint8_t* ptr = GetNextBytePtr(num_bytes);
+  memcpy(ptr, &val, num_bytes);
+}
+
+inline void BitWriter::PutVlqInt(int32_t v) {
+  while ((v & 0xFFFFFF80) != 0L) {
+    PutAligned<uint8_t>((v & 0x7F) | 0x80, 1);
+    v >>= 7;
+  }
+  PutAligned<uint8_t>(v & 0x7F, 1);
+}
+
+
+inline BitReader::BitReader(const uint8_t* buffer, int buffer_len)
+  : buffer_(buffer),
+    max_bytes_(buffer_len),
+    buffered_values_(0),
+    byte_offset_(0),
+    bit_offset_(0) {
+  int num_bytes = std::min(8, max_bytes_);
+  memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
+}
+
+inline void BitReader::BufferValues() {
+  int bytes_remaining = max_bytes_ - byte_offset_;
+  if (PREDICT_TRUE(bytes_remaining >= 8)) {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+  } else {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+  }
+}
+
+template<typename T>
+inline bool BitReader::GetValue(int num_bits, T* v) {
+  DCHECK_LE(num_bits, 64);
+  DCHECK_LE(num_bits, sizeof(T) * 8);
+
+  if (PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
+
+  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
+
+  bit_offset_ += num_bits;
+  if (bit_offset_ >= 64) {
+    byte_offset_ += 8;
+    bit_offset_ -= 64;
+    BufferValues();
+    // Read bits of v that crossed into new buffered_values_
+    *v |= BitUtil::ShiftLeftZeroOnOverflow(
+        BitUtil::TrailingBits(buffered_values_, bit_offset_),
+        (num_bits - bit_offset_));
+  }
+  DCHECK_LE(bit_offset_, 64);
+  return true;
+}
+
+inline void BitReader::Rewind(int num_bits) {
+  bit_offset_ -= num_bits;
+  if (bit_offset_ >= 0) {
+    return;
+  }
+  while (bit_offset_ < 0) {
+    int seek_back = std::min(byte_offset_, 8);
+    byte_offset_ -= seek_back;
+    bit_offset_ += seek_back * 8;
+  }
+  // This should only be executed *if* rewinding by 'num_bits'
+  // make the existing buffered_values_ invalid
+  DCHECK_GE(byte_offset_, 0); // Check for underflow
+  memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+}
+
+inline void BitReader::SeekToBit(uint stream_position) {
+  DCHECK_LE(stream_position, max_bytes_ * 8);
+
+  int delta = static_cast<int>(stream_position) - position();
+  if (delta == 0) {
+    return;
+  } else if (delta < 0) {
+    Rewind(position() - stream_position);
+  } else {
+    bit_offset_ += delta;
+    while (bit_offset_ >= 64) {
+      byte_offset_ +=8;
+      bit_offset_ -= 64;
+      if (bit_offset_ < 64) {
+        // This should only be executed if seeking to
+        // 'stream_position' makes the existing buffered_values_
+        // invalid.
+        BufferValues();
+      }
+    }
+  }
+}
+
+template<typename T>
+inline bool BitReader::GetAligned(int num_bytes, T* v) {
+  DCHECK_LE(num_bytes, sizeof(T));
+  int bytes_read = BitUtil::Ceil(bit_offset_, 8);
+  if (PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
+
+  // Advance byte_offset to next unread byte and read num_bytes
+  byte_offset_ += bytes_read;
+  memcpy(v, buffer_ + byte_offset_, num_bytes);
+  byte_offset_ += num_bytes;
+
+  // Reset buffered_values_
+  bit_offset_ = 0;
+  int bytes_remaining = max_bytes_ - byte_offset_;
+  if (PREDICT_TRUE(bytes_remaining >= 8)) {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+  } else {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+  }
+  return true;
+}
+
+inline bool BitReader::GetVlqInt(int32_t* v) {
+  *v = 0;
+  int shift = 0;
+  int num_bytes = 0;
+  uint8_t byte = 0;
+  do {
+    if (!GetAligned<uint8_t>(1, &byte)) return false;
+    *v |= (byte & 0x7F) << shift;
+    shift += 7;
+    DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
+  } while ((byte & 0x80) != 0);
+  return true;
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util-test.cc b/be/src/kudu/util/bit-util-test.cc
new file mode 100644
index 0000000..0d8eab4
--- /dev/null
+++ b/be/src/kudu/util/bit-util-test.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <boost/utility/binary.hpp>
+#include <gtest/gtest.h>
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+TEST(BitUtil, TrailingBits) {
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 0), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 1), 1);
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 64),
+            BOOST_BINARY(1 1 1 1 1 1 1 1));
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 100),
+            BOOST_BINARY(1 1 1 1 1 1 1 1));
+  EXPECT_EQ(BitUtil::TrailingBits(0, 1), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(0, 64), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 0), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 63), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 64), 1LL << 63);
+
+}
+
+TEST(BitUtil, ShiftBits) {
+  EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(1ULL, 64), 0ULL);
+  EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0xFFFFFFFF00000000ULL);
+  EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(1ULL, 64), 0ULL);
+  EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0x00000000FFFFFFFFULL);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util.h b/be/src/kudu/util/bit-util.h
new file mode 100644
index 0000000..5f36887
--- /dev/null
+++ b/be/src/kudu/util/bit-util.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_BIT_UTIL_H
+#define IMPALA_BIT_UTIL_H
+
+#include <stdint.h>
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// Utility class to do standard bit tricks
+// TODO: is this in boost or something else like that?
+class BitUtil {
+ public:
+  // Returns the ceil of value/divisor
+  static inline int Ceil(int value, int divisor) {
+    return value / divisor + (value % divisor != 0);
+  }
+
+  // Returns the 'num_bits' least-significant bits of 'v'.
+  static inline uint64_t TrailingBits(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits == 0)) return 0;
+    if (PREDICT_FALSE(num_bits >= 64)) return v;
+    int n = 64 - num_bits;
+    return (v << n) >> n;
+  }
+
+  static inline uint64_t ShiftLeftZeroOnOverflow(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits >= 64)) return 0;
+    return v << num_bits;
+  }
+
+  static inline uint64_t ShiftRightZeroOnOverflow(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits >= 64)) return 0;
+    return v >> num_bits;
+  }
+
+
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap-test.cc b/be/src/kudu/util/bitmap-test.cc
new file mode 100644
index 0000000..089ed3b
--- /dev/null
+++ b/be/src/kudu/util/bitmap-test.cc
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/util/bitmap.h"
+
+namespace kudu {
+
+static int ReadBackBitmap(uint8_t *bm, size_t bits,
+                           std::vector<size_t> *result) {
+  int iters = 0;
+  for (TrueBitIterator iter(bm, bits);
+       !iter.done();
+       ++iter) {
+    size_t val = *iter;
+    result->push_back(val);
+
+    iters++;
+  }
+  return iters;
+}
+
+TEST(TestBitMap, TestIteration) {
+  uint8_t bm[8];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 0);
+  BitmapSet(bm, 8);
+  BitmapSet(bm, 31);
+  BitmapSet(bm, 32);
+  BitmapSet(bm, 33);
+  BitmapSet(bm, 63);
+
+  EXPECT_EQ("   0: 10000000 10000000 00000000 00000001 11000000 00000000 00000000 00000001 \n",
+            BitmapToString(bm, sizeof(bm) * 8));
+
+  std::vector<size_t> read_back;
+
+  int iters = ReadBackBitmap(bm, sizeof(bm)*8, &read_back);
+  ASSERT_EQ(6, iters);
+  ASSERT_EQ("0,8,31,32,33,63", JoinElements(read_back, ","));
+}
+
+
+TEST(TestBitMap, TestIteration2) {
+  uint8_t bm[1];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 1);
+
+  std::vector<size_t> read_back;
+
+  int iters = ReadBackBitmap(bm, 3, &read_back);
+  ASSERT_EQ(1, iters);
+  ASSERT_EQ("1", JoinElements(read_back, ","));
+}
+
+TEST(TestBitmap, TestSetAndTestBits) {
+  uint8_t bm[1];
+  memset(bm, 0, sizeof(bm));
+
+  size_t num_bits = sizeof(bm) * 8;
+  for (size_t i = 0; i < num_bits; i++) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+
+    BitmapSet(bm, i);
+    ASSERT_TRUE(BitmapTest(bm, i));
+
+    BitmapClear(bm, i);
+    ASSERT_FALSE(BitmapTest(bm, i));
+
+    BitmapChange(bm, i, true);
+    ASSERT_TRUE(BitmapTest(bm, i));
+
+    BitmapChange(bm, i, false);
+    ASSERT_FALSE(BitmapTest(bm, i));
+  }
+
+  // Set the other bit: 01010101
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+    if (i & 1) BitmapSet(bm, i);
+  }
+
+  // Check and Clear the other bit: 0000000
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+    if (i & 1) BitmapClear(bm, i);
+  }
+
+  // Check if bits are zero and change the other to one
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+    BitmapChange(bm, i, i & 1);
+  }
+
+  // Check the bits change them again
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+    BitmapChange(bm, i, !(i & 1));
+  }
+
+  // Check the last setup
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!(i & 1), BitmapTest(bm, i));
+  }
+}
+
+TEST(TestBitMap, TestBulkSetAndTestBits) {
+  uint8_t bm[16];
+  size_t total_size = sizeof(bm) * 8;
+
+  // Test Bulk change bits and test bits
+  for (int i = 0; i < 4; ++i) {
+    bool value = i & 1;
+    size_t num_bits = total_size;
+    while (num_bits > 0) {
+      for (size_t offset = 0; offset < num_bits; ++offset) {
+        BitmapChangeBits(bm, 0, total_size, !value);
+        BitmapChangeBits(bm, offset, num_bits - offset, value);
+
+        ASSERT_EQ(value, BitMapIsAllSet(bm, offset, num_bits));
+        ASSERT_EQ(!value, BitmapIsAllZero(bm, offset, num_bits));
+
+        if (offset > 1) {
+          ASSERT_EQ(value, BitmapIsAllZero(bm, 0, offset - 1));
+          ASSERT_EQ(!value, BitMapIsAllSet(bm, 0, offset - 1));
+        }
+
+        if ((offset + num_bits) < total_size) {
+          ASSERT_EQ(value, BitmapIsAllZero(bm, num_bits, total_size));
+          ASSERT_EQ(!value, BitMapIsAllSet(bm, num_bits, total_size));
+        }
+      }
+      num_bits--;
+    }
+  }
+}
+
+TEST(TestBitMap, TestFindBit) {
+  uint8_t bm[16];
+
+  size_t num_bits = sizeof(bm) * 8;
+  BitmapChangeBits(bm, 0, num_bits, false);
+  while (num_bits > 0) {
+    for (size_t offset = 0; offset < num_bits; ++offset) {
+      size_t idx;
+      ASSERT_FALSE(BitmapFindFirstSet(bm, offset, num_bits, &idx));
+      ASSERT_TRUE(BitmapFindFirstZero(bm, offset, num_bits, &idx));
+      ASSERT_EQ(idx, offset);
+    }
+    num_bits--;
+  }
+
+  num_bits = sizeof(bm) * 8;
+  for (int i = 0; i < num_bits; ++i) {
+    BitmapChange(bm, i, i & 3);
+  }
+
+  for (; num_bits > 0; num_bits--) {
+    for (size_t offset = 0; offset < num_bits; ++offset) {
+      size_t idx;
+
+      // Find a set bit
+      bool res = BitmapFindFirstSet(bm, offset, num_bits, &idx);
+      size_t expected_set_idx = (offset + !(offset & 3));
+      bool expect_set_found = (expected_set_idx < num_bits);
+      ASSERT_EQ(expect_set_found, res);
+      if (expect_set_found) {
+        ASSERT_EQ(expected_set_idx, idx);
+      }
+
+      // Find a zero bit
+      res = BitmapFindFirstZero(bm, offset, num_bits, &idx);
+      size_t expected_zero_idx = offset + ((offset & 3) ? (4 - (offset & 3)) : 0);
+      bool expect_zero_found = (expected_zero_idx < num_bits);
+      ASSERT_EQ(expect_zero_found, res);
+      if (expect_zero_found) {
+        ASSERT_EQ(expected_zero_idx, idx);
+      }
+    }
+  }
+}
+
+TEST(TestBitMap, TestBitmapIteration) {
+  uint8_t bm[8];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 0);
+  BitmapSet(bm, 8);
+  BitmapSet(bm, 31);
+  BitmapSet(bm, 32);
+  BitmapSet(bm, 33);
+  BitmapSet(bm, 63);
+
+  BitmapIterator biter(bm, sizeof(bm) * 8);
+
+  size_t i = 0;
+  size_t size;
+  bool value = false;
+  bool expected_value = true;
+  size_t expected_sizes[] = {1, 7, 1, 22, 3, 29, 1, 0};
+  while ((size = biter.Next(&value)) > 0) {
+    ASSERT_LT(i, 8);
+    ASSERT_EQ(expected_value, value);
+    ASSERT_EQ(expected_sizes[i], size);
+    expected_value = !expected_value;
+    i++;
+  }
+  ASSERT_EQ(expected_sizes[i], size);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.cc b/be/src/kudu/util/bitmap.cc
new file mode 100644
index 0000000..eed7880
--- /dev/null
+++ b/be/src/kudu/util/bitmap.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/bitmap.h"
+
+#include <cstring>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/stringprintf.h"
+
+namespace kudu {
+
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value) {
+  DCHECK_GT(num_bits, 0);
+
+  size_t start_byte = (offset >> 3);
+  size_t end_byte = (offset + num_bits - 1) >> 3;
+  int single_byte = (start_byte == end_byte);
+
+  // Change the last bits of the first byte
+  size_t left = offset & 0x7;
+  size_t right = (single_byte) ? (left + num_bits) : 8;
+  uint8_t mask = ((0xff << left) & (0xff >> (8 - right)));
+  if (value) {
+    bitmap[start_byte++] |= mask;
+  } else {
+    bitmap[start_byte++] &= ~mask;
+  }
+
+  // Nothing left... I'm done
+  if (single_byte) {
+    return;
+  }
+
+  // change the middle bits
+  if (end_byte > start_byte) {
+    const uint8_t pattern8[2] = { 0x00, 0xff };
+    memset(bitmap + start_byte, pattern8[value], end_byte - start_byte);
+  }
+
+  // change the first bits of the last byte
+  right = offset + num_bits - (end_byte << 3);
+  mask = (0xff >> (8 - right));
+  if (value) {
+    bitmap[end_byte] |= mask;
+  } else {
+    bitmap[end_byte] &= ~mask;
+  }
+}
+
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+                     bool value, size_t *idx) {
+  const uint64_t pattern64[2] = { 0xffffffffffffffff, 0x0000000000000000 };
+  const uint8_t pattern8[2] = { 0xff, 0x00 };
+  size_t bit;
+
+  DCHECK_LE(offset, bitmap_size);
+
+  // Jump to the byte at specified offset
+  const uint8_t *p = bitmap + (offset >> 3);
+  size_t num_bits = bitmap_size - offset;
+
+  // Find a 'value' bit at the end of the first byte
+  if ((bit = offset & 0x7)) {
+    for (; bit < 8 && num_bits > 0; ++bit) {
+      if (BitmapTest(p, bit) == value) {
+        *idx = ((p - bitmap) << 3) + bit;
+        return true;
+      }
+
+      num_bits--;
+    }
+
+    p++;
+  }
+
+  // check 64bit at the time for a 'value' bit
+  const uint64_t *u64 = (const uint64_t *)p;
+  while (num_bits >= 64 && *u64 == pattern64[value]) {
+    num_bits -= 64;
+    u64++;
+  }
+
+  // check 8bit at the time for a 'value' bit
+  p = (const uint8_t *)u64;
+  while (num_bits >= 8 && *p == pattern8[value]) {
+    num_bits -= 8;
+    p++;
+  }
+
+  // Find a 'value' bit at the beginning of the last byte
+  for (bit = 0; num_bits > 0; ++bit) {
+    if (BitmapTest(p, bit) == value) {
+      *idx = ((p - bitmap) << 3) + bit;
+      return true;
+    }
+    num_bits--;
+  }
+
+  return false;
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits) {
+  std::string s;
+  size_t index = 0;
+  while (index < num_bits) {
+    StringAppendF(&s, "%4zu: ", index);
+    for (int i = 0; i < 8 && index < num_bits; ++i) {
+      for (int j = 0; j < 8 && index < num_bits; ++j) {
+        StringAppendF(&s, "%d", BitmapTest(bitmap, index));
+        index++;
+      }
+      StringAppendF(&s, " ");
+    }
+    StringAppendF(&s, "\n");
+  }
+  return s;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.h b/be/src/kudu/util/bitmap.h
new file mode 100644
index 0000000..d9f5260
--- /dev/null
+++ b/be/src/kudu/util/bitmap.h
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for dealing with a byte array as if it were a bitmap.
+#ifndef KUDU_UTIL_BITMAP_H
+#define KUDU_UTIL_BITMAP_H
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// Return the number of bytes necessary to store the given number of bits.
+inline size_t BitmapSize(size_t num_bits) {
+  return (num_bits + 7) / 8;
+}
+
+// Set the given bit.
+inline void BitmapSet(uint8_t *bitmap, size_t idx) {
+  bitmap[idx >> 3] |= 1 << (idx & 7);
+}
+
+// Switch the given bit to the specified value.
+inline void BitmapChange(uint8_t *bitmap, size_t idx, bool value) {
+  bitmap[idx >> 3] = (bitmap[idx >> 3] & ~(1 << (idx & 7))) | ((!!value) << (idx & 7));
+}
+
+// Clear the given bit.
+inline void BitmapClear(uint8_t *bitmap, size_t idx) {
+  bitmap[idx >> 3] &= ~(1 << (idx & 7));
+}
+
+// Test/get the given bit.
+inline bool BitmapTest(const uint8_t *bitmap, size_t idx) {
+  return bitmap[idx >> 3] & (1 << (idx & 7));
+}
+
+// Merge the two bitmaps using bitwise or. Both bitmaps should have at least
+// n_bits valid bits.
+inline void BitmapMergeOr(uint8_t *dst, const uint8_t *src, size_t n_bits) {
+  size_t n_bytes = BitmapSize(n_bits);
+  for (size_t i = 0; i < n_bytes; i++) {
+    *dst++ |= *src++;
+  }
+}
+
+// Set bits from offset to (offset + num_bits) to the specified value
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value);
+
+// Find the first bit of the specified value, starting from the specified offset.
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+                     bool value, size_t *idx);
+
+// Find the first set bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstSet(const uint8_t *bitmap, size_t offset,
+                               size_t bitmap_size, size_t *idx) {
+  return BitmapFindFirst(bitmap, offset, bitmap_size, true, idx);
+}
+
+// Find the first zero bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstZero(const uint8_t *bitmap, size_t offset,
+                                size_t bitmap_size, size_t *idx) {
+  return BitmapFindFirst(bitmap, offset, bitmap_size, false, idx);
+}
+
+// Returns true if the bitmap contains only ones.
+inline bool BitMapIsAllSet(const uint8_t *bitmap, size_t offset, size_t bitmap_size) {
+  DCHECK_LT(offset, bitmap_size);
+  size_t idx;
+  return !BitmapFindFirstZero(bitmap, offset, bitmap_size, &idx);
+}
+
+// Returns true if the bitmap contains only zeros.
+inline bool BitmapIsAllZero(const uint8_t *bitmap, size_t offset, size_t bitmap_size) {
+  DCHECK_LT(offset, bitmap_size);
+  size_t idx;
+  return !BitmapFindFirstSet(bitmap, offset, bitmap_size, &idx);
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits);
+
+// Iterator which yields ranges of set and unset bits.
+// Example usage:
+//   bool value;
+//   size_t size;
+//   BitmapIterator iter(bitmap, n_bits);
+//   while ((size = iter.Next(&value))) {
+//      printf("bitmap block len=%lu value=%d\n", size, value);
+//   }
+class BitmapIterator {
+ public:
+  BitmapIterator(const uint8_t *map, size_t num_bits)
+    : offset_(0), num_bits_(num_bits), map_(map)
+  {}
+
+  bool done() const {
+    return (num_bits_ - offset_) == 0;
+  }
+
+  void SeekTo(size_t bit) {
+    DCHECK_LE(bit, num_bits_);
+    offset_ = bit;
+  }
+
+  size_t Next(bool *value) {
+    size_t len = num_bits_ - offset_;
+    if (PREDICT_FALSE(len == 0))
+      return(0);
+
+    *value = BitmapTest(map_, offset_);
+
+    size_t index;
+    if (BitmapFindFirst(map_, offset_, num_bits_, !(*value), &index)) {
+      len = index - offset_;
+    } else {
+      index = num_bits_;
+    }
+
+    offset_ = index;
+    return len;
+  }
+
+ private:
+  size_t offset_;
+  size_t num_bits_;
+  const uint8_t *map_;
+};
+
+// Iterator which yields the set bits in a bitmap.
+// Example usage:
+//   for (TrueBitIterator iter(bitmap, n_bits);
+//        !iter.done();
+//        ++iter) {
+//     int next_onebit_position = *iter;
+//   }
+class TrueBitIterator {
+ public:
+  TrueBitIterator(const uint8_t *bitmap, size_t n_bits)
+    : bitmap_(bitmap),
+      cur_byte_(0),
+      cur_byte_idx_(0),
+      n_bits_(n_bits),
+      n_bytes_(BitmapSize(n_bits_)),
+      bit_idx_(0) {
+    if (n_bits_ == 0) {
+      cur_byte_idx_ = 1; // sets done
+    } else {
+      cur_byte_ = bitmap[0];
+      AdvanceToNextOneBit();
+    }
+  }
+
+  TrueBitIterator &operator ++() {
+    DCHECK(!done());
+    DCHECK(cur_byte_ & 1);
+    cur_byte_ &= (~1);
+    AdvanceToNextOneBit();
+    return *this;
+  }
+
+  bool done() const {
+    return cur_byte_idx_ >= n_bytes_;
+  }
+
+  size_t operator *() const {
+    DCHECK(!done());
+    return bit_idx_;
+  }
+
+ private:
+  void AdvanceToNextOneBit() {
+    while (cur_byte_ == 0) {
+      cur_byte_idx_++;
+      if (cur_byte_idx_ >= n_bytes_) return;
+      cur_byte_ = bitmap_[cur_byte_idx_];
+      bit_idx_ = cur_byte_idx_ * 8;
+    }
+    DVLOG(2) << "Found next nonzero byte at " << cur_byte_idx_
+             << " val=" << cur_byte_;
+
+    DCHECK_NE(cur_byte_, 0);
+    int set_bit = Bits::FindLSBSetNonZero(cur_byte_);
+    bit_idx_ += set_bit;
+    cur_byte_ >>= set_bit;
+  }
+
+  const uint8_t *bitmap_;
+  uint8_t cur_byte_;
+  uint8_t cur_byte_idx_;
+
+  const size_t n_bits_;
+  const size_t n_bytes_;
+  size_t bit_idx_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue-test.cc b/be/src/kudu/util/blocking_queue-test.cc
new file mode 100644
index 0000000..a2271ff
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <cstdint>
+#include <map>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/blocking_queue.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+BlockingQueue<int32_t> test1_queue(5);
+
+void InsertSomeThings() {
+  ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS);
+  ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS);
+}
+
+TEST(BlockingQueueTest, Test1) {
+  thread inserter_thread(InsertSomeThings);
+  int32_t i;
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(1, i);
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(2, i);
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(3, i);
+  inserter_thread.join();
+}
+
+TEST(BlockingQueueTest, TestBlockingDrainTo) {
+  BlockingQueue<int32_t> test_queue(3);
+  ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
+  vector<int32_t> out;
+  ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30)));
+  ASSERT_EQ(1, out[0]);
+  ASSERT_EQ(2, out[1]);
+  ASSERT_EQ(3, out[2]);
+
+  // Set a deadline in the past and ensure we time out.
+  Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsTimedOut());
+
+  // Ensure that if the queue is shut down, we get Aborted status.
+  test_queue.Shutdown();
+  s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsAborted());
+}
+
+// Test that, when the queue is shut down with elements still pending,
+// Drain still returns OK until the elements are all gone.
+TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
+  // Put some elements into the queue and then shut it down.
+  BlockingQueue<int32_t> q(3);
+  ASSERT_EQ(q.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(q.Put(2), QUEUE_SUCCESS);
+
+  q.Shutdown();
+
+  // Get() should still return an element.
+  int i;
+  ASSERT_TRUE(q.BlockingGet(&i));
+  ASSERT_EQ(1, i);
+
+  // Drain should still return OK, since it yielded elements.
+  vector<int32_t> out;
+  ASSERT_OK(q.BlockingDrainTo(&out));
+  ASSERT_EQ(2, out[0]);
+
+  // Now that it's empty, it should return Aborted.
+  Status s = q.BlockingDrainTo(&out);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+  ASSERT_FALSE(q.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestTooManyInsertions) {
+  BlockingQueue<int32_t> test_queue(2);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_FULL);
+}
+
+namespace {
+
+struct LengthLogicalSize {
+  static size_t logical_size(const string& s) {
+    return s.length();
+  }
+};
+
+} // anonymous namespace
+
+TEST(BlockingQueueTest, TestLogicalSize) {
+  BlockingQueue<string, LengthLogicalSize> test_queue(4);
+  ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
+}
+
+TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
+  BlockingQueue<int32_t> test_queue(1);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  // No DCHECK failure on destruct.
+}
+
+#ifndef NDEBUG
+TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) {
+  ::testing::FLAGS_gtest_death_test_style = "threadsafe";
+  ASSERT_DEATH({
+      BlockingQueue<int32_t*> test_queue(1);
+      int32_t element = 123;
+      ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS);
+      // Debug assertion triggered on queue destruction since type is a pointer.
+    },
+    "BlockingQueue holds bare pointers");
+}
+#endif // NDEBUG
+
+TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
+  BlockingQueue<int64_t> test_queue(2);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  test_queue.Shutdown();
+  ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
+  int64_t i;
+  ASSERT_TRUE(test_queue.BlockingGet(&i));
+  ASSERT_EQ(123, i);
+  ASSERT_FALSE(test_queue.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestGscopedPtrMethods) {
+  BlockingQueue<int*> test_queue(2);
+  gscoped_ptr<int> input_int(new int(123));
+  ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
+  gscoped_ptr<int> output_int;
+  ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+  ASSERT_EQ(123, *output_int.get());
+  test_queue.Shutdown();
+}
+
+class MultiThreadTest {
+ public:
+  MultiThreadTest()
+   :  puts_(4),
+      blocking_puts_(4),
+      nthreads_(5),
+      queue_(nthreads_ * puts_),
+      num_inserters_(nthreads_),
+      sync_latch_(nthreads_) {
+  }
+
+  void InserterThread(int arg) {
+    for (int i = 0; i < puts_; i++) {
+      ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS);
+    }
+    sync_latch_.CountDown();
+    sync_latch_.Wait();
+    for (int i = 0; i < blocking_puts_; i++) {
+      ASSERT_TRUE(queue_.BlockingPut(arg));
+    }
+    MutexLock guard(lock_);
+    if (--num_inserters_ == 0) {
+      queue_.Shutdown();
+    }
+  }
+
+  void RemoverThread() {
+    for (int i = 0; i < puts_ + blocking_puts_; i++) {
+      int32_t arg = 0;
+      bool got = queue_.BlockingGet(&arg);
+      if (!got) {
+        arg = -1;
+      }
+      MutexLock guard(lock_);
+      gotten_[arg] = gotten_[arg] + 1;
+    }
+  }
+
+  void Run() {
+    for (int i = 0; i < nthreads_; i++) {
+      threads_.emplace_back(&MultiThreadTest::InserterThread, this, i);
+      threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+    }
+    // We add an extra thread to ensure that there aren't enough elements in
+    // the queue to go around.  This way, we test removal after Shutdown.
+    threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+    for (auto& thread : threads_) {
+      thread.join();
+    }
+    // Let's check to make sure we got what we should have.
+    MutexLock guard(lock_);
+    for (int i = 0; i < nthreads_; i++) {
+      ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]);
+    }
+    // And there were nthreads_ * (puts_ + blocking_puts_)
+    // elements removed, but only nthreads_ * puts_ +
+    // blocking_puts_ elements added.  So some removers hit the
+    // shutdown case.
+    ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]);
+  }
+
+  int puts_;
+  int blocking_puts_;
+  int nthreads_;
+  BlockingQueue<int32_t> queue_;
+  Mutex lock_;
+  std::map<int32_t, int> gotten_;
+  vector<thread> threads_;
+  int num_inserters_;
+  CountDownLatch sync_latch_;
+};
+
+TEST(BlockingQueueTest, TestMultipleThreads) {
+  MultiThreadTest test;
+  test.Run();
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue.h b/be/src/kudu/util/blocking_queue.h
new file mode 100644
index 0000000..7331c12
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
+#define KUDU_UTIL_BLOCKING_QUEUE_H
+
+#include <list>
+#include <string>
+#include <type_traits>
+#include <unistd.h>
+#include <vector>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Return values for BlockingQueue::Put()
+enum QueueStatus {
+  QUEUE_SUCCESS = 0,
+  QUEUE_SHUTDOWN = 1,
+  QUEUE_FULL = 2
+};
+
+// Default logical length implementation: always returns 1.
+struct DefaultLogicalSize {
+  template<typename T>
+  static size_t logical_size(const T& /* unused */) {
+    return 1;
+  }
+};
+
+template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
+class BlockingQueue {
+ public:
+  // If T is a pointer, this will be the base type.  If T is not a pointer, you
+  // can ignore this and the functions which make use of it.
+  // Template substitution failure is not an error.
+  typedef typename std::remove_pointer<T>::type T_VAL;
+
+  explicit BlockingQueue(size_t max_size)
+    : shutdown_(false),
+      size_(0),
+      max_size_(max_size),
+      not_empty_(&lock_),
+      not_full_(&lock_) {
+  }
+
+  // If the queue holds a bare pointer, it must be empty on destruction, since
+  // it may have ownership of the pointer.
+  ~BlockingQueue() {
+    DCHECK(list_.empty() || !std::is_pointer<T>::value)
+        << "BlockingQueue holds bare pointers at destruction time";
+  }
+
+  // Get an element from the queue.  Returns false if we were shut down prior to
+  // getting the element.
+  bool BlockingGet(T *out) {
+    MutexLock l(lock_);
+    while (true) {
+      if (!list_.empty()) {
+        *out = list_.front();
+        list_.pop_front();
+        decrement_size_unlocked(*out);
+        not_full_.Signal();
+        return true;
+      }
+      if (shutdown_) {
+        return false;
+      }
+      not_empty_.Wait();
+    }
+  }
+
+  // Get an element from the queue.  Returns false if the queue is empty and
+  // we were shut down prior to getting the element.
+  bool BlockingGet(gscoped_ptr<T_VAL> *out) {
+    T t = NULL;
+    bool got_element = BlockingGet(&t);
+    if (!got_element) {
+      return false;
+    }
+    out->reset(t);
+    return true;
+  }
+
+  // Get all elements from the queue and append them to a vector.
+  //
+  // If 'deadline' passes and no elements have been returned from the
+  // queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
+  // no deadline is used.
+  //
+  // If the queue has been shut down, but there are still elements waiting,
+  // then it returns those elements as if the queue were not yet shut down.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
+    MutexLock l(lock_);
+    while (true) {
+      if (!list_.empty()) {
+        out->reserve(list_.size());
+        for (const T& elt : list_) {
+          out->push_back(elt);
+          decrement_size_unlocked(elt);
+        }
+        list_.clear();
+        not_full_.Signal();
+        return Status::OK();
+      }
+      if (PREDICT_FALSE(shutdown_)) {
+        return Status::Aborted("");
+      }
+      if (!deadline.Initialized()) {
+        not_empty_.Wait();
+      } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
+        return Status::TimedOut("");
+      }
+    }
+  }
+
+  // Attempts to put the given value in the queue.
+  // Returns:
+  //   QUEUE_SUCCESS: if successfully inserted
+  //   QUEUE_FULL: if the queue has reached max_size
+  //   QUEUE_SHUTDOWN: if someone has already called Shutdown()
+  QueueStatus Put(const T &val) {
+    MutexLock l(lock_);
+    if (size_ >= max_size_) {
+      return QUEUE_FULL;
+    }
+    if (shutdown_) {
+      return QUEUE_SHUTDOWN;
+    }
+    list_.push_back(val);
+    increment_size_unlocked(val);
+    l.Unlock();
+    not_empty_.Signal();
+    return QUEUE_SUCCESS;
+  }
+
+  // Returns the same as the other Put() overload above.
+  // If the element was inserted, the gscoped_ptr releases its contents.
+  QueueStatus Put(gscoped_ptr<T_VAL> *val) {
+    QueueStatus s = Put(val->get());
+    if (s == QUEUE_SUCCESS) {
+      ignore_result<>(val->release());
+    }
+    return s;
+  }
+
+  // Gets an element for the queue; if the queue is full, blocks until
+  // space becomes available. Returns false if we were shutdown prior
+  // to enqueueing the element.
+  bool BlockingPut(const T& val) {
+    MutexLock l(lock_);
+    while (true) {
+      if (shutdown_) {
+        return false;
+      }
+      if (size_ < max_size_) {
+        list_.push_back(val);
+        increment_size_unlocked(val);
+        l.Unlock();
+        not_empty_.Signal();
+        return true;
+      }
+      not_full_.Wait();
+    }
+  }
+
+  // Same as other BlockingPut() overload above. If the element was
+  // enqueued, gscoped_ptr releases its contents.
+  bool BlockingPut(gscoped_ptr<T_VAL>* val) {
+    bool ret = Put(val->get());
+    if (ret) {
+      ignore_result(val->release());
+    }
+    return ret;
+  }
+
+  // Shut down the queue.
+  // When a blocking queue is shut down, no more elements can be added to it,
+  // and Put() will return QUEUE_SHUTDOWN.
+  // Existing elements will drain out of it, and then BlockingGet will start
+  // returning false.
+  void Shutdown() {
+    MutexLock l(lock_);
+    shutdown_ = true;
+    not_full_.Broadcast();
+    not_empty_.Broadcast();
+  }
+
+  bool empty() const {
+    MutexLock l(lock_);
+    return list_.empty();
+  }
+
+  size_t max_size() const {
+    return max_size_;
+  }
+
+  std::string ToString() const {
+    std::string ret;
+
+    MutexLock l(lock_);
+    for (const T& t : list_) {
+      ret.append(t->ToString());
+      ret.append("\n");
+    }
+    return ret;
+  }
+
+ private:
+
+  // Increments queue size. Must be called when 'lock_' is held.
+  void increment_size_unlocked(const T& t) {
+    size_ += LOGICAL_SIZE::logical_size(t);
+  }
+
+  // Decrements queue size. Must be called when 'lock_' is held.
+  void decrement_size_unlocked(const T& t) {
+    size_ -= LOGICAL_SIZE::logical_size(t);
+  }
+
+  bool shutdown_;
+  size_t size_;
+  size_t max_size_;
+  mutable Mutex lock_;
+  ConditionVariable not_empty_;
+  ConditionVariable not_full_;
+  std::list<T> list_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter-test.cc b/be/src/kudu/util/bloom_filter-test.cc
new file mode 100644
index 0000000..788ec36
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter-test.cc
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/bloom_filter.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+static const int kRandomSeed = 0xdeadbeef;
+
+static void AddRandomKeys(int random_seed, int n_keys, BloomFilterBuilder *bf) {
+  srandom(random_seed);
+  for (int i = 0; i < n_keys; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    bf->AddKey(probe);
+  }
+}
+
+static void CheckRandomKeys(int random_seed, int n_keys, const BloomFilter &bf) {
+  srandom(random_seed);
+  for (int i = 0; i < n_keys; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    ASSERT_TRUE(bf.MayContainKey(probe));
+  }
+}
+
+TEST(TestBloomFilter, TestInsertAndProbe) {
+  int n_keys = 2000;
+  BloomFilterBuilder bfb(
+    BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+
+  // Check that the desired false positive rate is achieved.
+  double expected_fp_rate = bfb.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate, 0.01, 0.002);
+
+  // 1% FP rate should need about 9 bits per key
+  ASSERT_EQ(9, bfb.n_bits() / n_keys);
+
+  // Enter n_keys random keys into the bloom filter
+  AddRandomKeys(kRandomSeed, n_keys, &bfb);
+
+  // Verify that the keys we inserted all return true when queried.
+  BloomFilter bf(bfb.slice(), bfb.n_hashes());
+  CheckRandomKeys(kRandomSeed, n_keys, bf);
+
+  // Query a bunch of other keys, and verify the false positive rate
+  // is within reasonable bounds.
+  uint32_t num_queries = 100000;
+  uint32_t num_positives = 0;
+  for (int i = 0; i < num_queries; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    if (bf.MayContainKey(probe)) {
+      num_positives++;
+    }
+  }
+
+  double fp_rate = static_cast<double>(num_positives) / static_cast<double>(num_queries);
+  LOG(INFO) << "FP rate: " << fp_rate << " (" << num_positives << "/" << num_queries << ")";
+  LOG(INFO) << "Expected FP rate: " << expected_fp_rate;
+
+  // Actual FP rate should be within 20% of the estimated FP rate
+  ASSERT_NEAR(fp_rate, expected_fp_rate, 0.20*expected_fp_rate);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.cc b/be/src/kudu/util/bloom_filter.cc
new file mode 100644
index 0000000..b1a2055
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/bloom_filter.h"
+
+#include <cmath>
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+static double kNaturalLog2 = 0.69314;
+
+static int ComputeOptimalHashCount(size_t n_bits, size_t elems) {
+  int n_hashes = n_bits * kNaturalLog2 / elems;
+  if (n_hashes < 1) n_hashes = 1;
+  return n_hashes;
+}
+
+BloomFilterSizing BloomFilterSizing::ByCountAndFPRate(
+  size_t expected_count, double fp_rate) {
+  CHECK_GT(fp_rate, 0);
+  CHECK_LT(fp_rate, 1);
+
+  double n_bits = -static_cast<double>(expected_count) * log(fp_rate)
+    / kNaturalLog2 / kNaturalLog2;
+  int n_bytes = static_cast<int>(ceil(n_bits / 8));
+  CHECK_GT(n_bytes, 0)
+    << "expected_count: " << expected_count
+    << " fp_rate: " << fp_rate;
+  return BloomFilterSizing(n_bytes, expected_count);
+}
+
+BloomFilterSizing BloomFilterSizing::BySizeAndFPRate(size_t n_bytes, double fp_rate) {
+  size_t n_bits = n_bytes * 8;
+  double expected_elems = -static_cast<double>(n_bits) * kNaturalLog2 * kNaturalLog2 /
+    log(fp_rate);
+  DCHECK_GT(expected_elems, 1);
+  return BloomFilterSizing(n_bytes, (size_t)ceil(expected_elems));
+}
+
+
+BloomFilterBuilder::BloomFilterBuilder(const BloomFilterSizing &sizing)
+  : n_bits_(sizing.n_bytes() * 8),
+    bitmap_(new uint8_t[sizing.n_bytes()]),
+    n_hashes_(ComputeOptimalHashCount(n_bits_, sizing.expected_count())),
+    expected_count_(sizing.expected_count()),
+    n_inserted_(0) {
+  Clear();
+}
+
+void BloomFilterBuilder::Clear() {
+  memset(&bitmap_[0], 0, n_bytes());
+  n_inserted_ = 0;
+}
+
+double BloomFilterBuilder::false_positive_rate() const {
+  CHECK_NE(expected_count_, 0)
+    << "expected_count_ not initialized: can't call this function on "
+    << "a BloomFilter initialized from external data";
+
+  return pow(1 - exp(-static_cast<double>(n_hashes_) * expected_count_ / n_bits_), n_hashes_);
+}
+
+BloomFilter::BloomFilter(const Slice &data, size_t n_hashes)
+  : n_bits_(data.size() * 8),
+    bitmap_(reinterpret_cast<const uint8_t *>(data.data())),
+    n_hashes_(n_hashes)
+{}
+
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.h b/be/src/kudu/util/bloom_filter.h
new file mode 100644
index 0000000..ad4e3eb
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.h
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOOM_FILTER_H
+#define KUDU_UTIL_BLOOM_FILTER_H
+
+#include <cstddef>
+#include <cstdint>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+// Probe calculated from a given key. This caches the calculated
+// hash values which are necessary for probing into a Bloom Filter,
+// so that when many bloom filters have to be consulted for a given
+// key, we only need to calculate the hashes once.
+//
+// This is implemented based on the idea of double-hashing from the following paper:
+//   "Less Hashing, Same Performance: Building a Better Bloom Filter"
+//   Kirsch and Mitzenmacher, ESA 2006
+//   https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
+//
+// Currently, the implementation uses the 64-bit City Hash.
+// TODO: an SSE CRC32 hash is probably ~20% faster. Come back to this
+// at some point.
+class BloomKeyProbe {
+ public:
+  // Default constructor - this is only used to instantiate an object
+  // and later reassign by assignment from another instance
+  BloomKeyProbe() {}
+
+  // Construct a probe from the given key.
+  //
+  // NOTE: proper operation requires that the referenced memory remain
+  // valid for the lifetime of this object.
+  explicit BloomKeyProbe(const Slice &key) : key_(key) {
+    uint64_t h = util_hash::CityHash64(
+      reinterpret_cast<const char *>(key.data()),
+      key.size());
+
+    // Use the top and bottom halves of the 64-bit hash
+    // as the two independent hash functions for mixing.
+    h_1_ = static_cast<uint32_t>(h);
+    h_2_ = static_cast<uint32_t>(h >> 32);
+  }
+
+  const Slice &key() const { return key_; }
+
+  // The initial hash value. See MixHash() for usage example.
+  uint32_t initial_hash() const {
+    return h_1_;
+  }
+
+  // Mix the given hash function with the second calculated hash
+  // value. A sequence of independent hashes can be calculated
+  // by repeatedly calling MixHash() on its previous result.
+  ATTRIBUTE_NO_SANITIZE_INTEGER
+  uint32_t MixHash(uint32_t h) const {
+    return h + h_2_;
+  }
+
+ private:
+  Slice key_;
+
+  // The two hashes.
+  uint32_t h_1_;
+  uint32_t h_2_;
+};
+
+// Sizing parameters for the constructor to BloomFilterBuilder.
+// This is simply to provide a nicer API than a bunch of overloaded
+// constructors.
+class BloomFilterSizing {
+ public:
+  // Size the bloom filter by a fixed size and false positive rate.
+  //
+  // Picks the number of entries to achieve the above.
+  static BloomFilterSizing BySizeAndFPRate(size_t n_bytes, double fp_rate);
+
+  // Size the bloom filer by an expected count and false positive rate.
+  //
+  // Picks the number of bytes to achieve the above.
+  static BloomFilterSizing ByCountAndFPRate(size_t expected_count, double fp_rate);
+
+  size_t n_bytes() const { return n_bytes_; }
+  size_t expected_count() const { return expected_count_; }
+
+ private:
+  BloomFilterSizing(size_t n_bytes, size_t expected_count) :
+    n_bytes_(n_bytes),
+    expected_count_(expected_count)
+  {}
+
+  size_t n_bytes_;
+  size_t expected_count_;
+};
+
+
+// Builder for a BloomFilter structure.
+class BloomFilterBuilder {
+ public:
+  // Create a bloom filter.
+  // See BloomFilterSizing static methods to specify this argument.
+  explicit BloomFilterBuilder(const BloomFilterSizing &sizing);
+
+  // Clear all entries, reset insertion count.
+  void Clear();
+
+  // Add the given key to the bloom filter.
+  void AddKey(const BloomKeyProbe &probe);
+
+  // Return an estimate of the false positive rate.
+  double false_positive_rate() const;
+
+  int n_bytes() const {
+    return n_bits_ / 8;
+  }
+
+  int n_bits() const {
+    return n_bits_;
+  }
+
+  // Return a slice view into this Bloom Filter, suitable for
+  // writing out to a file.
+  const Slice slice() const {
+    return Slice(&bitmap_[0], n_bytes());
+  }
+
+  // Return the number of hashes that are calculated for each entry
+  // in the bloom filter.
+  size_t n_hashes() const { return n_hashes_; }
+
+  size_t expected_count() const { return expected_count_; }
+
+  // Return the number of keys inserted.
+  size_t count() const { return n_inserted_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterBuilder);
+
+  size_t n_bits_;
+  gscoped_array<uint8_t> bitmap_;
+
+  // The number of hash functions to compute.
+  size_t n_hashes_;
+
+  // The expected number of elements, for which the bloom is optimized.
+  size_t expected_count_;
+
+  // The number of elements inserted so far since the last Reset.
+  size_t n_inserted_;
+};
+
+
+// Wrapper around a byte array for reading it as a bloom filter.
+class BloomFilter {
+ public:
+  BloomFilter() : bitmap_(nullptr) {}
+  BloomFilter(const Slice &data, size_t n_hashes);
+
+  // Return true if the filter may contain the given key.
+  bool MayContainKey(const BloomKeyProbe &probe) const;
+
+ private:
+  friend class BloomFilterBuilder;
+  static uint32_t PickBit(uint32_t hash, size_t n_bits);
+
+  size_t n_bits_;
+  const uint8_t *bitmap_;
+
+  size_t n_hashes_;
+};
+
+
+////////////////////////////////////////////////////////////
+// Inline implementations
+////////////////////////////////////////////////////////////
+
+inline uint32_t BloomFilter::PickBit(uint32_t hash, size_t n_bits) {
+  switch (n_bits) {
+    // Fast path for the default bloom filter block size. Bitwise math
+    // is much faster than division.
+    case 4096 * 8:
+      return hash & (n_bits - 1);
+
+    default:
+      return hash % n_bits;
+  }
+}
+
+inline void BloomFilterBuilder::AddKey(const BloomKeyProbe &probe) {
+  uint32_t h = probe.initial_hash();
+  for (size_t i = 0; i < n_hashes_; i++) {
+    uint32_t bitpos = BloomFilter::PickBit(h, n_bits_);
+    BitmapSet(&bitmap_[0], bitpos);
+    h = probe.MixHash(h);
+  }
+  n_inserted_++;
+}
+
+inline bool BloomFilter::MayContainKey(const BloomKeyProbe &probe) const {
+  uint32_t h = probe.initial_hash();
+
+  // Basic unrolling by 2s gives a small benefit here since the two bit positions
+  // can be calculated in parallel -- it's a 50% chance that the first will be
+  // set even if it's a bloom miss, in which case we can parallelize the load.
+  int rem_hashes = n_hashes_;
+  while (rem_hashes >= 2) {
+    uint32_t bitpos1 = PickBit(h, n_bits_);
+    h = probe.MixHash(h);
+    uint32_t bitpos2 = PickBit(h, n_bits_);
+    h = probe.MixHash(h);
+
+    if (!BitmapTest(&bitmap_[0], bitpos1) ||
+        !BitmapTest(&bitmap_[0], bitpos2)) {
+      return false;
+    }
+
+    rem_hashes -= 2;
+  }
+
+  while (rem_hashes) {
+    uint32_t bitpos = PickBit(h, n_bits_);
+    if (!BitmapTest(&bitmap_[0], bitpos)) {
+      return false;
+    }
+    h = probe.MixHash(h);
+    rem_hashes--;
+  }
+  return true;
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/boost_mutex_utils.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/boost_mutex_utils.h b/be/src/kudu/util/boost_mutex_utils.h
new file mode 100644
index 0000000..6f6390b
--- /dev/null
+++ b/be/src/kudu/util/boost_mutex_utils.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_BOOST_MUTEX_UTILS_H
+#define KUDU_BOOST_MUTEX_UTILS_H
+
+
+// Similar to std::lock_guard except that it takes
+// a lock pointer, and checks against nullptr. If the
+// pointer is NULL, does nothing. Otherwise guards
+// with the lock.
+template<class LockType>
+class lock_guard_maybe {
+ public:
+  explicit lock_guard_maybe(LockType *l) :
+    lock_(l) {
+    if (l != nullptr) {
+      l->lock();
+    }
+  }
+
+  ~lock_guard_maybe() {
+    if (lock_ != nullptr) {
+      lock_->unlock();
+    }
+  }
+
+ private:
+  LockType *lock_;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache-bench.cc b/be/src/kudu/util/cache-bench.cc
new file mode 100644
index 0000000..1e705be
--- /dev/null
+++ b/be/src/kudu/util/cache-bench.cc
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string.h>
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
+DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
+
+using std::atomic;
+using std::pair;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+// Benchmark a 1GB cache.
+static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
+// Use 4kb entries.
+static constexpr int kEntrySize = 4 * 1024;
+
+// Test parameterization.
+struct BenchSetup {
+  enum class Pattern {
+    // Zipfian distribution -- a small number of items make up the
+    // vast majority of lookups.
+    ZIPFIAN,
+    // Every item is equally likely to be looked up.
+    UNIFORM
+  };
+  Pattern pattern;
+
+  // The ratio between the size of the dataset and the cache.
+  //
+  // A value smaller than 1 will ensure that the whole dataset fits
+  // in the cache.
+  double dataset_cache_ratio;
+
+  string ToString() const {
+    string ret;
+    switch (pattern) {
+      case Pattern::ZIPFIAN: ret += "ZIPFIAN"; break;
+      case Pattern::UNIFORM: ret += "UNIFORM"; break;
+    }
+    ret += StringPrintf(" ratio=%.2fx n_unique=%d", dataset_cache_ratio, max_key());
+    return ret;
+  }
+
+  // Return the maximum cache key to be generated for a lookup.
+  uint32_t max_key() const {
+    return static_cast<int64_t>(kCacheCapacity * dataset_cache_ratio) / kEntrySize;
+  }
+};
+
+class CacheBench : public KuduTest,
+                   public testing::WithParamInterface<BenchSetup>{
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    cache_.reset(NewLRUCache(DRAM_CACHE, kCacheCapacity, "test-cache"));
+  }
+
+  // Run queries against the cache until '*done' becomes true.
+  // Returns a pair of the number of cache hits and lookups.
+  pair<int64_t, int64_t> DoQueries(const atomic<bool>* done) {
+    const BenchSetup& setup = GetParam();
+    Random r(GetRandomSeed32());
+    int64_t lookups = 0;
+    int64_t hits = 0;
+    while (!*done) {
+      uint32_t int_key;
+      if (setup.pattern == BenchSetup::Pattern::ZIPFIAN) {
+        int_key = r.Skewed(Bits::Log2Floor(setup.max_key()));
+      } else {
+        int_key = r.Uniform(setup.max_key());
+      }
+      char key_buf[sizeof(int_key)];
+      memcpy(key_buf, &int_key, sizeof(int_key));
+      Slice key_slice(key_buf, arraysize(key_buf));
+      Cache::Handle* h = cache_->Lookup(key_slice, Cache::EXPECT_IN_CACHE);
+      if (h) {
+        hits++;
+      } else {
+        Cache::PendingHandle* ph = cache_->Allocate(
+            key_slice, /* val_len=*/kEntrySize, /* charge=*/kEntrySize);
+        h = cache_->Insert(ph, nullptr);
+      }
+
+      cache_->Release(h);
+      lookups++;
+    }
+    return {hits, lookups};
+  }
+
+  // Starts the given number of threads to concurrently call DoQueries.
+  // Returns the aggregated number of cache hits and lookups.
+  pair<int64_t, int64_t> RunQueryThreads(int n_threads, int n_seconds) {
+    vector<thread> threads(n_threads);
+    atomic<bool> done(false);
+    atomic<int64_t> total_lookups(0);
+    atomic<int64_t> total_hits(0);
+    for (int i = 0; i < n_threads; i++) {
+      threads[i] = thread([&]() {
+          pair<int64_t, int64_t> hits_lookups = DoQueries(&done);
+          total_hits += hits_lookups.first;
+          total_lookups += hits_lookups.second;
+        });
+    }
+    SleepFor(MonoDelta::FromSeconds(n_seconds));
+    done = true;
+    for (auto& t : threads) {
+      t.join();
+    }
+    return {total_hits, total_lookups};
+  }
+
+ protected:
+  unique_ptr<Cache> cache_;
+};
+
+// Test both distributions, and for each, test both the case where the data
+// fits in the cache and where it is a bit larger.
+INSTANTIATE_TEST_CASE_P(Patterns, CacheBench, testing::ValuesIn(std::vector<BenchSetup>{
+      {BenchSetup::Pattern::ZIPFIAN, 1.0},
+      {BenchSetup::Pattern::ZIPFIAN, 3.0},
+      {BenchSetup::Pattern::UNIFORM, 1.0},
+      {BenchSetup::Pattern::UNIFORM, 3.0}
+    }));
+
+TEST_P(CacheBench, RunBench) {
+  const BenchSetup& setup = GetParam();
+
+  // Run a short warmup phase to try to populate the cache. Otherwise even if the
+  // dataset is smaller than the cache capacity, we would count a bunch of misses
+  // during the warm-up phase.
+  LOG(INFO) << "Warming up...";
+  RunQueryThreads(FLAGS_num_threads, 1);
+
+  LOG(INFO) << "Running benchmark...";
+  pair<int64_t, int64_t> hits_lookups = RunQueryThreads(FLAGS_num_threads, FLAGS_run_seconds);
+  int64_t hits = hits_lookups.first;
+  int64_t lookups = hits_lookups.second;
+
+  int64_t l_per_sec = lookups / FLAGS_run_seconds;
+  double hit_rate = static_cast<double>(hits) / lookups;
+  string test_case = setup.ToString();
+  LOG(INFO) << test_case << ": " << HumanReadableNum::ToString(l_per_sec) << " lookups/sec";
+  LOG(INFO) << test_case << ": " << StringPrintf("%.1f", hit_rate * 100.0) << "% hit rate";
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache-test.cc b/be/src/kudu/util/cache-test.cc
new file mode 100644
index 0000000..3fd1d5f
--- /dev/null
+++ b/be/src/kudu/util/cache-test.cc
@@ -0,0 +1,246 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+#if defined(__linux__)
+DECLARE_string(nvm_cache_path);
+#endif // defined(__linux__)
+
+DECLARE_double(cache_memtracker_approximation_ratio);
+
+namespace kudu {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+  faststring result;
+  PutFixed32(&result, k);
+  return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+  assert(k.size() == 4);
+  return DecodeFixed32(k.data());
+}
+
+class CacheTest : public KuduTest,
+                  public ::testing::WithParamInterface<CacheType>,
+                  public Cache::EvictionCallback {
+ public:
+
+  // Implementation of the EvictionCallback interface
+  void EvictedEntry(Slice key, Slice val) override {
+    evicted_keys_.push_back(DecodeInt(key));
+    evicted_values_.push_back(DecodeInt(val));
+  }
+  std::vector<int> evicted_keys_;
+  std::vector<int> evicted_values_;
+  std::shared_ptr<MemTracker> mem_tracker_;
+  gscoped_ptr<Cache> cache_;
+  MetricRegistry metric_registry_;
+
+  static const int kCacheSize = 14*1024*1024;
+
+  virtual void SetUp() OVERRIDE {
+
+#if defined(HAVE_LIB_VMEM)
+    if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
+      FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
+      ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
+    }
+#endif // defined(HAVE_LIB_VMEM)
+
+    // Disable approximate tracking of cache memory since we make specific
+    // assertions on the MemTracker in this test.
+    FLAGS_cache_memtracker_approximation_ratio = 0;
+
+    cache_.reset(NewLRUCache(GetParam(), kCacheSize, "cache_test"));
+
+    MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+    // Since nvm cache does not have memtracker due to the use of
+    // tcmalloc for this we only check for it in the DRAM case.
+    if (GetParam() == DRAM_CACHE) {
+      ASSERT_TRUE(mem_tracker_.get());
+    }
+
+    scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
+        &metric_registry_, "test");
+    cache_->SetMetrics(entity);
+  }
+
+  int Lookup(int key) {
+    Cache::Handle* handle = cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE);
+    const int r = (handle == nullptr) ? -1 : DecodeInt(cache_->Value(handle));
+    if (handle != nullptr) {
+      cache_->Release(handle);
+    }
+    return r;
+  }
+
+  void Insert(int key, int value, int charge = 1) {
+    std::string key_str = EncodeInt(key);
+    std::string val_str = EncodeInt(value);
+    Cache::PendingHandle* handle = CHECK_NOTNULL(cache_->Allocate(key_str, val_str.size(), charge));
+    memcpy(cache_->MutableValue(handle), val_str.data(), val_str.size());
+
+    cache_->Release(cache_->Insert(handle, this));
+  }
+
+  void Erase(int key) {
+    cache_->Erase(EncodeInt(key));
+  }
+};
+
+#if defined(__linux__)
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE, NVM_CACHE));
+#else
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE));
+#endif // defined(__linux__)
+
+TEST_P(CacheTest, TrackMemory) {
+  if (mem_tracker_) {
+    Insert(100, 100, 1);
+    ASSERT_EQ(1, mem_tracker_->consumption());
+    Erase(100);
+    ASSERT_EQ(0, mem_tracker_->consumption());
+    ASSERT_EQ(1, mem_tracker_->peak_consumption());
+  }
+}
+
+TEST_P(CacheTest, HitAndMiss) {
+  ASSERT_EQ(-1, Lookup(100));
+
+  Insert(100, 101);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(-1,  Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(200, 201);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(100, 102);
+  ASSERT_EQ(102, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+}
+
+TEST_P(CacheTest, Erase) {
+  Erase(200);
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  Insert(100, 101);
+  Insert(200, 201);
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, EntriesArePinned) {
+  Insert(100, 101);
+  Cache::Handle* h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
+
+  Insert(100, 102);
+  Cache::Handle* h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(102, DecodeInt(cache_->Value(h2)));
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  cache_->Release(h1);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_EQ(1, evicted_keys_.size());
+
+  cache_->Release(h2);
+  ASSERT_EQ(2, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[1]);
+  ASSERT_EQ(102, evicted_values_[1]);
+}
+
+TEST_P(CacheTest, EvictionPolicy) {
+  Insert(100, 101);
+  Insert(200, 201);
+
+  const int kNumElems = 1000;
+  const int kSizePerElem = kCacheSize / kNumElems;
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 101. This
+  // frequently-used entry should not be evicted.
+  for (int i = 0; i < kNumElems + 1000; i++) {
+    Insert(1000+i, 2000+i, kSizePerElem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+  }
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' wasn't accessed in the loop above, it should have
+  // been evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
+TEST_P(CacheTest, HeavyEntries) {
+  // Add a bunch of light and heavy entries and then count the combined
+  // size of items still in the cache, which must be approximately the
+  // same as the total capacity.
+  const int kLight = kCacheSize/1000;
+  const int kHeavy = kCacheSize/100;
+  int added = 0;
+  int index = 0;
+  while (added < 2*kCacheSize) {
+    const int weight = (index & 1) ? kLight : kHeavy;
+    Insert(index, 1000+index, weight);
+    added += weight;
+    index++;
+  }
+
+  int cached_weight = 0;
+  for (int i = 0; i < index; i++) {
+    const int weight = (i & 1 ? kLight : kHeavy);
+    int r = Lookup(i);
+    if (r >= 0) {
+      cached_weight += weight;
+      ASSERT_EQ(1000+i, r);
+    }
+  }
+  ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
+}
+
+}  // namespace kudu