You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:20 UTC
[08/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.h b/be/src/kudu/util/random_util.h
new file mode 100644
index 0000000..bda8c42
--- /dev/null
+++ b/be/src/kudu/util/random_util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_RANDOM_UTIL_H
+#define KUDU_UTIL_RANDOM_UTIL_H
+
+#include <cstdlib>
+#include <stdint.h>
+
+#include <string>
+
+namespace kudu {
+
+class Random;
+
+// Writes exactly n random bytes to dest using the parameter Random generator.
+// Note RandomString() does not null-terminate its strings, though '\0' could
+// be written to dest with the same probability as any other byte.
+void RandomString(void* dest, size_t n, Random* rng);
+
+// Same as the above, but returns the string.
+std::string RandomString(size_t n, Random* rng);
+
+// Generate a 32-bit random seed from several sources, including timestamp,
+// pid & tid.
+uint32_t GetRandomSeed32();
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_RANDOM_UTIL_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-encoding.h b/be/src/kudu/util/rle-encoding.h
new file mode 100644
index 0000000..5b16fd9
--- /dev/null
+++ b/be/src/kudu/util/rle-encoding.h
@@ -0,0 +1,523 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_RLE_ENCODING_H
+#define IMPALA_RLE_ENCODING_H
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/bit-stream-utils.inline.h"
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs
+// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
+// (literal encoding).
+// For both types of runs, there is a byte-aligned indicator which encodes the length
+// of the run and the type of the run.
+// This encoding has the benefit that when there aren't any long enough runs, values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+// encoded-block := run*
+// run := literal-run | repeated-run
+// literal-run := literal-indicator < literal bytes >
+// repeated-run := repeated-indicator < repeated value. padded to byte boundary >
+// literal-indicator := varint_encode( number_of_groups << 1 | 1)
+// repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The rest
+// of the varint is used to determine the length of the run (eg how many times the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
+// the actual number of encoded ints. (This means that the total number of encoded values
+// can not be determined from the encoded data, since the number of values in the last
+// group may not be a multiple of 8).
+// There is a break-even point when it is more storage efficient to do run length
+// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes
+// for both the repeated encoding or the literal encoding. This value can always
+// be computed based on the bit-width.
+// TODO: think about how to use this for strings. The bit packing isn't quite the same.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
+// - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+//
+
+// Decoder class for RLE encoded data.
+//
+// NOTE: the encoded format does not have any length prefix or any other way of
+// indicating that the encoded sequence ends at a certain point, so the Decoder
+// methods may return some extra bits at the end before the read methods start
+// to return 0/false.
+template<typename T>
+class RleDecoder {
+ public:
+ // Create a decoder object. buffer/buffer_len is the decoded data.
+ // bit_width is the width of each value (before encoding).
+ RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
+ : bit_reader_(buffer, buffer_len),
+ bit_width_(bit_width),
+ current_value_(0),
+ repeat_count_(0),
+ literal_count_(0),
+ rewind_state_(CANT_REWIND) {
+ DCHECK_GE(bit_width_, 1);
+ DCHECK_LE(bit_width_, 64);
+ }
+
+ RleDecoder() {}
+
+ // Skip n values, and returns the number of non-zero entries skipped.
+ size_t Skip(size_t to_skip);
+
+ // Gets the next value. Returns false if there are no more.
+ bool Get(T* val);
+
+ // Seek to the previous value.
+ void RewindOne();
+
+ // Gets the next run of the same 'val'. Returns 0 if there is no
+ // more data to be decoded. Will return a run of at most 'max_run'
+ // values. If there are more values than this, the next call to
+ // GetNextRun will return more from the same run.
+ size_t GetNextRun(T* val, size_t max_run);
+
+ private:
+ bool ReadHeader();
+
+ enum RewindState {
+ REWIND_LITERAL,
+ REWIND_RUN,
+ CANT_REWIND
+ };
+
+ BitReader bit_reader_;
+ int bit_width_;
+ uint64_t current_value_;
+ uint32_t repeat_count_;
+ uint32_t literal_count_;
+ RewindState rewind_state_;
+};
+
+// Class to incrementally build the rle data.
+// The encoding has two modes: encoding repeated runs and literal runs.
+// If the run is sufficiently short, it is more efficient to encode as a literal run.
+// This class does so by buffering 8 values at a time. If they are not all the same
+// they are added to the literal run. If they are the same, they are added to the
+// repeated run. When we switch modes, the previous run is flushed out.
+template<typename T>
+class RleEncoder {
+ public:
+ // buffer: buffer to write bits to.
+ // bit_width: max number of bits for value.
+ // TODO: consider adding a min_repeated_run_length so the caller can control
+ // when values should be encoded as repeated runs. Currently this is derived
+ // based on the bit_width, which can determine a storage optimal choice.
+ explicit RleEncoder(faststring *buffer, int bit_width)
+ : bit_width_(bit_width),
+ bit_writer_(buffer) {
+ DCHECK_GE(bit_width_, 1);
+ DCHECK_LE(bit_width_, 64);
+ Clear();
+ }
+
+ // Reserve 'num_bytes' bytes for a plain encoded header, set each
+ // byte with 'val': this is used for the RLE-encoded data blocks in
+ // order to be able to able to store the initial ordinal position
+ // and number of elements. This is a part of RleEncoder in order to
+ // maintain the correct offset in 'buffer'.
+ void Reserve(int num_bytes, uint8_t val);
+
+ // Encode value. This value must be representable with bit_width_ bits.
+ void Put(T value, size_t run_length = 1);
+
+ // Flushes any pending values to the underlying buffer.
+ // Returns the total number of bytes written
+ int Flush();
+
+ // Resets all the state in the encoder.
+ void Clear();
+
+ int32_t len() const { return bit_writer_.bytes_written(); }
+
+ private:
+ // Flushes any buffered values. If this is part of a repeated run, this is largely
+ // a no-op.
+ // If it is part of a literal run, this will call FlushLiteralRun, which writes
+ // out the buffered literal values.
+ // If 'done' is true, the current run would be written even if it would normally
+ // have been buffered more. This should only be called at the end, when the
+ // encoder has received all values even if it would normally continue to be
+ // buffered.
+ void FlushBufferedValues(bool done);
+
+ // Flushes literal values to the underlying buffer. If update_indicator_byte,
+ // then the current literal run is complete and the indicator byte is updated.
+ void FlushLiteralRun(bool update_indicator_byte);
+
+ // Flushes a repeated run to the underlying buffer.
+ void FlushRepeatedRun();
+
+ // Number of bits needed to encode the value.
+ const int bit_width_;
+
+ // Underlying buffer.
+ BitWriter bit_writer_;
+
+ // We need to buffer at most 8 values for literals. This happens when the
+ // bit_width is 1 (so 8 values fit in one byte).
+ // TODO: generalize this to other bit widths
+ uint64_t buffered_values_[8];
+
+ // Number of values in buffered_values_
+ int num_buffered_values_;
+
+ // The current (also last) value that was written and the count of how
+ // many times in a row that value has been seen. This is maintained even
+ // if we are in a literal run. If the repeat_count_ get high enough, we switch
+ // to encoding repeated runs.
+ uint64_t current_value_;
+ int repeat_count_;
+
+ // Number of literals in the current run. This does not include the literals
+ // that might be in buffered_values_. Only after we've got a group big enough
+ // can we decide if they should part of the literal_count_ or repeat_count_
+ int literal_count_;
+
+ // Index of a byte in the underlying buffer that stores the indicator byte.
+ // This is reserved as soon as we need a literal run but the value is written
+ // when the literal run is complete. We maintain an index rather than a pointer
+ // into the underlying buffer because the pointer value may become invalid if
+ // the underlying buffer is resized.
+ int literal_indicator_byte_idx_;
+};
+
+template<typename T>
+inline bool RleDecoder<T>::ReadHeader() {
+ DCHECK(bit_reader_.is_initialized());
+ if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
+ // Read the next run's indicator int, it could be a literal or repeated run
+ // The int is encoded as a vlq-encoded value.
+ int32_t indicator_value = 0;
+ bool result = bit_reader_.GetVlqInt(&indicator_value);
+ if (PREDICT_FALSE(!result)) {
+ return false;
+ }
+
+ // lsb indicates if it is a literal run or repeated run
+ bool is_literal = indicator_value & 1;
+ if (is_literal) {
+ literal_count_ = (indicator_value >> 1) * 8;
+ DCHECK_GT(literal_count_, 0);
+ } else {
+ repeat_count_ = indicator_value >> 1;
+ DCHECK_GT(repeat_count_, 0);
+ bool result = bit_reader_.GetAligned<T>(
+ BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_));
+ DCHECK(result);
+ }
+ }
+ return true;
+}
+
+template<typename T>
+inline bool RleDecoder<T>::Get(T* val) {
+ DCHECK(bit_reader_.is_initialized());
+ if (PREDICT_FALSE(!ReadHeader())) {
+ return false;
+ }
+
+ if (PREDICT_TRUE(repeat_count_ > 0)) {
+ *val = current_value_;
+ --repeat_count_;
+ rewind_state_ = REWIND_RUN;
+ } else {
+ DCHECK(literal_count_ > 0);
+ bool result = bit_reader_.GetValue(bit_width_, val);
+ DCHECK(result);
+ --literal_count_;
+ rewind_state_ = REWIND_LITERAL;
+ }
+
+ return true;
+}
+
+template<typename T>
+inline void RleDecoder<T>::RewindOne() {
+ DCHECK(bit_reader_.is_initialized());
+
+ switch (rewind_state_) {
+ case CANT_REWIND:
+ LOG(FATAL) << "Can't rewind more than once after each read!";
+ break;
+ case REWIND_RUN:
+ ++repeat_count_;
+ break;
+ case REWIND_LITERAL:
+ {
+ bit_reader_.Rewind(bit_width_);
+ ++literal_count_;
+ break;
+ }
+ }
+
+ rewind_state_ = CANT_REWIND;
+}
+
+template<typename T>
+inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
+ DCHECK(bit_reader_.is_initialized());
+ DCHECK_GT(max_run, 0);
+ size_t ret = 0;
+ size_t rem = max_run;
+ while (ReadHeader()) {
+ if (PREDICT_TRUE(repeat_count_ > 0)) {
+ if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
+ return ret;
+ }
+ *val = current_value_;
+ if (repeat_count_ >= rem) {
+ // The next run is longer than the amount of remaining data
+ // that the caller wants to read. Only consume it partially.
+ repeat_count_ -= rem;
+ ret += rem;
+ return ret;
+ }
+ ret += repeat_count_;
+ rem -= repeat_count_;
+ repeat_count_ = 0;
+ } else {
+ DCHECK(literal_count_ > 0);
+ if (ret == 0) {
+ bool has_more = bit_reader_.GetValue(bit_width_, val);
+ DCHECK(has_more);
+ literal_count_--;
+ ret++;
+ rem--;
+ }
+
+ while (literal_count_ > 0) {
+ bool result = bit_reader_.GetValue(bit_width_, ¤t_value_);
+ DCHECK(result);
+ if (current_value_ != *val || rem == 0) {
+ bit_reader_.Rewind(bit_width_);
+ return ret;
+ }
+ ret++;
+ rem--;
+ literal_count_--;
+ }
+ }
+ }
+ return ret;
+ }
+
+template<typename T>
+inline size_t RleDecoder<T>::Skip(size_t to_skip) {
+ DCHECK(bit_reader_.is_initialized());
+
+ size_t set_count = 0;
+ while (to_skip > 0) {
+ bool result = ReadHeader();
+ DCHECK(result);
+
+ if (PREDICT_TRUE(repeat_count_ > 0)) {
+ size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
+ repeat_count_ -= nskip;
+ to_skip -= nskip;
+ if (current_value_ != 0) {
+ set_count += nskip;
+ }
+ } else {
+ DCHECK(literal_count_ > 0);
+ size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
+ literal_count_ -= nskip;
+ to_skip -= nskip;
+ for (; nskip > 0; nskip--) {
+ T value = 0;
+ bool result = bit_reader_.GetValue(bit_width_, &value);
+ DCHECK(result);
+ if (value != 0) {
+ set_count++;
+ }
+ }
+ }
+ }
+ return set_count;
+}
+
+// This function buffers input values 8 at a time. After seeing all 8 values,
+// it decides whether they should be encoded as a literal or repeated run.
+template<typename T>
+inline void RleEncoder<T>::Put(T value, size_t run_length) {
+ DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
+
+ // TODO(perf): remove the loop and use the repeat_count_
+ for (; run_length > 0; run_length--) {
+ if (PREDICT_TRUE(current_value_ == value)) {
+ ++repeat_count_;
+ if (repeat_count_ > 8) {
+ // This is just a continuation of the current run, no need to buffer the
+ // values.
+ // Note that this is the fast path for long repeated runs.
+ continue;
+ }
+ } else {
+ if (repeat_count_ >= 8) {
+ // We had a run that was long enough but it has ended. Flush the
+ // current repeated run.
+ DCHECK_EQ(literal_count_, 0);
+ FlushRepeatedRun();
+ }
+ repeat_count_ = 1;
+ current_value_ = value;
+ }
+
+ buffered_values_[num_buffered_values_] = value;
+ if (++num_buffered_values_ == 8) {
+ DCHECK_EQ(literal_count_ % 8, 0);
+ FlushBufferedValues(false);
+ }
+ }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
+ if (literal_indicator_byte_idx_ < 0) {
+ // The literal indicator byte has not been reserved yet, get one now.
+ literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
+ DCHECK_GE(literal_indicator_byte_idx_, 0);
+ }
+
+ // Write all the buffered values as bit packed literals
+ for (int i = 0; i < num_buffered_values_; ++i) {
+ bit_writer_.PutValue(buffered_values_[i], bit_width_);
+ }
+ num_buffered_values_ = 0;
+
+ if (update_indicator_byte) {
+ // At this point we need to write the indicator byte for the literal run.
+ // We only reserve one byte, to allow for streaming writes of literal values.
+ // The logic makes sure we flush literal runs often enough to not overrun
+ // the 1 byte.
+ int num_groups = BitUtil::Ceil(literal_count_, 8);
+ int32_t indicator_value = (num_groups << 1) | 1;
+ DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
+ bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
+ literal_indicator_byte_idx_ = -1;
+ literal_count_ = 0;
+ }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushRepeatedRun() {
+ DCHECK_GT(repeat_count_, 0);
+ // The lsb of 0 indicates this is a repeated run
+ int32_t indicator_value = repeat_count_ << 1 | 0;
+ bit_writer_.PutVlqInt(indicator_value);
+ bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
+ num_buffered_values_ = 0;
+ repeat_count_ = 0;
+}
+
+// Flush the values that have been buffered. At this point we decide whether
+// we need to switch between the run types or continue the current one.
+template<typename T>
+inline void RleEncoder<T>::FlushBufferedValues(bool done) {
+ if (repeat_count_ >= 8) {
+ // Clear the buffered values. They are part of the repeated run now and we
+ // don't want to flush them out as literals.
+ num_buffered_values_ = 0;
+ if (literal_count_ != 0) {
+ // There was a current literal run. All the values in it have been flushed
+ // but we still need to update the indicator byte.
+ DCHECK_EQ(literal_count_ % 8, 0);
+ DCHECK_EQ(repeat_count_, 8);
+ FlushLiteralRun(true);
+ }
+ DCHECK_EQ(literal_count_, 0);
+ return;
+ }
+
+ literal_count_ += num_buffered_values_;
+ int num_groups = BitUtil::Ceil(literal_count_, 8);
+ if (num_groups + 1 >= (1 << 6)) {
+ // We need to start a new literal run because the indicator byte we've reserved
+ // cannot store more values.
+ DCHECK_GE(literal_indicator_byte_idx_, 0);
+ FlushLiteralRun(true);
+ } else {
+ FlushLiteralRun(done);
+ }
+ repeat_count_ = 0;
+}
+
+template<typename T>
+inline void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
+ for (int i = 0; i < num_bytes; ++i) {
+ bit_writer_.PutValue(val, 8);
+ }
+}
+
+template<typename T>
+inline int RleEncoder<T>::Flush() {
+ if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
+ bool all_repeat = literal_count_ == 0 &&
+ (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
+ // There is something pending, figure out if it's a repeated or literal run
+ if (repeat_count_ > 0 && all_repeat) {
+ FlushRepeatedRun();
+ } else {
+ literal_count_ += num_buffered_values_;
+ FlushLiteralRun(true);
+ repeat_count_ = 0;
+ }
+ }
+ bit_writer_.Flush();
+ DCHECK_EQ(num_buffered_values_, 0);
+ DCHECK_EQ(literal_count_, 0);
+ DCHECK_EQ(repeat_count_, 0);
+ return bit_writer_.bytes_written();
+}
+
+template<typename T>
+inline void RleEncoder<T>::Clear() {
+ current_value_ = 0;
+ repeat_count_ = 0;
+ num_buffered_values_ = 0;
+ literal_count_ = 0;
+ literal_indicator_byte_idx_ = -1;
+ bit_writer_.Clear();
+}
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-test.cc b/be/src/kudu/util/rle-test.cc
new file mode 100644
index 0000000..20d0a27
--- /dev/null
+++ b/be/src/kudu/util/rle-test.cc
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <ostream>
+#include <string>
+#include <vector>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <boost/utility/binary.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/bit-stream-utils.h"
+#include "kudu/util/bit-stream-utils.inline.h"
+#include "kudu/util/bit-util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/hexdump.h"
+#include "kudu/util/rle-encoding.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+const int kMaxWidth = 64;
+
+class TestRle : public KuduTest {};
+
+TEST(BitArray, TestBool) {
+ const int len_bytes = 2;
+ faststring buffer(len_bytes);
+
+ BitWriter writer(&buffer);
+
+ // Write alternating 0's and 1's
+ for (int i = 0; i < 8; ++i) {
+ writer.PutValue(i % 2, 1);
+ }
+ writer.Flush();
+ EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+
+ // Write 00110011
+ for (int i = 0; i < 8; ++i) {
+ switch (i) {
+ case 0:
+ case 1:
+ case 4:
+ case 5:
+ writer.PutValue(0, 1);
+ break;
+ default:
+ writer.PutValue(1, 1);
+ break;
+ }
+ }
+ writer.Flush();
+
+ // Validate the exact bit value
+ EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+ EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
+
+ // Use the reader and validate
+ BitReader reader(buffer.data(), buffer.size());
+ for (int i = 0; i < 8; ++i) {
+ bool val = false;
+ bool result = reader.GetValue(1, &val);
+ EXPECT_TRUE(result);
+ EXPECT_EQ(val, i % 2);
+ }
+
+ for (int i = 0; i < 8; ++i) {
+ bool val = false;
+ bool result = reader.GetValue(1, &val);
+ EXPECT_TRUE(result);
+ switch (i) {
+ case 0:
+ case 1:
+ case 4:
+ case 5:
+ EXPECT_EQ(val, false);
+ break;
+ default:
+ EXPECT_EQ(val, true);
+ break;
+ }
+ }
+}
+
+// Writes 'num_vals' values with width 'bit_width' and reads them back.
+void TestBitArrayValues(int bit_width, int num_vals) {
+ const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8);
+ const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width;
+
+ faststring buffer(kTestLen);
+ BitWriter writer(&buffer);
+ for (int i = 0; i < num_vals; ++i) {
+ writer.PutValue(i % mod, bit_width);
+ }
+ writer.Flush();
+ EXPECT_EQ(writer.bytes_written(), kTestLen);
+
+ BitReader reader(buffer.data(), kTestLen);
+ for (int i = 0; i < num_vals; ++i) {
+ int64_t val = 0;
+ bool result = reader.GetValue(bit_width, &val);
+ EXPECT_TRUE(result);
+ EXPECT_EQ(val, i % mod);
+ }
+ EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitArray, TestValues) {
+ for (int width = 1; width <= kMaxWidth; ++width) {
+ TestBitArrayValues(width, 1);
+ TestBitArrayValues(width, 2);
+ // Don't write too many values
+ TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
+ TestBitArrayValues(width, 1024);
+ }
+}
+
+// Test some mixed values
+TEST(BitArray, TestMixed) {
+ const int kTestLenBits = 1024;
+ faststring buffer(kTestLenBits / 8);
+ bool parity = true;
+
+ BitWriter writer(&buffer);
+ for (int i = 0; i < kTestLenBits; ++i) {
+ if (i % 2 == 0) {
+ writer.PutValue(parity, 1);
+ parity = !parity;
+ } else {
+ writer.PutValue(i, 10);
+ }
+ }
+ writer.Flush();
+
+ parity = true;
+ BitReader reader(buffer.data(), buffer.size());
+ for (int i = 0; i < kTestLenBits; ++i) {
+ bool result;
+ if (i % 2 == 0) {
+ bool val = false;
+ result = reader.GetValue(1, &val);
+ EXPECT_EQ(val, parity);
+ parity = !parity;
+ } else {
+ int val;
+ result = reader.GetValue(10, &val);
+ EXPECT_EQ(val, i);
+ }
+ EXPECT_TRUE(result);
+ }
+}
+
+// Validates encoding of values by encoding and decoding them. If
+// expected_encoding != NULL, also validates that the encoded buffer is
+// exactly 'expected_encoding'.
+// if expected_len is not -1, it will validate the encoded size is correct.
+template<typename T>
+void ValidateRle(const vector<T>& values, int bit_width,
+ uint8_t* expected_encoding, int expected_len) {
+ faststring buffer;
+ RleEncoder<T> encoder(&buffer, bit_width);
+
+ for (const auto& value : values) {
+ encoder.Put(value);
+ }
+ int encoded_len = encoder.Flush();
+
+ if (expected_len != -1) {
+ EXPECT_EQ(encoded_len, expected_len);
+ }
+ if (expected_encoding != nullptr) {
+ EXPECT_EQ(memcmp(buffer.data(), expected_encoding, expected_len), 0)
+ << "\n"
+ << "Expected: " << HexDump(Slice(expected_encoding, expected_len)) << "\n"
+ << "Got: " << HexDump(Slice(buffer));
+ }
+
+ // Verify read
+ RleDecoder<T> decoder(buffer.data(), encoded_len, bit_width);
+ for (const auto& value : values) {
+ T val = 0;
+ bool result = decoder.Get(&val);
+ EXPECT_TRUE(result);
+ EXPECT_EQ(value, val);
+ }
+}
+
+TEST(Rle, SpecificSequences) {
+ const int kTestLen = 1024;
+ uint8_t expected_buffer[kTestLen];
+ vector<uint64_t> values;
+
+ // Test 50 0' followed by 50 1's
+ values.resize(100);
+ for (int i = 0; i < 50; ++i) {
+ values[i] = 0;
+ }
+ for (int i = 50; i < 100; ++i) {
+ values[i] = 1;
+ }
+
+ // expected_buffer valid for bit width <= 1 byte
+ expected_buffer[0] = (50 << 1);
+ expected_buffer[1] = 0;
+ expected_buffer[2] = (50 << 1);
+ expected_buffer[3] = 1;
+ for (int width = 1; width <= 8; ++width) {
+ ValidateRle(values, width, expected_buffer, 4);
+ }
+
+ for (int width = 9; width <= kMaxWidth; ++width) {
+ ValidateRle(values, width, nullptr, 2 * (1 + BitUtil::Ceil(width, 8)));
+ }
+
+ // Test 100 0's and 1's alternating
+ for (int i = 0; i < 100; ++i) {
+ values[i] = i % 2;
+ }
+ int num_groups = BitUtil::Ceil(100, 8);
+ expected_buffer[0] = (num_groups << 1) | 1;
+ for (int i = 0; i < 100/8; ++i) {
+ expected_buffer[i + 1] = BOOST_BINARY(1 0 1 0 1 0 1 0); // 0xaa
+ }
+ // Values for the last 4 0 and 1's
+ expected_buffer[1 + 100/8] = BOOST_BINARY(0 0 0 0 1 0 1 0); // 0x0a
+
+ // num_groups and expected_buffer only valid for bit width = 1
+ ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+ for (int width = 2; width <= kMaxWidth; ++width) {
+ ValidateRle(values, width, nullptr, 1 + BitUtil::Ceil(width * 100, 8));
+ }
+}
+
+// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value
+// is used, otherwise alternating values are used.
+void TestRleValues(int bit_width, int num_vals, int value = -1) {
+ const uint64_t mod = bit_width == 64 ? 1ULL : 1ULL << bit_width;
+ vector<uint64_t> values;
+ for (uint64_t v = 0; v < num_vals; ++v) {
+ values.push_back((value != -1) ? value : (bit_width == 64 ? v : (v % mod)));
+ }
+ ValidateRle(values, bit_width, nullptr, -1);
+}
+
+TEST(Rle, TestValues) {
+ for (int width = 1; width <= kMaxWidth; ++width) {
+ TestRleValues(width, 1);
+ TestRleValues(width, 1024);
+ TestRleValues(width, 1024, 0);
+ TestRleValues(width, 1024, 1);
+ }
+}
+
+class BitRle : public KuduTest {
+};
+
+// Tests all true/false values
+TEST_F(BitRle, AllSame) {
+ const int kTestLen = 1024;
+ vector<bool> values;
+
+ for (int v = 0; v < 2; ++v) {
+ values.clear();
+ for (int i = 0; i < kTestLen; ++i) {
+ values.push_back(v ? true : false);
+ }
+
+ ValidateRle(values, 1, nullptr, 3);
+ }
+}
+
+// Test that writes out a repeated group and then a literal
+// group but flush before finishing.
+TEST_F(BitRle, Flush) {
+ vector<bool> values;
+ for (int i = 0; i < 16; ++i) values.push_back(1);
+ values.push_back(false);
+ ValidateRle(values, 1, nullptr, -1);
+ values.push_back(true);
+ ValidateRle(values, 1, nullptr, -1);
+ values.push_back(true);
+ ValidateRle(values, 1, nullptr, -1);
+ values.push_back(true);
+ ValidateRle(values, 1, nullptr, -1);
+}
+
+// Test some random bool sequences.
+TEST_F(BitRle, RandomBools) {
+ int iters = 0;
+ const int n_iters = AllowSlowTests() ? 1000 : 20;
+ while (iters < n_iters) {
+ srand(iters++);
+ if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+ vector<uint64_t > values;
+ bool parity = 0;
+ for (int i = 0; i < 1000; ++i) {
+ int group_size = rand() % 20 + 1; // NOLINT(*)
+ if (group_size > 16) {
+ group_size = 1;
+ }
+ for (int i = 0; i < group_size; ++i) {
+ values.push_back(parity);
+ }
+ parity = !parity;
+ }
+ ValidateRle(values, (iters % kMaxWidth) + 1, nullptr, -1);
+ }
+}
+
+// Test some random 64-bit sequences.
+TEST_F(BitRle, Random64Bit) {
+ int iters = 0;
+ const int n_iters = AllowSlowTests() ? 1000 : 20;
+ while (iters < n_iters) {
+ srand(iters++);
+ if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+ vector<uint64_t > values;
+ for (int i = 0; i < 1000; ++i) {
+ int group_size = rand() % 20 + 1; // NOLINT(*)
+ uint64_t cur_value = (static_cast<uint64_t>(rand()) << 32) + static_cast<uint64_t>(rand());
+ if (group_size > 16) {
+ group_size = 1;
+ }
+ for (int i = 0; i < group_size; ++i) {
+ values.push_back(cur_value);
+ }
+
+ }
+ ValidateRle(values, 64, nullptr, -1);
+ }
+}
+
+// Test a sequence of 1 0's, 2 1's, 3 0's. etc
+// e.g. 011000111100000
+TEST_F(BitRle, RepeatedPattern) {
+ vector<bool> values;
+ const int min_run = 1;
+ const int max_run = 32;
+
+ for (int i = min_run; i <= max_run; ++i) {
+ int v = i % 2;
+ for (int j = 0; j < i; ++j) {
+ values.push_back(v);
+ }
+ }
+
+ // And go back down again
+ for (int i = max_run; i >= min_run; --i) {
+ int v = i % 2;
+ for (int j = 0; j < i; ++j) {
+ values.push_back(v);
+ }
+ }
+
+ ValidateRle(values, 1, nullptr, -1);
+}
+
+TEST_F(TestRle, TestBulkPut) {
+ size_t run_length;
+ bool val = false;
+
+ faststring buffer(1);
+ RleEncoder<bool> encoder(&buffer, 1);
+ encoder.Put(true, 10);
+ encoder.Put(false, 7);
+ encoder.Put(true, 5);
+ encoder.Put(true, 15);
+ encoder.Flush();
+
+ RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_TRUE(val);
+ ASSERT_EQ(10, run_length);
+
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_FALSE(val);
+ ASSERT_EQ(7, run_length);
+
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_TRUE(val);
+ ASSERT_EQ(20, run_length);
+
+ ASSERT_EQ(0, decoder.GetNextRun(&val, MathLimits<size_t>::kMax));
+}
+
+TEST_F(TestRle, TestGetNextRun) {
+ // Repeat the test with different number of items
+ for (int num_items = 7; num_items < 200; num_items += 13) {
+ // Test different block patterns
+ // 1: 01010101 01010101
+ // 2: 00110011 00110011
+ // 3: 00011100 01110001
+ // ...
+ for (int block = 1; block <= 20; ++block) {
+ faststring buffer(1);
+ RleEncoder<bool> encoder(&buffer, 1);
+ for (int j = 0; j < num_items; ++j) {
+ encoder.Put(!!(j & 1), block);
+ }
+ encoder.Flush();
+
+ RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+ size_t count = num_items * block;
+ for (int j = 0; j < num_items; ++j) {
+ size_t run_length;
+ bool val = false;
+ DCHECK_GT(count, 0);
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ run_length = std::min(run_length, count);
+
+ ASSERT_EQ(!!(j & 1), val);
+ ASSERT_EQ(block, run_length);
+ count -= run_length;
+ }
+ DCHECK_EQ(count, 0);
+ }
+ }
+}
+
+// Generate a random bit string which consists of 'num_runs' runs,
+// each with a random length between 1 and 100. Returns the number
+// of values encoded (i.e the sum run length).
+static size_t GenerateRandomBitString(int num_runs, faststring* enc_buf, string* string_rep) {
+ RleEncoder<bool> enc(enc_buf, 1);
+ int num_bits = 0;
+ for (int i = 0; i < num_runs; i++) {
+ int run_length = random() % 100;
+ bool value = static_cast<bool>(i & 1);
+ enc.Put(value, run_length);
+ string_rep->append(run_length, value ? '1' : '0');
+ num_bits += run_length;
+ }
+ enc.Flush();
+ return num_bits;
+}
+
+TEST_F(TestRle, TestRoundTripRandomSequencesWithRuns) {
+ SeedRandom();
+
+ // Test the limiting function of GetNextRun.
+ const int kMaxToReadAtOnce = (random() % 20) + 1;
+
+ // Generate a bunch of random bit sequences, and "round-trip" them
+ // through the encode/decode sequence.
+ for (int rep = 0; rep < 100; rep++) {
+ faststring buf;
+ string string_rep;
+ int num_bits = GenerateRandomBitString(10, &buf, &string_rep);
+ RleDecoder<bool> decoder(buf.data(), buf.size(), 1);
+ string roundtrip_str;
+ int rem_to_read = num_bits;
+ size_t run_len;
+ bool val;
+ while (rem_to_read > 0 &&
+ (run_len = decoder.GetNextRun(&val, std::min(kMaxToReadAtOnce, rem_to_read))) != 0) {
+ ASSERT_LE(run_len, kMaxToReadAtOnce);
+ roundtrip_str.append(run_len, val ? '1' : '0');
+ rem_to_read -= run_len;
+ }
+
+ ASSERT_EQ(string_rep, roundtrip_str);
+ }
+}
+TEST_F(TestRle, TestSkip) {
+ faststring buffer(1);
+ RleEncoder<bool> encoder(&buffer, 1);
+
+ // 0101010[1] 01010101 01
+ // "A"
+ for (int j = 0; j < 18; ++j) {
+ encoder.Put(!!(j & 1));
+ }
+
+ // 0011[00] 11001100 11001100 11001100 11001100
+ // "B"
+ for (int j = 0; j < 19; ++j) {
+ encoder.Put(!!(j & 1), 2);
+ }
+
+ // 000000000000 11[1111111111] 000000000000 111111111111
+ // "C"
+ // 000000000000 111111111111 0[00000000000] 111111111111
+ // "D"
+ // 000000000000 111111111111 000000000000 111111111111
+ for (int j = 0; j < 12; ++j) {
+ encoder.Put(!!(j & 1), 12);
+ }
+ encoder.Flush();
+
+ bool val = false;
+ size_t run_length;
+ RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+
+ // position before "A"
+ ASSERT_EQ(3, decoder.Skip(7));
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_TRUE(val);
+ ASSERT_EQ(1, run_length);
+
+ // position before "B"
+ ASSERT_EQ(7, decoder.Skip(14));
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_FALSE(val);
+ ASSERT_EQ(2, run_length);
+
+ // position before "C"
+ ASSERT_EQ(18, decoder.Skip(46));
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_TRUE(val);
+ ASSERT_EQ(10, run_length);
+
+ // position before "D"
+ ASSERT_EQ(24, decoder.Skip(49));
+ run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+ ASSERT_FALSE(val);
+ ASSERT_EQ(11, run_length);
+
+ encoder.Flush();
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log-test.cc b/be/src/kudu/util/rolling_log-test.cc
new file mode 100644
index 0000000..f4f8186
--- /dev/null
+++ b/be/src/kudu/util/rolling_log-test.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <unistd.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class RollingLogTest : public KuduTest {
+ public:
+ RollingLogTest()
+ : log_dir_(GetTestPath("log_dir")) {
+ }
+
+ virtual void SetUp() OVERRIDE {
+ ASSERT_OK(env_->CreateDir(log_dir_));
+ }
+
+ protected:
+ void AssertLogCount(int expected_count, vector<string>* children) {
+ vector<string> dir_entries;
+ ASSERT_OK(env_->GetChildren(log_dir_, &dir_entries));
+ children->clear();
+
+ for (const string& child : dir_entries) {
+ if (child == "." || child == "..") continue;
+ children->push_back(child);
+ ASSERT_TRUE(HasPrefixString(child, "rolling_log-test."));
+ ASSERT_STR_CONTAINS(child, ".mylog.");
+
+ string pid_suffix = Substitute("$0", getpid());
+ ASSERT_TRUE(HasSuffixString(child, pid_suffix) ||
+ HasSuffixString(child, pid_suffix + ".gz")) << "bad child: " << child;
+ }
+ std::sort(children->begin(), children->end());
+ ASSERT_EQ(children->size(), expected_count) << *children;
+ }
+
+ const string log_dir_;
+};
+
+// Test with compression off.
+TEST_F(RollingLogTest, TestLog) {
+ RollingLog log(env_, log_dir_, "mylog");
+ log.SetCompressionEnabled(false);
+ log.SetRollThresholdBytes(100);
+
+ // Before writing anything, we shouldn't open a log file.
+ vector<string> children;
+ NO_FATALS(AssertLogCount(0, &children));
+
+ // Appending some data should write a new segment.
+ const string kTestString = "Hello world\n";
+ ASSERT_OK(log.Append(kTestString));
+ NO_FATALS(AssertLogCount(1, &children));
+
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(log.Append(kTestString));
+ }
+ NO_FATALS(AssertLogCount(2, &children));
+
+ faststring data;
+ string path = JoinPathSegments(log_dir_, children[0]);
+ ASSERT_OK(ReadFileToString(env_, path, &data));
+ ASSERT_TRUE(HasPrefixString(data.ToString(), kTestString))
+ << "Data missing";
+ ASSERT_LE(data.size(), 100 + kTestString.length())
+ << "Roll threshold not respected";
+}
+
+// Test with compression on.
+TEST_F(RollingLogTest, TestCompression) {
+ RollingLog log(env_, log_dir_, "mylog");
+ ASSERT_OK(log.Open());
+
+ StringPiece data = "Hello world\n";
+ int raw_size = 0;
+ for (int i = 0; i < 1000; i++) {
+ ASSERT_OK(log.Append(data));
+ raw_size += data.size();
+ }
+ ASSERT_OK(log.Close());
+
+ vector<string> children;
+ NO_FATALS(AssertLogCount(1, &children));
+ ASSERT_TRUE(HasSuffixString(children[0], ".gz"));
+
+ // Ensure that the output is actually gzipped.
+ uint64_t size;
+ ASSERT_OK(env_->GetFileSize(JoinPathSegments(log_dir_, children[0]), &size));
+ ASSERT_LT(size, raw_size / 10);
+ ASSERT_GT(size, 0);
+}
+
+TEST_F(RollingLogTest, TestFileCountLimit) {
+ RollingLog log(env_, log_dir_, "mylog");
+ ASSERT_OK(log.Open());
+ log.SetRollThresholdBytes(100);
+ log.SetMaxNumSegments(3);
+
+ for (int i = 0; i < 100; i++) {
+ ASSERT_OK(log.Append("hello world\n"));
+ }
+ ASSERT_OK(log.Close());
+
+ vector<string> children;
+ NO_FATALS(AssertLogCount(3, &children));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.cc b/be/src/kudu/util/rolling_log.cc
new file mode 100644
index 0000000..50f9fbd
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <unistd.h>
+
+#include <ctime>
+#include <iomanip>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <zlib.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/user.h"
+
+using std::ostringstream;
+using std::setw;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+static const int kDefaultRollThresholdBytes = 64 * 1024 * 1024; // 64MB
+
+DECLARE_int32(max_log_files);
+
+namespace kudu {
+
+RollingLog::RollingLog(Env* env, string log_dir, string log_name)
+ : env_(env),
+ log_dir_(std::move(log_dir)),
+ log_name_(std::move(log_name)),
+ roll_threshold_bytes_(kDefaultRollThresholdBytes),
+ max_num_segments_(FLAGS_max_log_files),
+ compress_after_close_(true) {}
+
+RollingLog::~RollingLog() {
+ WARN_NOT_OK(Close(), "Unable to close RollingLog");
+}
+
+void RollingLog::SetRollThresholdBytes(int64_t size) {
+ CHECK_GT(size, 0);
+ roll_threshold_bytes_ = size;
+}
+
+void RollingLog::SetMaxNumSegments(int num_segments) {
+ CHECK_GT(num_segments, 0);
+ max_num_segments_ = num_segments;
+}
+
+void RollingLog::SetCompressionEnabled(bool compress) {
+ compress_after_close_ = compress;
+}
+
+namespace {
+
+string HostnameOrUnknown() {
+ string hostname;
+ Status s = GetHostname(&hostname);
+ if (!s.ok()) {
+ return "unknown_host";
+ }
+ return hostname;
+}
+
+string UsernameOrUnknown() {
+ string user_name;
+ Status s = GetLoggedInUser(&user_name);
+ if (!s.ok()) {
+ return "unknown_user";
+ }
+ return user_name;
+}
+
+string FormattedTimestamp() {
+ // Implementation cribbed from glog/logging.cc
+ time_t time = static_cast<time_t>(WallTime_Now());
+ struct ::tm tm_time;
+ localtime_r(&time, &tm_time);
+
+ ostringstream str;
+ str.fill('0');
+ str << 1900+tm_time.tm_year
+ << setw(2) << 1+tm_time.tm_mon
+ << setw(2) << tm_time.tm_mday
+ << '-'
+ << setw(2) << tm_time.tm_hour
+ << setw(2) << tm_time.tm_min
+ << setw(2) << tm_time.tm_sec;
+ return str.str();
+}
+
+} // anonymous namespace
+
+string RollingLog::GetLogFileName(int sequence) const {
+ return Substitute("$0.$1.$2.$3.$4.$5.$6",
+ google::ProgramInvocationShortName(),
+ HostnameOrUnknown(),
+ UsernameOrUnknown(),
+ log_name_,
+ FormattedTimestamp(),
+ sequence,
+ getpid());
+}
+
+string RollingLog::GetLogFilePattern() const {
+ return Substitute("$0.$1.$2.$3.$4.$5.$6",
+ google::ProgramInvocationShortName(),
+ HostnameOrUnknown(),
+ UsernameOrUnknown(),
+ log_name_,
+ /* any timestamp */'*',
+ /* any sequence number */'*',
+ /* any pid */'*');
+}
+
+Status RollingLog::Open() {
+ CHECK(!file_);
+
+ for (int sequence = 0; ; sequence++) {
+
+ string path = JoinPathSegments(log_dir_, GetLogFileName(sequence));
+ // Don't reuse an existing path if there is already a log
+ // or a compressed log with the same name.
+ if (env_->FileExists(path) ||
+ env_->FileExists(path + ".gz")) {
+ continue;
+ }
+
+ WritableFileOptions opts;
+ // Logs aren't worth the performance cost of durability.
+ opts.sync_on_close = false;
+ opts.mode = Env::CREATE_NON_EXISTING;
+
+ RETURN_NOT_OK(env_->NewWritableFile(opts, path, &file_));
+
+ VLOG(1) << "Rolled " << log_name_ << " log to new file: " << path;
+ break;
+ }
+ return Status::OK();
+}
+
+Status RollingLog::Close() {
+ if (!file_) {
+ return Status::OK();
+ }
+ string path = file_->filename();
+ RETURN_NOT_OK_PREPEND(file_->Close(),
+ Substitute("Unable to close $0", path));
+ file_.reset();
+ if (compress_after_close_) {
+ WARN_NOT_OK(CompressFile(path), "Unable to compress old log file");
+ }
+ auto glob = JoinPathSegments(log_dir_, GetLogFilePattern());
+ WARN_NOT_OK(env_util::DeleteExcessFilesByPattern(env_, glob, max_num_segments_),
+ Substitute("failed to delete old $0 log files", log_name_));
+ return Status::OK();
+}
+
+Status RollingLog::Append(StringPiece s) {
+ if (!file_) {
+ RETURN_NOT_OK_PREPEND(Open(), "Unable to open log");
+ }
+
+ RETURN_NOT_OK(file_->Append(s));
+ if (file_->Size() > roll_threshold_bytes_) {
+ RETURN_NOT_OK_PREPEND(Close(), "Unable to roll log");
+ roll_count_++;
+ RETURN_NOT_OK_PREPEND(Open(), "Unable to roll log");
+ }
+ return Status::OK();
+}
+
+namespace {
+
+Status GzClose(gzFile f) {
+ int err = gzclose(f);
+ switch (err) {
+ case Z_OK:
+ return Status::OK();
+ case Z_STREAM_ERROR:
+ return Status::InvalidArgument("Stream not valid");
+ case Z_ERRNO:
+ return Status::IOError("IO Error closing stream");
+ case Z_MEM_ERROR:
+ return Status::RuntimeError("Out of memory");
+ case Z_BUF_ERROR:
+ return Status::IOError("read ended in the middle of a stream");
+ default:
+ return Status::IOError("Unknown zlib error", SimpleItoa(err));
+ }
+}
+
+class ScopedGzipCloser {
+ public:
+ explicit ScopedGzipCloser(gzFile f)
+ : file_(f) {
+ }
+
+ ~ScopedGzipCloser() {
+ if (file_) {
+ WARN_NOT_OK(GzClose(file_), "Unable to close gzip stream");
+ }
+ }
+
+ void Cancel() {
+ file_ = nullptr;
+ }
+
+ private:
+ gzFile file_;
+};
+} // anonymous namespace
+
+// We implement CompressFile() manually using zlib APIs rather than forking
+// out to '/bin/gzip' since fork() can be expensive on processes that use a large
+// amount of memory. During the time of the fork, other threads could end up
+// blocked. Implementing it using the zlib stream APIs isn't too much code
+// and is less likely to be problematic.
+Status RollingLog::CompressFile(const std::string& path) const {
+ unique_ptr<SequentialFile> in_file;
+ RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(path, &in_file),
+ "Unable to open input file to compress");
+
+ string gz_path = path + ".gz";
+ gzFile gzf = gzopen(gz_path.c_str(), "w");
+ if (!gzf) {
+ return Status::IOError("Unable to open gzip stream");
+ }
+
+ ScopedGzipCloser closer(gzf);
+
+ // Loop reading data from the input file and writing to the gzip stream.
+ uint8_t buf[32 * 1024];
+ while (true) {
+ Slice result(buf, arraysize(buf));
+ RETURN_NOT_OK_PREPEND(in_file->Read(&result),
+ "Unable to read from gzip input");
+ if (result.size() == 0) {
+ break;
+ }
+ int n = gzwrite(gzf, result.data(), result.size());
+ if (n == 0) {
+ int errnum;
+ return Status::IOError("Unable to write to gzip output",
+ gzerror(gzf, &errnum));
+ }
+ }
+ closer.Cancel();
+ RETURN_NOT_OK_PREPEND(GzClose(gzf),
+ "Unable to close gzip output");
+
+ WARN_NOT_OK(env_->DeleteFile(path),
+ "Unable to delete gzip input file after compression");
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.h b/be/src/kudu/util/rolling_log.h
new file mode 100644
index 0000000..0bb6755
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ROLLING_LOG_H
+#define KUDU_UTIL_ROLLING_LOG_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+class WritableFile;
+
+// A simple rolling log.
+//
+// This creates a log which spans multiple files in a specified directory.
+// After a log file reaches a specified size threshold, it automatically rolls
+// to the next file in the sequence.
+//
+// The files are named similarly to glog log files and use the following pattern:
+//
+// <log_dir>/<program-name>.<hostname>.<user-name>.<log-name>.<timestamp>.<sequence>.<pid>
+// log_dir: the log_dir specified in the constructor
+// program-name: argv[0], as determined by google::ProgramInvocationShortName()
+// hostname: the local machine hostname
+// user-name: the current user name
+// log-name: the log_name specified in the constructor
+// timestamp: the wall clock time when the log file was created, in
+// YYYYmmdd-HHMMSS fixed-length format.
+// sequence: a sequence number which is used to disambiguate when the log file is
+// rolled multiple times within a second
+// pid: the pid of the daemon
+//
+// The log implementation does not ensure durability of the log or its files in any way.
+// This class is not thread-safe and must be externally synchronized.
+class RollingLog {
+ public:
+ RollingLog(Env* env, std::string log_dir, std::string log_name);
+
+ ~RollingLog();
+
+ // Open the log.
+ // It is optional to call this function. Append() will automatically open
+ // the log as necessary if it is not open.
+ Status Open();
+
+ // Set the pre-compression size threshold at which the log file will be rolled.
+ // If the log is already open, this applies for the the current and any future
+ // log file.
+ //
+ // NOTE: This is the limit on a single segment of the log, not a limit on the total
+ // size of the log.
+ //
+ // NOTE: The threshold is checked _after_ each call to Append(). So, the size of
+ // the log may overshoot this threshold by as much as the size of a single appended
+ // message.
+ void SetRollThresholdBytes(int64_t size);
+
+ // Set the total number of log segments to be retained. When the log is rolled,
+ // old segments are removed to achieve the targeted number of segments.
+ void SetMaxNumSegments(int num_segments);
+
+ // If compression is enabled, log files are compressed.
+ // NOTE: this requires that the passed-in Env instance is the local file system.
+ void SetCompressionEnabled(bool compress);
+
+ // Append the given data to the current log file.
+ //
+ // If, after appending this data, the file size has crossed the configured roll
+ // threshold, a new empty log file is created. Note that this is a synchronous API and
+ // causes potentially-blocking IO on the current thread. However, this does not fsync()
+ // or otherwise ensure durability of the appended data.
+ Status Append(StringPiece data) WARN_UNUSED_RESULT;
+
+ // Close the log.
+ Status Close();
+
+ // Return the number of times this log has rolled since it was first opened.
+ int roll_count() const {
+ return roll_count_;
+ }
+
+ private:
+ std::string GetLogFileName(int sequence) const;
+
+ // Get a glob pattern matching all log files written by this instance.
+ std::string GetLogFilePattern() const;
+
+ // Compress the given path, writing a new file '<path>.gz'.
+ Status CompressFile(const std::string& path) const;
+
+ Env* const env_;
+ const std::string log_dir_;
+ const std::string log_name_;
+
+ int64_t roll_threshold_bytes_;
+ int max_num_segments_;
+
+ std::unique_ptr<WritableFile> file_;
+ bool compress_after_close_;
+
+ int roll_count_ = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(RollingLog);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_ROLLING_LOG_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex-test.cc b/be/src/kudu/util/rw_mutex-test.cc
new file mode 100644
index 0000000..c2cb394
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex-test.cc
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <ostream>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/test_util.h"
+
+using std::lock_guard;
+using std::thread;
+using std::try_to_lock;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+class RWMutexTest : public KuduTest,
+ public ::testing::WithParamInterface<RWMutex::Priority> {
+ public:
+ RWMutexTest()
+ : lock_(GetParam()) {
+ }
+ protected:
+ RWMutex lock_;
+};
+
+// Instantiate every test for each kind of RWMutex priority.
+INSTANTIATE_TEST_CASE_P(Priorities, RWMutexTest,
+ ::testing::Values(RWMutex::Priority::PREFER_READING,
+ RWMutex::Priority::PREFER_WRITING));
+
+// Multi-threaded test that tries to find deadlocks in the RWMutex wrapper.
+TEST_P(RWMutexTest, TestDeadlocks) {
+ uint64_t number_of_writes = 0;
+ AtomicInt<uint64_t> number_of_reads(0);
+
+ AtomicBool done(false);
+ vector<thread> threads;
+
+ // Start several blocking and non-blocking read-write workloads.
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&](){
+ while (!done.Load()) {
+ lock_guard<RWMutex> l(lock_);
+ number_of_writes++;
+ }
+ });
+ threads.emplace_back([&](){
+ while (!done.Load()) {
+ unique_lock<RWMutex> l(lock_, try_to_lock);
+ if (l.owns_lock()) {
+ number_of_writes++;
+ }
+ }
+ });
+ }
+
+ // Start several blocking and non-blocking read-only workloads.
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&](){
+ while (!done.Load()) {
+ shared_lock<RWMutex> l(lock_);
+ number_of_reads.Increment();
+ }
+ });
+ threads.emplace_back([&](){
+ while (!done.Load()) {
+ shared_lock<RWMutex> l(lock_, try_to_lock);
+ if (l.owns_lock()) {
+ number_of_reads.Increment();
+ }
+ }
+ });
+ }
+
+ SleepFor(MonoDelta::FromSeconds(1));
+ done.Store(true);
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ shared_lock<RWMutex> l(lock_);
+ LOG(INFO) << "Number of writes: " << number_of_writes;
+ LOG(INFO) << "Number of reads: " << number_of_reads.Load();
+}
+
+#ifndef NDEBUG
+// Tests that the RWMutex wrapper catches basic usage errors. This checking is
+// only enabled in debug builds.
+TEST_P(RWMutexTest, TestLockChecking) {
+ EXPECT_DEATH({
+ lock_.ReadLock();
+ lock_.ReadLock();
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryReadLock());
+ CHECK(lock_.TryReadLock());
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ lock_.ReadLock();
+ lock_.WriteLock();
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryReadLock());
+ CHECK(lock_.TryWriteLock());
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ lock_.WriteLock();
+ lock_.ReadLock();
+ }, "already holding lock for writing");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryWriteLock());
+ CHECK(lock_.TryReadLock());
+ }, "already holding lock for writing");
+
+ EXPECT_DEATH({
+ lock_.WriteLock();
+ lock_.WriteLock();
+ }, "already holding lock for writing");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryWriteLock());
+ CHECK(lock_.TryWriteLock());
+ }, "already holding lock for writing");
+
+ EXPECT_DEATH({
+ lock_.ReadUnlock();
+ }, "wasn't holding lock for reading");
+
+ EXPECT_DEATH({
+ lock_.WriteUnlock();
+ }, "wasn't holding lock for writing");
+
+ EXPECT_DEATH({
+ lock_.ReadLock();
+ lock_.WriteUnlock();
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryReadLock());
+ lock_.WriteUnlock();
+ }, "already holding lock for reading");
+
+ EXPECT_DEATH({
+ lock_.WriteLock();
+ lock_.ReadUnlock();
+ }, "already holding lock for writing");
+
+ EXPECT_DEATH({
+ CHECK(lock_.TryWriteLock());
+ lock_.ReadUnlock();
+ }, "already holding lock for writing");
+}
+#endif
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.cc b/be/src/kudu/util/rw_mutex.cc
new file mode 100644
index 0000000..75fc1e7
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rw_mutex.h"
+
+#include <cerrno>
+#include <cstring>
+#include <mutex>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/env.h"
+
+using std::lock_guard;
+
+namespace {
+
+void unlock_rwlock(pthread_rwlock_t* rwlock) {
+ int rv = pthread_rwlock_unlock(rwlock);
+ DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+} // anonymous namespace
+
+namespace kudu {
+
+RWMutex::RWMutex()
+#ifndef NDEBUG
+ : writer_tid_(0)
+#endif
+{
+ Init(Priority::PREFER_READING);
+}
+
+RWMutex::RWMutex(Priority prio)
+#ifndef NDEBUG
+ : writer_tid_(0)
+#endif
+{
+ Init(prio);
+}
+
+void RWMutex::Init(Priority prio) {
+#ifdef __linux__
+ // Adapt from priority to the pthread type.
+ int kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+ switch (prio) {
+ case Priority::PREFER_READING:
+ kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+ break;
+ case Priority::PREFER_WRITING:
+ kind = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
+ break;
+ }
+
+ // Initialize the new rwlock with the user's preference.
+ pthread_rwlockattr_t attr;
+ int rv = pthread_rwlockattr_init(&attr);
+ DCHECK_EQ(0, rv) << strerror(rv);
+ rv = pthread_rwlockattr_setkind_np(&attr, kind);
+ DCHECK_EQ(0, rv) << strerror(rv);
+ rv = pthread_rwlock_init(&native_handle_, &attr);
+ DCHECK_EQ(0, rv) << strerror(rv);
+ rv = pthread_rwlockattr_destroy(&attr);
+ DCHECK_EQ(0, rv) << strerror(rv);
+#else
+ int rv = pthread_rwlock_init(&native_handle_, NULL);
+ DCHECK_EQ(0, rv) << strerror(rv);
+#endif
+}
+
+RWMutex::~RWMutex() {
+ int rv = pthread_rwlock_destroy(&native_handle_);
+ DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+void RWMutex::ReadLock() {
+ CheckLockState(LockState::NEITHER);
+ int rv = pthread_rwlock_rdlock(&native_handle_);
+ DCHECK_EQ(0, rv) << strerror(rv);
+ MarkForReading();
+}
+
+void RWMutex::ReadUnlock() {
+ CheckLockState(LockState::READER);
+ UnmarkForReading();
+ unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryReadLock() {
+ CheckLockState(LockState::NEITHER);
+ int rv = pthread_rwlock_tryrdlock(&native_handle_);
+ if (rv == EBUSY) {
+ return false;
+ }
+ DCHECK_EQ(0, rv) << strerror(rv);
+ MarkForReading();
+ return true;
+}
+
+void RWMutex::WriteLock() {
+ CheckLockState(LockState::NEITHER);
+ int rv = pthread_rwlock_wrlock(&native_handle_);
+ DCHECK_EQ(0, rv) << strerror(rv);
+ MarkForWriting();
+}
+
+void RWMutex::WriteUnlock() {
+ CheckLockState(LockState::WRITER);
+ UnmarkForWriting();
+ unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryWriteLock() {
+ CheckLockState(LockState::NEITHER);
+ int rv = pthread_rwlock_trywrlock(&native_handle_);
+ if (rv == EBUSY) {
+ return false;
+ }
+ DCHECK_EQ(0, rv) << strerror(rv);
+ MarkForWriting();
+ return true;
+}
+
+#ifndef NDEBUG
+
+void RWMutex::AssertAcquired() const {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()) ||
+ Env::Default()->gettid() == writer_tid_);
+}
+
+void RWMutex::AssertAcquiredForReading() const {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()));
+}
+
+void RWMutex::AssertAcquiredForWriting() const {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ CHECK_EQ(Env::Default()->gettid(), writer_tid_);
+}
+
+void RWMutex::CheckLockState(LockState state) const {
+ pid_t my_tid = Env::Default()->gettid();
+ bool is_reader;
+ bool is_writer;
+ {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ is_reader = ContainsKey(reader_tids_, my_tid);
+ is_writer = writer_tid_ == my_tid;
+ }
+
+ switch (state) {
+ case LockState::NEITHER:
+ CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+ CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+ break;
+ case LockState::READER:
+ CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+ CHECK(is_reader) << "Invalid state, wasn't holding lock for reading";
+ break;
+ case LockState::WRITER:
+ CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+ CHECK(is_writer) << "Invalid state, wasn't holding lock for writing";
+ break;
+ }
+}
+
+void RWMutex::MarkForReading() {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ reader_tids_.insert(Env::Default()->gettid());
+}
+
+void RWMutex::MarkForWriting() {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ writer_tid_ = Env::Default()->gettid();
+}
+
+void RWMutex::UnmarkForReading() {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ reader_tids_.erase(Env::Default()->gettid());
+}
+
+void RWMutex::UnmarkForWriting() {
+ lock_guard<simple_spinlock> l(tid_lock_);
+ writer_tid_ = 0;
+}
+
+#endif
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.h b/be/src/kudu/util/rw_mutex.h
new file mode 100644
index 0000000..bb6168c
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <unordered_set>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Read/write mutex. Implemented as a thin wrapper around pthread_rwlock_t.
+//
+// Although pthread_rwlock_t allows recursive acquisition, this wrapper does
+// not, and will crash in debug mode if recursive acquisition is detected.
+class RWMutex {
+ public:
+
+ // Possible fairness policies for the RWMutex.
+ enum class Priority {
+ // The lock will prioritize readers at the expense of writers.
+ PREFER_READING,
+
+ // The lock will prioritize writers at the expense of readers.
+ //
+ // Care should be taken when using this fairness policy, as it can lead to
+ // unexpected deadlocks (e.g. a writer waiting on the lock will prevent
+ // additional readers from acquiring it).
+ PREFER_WRITING,
+ };
+
+ // Create an RWMutex that prioritizes readers.
+ RWMutex();
+
+ // Create an RWMutex with customized priority. This is a best effort; the
+ // underlying platform may not support custom priorities.
+ explicit RWMutex(Priority prio);
+
+ ~RWMutex();
+
+ void ReadLock();
+ void ReadUnlock();
+ bool TryReadLock();
+
+ void WriteLock();
+ void WriteUnlock();
+ bool TryWriteLock();
+
+#ifndef NDEBUG
+ void AssertAcquired() const;
+ void AssertAcquiredForReading() const;
+ void AssertAcquiredForWriting() const;
+#else
+ void AssertAcquired() const {}
+ void AssertAcquiredForReading() const {}
+ void AssertAcquiredForWriting() const {}
+#endif
+
+ // Aliases for use with std::lock_guard and kudu::shared_lock.
+ void lock() { WriteLock(); }
+ void unlock() { WriteUnlock(); }
+ bool try_lock() { return TryWriteLock(); }
+ void lock_shared() { ReadLock(); }
+ void unlock_shared() { ReadUnlock(); }
+ bool try_lock_shared() { return TryReadLock(); }
+
+ private:
+ void Init(Priority prio);
+
+ enum class LockState {
+ NEITHER,
+ READER,
+ WRITER,
+ };
+#ifndef NDEBUG
+ void CheckLockState(LockState state) const;
+ void MarkForReading();
+ void MarkForWriting();
+ void UnmarkForReading();
+ void UnmarkForWriting();
+#else
+ void CheckLockState(LockState state) const {}
+ void MarkForReading() {}
+ void MarkForWriting() {}
+ void UnmarkForReading() {}
+ void UnmarkForWriting() {}
+#endif
+
+ pthread_rwlock_t native_handle_;
+
+#ifndef NDEBUG
+ // Protects reader_tids_ and writer_tid_.
+ mutable simple_spinlock tid_lock_;
+
+ // Tracks all current readers by tid.
+ std::unordered_set<pid_t> reader_tids_;
+
+ // Tracks the current writer (if one exists) by tid.
+ pid_t writer_tid_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(RWMutex);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_semaphore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore-test.cc b/be/src/kudu/util/rw_semaphore-test.cc
new file mode 100644
index 0000000..7717608
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore-test.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_semaphore.h"
+
+using std::thread;
+using std::vector;
+
+namespace kudu {
+struct SharedState {
+ SharedState() : done(false), int_var(0) {}
+
+ bool done;
+ int64_t int_var;
+ rw_semaphore sem;
+};
+
+// Thread which increases the value in the shared state under the write lock.
+void Writer(SharedState* state) {
+ int i = 0;
+ while (true) {
+ std::lock_guard<rw_semaphore> l(state->sem);
+ state->int_var += (i++);
+ if (state->done) {
+ break;
+ }
+ }
+}
+
+// Thread which verifies that the value in the shared state only increases.
+void Reader(SharedState* state) {
+ int prev_val = 0;
+ while (true) {
+ shared_lock<rw_semaphore> l(state->sem);
+ // The int var should only be seen to increase.
+ CHECK_GE(state->int_var, prev_val);
+ prev_val = state->int_var;
+ if (state->done) {
+ break;
+ }
+ }
+}
+
+// Test which verifies basic functionality of the semaphore.
+// When run under TSAN this also verifies the barriers.
+TEST(RWSemaphoreTest, TestBasicOperation) {
+ SharedState s;
+ vector<thread*> threads;
+ // Start 5 readers and writers.
+ for (int i = 0; i < 5; i++) {
+ threads.push_back(new thread(Reader, &s));
+ threads.push_back(new thread(Writer, &s));
+ }
+
+ // Let them contend for a short amount of time.
+ SleepFor(MonoDelta::FromMilliseconds(50));
+
+ // Signal them to stop.
+ {
+ std::lock_guard<rw_semaphore> l(s.sem);
+ s.done = true;
+ }
+
+ for (thread* t : threads) {
+ t->join();
+ delete t;
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore.h b/be/src/kudu/util/rw_semaphore.h
new file mode 100644
index 0000000..20c0b57
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore.h
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RW_SEMAPHORE_H
+#define KUDU_UTIL_RW_SEMAPHORE_H
+
+// Uncomment for extra debugging information. See below for details.
+// #define RW_SEMAPHORE_TRACK_HOLDER 1
+
+#include <boost/smart_ptr/detail/yield_k.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#ifdef RW_SEMAPHORE_TRACK_HOLDER
+#include "kudu/util/debug-util.h"
+#endif
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+// Read-Write semaphore. 32bit uint that contains the number of readers.
+// When someone wants to write, tries to set the 32bit, and waits until
+// the readers have finished. Readers are spinning while the write flag is set.
+//
+// This rw-semaphore makes no attempt at fairness, though it does avoid write
+// starvation (no new readers may obtain the lock if a write is waiting).
+//
+// NOTE: this means that it is not safe to reentrantly acquire the read lock,
+// due to the following deadlock:
+// - T1: acquire read lock
+// - T2: wait for write lock
+// (blocks waiting for readers)
+// - T1: try to acquire read-lock reentrantly
+// (blocks to avoid starving writers)
+//
+// Given that this is currently based only on spinning (and not futex),
+// it should only be used in cases where the lock is held for very short
+// time intervals.
+//
+// If the semaphore is expected to always be released from the same thread
+// that acquired it, use rw_spinlock instead.
+//
+// In order to support easier debugging of leaked locks, this class can track
+// the stack trace of the last thread to lock it in write mode. To do so,
+// uncomment the definition of RW_SEMAPHORE_TRACK_HOLDER at the top of this
+// file. Then, in gdb, print the contents of the semaphore, and you should see
+// the collected stack trace.
+class rw_semaphore {
+ public:
+ rw_semaphore() : state_(0) {
+ }
+ ~rw_semaphore() {}
+
+ void lock_shared() {
+ int loop_count = 0;
+ Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+ while (true) {
+ Atomic32 expected = cur_state & kNumReadersMask; // I expect no write lock
+ Atomic32 try_new_state = expected + 1; // Add me as reader
+ cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+ if (cur_state == expected)
+ break;
+ // Either was already locked by someone else, or CAS failed.
+ boost::detail::yield(loop_count++);
+ }
+ }
+
+ void unlock_shared() {
+ int loop_count = 0;
+ Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+ while (true) {
+ DCHECK_GT(cur_state & kNumReadersMask, 0)
+ << "unlock_shared() called when there are no shared locks held";
+ Atomic32 expected = cur_state; // I expect a write lock and other readers
+ Atomic32 try_new_state = expected - 1; // Drop me as reader
+ cur_state = base::subtle::Release_CompareAndSwap(&state_, expected, try_new_state);
+ if (cur_state == expected)
+ break;
+ // Either was already locked by someone else, or CAS failed.
+ boost::detail::yield(loop_count++);
+ }
+ }
+
+ // Tries to acquire a write lock, if no one else has it.
+ // This function retries on CAS failure and waits for readers to complete.
+ bool try_lock() {
+ int loop_count = 0;
+ Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+ while (true) {
+ // someone else has already the write lock
+ if (cur_state & kWriteFlag)
+ return false;
+
+ Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers
+ Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers
+ cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+ if (cur_state == expected)
+ break;
+ // Either was already locked by someone else, or CAS failed.
+ boost::detail::yield(loop_count++);
+ }
+
+ WaitPendingReaders();
+ RecordLockHolderStack();
+ return true;
+ }
+
+ void lock() {
+ int loop_count = 0;
+ Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+ while (true) {
+ Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers
+ Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers
+ // Note: we use NoBarrier here because we'll do the Acquire barrier down below
+ // in WaitPendingReaders
+ cur_state = base::subtle::NoBarrier_CompareAndSwap(&state_, expected, try_new_state);
+ if (cur_state == expected)
+ break;
+ // Either was already locked by someone else, or CAS failed.
+ boost::detail::yield(loop_count++);
+ }
+
+ WaitPendingReaders();
+
+#ifndef NDEBUG
+ writer_tid_ = Thread::CurrentThreadId();
+#endif // NDEBUG
+ RecordLockHolderStack();
+ }
+
+ void unlock() {
+ // I expect to be the only writer
+ DCHECK_EQ(base::subtle::NoBarrier_Load(&state_), kWriteFlag);
+
+#ifndef NDEBUG
+ writer_tid_ = -1; // Invalid tid.
+#endif // NDEBUG
+
+ ResetLockHolderStack();
+ // Reset: no writers & no readers.
+ Release_Store(&state_, 0);
+ }
+
+ // Return true if the lock is currently held for write by any thread.
+ // See simple_semaphore::is_locked() for details about where this is useful.
+ bool is_write_locked() const {
+ return base::subtle::NoBarrier_Load(&state_) & kWriteFlag;
+ }
+
+ // Return true if the lock is currently held, either for read or write
+ // by any thread.
+ // See simple_semaphore::is_locked() for details about where this is useful.
+ bool is_locked() const {
+ return base::subtle::NoBarrier_Load(&state_);
+ }
+
+ private:
+ static const uint32_t kNumReadersMask = 0x7fffffff;
+ static const uint32_t kWriteFlag = 1 << 31;
+
+#ifdef RW_SEMAPHORE_TRACK_HOLDER
+ StackTrace writer_stack_;
+ void RecordLockHolderStack() {
+ writer_stack_.Collect();
+ }
+ void ResetLockHolderStack() {
+ writer_stack_.Reset();
+ }
+#else
+ void RecordLockHolderStack() {
+ }
+ void ResetLockHolderStack() {
+ }
+#endif
+
+ void WaitPendingReaders() {
+ int loop_count = 0;
+ while ((base::subtle::Acquire_Load(&state_) & kNumReadersMask) > 0) {
+ boost::detail::yield(loop_count++);
+ }
+ }
+
+ private:
+ volatile Atomic32 state_;
+#ifndef NDEBUG
+ int64_t writer_tid_;
+#endif // NDEBUG
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RW_SEMAPHORE_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock-test.cc b/be/src/kudu/util/rwc_lock-test.cc
new file mode 100644
index 0000000..17c2b19
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock-test.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rwc_lock.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using base::subtle::NoBarrier_Load;
+using base::subtle::Release_Store;
+using std::string;
+using std::thread;
+using std::vector;
+
+class RWCLockTest : public KuduTest {};
+
+// Holds counters of how many threads hold the lock in each of the
+// provided modes.
+struct LockHoldersCount {
+ LockHoldersCount()
+ : num_readers(0),
+ num_writers(0),
+ num_committers(0) {
+ }
+
+ // Check the invariants of the lock counts.
+ void CheckInvariants() {
+ // At no time should we have more than one writer or committer.
+ CHECK_LE(num_writers, 1);
+ CHECK_LE(num_committers, 1);
+
+ // If we have any readers, then we should not have any committers.
+ if (num_readers > 0) {
+ CHECK_EQ(num_committers, 0);
+ }
+ }
+
+ void AdjustReaders(int delta) {
+ std::lock_guard<simple_spinlock> l(lock);
+ num_readers += delta;
+ CheckInvariants();
+ }
+
+ void AdjustWriters(int delta) {
+ std::lock_guard<simple_spinlock> l(lock);
+ num_writers += delta;
+ CheckInvariants();
+ }
+
+ void AdjustCommitters(int delta) {
+ std::lock_guard<simple_spinlock> l(lock);
+ num_committers += delta;
+ CheckInvariants();
+ }
+
+ int num_readers;
+ int num_writers;
+ int num_committers;
+ simple_spinlock lock;
+};
+
+struct SharedState {
+ LockHoldersCount counts;
+ RWCLock rwc_lock;
+ Atomic32 stop;
+};
+
+void ReaderThread(SharedState* state) {
+ while (!NoBarrier_Load(&state->stop)) {
+ state->rwc_lock.ReadLock();
+ state->counts.AdjustReaders(1);
+ state->counts.AdjustReaders(-1);
+ state->rwc_lock.ReadUnlock();
+ }
+}
+
+void WriterThread(SharedState* state) {
+ string local_str;
+ while (!NoBarrier_Load(&state->stop)) {
+ state->rwc_lock.WriteLock();
+ state->counts.AdjustWriters(1);
+
+ state->rwc_lock.UpgradeToCommitLock();
+ state->counts.AdjustWriters(-1);
+ state->counts.AdjustCommitters(1);
+
+ state->counts.AdjustCommitters(-1);
+ state->rwc_lock.CommitUnlock();
+ }
+}
+
+
+TEST_F(RWCLockTest, TestCorrectBehavior) {
+ SharedState state;
+ Release_Store(&state.stop, 0);
+
+ vector<thread> threads;
+
+ const int kNumWriters = 5;
+ const int kNumReaders = 5;
+
+ for (int i = 0; i < kNumWriters; i++) {
+ threads.emplace_back(WriterThread, &state);
+ }
+ for (int i = 0; i < kNumReaders; i++) {
+ threads.emplace_back(ReaderThread, &state);
+ }
+
+ if (AllowSlowTests()) {
+ SleepFor(MonoDelta::FromSeconds(1));
+ } else {
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+
+ Release_Store(&state.stop, 1);
+
+ for (thread& t : threads) {
+ t.join();
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.cc b/be/src/kudu/util/rwc_lock.cc
new file mode 100644
index 0000000..ffe4cbb
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rwc_lock.h"
+
+#include <glog/logging.h>
+
+#ifndef NDEBUG
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/thread.h"
+#endif // NDEBUG
+
+namespace kudu {
+
+RWCLock::RWCLock()
+ : no_mutators_(&lock_),
+ no_readers_(&lock_),
+ reader_count_(0),
+#ifdef NDEBUG
+ write_locked_(false) {
+#else
+ write_locked_(false),
+ writer_tid_(0),
+ last_writelock_acquire_time_(0) {
+ last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+}
+
+RWCLock::~RWCLock() {
+ DCHECK(!HasReaders());
+ DCHECK(!HasWriteLock());
+}
+
+void RWCLock::ReadLock() {
+ MutexLock l(lock_);
+ reader_count_++;
+}
+
+void RWCLock::ReadUnlock() {
+ MutexLock l(lock_);
+ DCHECK(HasReadersUnlocked());
+ reader_count_--;
+ if (reader_count_ == 0) {
+ no_readers_.Signal();
+ }
+}
+
+bool RWCLock::HasReaders() const {
+ MutexLock l(lock_);
+ return HasReadersUnlocked();
+}
+
+bool RWCLock::HasReadersUnlocked() const {
+ lock_.AssertAcquired();
+ return reader_count_ > 0;
+}
+
+bool RWCLock::HasWriteLock() const {
+ MutexLock l(lock_);
+ return HasWriteLockUnlocked();
+}
+
+bool RWCLock::HasWriteLockUnlocked() const {
+ lock_.AssertAcquired();
+#ifndef NDEBUG
+ return writer_tid_ == Thread::CurrentThreadId();
+#else
+ return write_locked_;
+#endif
+}
+
+void RWCLock::WriteLock() {
+ MutexLock l(lock_);
+ // Wait for any other mutations to finish.
+ while (write_locked_) {
+ no_mutators_.Wait();
+ }
+#ifndef NDEBUG
+ last_writelock_acquire_time_ = GetCurrentTimeMicros();
+ writer_tid_ = Thread::CurrentThreadId();
+ HexStackTraceToString(last_writer_backtrace_, kBacktraceBufSize);
+#endif // NDEBUG
+ write_locked_ = true;
+}
+
+void RWCLock::WriteUnlock() {
+ MutexLock l(lock_);
+ DCHECK(HasWriteLockUnlocked());
+ write_locked_ = false;
+#ifndef NDEBUG
+ writer_tid_ = 0;
+ last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+ no_mutators_.Signal();
+}
+
+void RWCLock::UpgradeToCommitLock() {
+ lock_.lock();
+ DCHECK(HasWriteLockUnlocked());
+ while (reader_count_ > 0) {
+ no_readers_.Wait();
+ }
+ DCHECK(HasWriteLockUnlocked());
+
+ // Leaves the lock held, which prevents any new readers
+ // or writers.
+}
+
+void RWCLock::CommitUnlock() {
+ DCHECK(!HasReadersUnlocked());
+ DCHECK(HasWriteLockUnlocked());
+ write_locked_ = false;
+#ifndef NDEBUG
+ writer_tid_ = 0;
+ last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+ no_mutators_.Broadcast();
+ lock_.unlock();
+}
+
+} // namespace kudu