You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:41 UTC
[29/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-stream-utils.inline.h b/be/src/kudu/util/bit-stream-utils.inline.h
new file mode 100644
index 0000000..d168bda
--- /dev/null
+++ b/be/src/kudu/util/bit-stream-utils.inline.h
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
+#define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
+
+#include <algorithm>
+
+#include "glog/logging.h"
+#include "kudu/util/bit-stream-utils.h"
+#include "kudu/util/alignment.h"
+
+namespace kudu {
+
+inline void BitWriter::PutValue(uint64_t v, int num_bits) {
+ DCHECK_LE(num_bits, 64);
+ // Truncate the higher-order bits. This is necessary to
+ // support signed values.
+ v &= ~0ULL >> (64 - num_bits);
+
+
+ buffered_values_ |= v << bit_offset_;
+ bit_offset_ += num_bits;
+
+ if (PREDICT_FALSE(bit_offset_ >= 64)) {
+ // Flush buffered_values_ and write out bits of v that did not fit
+ buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + 8, 8));
+ buffer_->resize(byte_offset_ + 8);
+ DCHECK_LE(byte_offset_ + 8, buffer_->capacity());
+ memcpy(buffer_->data() + byte_offset_, &buffered_values_, 8);
+ buffered_values_ = 0;
+ byte_offset_ += 8;
+ bit_offset_ -= 64;
+ buffered_values_ = BitUtil::ShiftRightZeroOnOverflow(v, (num_bits - bit_offset_));
+ }
+ DCHECK_LT(bit_offset_, 64);
+}
+
+inline void BitWriter::Flush(bool align) {
+ int num_bytes = BitUtil::Ceil(bit_offset_, 8);
+ buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8));
+ buffer_->resize(byte_offset_ + num_bytes);
+ DCHECK_LE(byte_offset_ + num_bytes, buffer_->capacity());
+ memcpy(buffer_->data() + byte_offset_, &buffered_values_, num_bytes);
+
+ if (align) {
+ buffered_values_ = 0;
+ byte_offset_ += num_bytes;
+ bit_offset_ = 0;
+ }
+}
+
+inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) {
+ Flush(/* align */ true);
+ buffer_->reserve(KUDU_ALIGN_UP(byte_offset_ + num_bytes, 8));
+ buffer_->resize(byte_offset_ + num_bytes);
+ uint8_t* ptr = buffer_->data() + byte_offset_;
+ byte_offset_ += num_bytes;
+ DCHECK_LE(byte_offset_, buffer_->capacity());
+ return ptr;
+}
+
+template<typename T>
+inline void BitWriter::PutAligned(T val, int num_bytes) {
+ DCHECK_LE(num_bytes, sizeof(T));
+ uint8_t* ptr = GetNextBytePtr(num_bytes);
+ memcpy(ptr, &val, num_bytes);
+}
+
+inline void BitWriter::PutVlqInt(int32_t v) {
+ while ((v & 0xFFFFFF80) != 0L) {
+ PutAligned<uint8_t>((v & 0x7F) | 0x80, 1);
+ v >>= 7;
+ }
+ PutAligned<uint8_t>(v & 0x7F, 1);
+}
+
+
+inline BitReader::BitReader(const uint8_t* buffer, int buffer_len)
+ : buffer_(buffer),
+ max_bytes_(buffer_len),
+ buffered_values_(0),
+ byte_offset_(0),
+ bit_offset_(0) {
+ int num_bytes = std::min(8, max_bytes_);
+ memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
+}
+
+inline void BitReader::BufferValues() {
+ int bytes_remaining = max_bytes_ - byte_offset_;
+ if (PREDICT_TRUE(bytes_remaining >= 8)) {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+ } else {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+ }
+}
+
+template<typename T>
+inline bool BitReader::GetValue(int num_bits, T* v) {
+ DCHECK_LE(num_bits, 64);
+ DCHECK_LE(num_bits, sizeof(T) * 8);
+
+ if (PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
+
+ *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
+
+ bit_offset_ += num_bits;
+ if (bit_offset_ >= 64) {
+ byte_offset_ += 8;
+ bit_offset_ -= 64;
+ BufferValues();
+ // Read bits of v that crossed into new buffered_values_
+ *v |= BitUtil::ShiftLeftZeroOnOverflow(
+ BitUtil::TrailingBits(buffered_values_, bit_offset_),
+ (num_bits - bit_offset_));
+ }
+ DCHECK_LE(bit_offset_, 64);
+ return true;
+}
+
+inline void BitReader::Rewind(int num_bits) {
+ bit_offset_ -= num_bits;
+ if (bit_offset_ >= 0) {
+ return;
+ }
+ while (bit_offset_ < 0) {
+ int seek_back = std::min(byte_offset_, 8);
+ byte_offset_ -= seek_back;
+ bit_offset_ += seek_back * 8;
+ }
+ // This should only be executed *if* rewinding by 'num_bits'
+ // make the existing buffered_values_ invalid
+ DCHECK_GE(byte_offset_, 0); // Check for underflow
+ memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+}
+
+inline void BitReader::SeekToBit(uint stream_position) {
+ DCHECK_LE(stream_position, max_bytes_ * 8);
+
+ int delta = static_cast<int>(stream_position) - position();
+ if (delta == 0) {
+ return;
+ } else if (delta < 0) {
+ Rewind(position() - stream_position);
+ } else {
+ bit_offset_ += delta;
+ while (bit_offset_ >= 64) {
+ byte_offset_ +=8;
+ bit_offset_ -= 64;
+ if (bit_offset_ < 64) {
+ // This should only be executed if seeking to
+ // 'stream_position' makes the existing buffered_values_
+ // invalid.
+ BufferValues();
+ }
+ }
+ }
+}
+
+template<typename T>
+inline bool BitReader::GetAligned(int num_bytes, T* v) {
+ DCHECK_LE(num_bytes, sizeof(T));
+ int bytes_read = BitUtil::Ceil(bit_offset_, 8);
+ if (PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
+
+ // Advance byte_offset to next unread byte and read num_bytes
+ byte_offset_ += bytes_read;
+ memcpy(v, buffer_ + byte_offset_, num_bytes);
+ byte_offset_ += num_bytes;
+
+ // Reset buffered_values_
+ bit_offset_ = 0;
+ int bytes_remaining = max_bytes_ - byte_offset_;
+ if (PREDICT_TRUE(bytes_remaining >= 8)) {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+ } else {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+ }
+ return true;
+}
+
+inline bool BitReader::GetVlqInt(int32_t* v) {
+ *v = 0;
+ int shift = 0;
+ int num_bytes = 0;
+ uint8_t byte = 0;
+ do {
+ if (!GetAligned<uint8_t>(1, &byte)) return false;
+ *v |= (byte & 0x7F) << shift;
+ shift += 7;
+ DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
+ } while ((byte & 0x80) != 0);
+ return true;
+}
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util-test.cc b/be/src/kudu/util/bit-util-test.cc
new file mode 100644
index 0000000..0d8eab4
--- /dev/null
+++ b/be/src/kudu/util/bit-util-test.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <boost/utility/binary.hpp>
+#include <gtest/gtest.h>
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+TEST(BitUtil, TrailingBits) {
+ EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 0), 0);
+ EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 1), 1);
+ EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 64),
+ BOOST_BINARY(1 1 1 1 1 1 1 1));
+ EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 100),
+ BOOST_BINARY(1 1 1 1 1 1 1 1));
+ EXPECT_EQ(BitUtil::TrailingBits(0, 1), 0);
+ EXPECT_EQ(BitUtil::TrailingBits(0, 64), 0);
+ EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 0), 0);
+ EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 63), 0);
+ EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 64), 1LL << 63);
+
+}
+
+TEST(BitUtil, ShiftBits) {
+ EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(1ULL, 64), 0ULL);
+ EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0xFFFFFFFF00000000ULL);
+ EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(1ULL, 64), 0ULL);
+ EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0x00000000FFFFFFFFULL);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util.h b/be/src/kudu/util/bit-util.h
new file mode 100644
index 0000000..5f36887
--- /dev/null
+++ b/be/src/kudu/util/bit-util.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_BIT_UTIL_H
+#define IMPALA_BIT_UTIL_H
+
+#include <stdint.h>
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// Utility class to do standard bit tricks
+// TODO: is this in boost or something else like that?
+class BitUtil {
+ public:
+ // Returns the ceil of value/divisor
+ static inline int Ceil(int value, int divisor) {
+ return value / divisor + (value % divisor != 0);
+ }
+
+ // Returns the 'num_bits' least-significant bits of 'v'.
+ static inline uint64_t TrailingBits(uint64_t v, int num_bits) {
+ if (PREDICT_FALSE(num_bits == 0)) return 0;
+ if (PREDICT_FALSE(num_bits >= 64)) return v;
+ int n = 64 - num_bits;
+ return (v << n) >> n;
+ }
+
+ static inline uint64_t ShiftLeftZeroOnOverflow(uint64_t v, int num_bits) {
+ if (PREDICT_FALSE(num_bits >= 64)) return 0;
+ return v << num_bits;
+ }
+
+ static inline uint64_t ShiftRightZeroOnOverflow(uint64_t v, int num_bits) {
+ if (PREDICT_FALSE(num_bits >= 64)) return 0;
+ return v >> num_bits;
+ }
+
+
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap-test.cc b/be/src/kudu/util/bitmap-test.cc
new file mode 100644
index 0000000..089ed3b
--- /dev/null
+++ b/be/src/kudu/util/bitmap-test.cc
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/util/bitmap.h"
+
+namespace kudu {
+
+static int ReadBackBitmap(uint8_t *bm, size_t bits,
+ std::vector<size_t> *result) {
+ int iters = 0;
+ for (TrueBitIterator iter(bm, bits);
+ !iter.done();
+ ++iter) {
+ size_t val = *iter;
+ result->push_back(val);
+
+ iters++;
+ }
+ return iters;
+}
+
+TEST(TestBitMap, TestIteration) {
+ uint8_t bm[8];
+ memset(bm, 0, sizeof(bm));
+ BitmapSet(bm, 0);
+ BitmapSet(bm, 8);
+ BitmapSet(bm, 31);
+ BitmapSet(bm, 32);
+ BitmapSet(bm, 33);
+ BitmapSet(bm, 63);
+
+ EXPECT_EQ(" 0: 10000000 10000000 00000000 00000001 11000000 00000000 00000000 00000001 \n",
+ BitmapToString(bm, sizeof(bm) * 8));
+
+ std::vector<size_t> read_back;
+
+ int iters = ReadBackBitmap(bm, sizeof(bm)*8, &read_back);
+ ASSERT_EQ(6, iters);
+ ASSERT_EQ("0,8,31,32,33,63", JoinElements(read_back, ","));
+}
+
+
+TEST(TestBitMap, TestIteration2) {
+ uint8_t bm[1];
+ memset(bm, 0, sizeof(bm));
+ BitmapSet(bm, 1);
+
+ std::vector<size_t> read_back;
+
+ int iters = ReadBackBitmap(bm, 3, &read_back);
+ ASSERT_EQ(1, iters);
+ ASSERT_EQ("1", JoinElements(read_back, ","));
+}
+
+TEST(TestBitmap, TestSetAndTestBits) {
+ uint8_t bm[1];
+ memset(bm, 0, sizeof(bm));
+
+ size_t num_bits = sizeof(bm) * 8;
+ for (size_t i = 0; i < num_bits; i++) {
+ ASSERT_FALSE(BitmapTest(bm, i));
+
+ BitmapSet(bm, i);
+ ASSERT_TRUE(BitmapTest(bm, i));
+
+ BitmapClear(bm, i);
+ ASSERT_FALSE(BitmapTest(bm, i));
+
+ BitmapChange(bm, i, true);
+ ASSERT_TRUE(BitmapTest(bm, i));
+
+ BitmapChange(bm, i, false);
+ ASSERT_FALSE(BitmapTest(bm, i));
+ }
+
+ // Set the other bit: 01010101
+ for (size_t i = 0; i < num_bits; ++i) {
+ ASSERT_FALSE(BitmapTest(bm, i));
+ if (i & 1) BitmapSet(bm, i);
+ }
+
+ // Check and Clear the other bit: 0000000
+ for (size_t i = 0; i < num_bits; ++i) {
+ ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+ if (i & 1) BitmapClear(bm, i);
+ }
+
+ // Check if bits are zero and change the other to one
+ for (size_t i = 0; i < num_bits; ++i) {
+ ASSERT_FALSE(BitmapTest(bm, i));
+ BitmapChange(bm, i, i & 1);
+ }
+
+ // Check the bits change them again
+ for (size_t i = 0; i < num_bits; ++i) {
+ ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+ BitmapChange(bm, i, !(i & 1));
+ }
+
+ // Check the last setup
+ for (size_t i = 0; i < num_bits; ++i) {
+ ASSERT_EQ(!(i & 1), BitmapTest(bm, i));
+ }
+}
+
+TEST(TestBitMap, TestBulkSetAndTestBits) {
+ uint8_t bm[16];
+ size_t total_size = sizeof(bm) * 8;
+
+ // Test Bulk change bits and test bits
+ for (int i = 0; i < 4; ++i) {
+ bool value = i & 1;
+ size_t num_bits = total_size;
+ while (num_bits > 0) {
+ for (size_t offset = 0; offset < num_bits; ++offset) {
+ BitmapChangeBits(bm, 0, total_size, !value);
+ BitmapChangeBits(bm, offset, num_bits - offset, value);
+
+ ASSERT_EQ(value, BitMapIsAllSet(bm, offset, num_bits));
+ ASSERT_EQ(!value, BitmapIsAllZero(bm, offset, num_bits));
+
+ if (offset > 1) {
+ ASSERT_EQ(value, BitmapIsAllZero(bm, 0, offset - 1));
+ ASSERT_EQ(!value, BitMapIsAllSet(bm, 0, offset - 1));
+ }
+
+ if ((offset + num_bits) < total_size) {
+ ASSERT_EQ(value, BitmapIsAllZero(bm, num_bits, total_size));
+ ASSERT_EQ(!value, BitMapIsAllSet(bm, num_bits, total_size));
+ }
+ }
+ num_bits--;
+ }
+ }
+}
+
+TEST(TestBitMap, TestFindBit) {
+ uint8_t bm[16];
+
+ size_t num_bits = sizeof(bm) * 8;
+ BitmapChangeBits(bm, 0, num_bits, false);
+ while (num_bits > 0) {
+ for (size_t offset = 0; offset < num_bits; ++offset) {
+ size_t idx;
+ ASSERT_FALSE(BitmapFindFirstSet(bm, offset, num_bits, &idx));
+ ASSERT_TRUE(BitmapFindFirstZero(bm, offset, num_bits, &idx));
+ ASSERT_EQ(idx, offset);
+ }
+ num_bits--;
+ }
+
+ num_bits = sizeof(bm) * 8;
+ for (int i = 0; i < num_bits; ++i) {
+ BitmapChange(bm, i, i & 3);
+ }
+
+ for (; num_bits > 0; num_bits--) {
+ for (size_t offset = 0; offset < num_bits; ++offset) {
+ size_t idx;
+
+ // Find a set bit
+ bool res = BitmapFindFirstSet(bm, offset, num_bits, &idx);
+ size_t expected_set_idx = (offset + !(offset & 3));
+ bool expect_set_found = (expected_set_idx < num_bits);
+ ASSERT_EQ(expect_set_found, res);
+ if (expect_set_found) {
+ ASSERT_EQ(expected_set_idx, idx);
+ }
+
+ // Find a zero bit
+ res = BitmapFindFirstZero(bm, offset, num_bits, &idx);
+ size_t expected_zero_idx = offset + ((offset & 3) ? (4 - (offset & 3)) : 0);
+ bool expect_zero_found = (expected_zero_idx < num_bits);
+ ASSERT_EQ(expect_zero_found, res);
+ if (expect_zero_found) {
+ ASSERT_EQ(expected_zero_idx, idx);
+ }
+ }
+ }
+}
+
+TEST(TestBitMap, TestBitmapIteration) {
+ uint8_t bm[8];
+ memset(bm, 0, sizeof(bm));
+ BitmapSet(bm, 0);
+ BitmapSet(bm, 8);
+ BitmapSet(bm, 31);
+ BitmapSet(bm, 32);
+ BitmapSet(bm, 33);
+ BitmapSet(bm, 63);
+
+ BitmapIterator biter(bm, sizeof(bm) * 8);
+
+ size_t i = 0;
+ size_t size;
+ bool value = false;
+ bool expected_value = true;
+ size_t expected_sizes[] = {1, 7, 1, 22, 3, 29, 1, 0};
+ while ((size = biter.Next(&value)) > 0) {
+ ASSERT_LT(i, 8);
+ ASSERT_EQ(expected_value, value);
+ ASSERT_EQ(expected_sizes[i], size);
+ expected_value = !expected_value;
+ i++;
+ }
+ ASSERT_EQ(expected_sizes[i], size);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.cc b/be/src/kudu/util/bitmap.cc
new file mode 100644
index 0000000..eed7880
--- /dev/null
+++ b/be/src/kudu/util/bitmap.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/bitmap.h"
+
+#include <cstring>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/stringprintf.h"
+
+namespace kudu {
+
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value) {
+ DCHECK_GT(num_bits, 0);
+
+ size_t start_byte = (offset >> 3);
+ size_t end_byte = (offset + num_bits - 1) >> 3;
+ int single_byte = (start_byte == end_byte);
+
+ // Change the last bits of the first byte
+ size_t left = offset & 0x7;
+ size_t right = (single_byte) ? (left + num_bits) : 8;
+ uint8_t mask = ((0xff << left) & (0xff >> (8 - right)));
+ if (value) {
+ bitmap[start_byte++] |= mask;
+ } else {
+ bitmap[start_byte++] &= ~mask;
+ }
+
+ // Nothing left... I'm done
+ if (single_byte) {
+ return;
+ }
+
+ // change the middle bits
+ if (end_byte > start_byte) {
+ const uint8_t pattern8[2] = { 0x00, 0xff };
+ memset(bitmap + start_byte, pattern8[value], end_byte - start_byte);
+ }
+
+ // change the first bits of the last byte
+ right = offset + num_bits - (end_byte << 3);
+ mask = (0xff >> (8 - right));
+ if (value) {
+ bitmap[end_byte] |= mask;
+ } else {
+ bitmap[end_byte] &= ~mask;
+ }
+}
+
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+ bool value, size_t *idx) {
+ const uint64_t pattern64[2] = { 0xffffffffffffffff, 0x0000000000000000 };
+ const uint8_t pattern8[2] = { 0xff, 0x00 };
+ size_t bit;
+
+ DCHECK_LE(offset, bitmap_size);
+
+ // Jump to the byte at specified offset
+ const uint8_t *p = bitmap + (offset >> 3);
+ size_t num_bits = bitmap_size - offset;
+
+ // Find a 'value' bit at the end of the first byte
+ if ((bit = offset & 0x7)) {
+ for (; bit < 8 && num_bits > 0; ++bit) {
+ if (BitmapTest(p, bit) == value) {
+ *idx = ((p - bitmap) << 3) + bit;
+ return true;
+ }
+
+ num_bits--;
+ }
+
+ p++;
+ }
+
+ // check 64bit at the time for a 'value' bit
+ const uint64_t *u64 = (const uint64_t *)p;
+ while (num_bits >= 64 && *u64 == pattern64[value]) {
+ num_bits -= 64;
+ u64++;
+ }
+
+ // check 8bit at the time for a 'value' bit
+ p = (const uint8_t *)u64;
+ while (num_bits >= 8 && *p == pattern8[value]) {
+ num_bits -= 8;
+ p++;
+ }
+
+ // Find a 'value' bit at the beginning of the last byte
+ for (bit = 0; num_bits > 0; ++bit) {
+ if (BitmapTest(p, bit) == value) {
+ *idx = ((p - bitmap) << 3) + bit;
+ return true;
+ }
+ num_bits--;
+ }
+
+ return false;
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits) {
+ std::string s;
+ size_t index = 0;
+ while (index < num_bits) {
+ StringAppendF(&s, "%4zu: ", index);
+ for (int i = 0; i < 8 && index < num_bits; ++i) {
+ for (int j = 0; j < 8 && index < num_bits; ++j) {
+ StringAppendF(&s, "%d", BitmapTest(bitmap, index));
+ index++;
+ }
+ StringAppendF(&s, " ");
+ }
+ StringAppendF(&s, "\n");
+ }
+ return s;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bitmap.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.h b/be/src/kudu/util/bitmap.h
new file mode 100644
index 0000000..d9f5260
--- /dev/null
+++ b/be/src/kudu/util/bitmap.h
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for dealing with a byte array as if it were a bitmap.
+#ifndef KUDU_UTIL_BITMAP_H
+#define KUDU_UTIL_BITMAP_H
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// Return the number of bytes necessary to store the given number of bits.
+inline size_t BitmapSize(size_t num_bits) {
+ return (num_bits + 7) / 8;
+}
+
+// Set the given bit.
+inline void BitmapSet(uint8_t *bitmap, size_t idx) {
+ bitmap[idx >> 3] |= 1 << (idx & 7);
+}
+
+// Switch the given bit to the specified value.
+inline void BitmapChange(uint8_t *bitmap, size_t idx, bool value) {
+ bitmap[idx >> 3] = (bitmap[idx >> 3] & ~(1 << (idx & 7))) | ((!!value) << (idx & 7));
+}
+
+// Clear the given bit.
+inline void BitmapClear(uint8_t *bitmap, size_t idx) {
+ bitmap[idx >> 3] &= ~(1 << (idx & 7));
+}
+
+// Test/get the given bit.
+inline bool BitmapTest(const uint8_t *bitmap, size_t idx) {
+ return bitmap[idx >> 3] & (1 << (idx & 7));
+}
+
+// Merge the two bitmaps using bitwise or. Both bitmaps should have at least
+// n_bits valid bits.
+inline void BitmapMergeOr(uint8_t *dst, const uint8_t *src, size_t n_bits) {
+ size_t n_bytes = BitmapSize(n_bits);
+ for (size_t i = 0; i < n_bytes; i++) {
+ *dst++ |= *src++;
+ }
+}
+
+// Set bits from offset to (offset + num_bits) to the specified value
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value);
+
+// Find the first bit of the specified value, starting from the specified offset.
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+ bool value, size_t *idx);
+
+// Find the first set bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstSet(const uint8_t *bitmap, size_t offset,
+ size_t bitmap_size, size_t *idx) {
+ return BitmapFindFirst(bitmap, offset, bitmap_size, true, idx);
+}
+
+// Find the first zero bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstZero(const uint8_t *bitmap, size_t offset,
+ size_t bitmap_size, size_t *idx) {
+ return BitmapFindFirst(bitmap, offset, bitmap_size, false, idx);
+}
+
+// Returns true if the bitmap contains only ones.
+inline bool BitMapIsAllSet(const uint8_t *bitmap, size_t offset, size_t bitmap_size) {
+ DCHECK_LT(offset, bitmap_size);
+ size_t idx;
+ return !BitmapFindFirstZero(bitmap, offset, bitmap_size, &idx);
+}
+
+// Returns true if the bitmap contains only zeros.
+inline bool BitmapIsAllZero(const uint8_t *bitmap, size_t offset, size_t bitmap_size) {
+ DCHECK_LT(offset, bitmap_size);
+ size_t idx;
+ return !BitmapFindFirstSet(bitmap, offset, bitmap_size, &idx);
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits);
+
+// Iterator which yields ranges of set and unset bits.
+// Example usage:
+// bool value;
+// size_t size;
+// BitmapIterator iter(bitmap, n_bits);
+// while ((size = iter.Next(&value))) {
+// printf("bitmap block len=%lu value=%d\n", size, value);
+// }
+class BitmapIterator {
+ public:
+ BitmapIterator(const uint8_t *map, size_t num_bits)
+ : offset_(0), num_bits_(num_bits), map_(map)
+ {}
+
+ bool done() const {
+ return (num_bits_ - offset_) == 0;
+ }
+
+ void SeekTo(size_t bit) {
+ DCHECK_LE(bit, num_bits_);
+ offset_ = bit;
+ }
+
+ size_t Next(bool *value) {
+ size_t len = num_bits_ - offset_;
+ if (PREDICT_FALSE(len == 0))
+ return(0);
+
+ *value = BitmapTest(map_, offset_);
+
+ size_t index;
+ if (BitmapFindFirst(map_, offset_, num_bits_, !(*value), &index)) {
+ len = index - offset_;
+ } else {
+ index = num_bits_;
+ }
+
+ offset_ = index;
+ return len;
+ }
+
+ private:
+ size_t offset_;
+ size_t num_bits_;
+ const uint8_t *map_;
+};
+
+// Iterator which yields the set bits in a bitmap.
+// Example usage:
+// for (TrueBitIterator iter(bitmap, n_bits);
+// !iter.done();
+// ++iter) {
+// int next_onebit_position = *iter;
+// }
+class TrueBitIterator {
+ public:
+ TrueBitIterator(const uint8_t *bitmap, size_t n_bits)
+ : bitmap_(bitmap),
+ cur_byte_(0),
+ cur_byte_idx_(0),
+ n_bits_(n_bits),
+ n_bytes_(BitmapSize(n_bits_)),
+ bit_idx_(0) {
+ if (n_bits_ == 0) {
+ cur_byte_idx_ = 1; // sets done
+ } else {
+ cur_byte_ = bitmap[0];
+ AdvanceToNextOneBit();
+ }
+ }
+
+ TrueBitIterator &operator ++() {
+ DCHECK(!done());
+ DCHECK(cur_byte_ & 1);
+ cur_byte_ &= (~1);
+ AdvanceToNextOneBit();
+ return *this;
+ }
+
+ bool done() const {
+ return cur_byte_idx_ >= n_bytes_;
+ }
+
+ size_t operator *() const {
+ DCHECK(!done());
+ return bit_idx_;
+ }
+
+ private:
+ void AdvanceToNextOneBit() {
+ while (cur_byte_ == 0) {
+ cur_byte_idx_++;
+ if (cur_byte_idx_ >= n_bytes_) return;
+ cur_byte_ = bitmap_[cur_byte_idx_];
+ bit_idx_ = cur_byte_idx_ * 8;
+ }
+ DVLOG(2) << "Found next nonzero byte at " << cur_byte_idx_
+ << " val=" << cur_byte_;
+
+ DCHECK_NE(cur_byte_, 0);
+ int set_bit = Bits::FindLSBSetNonZero(cur_byte_);
+ bit_idx_ += set_bit;
+ cur_byte_ >>= set_bit;
+ }
+
+ const uint8_t *bitmap_;
+ uint8_t cur_byte_;
+ uint8_t cur_byte_idx_;
+
+ const size_t n_bits_;
+ const size_t n_bytes_;
+ size_t bit_idx_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue-test.cc b/be/src/kudu/util/blocking_queue-test.cc
new file mode 100644
index 0000000..a2271ff
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <cstdint>
+#include <map>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/blocking_queue.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+BlockingQueue<int32_t> test1_queue(5);
+
+void InsertSomeThings() {
+ ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS);
+ ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS);
+ ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS);
+}
+
+TEST(BlockingQueueTest, Test1) {
+ thread inserter_thread(InsertSomeThings);
+ int32_t i;
+ ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_EQ(1, i);
+ ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_EQ(2, i);
+ ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_EQ(3, i);
+ inserter_thread.join();
+}
+
+TEST(BlockingQueueTest, TestBlockingDrainTo) {
+ BlockingQueue<int32_t> test_queue(3);
+ ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
+ vector<int32_t> out;
+ ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30)));
+ ASSERT_EQ(1, out[0]);
+ ASSERT_EQ(2, out[1]);
+ ASSERT_EQ(3, out[2]);
+
+ // Set a deadline in the past and ensure we time out.
+ Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsTimedOut());
+
+ // Ensure that if the queue is shut down, we get Aborted status.
+ test_queue.Shutdown();
+ s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsAborted());
+}
+
+// Test that, when the queue is shut down with elements still pending,
+// Drain still returns OK until the elements are all gone.
+TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
+ // Put some elements into the queue and then shut it down.
+ BlockingQueue<int32_t> q(3);
+ ASSERT_EQ(q.Put(1), QUEUE_SUCCESS);
+ ASSERT_EQ(q.Put(2), QUEUE_SUCCESS);
+
+ q.Shutdown();
+
+ // Get() should still return an element.
+ int i;
+ ASSERT_TRUE(q.BlockingGet(&i));
+ ASSERT_EQ(1, i);
+
+ // Drain should still return OK, since it yielded elements.
+ vector<int32_t> out;
+ ASSERT_OK(q.BlockingDrainTo(&out));
+ ASSERT_EQ(2, out[0]);
+
+ // Now that it's empty, it should return Aborted.
+ Status s = q.BlockingDrainTo(&out);
+ ASSERT_TRUE(s.IsAborted()) << s.ToString();
+ ASSERT_FALSE(q.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestTooManyInsertions) {
+ BlockingQueue<int32_t> test_queue(2);
+ ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put(123), QUEUE_FULL);
+}
+
+namespace {
+
+struct LengthLogicalSize {
+ static size_t logical_size(const string& s) {
+ return s.length();
+ }
+};
+
+} // anonymous namespace
+
+TEST(BlockingQueueTest, TestLogicalSize) {
+ BlockingQueue<string, LengthLogicalSize> test_queue(4);
+ ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
+ ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
+}
+
+TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
+ BlockingQueue<int32_t> test_queue(1);
+ ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+ // No DCHECK failure on destruct.
+}
+
+#ifndef NDEBUG
+TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) {
+ ::testing::FLAGS_gtest_death_test_style = "threadsafe";
+ ASSERT_DEATH({
+ BlockingQueue<int32_t*> test_queue(1);
+ int32_t element = 123;
+ ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS);
+ // Debug assertion triggered on queue destruction since type is a pointer.
+ },
+ "BlockingQueue holds bare pointers");
+}
+#endif // NDEBUG
+
+TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
+ BlockingQueue<int64_t> test_queue(2);
+ ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+ test_queue.Shutdown();
+ ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
+ int64_t i;
+ ASSERT_TRUE(test_queue.BlockingGet(&i));
+ ASSERT_EQ(123, i);
+ ASSERT_FALSE(test_queue.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestGscopedPtrMethods) {
+ BlockingQueue<int*> test_queue(2);
+ gscoped_ptr<int> input_int(new int(123));
+ ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
+ gscoped_ptr<int> output_int;
+ ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+ ASSERT_EQ(123, *output_int.get());
+ test_queue.Shutdown();
+}
+
+class MultiThreadTest {
+ public:
+ MultiThreadTest()
+ : puts_(4),
+ blocking_puts_(4),
+ nthreads_(5),
+ queue_(nthreads_ * puts_),
+ num_inserters_(nthreads_),
+ sync_latch_(nthreads_) {
+ }
+
+ void InserterThread(int arg) {
+ for (int i = 0; i < puts_; i++) {
+ ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS);
+ }
+ sync_latch_.CountDown();
+ sync_latch_.Wait();
+ for (int i = 0; i < blocking_puts_; i++) {
+ ASSERT_TRUE(queue_.BlockingPut(arg));
+ }
+ MutexLock guard(lock_);
+ if (--num_inserters_ == 0) {
+ queue_.Shutdown();
+ }
+ }
+
+ void RemoverThread() {
+ for (int i = 0; i < puts_ + blocking_puts_; i++) {
+ int32_t arg = 0;
+ bool got = queue_.BlockingGet(&arg);
+ if (!got) {
+ arg = -1;
+ }
+ MutexLock guard(lock_);
+ gotten_[arg] = gotten_[arg] + 1;
+ }
+ }
+
+ void Run() {
+ for (int i = 0; i < nthreads_; i++) {
+ threads_.emplace_back(&MultiThreadTest::InserterThread, this, i);
+ threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+ }
+ // We add an extra thread to ensure that there aren't enough elements in
+ // the queue to go around. This way, we test removal after Shutdown.
+ threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+ for (auto& thread : threads_) {
+ thread.join();
+ }
+ // Let's check to make sure we got what we should have.
+ MutexLock guard(lock_);
+ for (int i = 0; i < nthreads_; i++) {
+ ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]);
+ }
+ // And there were nthreads_ * (puts_ + blocking_puts_)
+ // elements removed, but only nthreads_ * puts_ +
+ // blocking_puts_ elements added. So some removers hit the
+ // shutdown case.
+ ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]);
+ }
+
+ int puts_;
+ int blocking_puts_;
+ int nthreads_;
+ BlockingQueue<int32_t> queue_;
+ Mutex lock_;
+ std::map<int32_t, int> gotten_;
+ vector<thread> threads_;
+ int num_inserters_;
+ CountDownLatch sync_latch_;
+};
+
+TEST(BlockingQueueTest, TestMultipleThreads) {
+ MultiThreadTest test;
+ test.Run();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue.h b/be/src/kudu/util/blocking_queue.h
new file mode 100644
index 0000000..7331c12
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
+#define KUDU_UTIL_BLOCKING_QUEUE_H
+
+#include <list>
+#include <string>
+#include <type_traits>
+#include <unistd.h>
+#include <vector>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Return values for BlockingQueue::Put()
+enum QueueStatus {
+ QUEUE_SUCCESS = 0,
+ QUEUE_SHUTDOWN = 1,
+ QUEUE_FULL = 2
+};
+
+// Default logical length implementation: always returns 1.
+struct DefaultLogicalSize {
+ template<typename T>
+ static size_t logical_size(const T& /* unused */) {
+ return 1;
+ }
+};
+
+template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
+class BlockingQueue {
+ public:
+ // If T is a pointer, this will be the base type. If T is not a pointer, you
+ // can ignore this and the functions which make use of it.
+ // Template substitution failure is not an error.
+ typedef typename std::remove_pointer<T>::type T_VAL;
+
+ explicit BlockingQueue(size_t max_size)
+ : shutdown_(false),
+ size_(0),
+ max_size_(max_size),
+ not_empty_(&lock_),
+ not_full_(&lock_) {
+ }
+
+ // If the queue holds a bare pointer, it must be empty on destruction, since
+ // it may have ownership of the pointer.
+ ~BlockingQueue() {
+ DCHECK(list_.empty() || !std::is_pointer<T>::value)
+ << "BlockingQueue holds bare pointers at destruction time";
+ }
+
+ // Get an element from the queue. Returns false if we were shut down prior to
+ // getting the element.
+ bool BlockingGet(T *out) {
+ MutexLock l(lock_);
+ while (true) {
+ if (!list_.empty()) {
+ *out = list_.front();
+ list_.pop_front();
+ decrement_size_unlocked(*out);
+ not_full_.Signal();
+ return true;
+ }
+ if (shutdown_) {
+ return false;
+ }
+ not_empty_.Wait();
+ }
+ }
+
+ // Get an element from the queue. Returns false if the queue is empty and
+ // we were shut down prior to getting the element.
+ bool BlockingGet(gscoped_ptr<T_VAL> *out) {
+ T t = NULL;
+ bool got_element = BlockingGet(&t);
+ if (!got_element) {
+ return false;
+ }
+ out->reset(t);
+ return true;
+ }
+
+ // Get all elements from the queue and append them to a vector.
+ //
+ // If 'deadline' passes and no elements have been returned from the
+ // queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
+ // no deadline is used.
+ //
+ // If the queue has been shut down, but there are still elements waiting,
+ // then it returns those elements as if the queue were not yet shut down.
+ //
+ // Returns:
+ // - OK if successful
+ // - TimedOut if the deadline passed
+ // - Aborted if the queue shut down
+ Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
+ MutexLock l(lock_);
+ while (true) {
+ if (!list_.empty()) {
+ out->reserve(list_.size());
+ for (const T& elt : list_) {
+ out->push_back(elt);
+ decrement_size_unlocked(elt);
+ }
+ list_.clear();
+ not_full_.Signal();
+ return Status::OK();
+ }
+ if (PREDICT_FALSE(shutdown_)) {
+ return Status::Aborted("");
+ }
+ if (!deadline.Initialized()) {
+ not_empty_.Wait();
+ } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
+ return Status::TimedOut("");
+ }
+ }
+ }
+
+ // Attempts to put the given value in the queue.
+ // Returns:
+ // QUEUE_SUCCESS: if successfully inserted
+ // QUEUE_FULL: if the queue has reached max_size
+ // QUEUE_SHUTDOWN: if someone has already called Shutdown()
+ QueueStatus Put(const T &val) {
+ MutexLock l(lock_);
+ if (size_ >= max_size_) {
+ return QUEUE_FULL;
+ }
+ if (shutdown_) {
+ return QUEUE_SHUTDOWN;
+ }
+ list_.push_back(val);
+ increment_size_unlocked(val);
+ l.Unlock();
+ not_empty_.Signal();
+ return QUEUE_SUCCESS;
+ }
+
+ // Returns the same as the other Put() overload above.
+ // If the element was inserted, the gscoped_ptr releases its contents.
+ QueueStatus Put(gscoped_ptr<T_VAL> *val) {
+ QueueStatus s = Put(val->get());
+ if (s == QUEUE_SUCCESS) {
+ ignore_result<>(val->release());
+ }
+ return s;
+ }
+
+ // Gets an element for the queue; if the queue is full, blocks until
+ // space becomes available. Returns false if we were shutdown prior
+ // to enqueueing the element.
+ bool BlockingPut(const T& val) {
+ MutexLock l(lock_);
+ while (true) {
+ if (shutdown_) {
+ return false;
+ }
+ if (size_ < max_size_) {
+ list_.push_back(val);
+ increment_size_unlocked(val);
+ l.Unlock();
+ not_empty_.Signal();
+ return true;
+ }
+ not_full_.Wait();
+ }
+ }
+
+ // Same as other BlockingPut() overload above. If the element was
+ // enqueued, gscoped_ptr releases its contents.
+ bool BlockingPut(gscoped_ptr<T_VAL>* val) {
+ bool ret = Put(val->get());
+ if (ret) {
+ ignore_result(val->release());
+ }
+ return ret;
+ }
+
+ // Shut down the queue.
+ // When a blocking queue is shut down, no more elements can be added to it,
+ // and Put() will return QUEUE_SHUTDOWN.
+ // Existing elements will drain out of it, and then BlockingGet will start
+ // returning false.
+ void Shutdown() {
+ MutexLock l(lock_);
+ shutdown_ = true;
+ not_full_.Broadcast();
+ not_empty_.Broadcast();
+ }
+
+ bool empty() const {
+ MutexLock l(lock_);
+ return list_.empty();
+ }
+
+ size_t max_size() const {
+ return max_size_;
+ }
+
+ std::string ToString() const {
+ std::string ret;
+
+ MutexLock l(lock_);
+ for (const T& t : list_) {
+ ret.append(t->ToString());
+ ret.append("\n");
+ }
+ return ret;
+ }
+
+ private:
+
+ // Increments queue size. Must be called when 'lock_' is held.
+ void increment_size_unlocked(const T& t) {
+ size_ += LOGICAL_SIZE::logical_size(t);
+ }
+
+ // Decrements queue size. Must be called when 'lock_' is held.
+ void decrement_size_unlocked(const T& t) {
+ size_ -= LOGICAL_SIZE::logical_size(t);
+ }
+
+ bool shutdown_;
+ size_t size_;
+ size_t max_size_;
+ mutable Mutex lock_;
+ ConditionVariable not_empty_;
+ ConditionVariable not_full_;
+ std::list<T> list_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter-test.cc b/be/src/kudu/util/bloom_filter-test.cc
new file mode 100644
index 0000000..788ec36
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter-test.cc
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/bloom_filter.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+static const int kRandomSeed = 0xdeadbeef;
+
+static void AddRandomKeys(int random_seed, int n_keys, BloomFilterBuilder *bf) {
+ srandom(random_seed);
+ for (int i = 0; i < n_keys; i++) {
+ uint64_t key = random();
+ Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+ BloomKeyProbe probe(key_slice);
+ bf->AddKey(probe);
+ }
+}
+
+static void CheckRandomKeys(int random_seed, int n_keys, const BloomFilter &bf) {
+ srandom(random_seed);
+ for (int i = 0; i < n_keys; i++) {
+ uint64_t key = random();
+ Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+ BloomKeyProbe probe(key_slice);
+ ASSERT_TRUE(bf.MayContainKey(probe));
+ }
+}
+
+TEST(TestBloomFilter, TestInsertAndProbe) {
+ int n_keys = 2000;
+ BloomFilterBuilder bfb(
+ BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+
+ // Check that the desired false positive rate is achieved.
+ double expected_fp_rate = bfb.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate, 0.01, 0.002);
+
+ // 1% FP rate should need about 9 bits per key
+ ASSERT_EQ(9, bfb.n_bits() / n_keys);
+
+ // Enter n_keys random keys into the bloom filter
+ AddRandomKeys(kRandomSeed, n_keys, &bfb);
+
+ // Verify that the keys we inserted all return true when queried.
+ BloomFilter bf(bfb.slice(), bfb.n_hashes());
+ CheckRandomKeys(kRandomSeed, n_keys, bf);
+
+ // Query a bunch of other keys, and verify the false positive rate
+ // is within reasonable bounds.
+ uint32_t num_queries = 100000;
+ uint32_t num_positives = 0;
+ for (int i = 0; i < num_queries; i++) {
+ uint64_t key = random();
+ Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+ BloomKeyProbe probe(key_slice);
+ if (bf.MayContainKey(probe)) {
+ num_positives++;
+ }
+ }
+
+ double fp_rate = static_cast<double>(num_positives) / static_cast<double>(num_queries);
+ LOG(INFO) << "FP rate: " << fp_rate << " (" << num_positives << "/" << num_queries << ")";
+ LOG(INFO) << "Expected FP rate: " << expected_fp_rate;
+
+ // Actual FP rate should be within 20% of the estimated FP rate
+ ASSERT_NEAR(fp_rate, expected_fp_rate, 0.20*expected_fp_rate);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.cc b/be/src/kudu/util/bloom_filter.cc
new file mode 100644
index 0000000..b1a2055
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/bloom_filter.h"
+
+#include <cmath>
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+static double kNaturalLog2 = 0.69314;
+
+static int ComputeOptimalHashCount(size_t n_bits, size_t elems) {
+ int n_hashes = n_bits * kNaturalLog2 / elems;
+ if (n_hashes < 1) n_hashes = 1;
+ return n_hashes;
+}
+
+BloomFilterSizing BloomFilterSizing::ByCountAndFPRate(
+ size_t expected_count, double fp_rate) {
+ CHECK_GT(fp_rate, 0);
+ CHECK_LT(fp_rate, 1);
+
+ double n_bits = -static_cast<double>(expected_count) * log(fp_rate)
+ / kNaturalLog2 / kNaturalLog2;
+ int n_bytes = static_cast<int>(ceil(n_bits / 8));
+ CHECK_GT(n_bytes, 0)
+ << "expected_count: " << expected_count
+ << " fp_rate: " << fp_rate;
+ return BloomFilterSizing(n_bytes, expected_count);
+}
+
+BloomFilterSizing BloomFilterSizing::BySizeAndFPRate(size_t n_bytes, double fp_rate) {
+ size_t n_bits = n_bytes * 8;
+ double expected_elems = -static_cast<double>(n_bits) * kNaturalLog2 * kNaturalLog2 /
+ log(fp_rate);
+ DCHECK_GT(expected_elems, 1);
+ return BloomFilterSizing(n_bytes, (size_t)ceil(expected_elems));
+}
+
+
+BloomFilterBuilder::BloomFilterBuilder(const BloomFilterSizing &sizing)
+ : n_bits_(sizing.n_bytes() * 8),
+ bitmap_(new uint8_t[sizing.n_bytes()]),
+ n_hashes_(ComputeOptimalHashCount(n_bits_, sizing.expected_count())),
+ expected_count_(sizing.expected_count()),
+ n_inserted_(0) {
+ Clear();
+}
+
+void BloomFilterBuilder::Clear() {
+ memset(&bitmap_[0], 0, n_bytes());
+ n_inserted_ = 0;
+}
+
+double BloomFilterBuilder::false_positive_rate() const {
+ CHECK_NE(expected_count_, 0)
+ << "expected_count_ not initialized: can't call this function on "
+ << "a BloomFilter initialized from external data";
+
+ return pow(1 - exp(-static_cast<double>(n_hashes_) * expected_count_ / n_bits_), n_hashes_);
+}
+
+BloomFilter::BloomFilter(const Slice &data, size_t n_hashes)
+ : n_bits_(data.size() * 8),
+ bitmap_(reinterpret_cast<const uint8_t *>(data.data())),
+ n_hashes_(n_hashes)
+{}
+
+
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bloom_filter.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.h b/be/src/kudu/util/bloom_filter.h
new file mode 100644
index 0000000..ad4e3eb
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.h
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOOM_FILTER_H
+#define KUDU_UTIL_BLOOM_FILTER_H
+
+#include <cstddef>
+#include <cstdint>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+// Probe calculated from a given key. This caches the calculated
+// hash values which are necessary for probing into a Bloom Filter,
+// so that when many bloom filters have to be consulted for a given
+// key, we only need to calculate the hashes once.
+//
+// This is implemented based on the idea of double-hashing from the following paper:
+// "Less Hashing, Same Performance: Building a Better Bloom Filter"
+// Kirsch and Mitzenmacher, ESA 2006
+// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
+//
+// Currently, the implementation uses the 64-bit City Hash.
+// TODO: an SSE CRC32 hash is probably ~20% faster. Come back to this
+// at some point.
+class BloomKeyProbe {
+ public:
+ // Default constructor - this is only used to instantiate an object
+ // and later reassign by assignment from another instance
+ BloomKeyProbe() {}
+
+ // Construct a probe from the given key.
+ //
+ // NOTE: proper operation requires that the referenced memory remain
+ // valid for the lifetime of this object.
+ explicit BloomKeyProbe(const Slice &key) : key_(key) {
+ uint64_t h = util_hash::CityHash64(
+ reinterpret_cast<const char *>(key.data()),
+ key.size());
+
+ // Use the top and bottom halves of the 64-bit hash
+ // as the two independent hash functions for mixing.
+ h_1_ = static_cast<uint32_t>(h);
+ h_2_ = static_cast<uint32_t>(h >> 32);
+ }
+
+ const Slice &key() const { return key_; }
+
+ // The initial hash value. See MixHash() for usage example.
+ uint32_t initial_hash() const {
+ return h_1_;
+ }
+
+ // Mix the given hash function with the second calculated hash
+ // value. A sequence of independent hashes can be calculated
+ // by repeatedly calling MixHash() on its previous result.
+ ATTRIBUTE_NO_SANITIZE_INTEGER
+ uint32_t MixHash(uint32_t h) const {
+ return h + h_2_;
+ }
+
+ private:
+ Slice key_;
+
+ // The two hashes.
+ uint32_t h_1_;
+ uint32_t h_2_;
+};
+
+// Sizing parameters for the constructor to BloomFilterBuilder.
+// This is simply to provide a nicer API than a bunch of overloaded
+// constructors.
+class BloomFilterSizing {
+ public:
+ // Size the bloom filter by a fixed size and false positive rate.
+ //
+ // Picks the number of entries to achieve the above.
+ static BloomFilterSizing BySizeAndFPRate(size_t n_bytes, double fp_rate);
+
+ // Size the bloom filer by an expected count and false positive rate.
+ //
+ // Picks the number of bytes to achieve the above.
+ static BloomFilterSizing ByCountAndFPRate(size_t expected_count, double fp_rate);
+
+ size_t n_bytes() const { return n_bytes_; }
+ size_t expected_count() const { return expected_count_; }
+
+ private:
+ BloomFilterSizing(size_t n_bytes, size_t expected_count) :
+ n_bytes_(n_bytes),
+ expected_count_(expected_count)
+ {}
+
+ size_t n_bytes_;
+ size_t expected_count_;
+};
+
+
+// Builder for a BloomFilter structure.
+class BloomFilterBuilder {
+ public:
+ // Create a bloom filter.
+ // See BloomFilterSizing static methods to specify this argument.
+ explicit BloomFilterBuilder(const BloomFilterSizing &sizing);
+
+ // Clear all entries, reset insertion count.
+ void Clear();
+
+ // Add the given key to the bloom filter.
+ void AddKey(const BloomKeyProbe &probe);
+
+ // Return an estimate of the false positive rate.
+ double false_positive_rate() const;
+
+ int n_bytes() const {
+ return n_bits_ / 8;
+ }
+
+ int n_bits() const {
+ return n_bits_;
+ }
+
+ // Return a slice view into this Bloom Filter, suitable for
+ // writing out to a file.
+ const Slice slice() const {
+ return Slice(&bitmap_[0], n_bytes());
+ }
+
+ // Return the number of hashes that are calculated for each entry
+ // in the bloom filter.
+ size_t n_hashes() const { return n_hashes_; }
+
+ size_t expected_count() const { return expected_count_; }
+
+ // Return the number of keys inserted.
+ size_t count() const { return n_inserted_; }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(BloomFilterBuilder);
+
+ size_t n_bits_;
+ gscoped_array<uint8_t> bitmap_;
+
+ // The number of hash functions to compute.
+ size_t n_hashes_;
+
+ // The expected number of elements, for which the bloom is optimized.
+ size_t expected_count_;
+
+ // The number of elements inserted so far since the last Reset.
+ size_t n_inserted_;
+};
+
+
+// Wrapper around a byte array for reading it as a bloom filter.
+class BloomFilter {
+ public:
+ BloomFilter() : bitmap_(nullptr) {}
+ BloomFilter(const Slice &data, size_t n_hashes);
+
+ // Return true if the filter may contain the given key.
+ bool MayContainKey(const BloomKeyProbe &probe) const;
+
+ private:
+ friend class BloomFilterBuilder;
+ static uint32_t PickBit(uint32_t hash, size_t n_bits);
+
+ size_t n_bits_;
+ const uint8_t *bitmap_;
+
+ size_t n_hashes_;
+};
+
+
+////////////////////////////////////////////////////////////
+// Inline implementations
+////////////////////////////////////////////////////////////
+
+inline uint32_t BloomFilter::PickBit(uint32_t hash, size_t n_bits) {
+ switch (n_bits) {
+ // Fast path for the default bloom filter block size. Bitwise math
+ // is much faster than division.
+ case 4096 * 8:
+ return hash & (n_bits - 1);
+
+ default:
+ return hash % n_bits;
+ }
+}
+
+inline void BloomFilterBuilder::AddKey(const BloomKeyProbe &probe) {
+ uint32_t h = probe.initial_hash();
+ for (size_t i = 0; i < n_hashes_; i++) {
+ uint32_t bitpos = BloomFilter::PickBit(h, n_bits_);
+ BitmapSet(&bitmap_[0], bitpos);
+ h = probe.MixHash(h);
+ }
+ n_inserted_++;
+}
+
+inline bool BloomFilter::MayContainKey(const BloomKeyProbe &probe) const {
+ uint32_t h = probe.initial_hash();
+
+ // Basic unrolling by 2s gives a small benefit here since the two bit positions
+ // can be calculated in parallel -- it's a 50% chance that the first will be
+ // set even if it's a bloom miss, in which case we can parallelize the load.
+ int rem_hashes = n_hashes_;
+ while (rem_hashes >= 2) {
+ uint32_t bitpos1 = PickBit(h, n_bits_);
+ h = probe.MixHash(h);
+ uint32_t bitpos2 = PickBit(h, n_bits_);
+ h = probe.MixHash(h);
+
+ if (!BitmapTest(&bitmap_[0], bitpos1) ||
+ !BitmapTest(&bitmap_[0], bitpos2)) {
+ return false;
+ }
+
+ rem_hashes -= 2;
+ }
+
+ while (rem_hashes) {
+ uint32_t bitpos = PickBit(h, n_bits_);
+ if (!BitmapTest(&bitmap_[0], bitpos)) {
+ return false;
+ }
+ h = probe.MixHash(h);
+ rem_hashes--;
+ }
+ return true;
+}
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/boost_mutex_utils.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/boost_mutex_utils.h b/be/src/kudu/util/boost_mutex_utils.h
new file mode 100644
index 0000000..6f6390b
--- /dev/null
+++ b/be/src/kudu/util/boost_mutex_utils.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_BOOST_MUTEX_UTILS_H
+#define KUDU_BOOST_MUTEX_UTILS_H
+
+
+// Similar to std::lock_guard except that it takes
+// a lock pointer, and checks against nullptr. If the
+// pointer is NULL, does nothing. Otherwise guards
+// with the lock.
+template<class LockType>
+class lock_guard_maybe {
+ public:
+ explicit lock_guard_maybe(LockType *l) :
+ lock_(l) {
+ if (l != nullptr) {
+ l->lock();
+ }
+ }
+
+ ~lock_guard_maybe() {
+ if (lock_ != nullptr) {
+ lock_->unlock();
+ }
+ }
+
+ private:
+ LockType *lock_;
+};
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache-bench.cc b/be/src/kudu/util/cache-bench.cc
new file mode 100644
index 0000000..1e705be
--- /dev/null
+++ b/be/src/kudu/util/cache-bench.cc
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string.h>
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(num_threads, 16, "The number of threads to access the cache concurrently.");
+DEFINE_int32(run_seconds, 1, "The number of seconds to run the benchmark");
+
+using std::atomic;
+using std::pair;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+// Benchmark a 1GB cache.
+static constexpr int kCacheCapacity = 1024 * 1024 * 1024;
+// Use 4kb entries.
+static constexpr int kEntrySize = 4 * 1024;
+
+// Test parameterization.
+struct BenchSetup {
+ enum class Pattern {
+ // Zipfian distribution -- a small number of items make up the
+ // vast majority of lookups.
+ ZIPFIAN,
+ // Every item is equally likely to be looked up.
+ UNIFORM
+ };
+ Pattern pattern;
+
+ // The ratio between the size of the dataset and the cache.
+ //
+ // A value smaller than 1 will ensure that the whole dataset fits
+ // in the cache.
+ double dataset_cache_ratio;
+
+ string ToString() const {
+ string ret;
+ switch (pattern) {
+ case Pattern::ZIPFIAN: ret += "ZIPFIAN"; break;
+ case Pattern::UNIFORM: ret += "UNIFORM"; break;
+ }
+ ret += StringPrintf(" ratio=%.2fx n_unique=%d", dataset_cache_ratio, max_key());
+ return ret;
+ }
+
+ // Return the maximum cache key to be generated for a lookup.
+ uint32_t max_key() const {
+ return static_cast<int64_t>(kCacheCapacity * dataset_cache_ratio) / kEntrySize;
+ }
+};
+
+class CacheBench : public KuduTest,
+ public testing::WithParamInterface<BenchSetup>{
+ public:
+ void SetUp() override {
+ KuduTest::SetUp();
+
+ cache_.reset(NewLRUCache(DRAM_CACHE, kCacheCapacity, "test-cache"));
+ }
+
+ // Run queries against the cache until '*done' becomes true.
+ // Returns a pair of the number of cache hits and lookups.
+ pair<int64_t, int64_t> DoQueries(const atomic<bool>* done) {
+ const BenchSetup& setup = GetParam();
+ Random r(GetRandomSeed32());
+ int64_t lookups = 0;
+ int64_t hits = 0;
+ while (!*done) {
+ uint32_t int_key;
+ if (setup.pattern == BenchSetup::Pattern::ZIPFIAN) {
+ int_key = r.Skewed(Bits::Log2Floor(setup.max_key()));
+ } else {
+ int_key = r.Uniform(setup.max_key());
+ }
+ char key_buf[sizeof(int_key)];
+ memcpy(key_buf, &int_key, sizeof(int_key));
+ Slice key_slice(key_buf, arraysize(key_buf));
+ Cache::Handle* h = cache_->Lookup(key_slice, Cache::EXPECT_IN_CACHE);
+ if (h) {
+ hits++;
+ } else {
+ Cache::PendingHandle* ph = cache_->Allocate(
+ key_slice, /* val_len=*/kEntrySize, /* charge=*/kEntrySize);
+ h = cache_->Insert(ph, nullptr);
+ }
+
+ cache_->Release(h);
+ lookups++;
+ }
+ return {hits, lookups};
+ }
+
+ // Starts the given number of threads to concurrently call DoQueries.
+ // Returns the aggregated number of cache hits and lookups.
+ pair<int64_t, int64_t> RunQueryThreads(int n_threads, int n_seconds) {
+ vector<thread> threads(n_threads);
+ atomic<bool> done(false);
+ atomic<int64_t> total_lookups(0);
+ atomic<int64_t> total_hits(0);
+ for (int i = 0; i < n_threads; i++) {
+ threads[i] = thread([&]() {
+ pair<int64_t, int64_t> hits_lookups = DoQueries(&done);
+ total_hits += hits_lookups.first;
+ total_lookups += hits_lookups.second;
+ });
+ }
+ SleepFor(MonoDelta::FromSeconds(n_seconds));
+ done = true;
+ for (auto& t : threads) {
+ t.join();
+ }
+ return {total_hits, total_lookups};
+ }
+
+ protected:
+ unique_ptr<Cache> cache_;
+};
+
+// Test both distributions, and for each, test both the case where the data
+// fits in the cache and where it is a bit larger.
+INSTANTIATE_TEST_CASE_P(Patterns, CacheBench, testing::ValuesIn(std::vector<BenchSetup>{
+ {BenchSetup::Pattern::ZIPFIAN, 1.0},
+ {BenchSetup::Pattern::ZIPFIAN, 3.0},
+ {BenchSetup::Pattern::UNIFORM, 1.0},
+ {BenchSetup::Pattern::UNIFORM, 3.0}
+ }));
+
+TEST_P(CacheBench, RunBench) {
+ const BenchSetup& setup = GetParam();
+
+ // Run a short warmup phase to try to populate the cache. Otherwise even if the
+ // dataset is smaller than the cache capacity, we would count a bunch of misses
+ // during the warm-up phase.
+ LOG(INFO) << "Warming up...";
+ RunQueryThreads(FLAGS_num_threads, 1);
+
+ LOG(INFO) << "Running benchmark...";
+ pair<int64_t, int64_t> hits_lookups = RunQueryThreads(FLAGS_num_threads, FLAGS_run_seconds);
+ int64_t hits = hits_lookups.first;
+ int64_t lookups = hits_lookups.second;
+
+ int64_t l_per_sec = lookups / FLAGS_run_seconds;
+ double hit_rate = static_cast<double>(hits) / lookups;
+ string test_case = setup.ToString();
+ LOG(INFO) << test_case << ": " << HumanReadableNum::ToString(l_per_sec) << " lookups/sec";
+ LOG(INFO) << test_case << ": " << StringPrintf("%.1f", hit_rate * 100.0) << "% hit rate";
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache-test.cc b/be/src/kudu/util/cache-test.cc
new file mode 100644
index 0000000..3fd1d5f
--- /dev/null
+++ b/be/src/kudu/util/cache-test.cc
@@ -0,0 +1,246 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cassert>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+#if defined(__linux__)
+DECLARE_string(nvm_cache_path);
+#endif // defined(__linux__)
+
+DECLARE_double(cache_memtracker_approximation_ratio);
+
+namespace kudu {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+ faststring result;
+ PutFixed32(&result, k);
+ return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+ assert(k.size() == 4);
+ return DecodeFixed32(k.data());
+}
+
+class CacheTest : public KuduTest,
+ public ::testing::WithParamInterface<CacheType>,
+ public Cache::EvictionCallback {
+ public:
+
+ // Implementation of the EvictionCallback interface
+ void EvictedEntry(Slice key, Slice val) override {
+ evicted_keys_.push_back(DecodeInt(key));
+ evicted_values_.push_back(DecodeInt(val));
+ }
+ std::vector<int> evicted_keys_;
+ std::vector<int> evicted_values_;
+ std::shared_ptr<MemTracker> mem_tracker_;
+ gscoped_ptr<Cache> cache_;
+ MetricRegistry metric_registry_;
+
+ static const int kCacheSize = 14*1024*1024;
+
+ virtual void SetUp() OVERRIDE {
+
+#if defined(HAVE_LIB_VMEM)
+ if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
+ FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
+ ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
+ }
+#endif // defined(HAVE_LIB_VMEM)
+
+ // Disable approximate tracking of cache memory since we make specific
+ // assertions on the MemTracker in this test.
+ FLAGS_cache_memtracker_approximation_ratio = 0;
+
+ cache_.reset(NewLRUCache(GetParam(), kCacheSize, "cache_test"));
+
+ MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+ // Since nvm cache does not have memtracker due to the use of
+ // tcmalloc for this we only check for it in the DRAM case.
+ if (GetParam() == DRAM_CACHE) {
+ ASSERT_TRUE(mem_tracker_.get());
+ }
+
+ scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
+ &metric_registry_, "test");
+ cache_->SetMetrics(entity);
+ }
+
+ int Lookup(int key) {
+ Cache::Handle* handle = cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE);
+ const int r = (handle == nullptr) ? -1 : DecodeInt(cache_->Value(handle));
+ if (handle != nullptr) {
+ cache_->Release(handle);
+ }
+ return r;
+ }
+
+ void Insert(int key, int value, int charge = 1) {
+ std::string key_str = EncodeInt(key);
+ std::string val_str = EncodeInt(value);
+ Cache::PendingHandle* handle = CHECK_NOTNULL(cache_->Allocate(key_str, val_str.size(), charge));
+ memcpy(cache_->MutableValue(handle), val_str.data(), val_str.size());
+
+ cache_->Release(cache_->Insert(handle, this));
+ }
+
+ void Erase(int key) {
+ cache_->Erase(EncodeInt(key));
+ }
+};
+
+#if defined(__linux__)
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE, NVM_CACHE));
+#else
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE));
+#endif // defined(__linux__)
+
+TEST_P(CacheTest, TrackMemory) {
+ if (mem_tracker_) {
+ Insert(100, 100, 1);
+ ASSERT_EQ(1, mem_tracker_->consumption());
+ Erase(100);
+ ASSERT_EQ(0, mem_tracker_->consumption());
+ ASSERT_EQ(1, mem_tracker_->peak_consumption());
+ }
+}
+
+TEST_P(CacheTest, HitAndMiss) {
+ ASSERT_EQ(-1, Lookup(100));
+
+ Insert(100, 101);
+ ASSERT_EQ(101, Lookup(100));
+ ASSERT_EQ(-1, Lookup(200));
+ ASSERT_EQ(-1, Lookup(300));
+
+ Insert(200, 201);
+ ASSERT_EQ(101, Lookup(100));
+ ASSERT_EQ(201, Lookup(200));
+ ASSERT_EQ(-1, Lookup(300));
+
+ Insert(100, 102);
+ ASSERT_EQ(102, Lookup(100));
+ ASSERT_EQ(201, Lookup(200));
+ ASSERT_EQ(-1, Lookup(300));
+
+ ASSERT_EQ(1, evicted_keys_.size());
+ ASSERT_EQ(100, evicted_keys_[0]);
+ ASSERT_EQ(101, evicted_values_[0]);
+}
+
+TEST_P(CacheTest, Erase) {
+ Erase(200);
+ ASSERT_EQ(0, evicted_keys_.size());
+
+ Insert(100, 101);
+ Insert(200, 201);
+ Erase(100);
+ ASSERT_EQ(-1, Lookup(100));
+ ASSERT_EQ(201, Lookup(200));
+ ASSERT_EQ(1, evicted_keys_.size());
+ ASSERT_EQ(100, evicted_keys_[0]);
+ ASSERT_EQ(101, evicted_values_[0]);
+
+ Erase(100);
+ ASSERT_EQ(-1, Lookup(100));
+ ASSERT_EQ(201, Lookup(200));
+ ASSERT_EQ(1, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, EntriesArePinned) {
+ Insert(100, 101);
+ Cache::Handle* h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+ ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
+
+ Insert(100, 102);
+ Cache::Handle* h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+ ASSERT_EQ(102, DecodeInt(cache_->Value(h2)));
+ ASSERT_EQ(0, evicted_keys_.size());
+
+ cache_->Release(h1);
+ ASSERT_EQ(1, evicted_keys_.size());
+ ASSERT_EQ(100, evicted_keys_[0]);
+ ASSERT_EQ(101, evicted_values_[0]);
+
+ Erase(100);
+ ASSERT_EQ(-1, Lookup(100));
+ ASSERT_EQ(1, evicted_keys_.size());
+
+ cache_->Release(h2);
+ ASSERT_EQ(2, evicted_keys_.size());
+ ASSERT_EQ(100, evicted_keys_[1]);
+ ASSERT_EQ(102, evicted_values_[1]);
+}
+
+TEST_P(CacheTest, EvictionPolicy) {
+ Insert(100, 101);
+ Insert(200, 201);
+
+ const int kNumElems = 1000;
+ const int kSizePerElem = kCacheSize / kNumElems;
+
+ // Loop adding and looking up new entries, but repeatedly accessing key 101. This
+ // frequently-used entry should not be evicted.
+ for (int i = 0; i < kNumElems + 1000; i++) {
+ Insert(1000+i, 2000+i, kSizePerElem);
+ ASSERT_EQ(2000+i, Lookup(1000+i));
+ ASSERT_EQ(101, Lookup(100));
+ }
+ ASSERT_EQ(101, Lookup(100));
+ // Since '200' wasn't accessed in the loop above, it should have
+ // been evicted.
+ ASSERT_EQ(-1, Lookup(200));
+}
+
+TEST_P(CacheTest, HeavyEntries) {
+ // Add a bunch of light and heavy entries and then count the combined
+ // size of items still in the cache, which must be approximately the
+ // same as the total capacity.
+ const int kLight = kCacheSize/1000;
+ const int kHeavy = kCacheSize/100;
+ int added = 0;
+ int index = 0;
+ while (added < 2*kCacheSize) {
+ const int weight = (index & 1) ? kLight : kHeavy;
+ Insert(index, 1000+index, weight);
+ added += weight;
+ index++;
+ }
+
+ int cached_weight = 0;
+ for (int i = 0; i < index; i++) {
+ const int weight = (i & 1 ? kLight : kHeavy);
+ int r = Lookup(i);
+ if (r >= 0) {
+ cached_weight += weight;
+ ASSERT_EQ(1000+i, r);
+ }
+ }
+ ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
+}
+
+} // namespace kudu