You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:20 UTC

[08/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.h b/be/src/kudu/util/random_util.h
new file mode 100644
index 0000000..bda8c42
--- /dev/null
+++ b/be/src/kudu/util/random_util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_RANDOM_UTIL_H
+#define KUDU_UTIL_RANDOM_UTIL_H
+
+#include <cstdlib>
+#include <stdint.h>
+
+#include <string>
+
+namespace kudu {
+
+class Random;
+
+// Writes exactly n random bytes to dest using the parameter Random generator.
+// Note RandomString() does not null-terminate its strings, though '\0' could
+// be written to dest with the same probability as any other byte.
+void RandomString(void* dest, size_t n, Random* rng);
+
+// Same as the above, but returns the string.
+std::string RandomString(size_t n, Random* rng);
+
+// Generate a 32-bit random seed from several sources, including timestamp,
+// pid & tid.
+uint32_t GetRandomSeed32();
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_RANDOM_UTIL_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-encoding.h b/be/src/kudu/util/rle-encoding.h
new file mode 100644
index 0000000..5b16fd9
--- /dev/null
+++ b/be/src/kudu/util/rle-encoding.h
@@ -0,0 +1,523 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_RLE_ENCODING_H
+#define IMPALA_RLE_ENCODING_H
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/bit-stream-utils.inline.h"
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
+// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
+// (literal encoding).
+// For both types of runs, there is a byte-aligned indicator which encodes the length
+// of the run and the type of the run.
+// This encoding has the benefit that when there aren't any long enough runs, values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+//    encoded-block := run*
+//    run := literal-run | repeated-run
+//    literal-run := literal-indicator < literal bytes >
+//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
+//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
+//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The rest
+// of the varint is used to determine the length of the run (eg how many times the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
+// the actual number of encoded ints. (This means that the total number of encoded values
+// can not be determined from the encoded data, since the number of values in the last
+// group may not be a multiple of 8).
+// There is a break-even point when it is more storage efficient to do run length
+// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
+// for both the repeated encoding or the literal encoding.  This value can always
+// be computed based on the bit-width.
+// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
+//  - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+//
+
+// Decoder class for RLE encoded data.
+//
+// NOTE: the encoded format does not have any length prefix or any other way of
+// indicating that the encoded sequence ends at a certain point, so the Decoder
+// methods may return some extra bits at the end before the read methods start
+// to return 0/false.
+template<typename T>
+class RleDecoder {
+ public:
+  // Create a decoder object. buffer/buffer_len is the decoded data.
+  // bit_width is the width of each value (before encoding).
+  RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
+    : bit_reader_(buffer, buffer_len),
+      bit_width_(bit_width),
+      current_value_(0),
+      repeat_count_(0),
+      literal_count_(0),
+      rewind_state_(CANT_REWIND) {
+    DCHECK_GE(bit_width_, 1);
+    DCHECK_LE(bit_width_, 64);
+  }
+
+  RleDecoder() {}
+
+  // Skip n values, and returns the number of non-zero entries skipped.
+  size_t Skip(size_t to_skip);
+
+  // Gets the next value.  Returns false if there are no more.
+  bool Get(T* val);
+
+  // Seek to the previous value.
+  void RewindOne();
+
+  // Gets the next run of the same 'val'. Returns 0 if there is no
+  // more data to be decoded. Will return a run of at most 'max_run'
+  // values. If there are more values than this, the next call to
+  // GetNextRun will return more from the same run.
+  size_t GetNextRun(T* val, size_t max_run);
+
+ private:
+  bool ReadHeader();
+
+  enum RewindState {
+    REWIND_LITERAL,
+    REWIND_RUN,
+    CANT_REWIND
+  };
+
+  BitReader bit_reader_;
+  int bit_width_;
+  uint64_t current_value_;
+  uint32_t repeat_count_;
+  uint32_t literal_count_;
+  RewindState rewind_state_;
+};
+
+// Class to incrementally build the rle data.
+// The encoding has two modes: encoding repeated runs and literal runs.
+// If the run is sufficiently short, it is more efficient to encode as a literal run.
+// This class does so by buffering 8 values at a time.  If they are not all the same
+// they are added to the literal run.  If they are the same, they are added to the
+// repeated run.  When we switch modes, the previous run is flushed out.
+template<typename T>
+class RleEncoder {
+ public:
+  // buffer: buffer to write bits to.
+  // bit_width: max number of bits for value.
+  // TODO: consider adding a min_repeated_run_length so the caller can control
+  // when values should be encoded as repeated runs.  Currently this is derived
+  // based on the bit_width, which can determine a storage optimal choice.
+  explicit RleEncoder(faststring *buffer, int bit_width)
+    : bit_width_(bit_width),
+      bit_writer_(buffer) {
+    DCHECK_GE(bit_width_, 1);
+    DCHECK_LE(bit_width_, 64);
+    Clear();
+  }
+
+  // Reserve 'num_bytes' bytes for a plain encoded header, set each
+  // byte with 'val': this is used for the RLE-encoded data blocks in
+  // order to be able to able to store the initial ordinal position
+  // and number of elements. This is a part of RleEncoder in order to
+  // maintain the correct offset in 'buffer'.
+  void Reserve(int num_bytes, uint8_t val);
+
+  // Encode value. This value must be representable with bit_width_ bits.
+  void Put(T value, size_t run_length = 1);
+
+  // Flushes any pending values to the underlying buffer.
+  // Returns the total number of bytes written
+  int Flush();
+
+  // Resets all the state in the encoder.
+  void Clear();
+
+  int32_t len() const { return bit_writer_.bytes_written(); }
+
+ private:
+  // Flushes any buffered values.  If this is part of a repeated run, this is largely
+  // a no-op.
+  // If it is part of a literal run, this will call FlushLiteralRun, which writes
+  // out the buffered literal values.
+  // If 'done' is true, the current run would be written even if it would normally
+  // have been buffered more.  This should only be called at the end, when the
+  // encoder has received all values even if it would normally continue to be
+  // buffered.
+  void FlushBufferedValues(bool done);
+
+  // Flushes literal values to the underlying buffer.  If update_indicator_byte,
+  // then the current literal run is complete and the indicator byte is updated.
+  void FlushLiteralRun(bool update_indicator_byte);
+
+  // Flushes a repeated run to the underlying buffer.
+  void FlushRepeatedRun();
+
+  // Number of bits needed to encode the value.
+  const int bit_width_;
+
+  // Underlying buffer.
+  BitWriter bit_writer_;
+
+  // We need to buffer at most 8 values for literals.  This happens when the
+  // bit_width is 1 (so 8 values fit in one byte).
+  // TODO: generalize this to other bit widths
+  uint64_t buffered_values_[8];
+
+  // Number of values in buffered_values_
+  int num_buffered_values_;
+
+  // The current (also last) value that was written and the count of how
+  // many times in a row that value has been seen.  This is maintained even
+  // if we are in a literal run.  If the repeat_count_ get high enough, we switch
+  // to encoding repeated runs.
+  uint64_t current_value_;
+  int repeat_count_;
+
+  // Number of literals in the current run.  This does not include the literals
+  // that might be in buffered_values_.  Only after we've got a group big enough
+  // can we decide if they should part of the literal_count_ or repeat_count_
+  int literal_count_;
+
+  // Index of a byte in the underlying buffer that stores the indicator byte.
+  // This is reserved as soon as we need a literal run but the value is written
+  // when the literal run is complete. We maintain an index rather than a pointer
+  // into the underlying buffer because the pointer value may become invalid if
+  // the underlying buffer is resized.
+  int literal_indicator_byte_idx_;
+};
+
+template<typename T>
+inline bool RleDecoder<T>::ReadHeader() {
+  DCHECK(bit_reader_.is_initialized());
+  if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
+    // Read the next run's indicator int, it could be a literal or repeated run
+    // The int is encoded as a vlq-encoded value.
+    int32_t indicator_value = 0;
+    bool result = bit_reader_.GetVlqInt(&indicator_value);
+    if (PREDICT_FALSE(!result)) {
+      return false;
+    }
+
+    // lsb indicates if it is a literal run or repeated run
+    bool is_literal = indicator_value & 1;
+    if (is_literal) {
+      literal_count_ = (indicator_value >> 1) * 8;
+      DCHECK_GT(literal_count_, 0);
+    } else {
+      repeat_count_ = indicator_value >> 1;
+      DCHECK_GT(repeat_count_, 0);
+      bool result = bit_reader_.GetAligned<T>(
+          BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(&current_value_));
+      DCHECK(result);
+    }
+  }
+  return true;
+}
+
+template<typename T>
+inline bool RleDecoder<T>::Get(T* val) {
+  DCHECK(bit_reader_.is_initialized());
+  if (PREDICT_FALSE(!ReadHeader())) {
+    return false;
+  }
+
+  if (PREDICT_TRUE(repeat_count_ > 0)) {
+    *val = current_value_;
+    --repeat_count_;
+    rewind_state_ = REWIND_RUN;
+  } else {
+    DCHECK(literal_count_ > 0);
+    bool result = bit_reader_.GetValue(bit_width_, val);
+    DCHECK(result);
+    --literal_count_;
+    rewind_state_ = REWIND_LITERAL;
+  }
+
+  return true;
+}
+
+template<typename T>
+inline void RleDecoder<T>::RewindOne() {
+  DCHECK(bit_reader_.is_initialized());
+
+  switch (rewind_state_) {
+    case CANT_REWIND:
+      LOG(FATAL) << "Can't rewind more than once after each read!";
+      break;
+    case REWIND_RUN:
+      ++repeat_count_;
+      break;
+    case REWIND_LITERAL:
+      {
+        bit_reader_.Rewind(bit_width_);
+        ++literal_count_;
+        break;
+      }
+  }
+
+  rewind_state_ = CANT_REWIND;
+}
+
+template<typename T>
+inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
+  DCHECK(bit_reader_.is_initialized());
+  DCHECK_GT(max_run, 0);
+  size_t ret = 0;
+  size_t rem = max_run;
+  while (ReadHeader()) {
+    if (PREDICT_TRUE(repeat_count_ > 0)) {
+      if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
+        return ret;
+      }
+      *val = current_value_;
+      if (repeat_count_ >= rem) {
+        // The next run is longer than the amount of remaining data
+        // that the caller wants to read. Only consume it partially.
+        repeat_count_ -= rem;
+        ret += rem;
+        return ret;
+      }
+      ret += repeat_count_;
+      rem -= repeat_count_;
+      repeat_count_ = 0;
+    } else {
+      DCHECK(literal_count_ > 0);
+      if (ret == 0) {
+        bool has_more = bit_reader_.GetValue(bit_width_, val);
+        DCHECK(has_more);
+        literal_count_--;
+        ret++;
+        rem--;
+      }
+
+      while (literal_count_ > 0) {
+        bool result = bit_reader_.GetValue(bit_width_, &current_value_);
+        DCHECK(result);
+        if (current_value_ != *val || rem == 0) {
+          bit_reader_.Rewind(bit_width_);
+          return ret;
+        }
+        ret++;
+        rem--;
+        literal_count_--;
+      }
+    }
+  }
+  return ret;
+ }
+
+template<typename T>
+inline size_t RleDecoder<T>::Skip(size_t to_skip) {
+  DCHECK(bit_reader_.is_initialized());
+
+  size_t set_count = 0;
+  while (to_skip > 0) {
+    bool result = ReadHeader();
+    DCHECK(result);
+
+    if (PREDICT_TRUE(repeat_count_ > 0)) {
+      size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
+      repeat_count_ -= nskip;
+      to_skip -= nskip;
+      if (current_value_ != 0) {
+        set_count += nskip;
+      }
+    } else {
+      DCHECK(literal_count_ > 0);
+      size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
+      literal_count_ -= nskip;
+      to_skip -= nskip;
+      for (; nskip > 0; nskip--) {
+        T value = 0;
+        bool result = bit_reader_.GetValue(bit_width_, &value);
+        DCHECK(result);
+        if (value != 0) {
+          set_count++;
+        }
+      }
+    }
+  }
+  return set_count;
+}
+
+// This function buffers input values 8 at a time.  After seeing all 8 values,
+// it decides whether they should be encoded as a literal or repeated run.
+template<typename T>
+inline void RleEncoder<T>::Put(T value, size_t run_length) {
+  DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
+
+  // TODO(perf): remove the loop and use the repeat_count_
+  for (; run_length > 0; run_length--) {
+    if (PREDICT_TRUE(current_value_ == value)) {
+      ++repeat_count_;
+      if (repeat_count_ > 8) {
+        // This is just a continuation of the current run, no need to buffer the
+        // values.
+        // Note that this is the fast path for long repeated runs.
+        continue;
+      }
+    } else {
+      if (repeat_count_ >= 8) {
+        // We had a run that was long enough but it has ended.  Flush the
+        // current repeated run.
+        DCHECK_EQ(literal_count_, 0);
+        FlushRepeatedRun();
+      }
+      repeat_count_ = 1;
+      current_value_ = value;
+    }
+
+    buffered_values_[num_buffered_values_] = value;
+    if (++num_buffered_values_ == 8) {
+      DCHECK_EQ(literal_count_ % 8, 0);
+      FlushBufferedValues(false);
+    }
+  }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
+  if (literal_indicator_byte_idx_ < 0) {
+    // The literal indicator byte has not been reserved yet, get one now.
+    literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
+    DCHECK_GE(literal_indicator_byte_idx_, 0);
+  }
+
+  // Write all the buffered values as bit packed literals
+  for (int i = 0; i < num_buffered_values_; ++i) {
+    bit_writer_.PutValue(buffered_values_[i], bit_width_);
+  }
+  num_buffered_values_ = 0;
+
+  if (update_indicator_byte) {
+    // At this point we need to write the indicator byte for the literal run.
+    // We only reserve one byte, to allow for streaming writes of literal values.
+    // The logic makes sure we flush literal runs often enough to not overrun
+    // the 1 byte.
+    int num_groups = BitUtil::Ceil(literal_count_, 8);
+    int32_t indicator_value = (num_groups << 1) | 1;
+    DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
+    bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
+    literal_indicator_byte_idx_ = -1;
+    literal_count_ = 0;
+  }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushRepeatedRun() {
+  DCHECK_GT(repeat_count_, 0);
+  // The lsb of 0 indicates this is a repeated run
+  int32_t indicator_value = repeat_count_ << 1 | 0;
+  bit_writer_.PutVlqInt(indicator_value);
+  bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
+  num_buffered_values_ = 0;
+  repeat_count_ = 0;
+}
+
+// Flush the values that have been buffered.  At this point we decide whether
+// we need to switch between the run types or continue the current one.
+template<typename T>
+inline void RleEncoder<T>::FlushBufferedValues(bool done) {
+  if (repeat_count_ >= 8) {
+    // Clear the buffered values.  They are part of the repeated run now and we
+    // don't want to flush them out as literals.
+    num_buffered_values_ = 0;
+    if (literal_count_ != 0) {
+      // There was a current literal run.  All the values in it have been flushed
+      // but we still need to update the indicator byte.
+      DCHECK_EQ(literal_count_ % 8, 0);
+      DCHECK_EQ(repeat_count_, 8);
+      FlushLiteralRun(true);
+    }
+    DCHECK_EQ(literal_count_, 0);
+    return;
+  }
+
+  literal_count_ += num_buffered_values_;
+  int num_groups = BitUtil::Ceil(literal_count_, 8);
+  if (num_groups + 1 >= (1 << 6)) {
+    // We need to start a new literal run because the indicator byte we've reserved
+    // cannot store more values.
+    DCHECK_GE(literal_indicator_byte_idx_, 0);
+    FlushLiteralRun(true);
+  } else {
+    FlushLiteralRun(done);
+  }
+  repeat_count_ = 0;
+}
+
+template<typename T>
+inline void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
+  for (int i = 0; i < num_bytes; ++i) {
+    bit_writer_.PutValue(val, 8);
+  }
+}
+
+template<typename T>
+inline int RleEncoder<T>::Flush() {
+  if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
+    bool all_repeat = literal_count_ == 0 &&
+        (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
+    // There is something pending, figure out if it's a repeated or literal run
+    if (repeat_count_ > 0 && all_repeat) {
+      FlushRepeatedRun();
+    } else  {
+      literal_count_ += num_buffered_values_;
+      FlushLiteralRun(true);
+      repeat_count_ = 0;
+    }
+  }
+  bit_writer_.Flush();
+  DCHECK_EQ(num_buffered_values_, 0);
+  DCHECK_EQ(literal_count_, 0);
+  DCHECK_EQ(repeat_count_, 0);
+  return bit_writer_.bytes_written();
+}
+
+template<typename T>
+inline void RleEncoder<T>::Clear() {
+  current_value_ = 0;
+  repeat_count_ = 0;
+  num_buffered_values_ = 0;
+  literal_count_ = 0;
+  literal_indicator_byte_idx_ = -1;
+  bit_writer_.Clear();
+}
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-test.cc b/be/src/kudu/util/rle-test.cc
new file mode 100644
index 0000000..20d0a27
--- /dev/null
+++ b/be/src/kudu/util/rle-test.cc
@@ -0,0 +1,546 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <ostream>
+#include <string>
+#include <vector>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <boost/utility/binary.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/bit-stream-utils.h"
+#include "kudu/util/bit-stream-utils.inline.h"
+#include "kudu/util/bit-util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/hexdump.h"
+#include "kudu/util/rle-encoding.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+const int kMaxWidth = 64;
+
+class TestRle : public KuduTest {};
+
+TEST(BitArray, TestBool) {
+  const int len_bytes = 2;
+  faststring buffer(len_bytes);
+
+  BitWriter writer(&buffer);
+
+  // Write alternating 0's and 1's
+  for (int i = 0; i < 8; ++i) {
+    writer.PutValue(i % 2, 1);
+  }
+  writer.Flush();
+  EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+
+  // Write 00110011
+  for (int i = 0; i < 8; ++i) {
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        writer.PutValue(0, 1);
+        break;
+      default:
+        writer.PutValue(1, 1);
+        break;
+    }
+  }
+  writer.Flush();
+
+  // Validate the exact bit value
+  EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+  EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
+
+  // Use the reader and validate
+  BitReader reader(buffer.data(), buffer.size());
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i % 2);
+  }
+
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        EXPECT_EQ(val, false);
+        break;
+      default:
+        EXPECT_EQ(val, true);
+        break;
+    }
+  }
+}
+
+// Writes 'num_vals' values with width 'bit_width' and reads them back.
+void TestBitArrayValues(int bit_width, int num_vals) {
+  const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8);
+  const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width;
+
+  faststring buffer(kTestLen);
+  BitWriter writer(&buffer);
+  for (int i = 0; i < num_vals; ++i) {
+    writer.PutValue(i % mod, bit_width);
+  }
+  writer.Flush();
+  EXPECT_EQ(writer.bytes_written(), kTestLen);
+
+  BitReader reader(buffer.data(), kTestLen);
+  for (int i = 0; i < num_vals; ++i) {
+    int64_t val = 0;
+    bool result = reader.GetValue(bit_width, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i % mod);
+  }
+  EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitArray, TestValues) {
+  for (int width = 1; width <= kMaxWidth; ++width) {
+    TestBitArrayValues(width, 1);
+    TestBitArrayValues(width, 2);
+    // Don't write too many values
+    TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
+    TestBitArrayValues(width, 1024);
+  }
+}
+
+// Test some mixed values
+TEST(BitArray, TestMixed) {
+  const int kTestLenBits = 1024;
+  faststring buffer(kTestLenBits / 8);
+  bool parity = true;
+
+  BitWriter writer(&buffer);
+  for (int i = 0; i < kTestLenBits; ++i) {
+    if (i % 2 == 0) {
+      writer.PutValue(parity, 1);
+      parity = !parity;
+    } else {
+      writer.PutValue(i, 10);
+    }
+  }
+  writer.Flush();
+
+  parity = true;
+  BitReader reader(buffer.data(), buffer.size());
+  for (int i = 0; i < kTestLenBits; ++i) {
+    bool result;
+    if (i % 2 == 0) {
+      bool val = false;
+      result = reader.GetValue(1, &val);
+      EXPECT_EQ(val, parity);
+      parity = !parity;
+    } else {
+      int val;
+      result = reader.GetValue(10, &val);
+      EXPECT_EQ(val, i);
+    }
+    EXPECT_TRUE(result);
+  }
+}
+
+// Validates encoding of values by encoding and decoding them.  If
+// expected_encoding != NULL, also validates that the encoded buffer is
+// exactly 'expected_encoding'.
+// if expected_len is not -1, it will validate the encoded size is correct.
+template<typename T>
+void ValidateRle(const vector<T>& values, int bit_width,
+    uint8_t* expected_encoding, int expected_len) {
+  faststring buffer;
+  RleEncoder<T> encoder(&buffer, bit_width);
+
+  for (const auto& value : values) {
+    encoder.Put(value);
+  }
+  int encoded_len = encoder.Flush();
+
+  if (expected_len != -1) {
+    EXPECT_EQ(encoded_len, expected_len);
+  }
+  if (expected_encoding != nullptr) {
+    EXPECT_EQ(memcmp(buffer.data(), expected_encoding, expected_len), 0)
+      << "\n"
+      << "Expected: " << HexDump(Slice(expected_encoding, expected_len)) << "\n"
+      << "Got:      " << HexDump(Slice(buffer));
+  }
+
+  // Verify read
+  RleDecoder<T> decoder(buffer.data(), encoded_len, bit_width);
+  for (const auto& value : values) {
+    T val = 0;
+    bool result = decoder.Get(&val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(value, val);
+  }
+}
+
+TEST(Rle, SpecificSequences) {
+  const int kTestLen = 1024;
+  uint8_t expected_buffer[kTestLen];
+  vector<uint64_t> values;
+
+  // Test 50 0' followed by 50 1's
+  values.resize(100);
+  for (int i = 0; i < 50; ++i) {
+    values[i] = 0;
+  }
+  for (int i = 50; i < 100; ++i) {
+    values[i] = 1;
+  }
+
+  // expected_buffer valid for bit width <= 1 byte
+  expected_buffer[0] = (50 << 1);
+  expected_buffer[1] = 0;
+  expected_buffer[2] = (50 << 1);
+  expected_buffer[3] = 1;
+  for (int width = 1; width <= 8; ++width) {
+    ValidateRle(values, width, expected_buffer, 4);
+  }
+
+  for (int width = 9; width <= kMaxWidth; ++width) {
+    ValidateRle(values, width, nullptr, 2 * (1 + BitUtil::Ceil(width, 8)));
+  }
+
+  // Test 100 0's and 1's alternating
+  for (int i = 0; i < 100; ++i) {
+    values[i] = i % 2;
+  }
+  int num_groups = BitUtil::Ceil(100, 8);
+  expected_buffer[0] = (num_groups << 1) | 1;
+  for (int i = 0; i < 100/8; ++i) {
+    expected_buffer[i + 1] = BOOST_BINARY(1 0 1 0 1 0 1 0); // 0xaa
+  }
+  // Values for the last 4 0 and 1's
+  expected_buffer[1 + 100/8] = BOOST_BINARY(0 0 0 0 1 0 1 0); // 0x0a
+
+  // num_groups and expected_buffer only valid for bit width = 1
+  ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+  for (int width = 2; width <= kMaxWidth; ++width) {
+    ValidateRle(values, width, nullptr, 1 + BitUtil::Ceil(width * 100, 8));
+  }
+}
+
+// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value
+// is used, otherwise alternating values are used.
+void TestRleValues(int bit_width, int num_vals, int value = -1) {
+  const uint64_t mod = bit_width == 64 ? 1ULL : 1ULL << bit_width;
+  vector<uint64_t> values;
+  for (uint64_t v = 0; v < num_vals; ++v) {
+    values.push_back((value != -1) ? value : (bit_width == 64 ? v : (v % mod)));
+  }
+  ValidateRle(values, bit_width, nullptr, -1);
+}
+
+TEST(Rle, TestValues) {
+  for (int width = 1; width <= kMaxWidth; ++width) {
+    TestRleValues(width, 1);
+    TestRleValues(width, 1024);
+    TestRleValues(width, 1024, 0);
+    TestRleValues(width, 1024, 1);
+  }
+}
+
+class BitRle : public KuduTest {
+};
+
+// Tests all true/false values
+TEST_F(BitRle, AllSame) {
+  const int kTestLen = 1024;
+  vector<bool> values;
+
+  for (int v = 0; v < 2; ++v) {
+    values.clear();
+    for (int i = 0; i < kTestLen; ++i) {
+      values.push_back(v ? true : false);
+    }
+
+    ValidateRle(values, 1, nullptr, 3);
+  }
+}
+
+// Test that writes out a repeated group and then a literal
+// group but flush before finishing.
+TEST_F(BitRle, Flush) {
+  vector<bool> values;
+  for (int i = 0; i < 16; ++i) values.push_back(1);
+  values.push_back(false);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+}
+
+// Test some random bool sequences.
+TEST_F(BitRle, RandomBools) {
+  int iters = 0;
+  const int n_iters = AllowSlowTests() ? 1000 : 20;
+  while (iters < n_iters) {
+    srand(iters++);
+    if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+    vector<uint64_t > values;
+    bool parity = 0;
+    for (int i = 0; i < 1000; ++i) {
+      int group_size = rand() % 20 + 1; // NOLINT(*)
+      if (group_size > 16) {
+        group_size = 1;
+      }
+      for (int i = 0; i < group_size; ++i) {
+        values.push_back(parity);
+      }
+      parity = !parity;
+    }
+    ValidateRle(values, (iters % kMaxWidth) + 1, nullptr, -1);
+  }
+}
+
+// Test some random 64-bit sequences.
+TEST_F(BitRle, Random64Bit) {
+  int iters = 0;
+  const int n_iters = AllowSlowTests() ? 1000 : 20;
+  while (iters < n_iters) {
+    srand(iters++);
+    if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+    vector<uint64_t > values;
+    for (int i = 0; i < 1000; ++i) {
+      int group_size = rand() % 20 + 1; // NOLINT(*)
+      uint64_t cur_value = (static_cast<uint64_t>(rand()) << 32) + static_cast<uint64_t>(rand());
+      if (group_size > 16) {
+        group_size = 1;
+      }
+      for (int i = 0; i < group_size; ++i) {
+        values.push_back(cur_value);
+      }
+
+    }
+    ValidateRle(values, 64, nullptr, -1);
+  }
+}
+
+// Test a sequence of 1 0's, 2 1's, 3 0's. etc
+// e.g. 011000111100000
+TEST_F(BitRle, RepeatedPattern) {
+  vector<bool> values;
+  const int min_run = 1;
+  const int max_run = 32;
+
+  for (int i = min_run; i <= max_run; ++i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  // And go back down again
+  for (int i = max_run; i >= min_run; --i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  ValidateRle(values, 1, nullptr, -1);
+}
+
+TEST_F(TestRle, TestBulkPut) {
+  size_t run_length;
+  bool val = false;
+
+  faststring buffer(1);
+  RleEncoder<bool> encoder(&buffer, 1);
+  encoder.Put(true, 10);
+  encoder.Put(false, 7);
+  encoder.Put(true, 5);
+  encoder.Put(true, 15);
+  encoder.Flush();
+
+  RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(10, run_length);
+
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(7, run_length);
+
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(20, run_length);
+
+  ASSERT_EQ(0, decoder.GetNextRun(&val, MathLimits<size_t>::kMax));
+}
+
+TEST_F(TestRle, TestGetNextRun) {
+  // Repeat the test with different number of items
+  for (int num_items = 7; num_items < 200; num_items += 13) {
+    // Test different block patterns
+    //    1: 01010101 01010101
+    //    2: 00110011 00110011
+    //    3: 00011100 01110001
+    //    ...
+    for (int block = 1; block <= 20; ++block) {
+      faststring buffer(1);
+      RleEncoder<bool> encoder(&buffer, 1);
+      for (int j = 0; j < num_items; ++j) {
+        encoder.Put(!!(j & 1), block);
+      }
+      encoder.Flush();
+
+      RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+      size_t count = num_items * block;
+      for (int j = 0; j < num_items; ++j) {
+        size_t run_length;
+        bool val = false;
+        DCHECK_GT(count, 0);
+        run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+        run_length = std::min(run_length, count);
+
+        ASSERT_EQ(!!(j & 1), val);
+        ASSERT_EQ(block, run_length);
+        count -= run_length;
+      }
+      DCHECK_EQ(count, 0);
+    }
+  }
+}
+
+// Generate a random bit string which consists of 'num_runs' runs,
+// each with a random length between 1 and 100. Returns the number
+// of values encoded (i.e the sum run length).
+static size_t GenerateRandomBitString(int num_runs, faststring* enc_buf, string* string_rep) {
+  RleEncoder<bool> enc(enc_buf, 1);
+  int num_bits = 0;
+  for (int i = 0; i < num_runs; i++) {
+    int run_length = random() % 100;
+    bool value = static_cast<bool>(i & 1);
+    enc.Put(value, run_length);
+    string_rep->append(run_length, value ? '1' : '0');
+    num_bits += run_length;
+  }
+  enc.Flush();
+  return num_bits;
+}
+
+TEST_F(TestRle, TestRoundTripRandomSequencesWithRuns) {
+  SeedRandom();
+
+  // Test the limiting function of GetNextRun.
+  const int kMaxToReadAtOnce = (random() % 20) + 1;
+
+  // Generate a bunch of random bit sequences, and "round-trip" them
+  // through the encode/decode sequence.
+  for (int rep = 0; rep < 100; rep++) {
+    faststring buf;
+    string string_rep;
+    int num_bits = GenerateRandomBitString(10, &buf, &string_rep);
+    RleDecoder<bool> decoder(buf.data(), buf.size(), 1);
+    string roundtrip_str;
+    int rem_to_read = num_bits;
+    size_t run_len;
+    bool val;
+    while (rem_to_read > 0 &&
+           (run_len = decoder.GetNextRun(&val, std::min(kMaxToReadAtOnce, rem_to_read))) != 0) {
+      ASSERT_LE(run_len, kMaxToReadAtOnce);
+      roundtrip_str.append(run_len, val ? '1' : '0');
+      rem_to_read -= run_len;
+    }
+
+    ASSERT_EQ(string_rep, roundtrip_str);
+  }
+}
+TEST_F(TestRle, TestSkip) {
+  faststring buffer(1);
+  RleEncoder<bool> encoder(&buffer, 1);
+
+  // 0101010[1] 01010101 01
+  //        "A"
+  for (int j = 0; j < 18; ++j) {
+    encoder.Put(!!(j & 1));
+  }
+
+  // 0011[00] 11001100 11001100 11001100 11001100
+  //      "B"
+  for (int j = 0; j < 19; ++j) {
+    encoder.Put(!!(j & 1), 2);
+  }
+
+  // 000000000000 11[1111111111] 000000000000 111111111111
+  //                   "C"
+  // 000000000000 111111111111 0[00000000000] 111111111111
+  //                                  "D"
+  // 000000000000 111111111111 000000000000 111111111111
+  for (int j = 0; j < 12; ++j) {
+    encoder.Put(!!(j & 1), 12);
+  }
+  encoder.Flush();
+
+  bool val = false;
+  size_t run_length;
+  RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+
+  // position before "A"
+  ASSERT_EQ(3, decoder.Skip(7));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(1, run_length);
+
+  // position before "B"
+  ASSERT_EQ(7, decoder.Skip(14));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(2, run_length);
+
+  // position before "C"
+  ASSERT_EQ(18, decoder.Skip(46));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(10, run_length);
+
+  // position before "D"
+  ASSERT_EQ(24, decoder.Skip(49));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(11, run_length);
+
+  encoder.Flush();
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log-test.cc b/be/src/kudu/util/rolling_log-test.cc
new file mode 100644
index 0000000..f4f8186
--- /dev/null
+++ b/be/src/kudu/util/rolling_log-test.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <unistd.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class RollingLogTest : public KuduTest {
+ public:
+  RollingLogTest()
+    : log_dir_(GetTestPath("log_dir")) {
+  }
+
+  virtual void SetUp() OVERRIDE {
+    ASSERT_OK(env_->CreateDir(log_dir_));
+  }
+
+ protected:
+  void AssertLogCount(int expected_count, vector<string>* children) {
+    vector<string> dir_entries;
+    ASSERT_OK(env_->GetChildren(log_dir_, &dir_entries));
+    children->clear();
+
+    for (const string& child : dir_entries) {
+      if (child == "." || child == "..") continue;
+      children->push_back(child);
+      ASSERT_TRUE(HasPrefixString(child, "rolling_log-test."));
+      ASSERT_STR_CONTAINS(child, ".mylog.");
+
+      string pid_suffix = Substitute("$0", getpid());
+      ASSERT_TRUE(HasSuffixString(child, pid_suffix) ||
+                  HasSuffixString(child, pid_suffix + ".gz")) << "bad child: " << child;
+    }
+    std::sort(children->begin(), children->end());
+    ASSERT_EQ(children->size(), expected_count) << *children;
+  }
+
+  const string log_dir_;
+};
+
+// Test with compression off.
+TEST_F(RollingLogTest, TestLog) {
+  RollingLog log(env_, log_dir_, "mylog");
+  log.SetCompressionEnabled(false);
+  log.SetRollThresholdBytes(100);
+
+  // Before writing anything, we shouldn't open a log file.
+  vector<string> children;
+  NO_FATALS(AssertLogCount(0, &children));
+
+  // Appending some data should write a new segment.
+  const string kTestString = "Hello world\n";
+  ASSERT_OK(log.Append(kTestString));
+  NO_FATALS(AssertLogCount(1, &children));
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(log.Append(kTestString));
+  }
+  NO_FATALS(AssertLogCount(2, &children));
+
+  faststring data;
+  string path = JoinPathSegments(log_dir_, children[0]);
+  ASSERT_OK(ReadFileToString(env_, path, &data));
+  ASSERT_TRUE(HasPrefixString(data.ToString(), kTestString))
+    << "Data missing";
+  ASSERT_LE(data.size(), 100 + kTestString.length())
+      << "Roll threshold not respected";
+}
+
+// Test with compression on.
+TEST_F(RollingLogTest, TestCompression) {
+  RollingLog log(env_, log_dir_, "mylog");
+  ASSERT_OK(log.Open());
+
+  StringPiece data = "Hello world\n";
+  int raw_size = 0;
+  for (int i = 0; i < 1000; i++) {
+    ASSERT_OK(log.Append(data));
+    raw_size += data.size();
+  }
+  ASSERT_OK(log.Close());
+
+  vector<string> children;
+  NO_FATALS(AssertLogCount(1, &children));
+  ASSERT_TRUE(HasSuffixString(children[0], ".gz"));
+
+  // Ensure that the output is actually gzipped.
+  uint64_t size;
+  ASSERT_OK(env_->GetFileSize(JoinPathSegments(log_dir_, children[0]), &size));
+  ASSERT_LT(size, raw_size / 10);
+  ASSERT_GT(size, 0);
+}
+
+TEST_F(RollingLogTest, TestFileCountLimit) {
+  RollingLog log(env_, log_dir_, "mylog");
+  ASSERT_OK(log.Open());
+  log.SetRollThresholdBytes(100);
+  log.SetMaxNumSegments(3);
+
+  for (int i = 0; i < 100; i++) {
+    ASSERT_OK(log.Append("hello world\n"));
+  }
+  ASSERT_OK(log.Close());
+
+  vector<string> children;
+  NO_FATALS(AssertLogCount(3, &children));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.cc b/be/src/kudu/util/rolling_log.cc
new file mode 100644
index 0000000..50f9fbd
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <unistd.h>
+
+#include <ctime>
+#include <iomanip>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <zlib.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/user.h"
+
+using std::ostringstream;
+using std::setw;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+static const int kDefaultRollThresholdBytes = 64 * 1024 * 1024; // 64MB
+
+DECLARE_int32(max_log_files);
+
+namespace kudu {
+
+RollingLog::RollingLog(Env* env, string log_dir, string log_name)
+    : env_(env),
+      log_dir_(std::move(log_dir)),
+      log_name_(std::move(log_name)),
+      roll_threshold_bytes_(kDefaultRollThresholdBytes),
+      max_num_segments_(FLAGS_max_log_files),
+      compress_after_close_(true) {}
+
+RollingLog::~RollingLog() {
+  WARN_NOT_OK(Close(), "Unable to close RollingLog");
+}
+
+void RollingLog::SetRollThresholdBytes(int64_t size) {
+  CHECK_GT(size, 0);
+  roll_threshold_bytes_ = size;
+}
+
+void RollingLog::SetMaxNumSegments(int num_segments) {
+  CHECK_GT(num_segments, 0);
+  max_num_segments_ = num_segments;
+}
+
+void RollingLog::SetCompressionEnabled(bool compress) {
+  compress_after_close_ = compress;
+}
+
+namespace {
+
+string HostnameOrUnknown() {
+  string hostname;
+  Status s = GetHostname(&hostname);
+  if (!s.ok()) {
+    return "unknown_host";
+  }
+  return hostname;
+}
+
+string UsernameOrUnknown() {
+  string user_name;
+  Status s = GetLoggedInUser(&user_name);
+  if (!s.ok()) {
+    return "unknown_user";
+  }
+  return user_name;
+}
+
+string FormattedTimestamp() {
+  // Implementation cribbed from glog/logging.cc
+  time_t time = static_cast<time_t>(WallTime_Now());
+  struct ::tm tm_time;
+  localtime_r(&time, &tm_time);
+
+  ostringstream str;
+  str.fill('0');
+  str << 1900+tm_time.tm_year
+      << setw(2) << 1+tm_time.tm_mon
+      << setw(2) << tm_time.tm_mday
+      << '-'
+      << setw(2) << tm_time.tm_hour
+      << setw(2) << tm_time.tm_min
+      << setw(2) << tm_time.tm_sec;
+  return str.str();
+}
+
+} // anonymous namespace
+
+string RollingLog::GetLogFileName(int sequence) const {
+  return Substitute("$0.$1.$2.$3.$4.$5.$6",
+                    google::ProgramInvocationShortName(),
+                    HostnameOrUnknown(),
+                    UsernameOrUnknown(),
+                    log_name_,
+                    FormattedTimestamp(),
+                    sequence,
+                    getpid());
+}
+
+string RollingLog::GetLogFilePattern() const {
+  return Substitute("$0.$1.$2.$3.$4.$5.$6",
+                    google::ProgramInvocationShortName(),
+                    HostnameOrUnknown(),
+                    UsernameOrUnknown(),
+                    log_name_,
+                    /* any timestamp */'*',
+                    /* any sequence number */'*',
+                    /* any pid */'*');
+}
+
+Status RollingLog::Open() {
+  CHECK(!file_);
+
+  for (int sequence = 0; ; sequence++) {
+
+    string path = JoinPathSegments(log_dir_, GetLogFileName(sequence));
+    // Don't reuse an existing path if there is already a log
+    // or a compressed log with the same name.
+    if (env_->FileExists(path) ||
+        env_->FileExists(path + ".gz")) {
+      continue;
+    }
+
+    WritableFileOptions opts;
+    // Logs aren't worth the performance cost of durability.
+    opts.sync_on_close = false;
+    opts.mode = Env::CREATE_NON_EXISTING;
+
+    RETURN_NOT_OK(env_->NewWritableFile(opts, path, &file_));
+
+    VLOG(1) << "Rolled " << log_name_ << " log to new file: " << path;
+    break;
+  }
+  return Status::OK();
+}
+
+Status RollingLog::Close() {
+  if (!file_) {
+    return Status::OK();
+  }
+  string path = file_->filename();
+  RETURN_NOT_OK_PREPEND(file_->Close(),
+                        Substitute("Unable to close $0", path));
+  file_.reset();
+  if (compress_after_close_) {
+    WARN_NOT_OK(CompressFile(path), "Unable to compress old log file");
+  }
+  auto glob = JoinPathSegments(log_dir_, GetLogFilePattern());
+  WARN_NOT_OK(env_util::DeleteExcessFilesByPattern(env_, glob, max_num_segments_),
+              Substitute("failed to delete old $0 log files", log_name_));
+  return Status::OK();
+}
+
+Status RollingLog::Append(StringPiece s) {
+  if (!file_) {
+    RETURN_NOT_OK_PREPEND(Open(), "Unable to open log");
+  }
+
+  RETURN_NOT_OK(file_->Append(s));
+  if (file_->Size() > roll_threshold_bytes_) {
+    RETURN_NOT_OK_PREPEND(Close(), "Unable to roll log");
+    roll_count_++;
+    RETURN_NOT_OK_PREPEND(Open(), "Unable to roll log");
+  }
+  return Status::OK();
+}
+
+namespace {
+
+Status GzClose(gzFile f) {
+  int err = gzclose(f);
+  switch (err) {
+    case Z_OK:
+      return Status::OK();
+    case Z_STREAM_ERROR:
+      return Status::InvalidArgument("Stream not valid");
+    case Z_ERRNO:
+      return Status::IOError("IO Error closing stream");
+    case Z_MEM_ERROR:
+      return Status::RuntimeError("Out of memory");
+    case Z_BUF_ERROR:
+      return Status::IOError("read ended in the middle of a stream");
+    default:
+      return Status::IOError("Unknown zlib error", SimpleItoa(err));
+  }
+}
+
+class ScopedGzipCloser {
+ public:
+  explicit ScopedGzipCloser(gzFile f)
+    : file_(f) {
+  }
+
+  ~ScopedGzipCloser() {
+    if (file_) {
+      WARN_NOT_OK(GzClose(file_), "Unable to close gzip stream");
+    }
+  }
+
+  void Cancel() {
+    file_ = nullptr;
+  }
+
+ private:
+  gzFile file_;
+};
+} // anonymous namespace
+
+// We implement CompressFile() manually using zlib APIs rather than forking
+// out to '/bin/gzip' since fork() can be expensive on processes that use a large
+// amount of memory. During the time of the fork, other threads could end up
+// blocked. Implementing it using the zlib stream APIs isn't too much code
+// and is less likely to be problematic.
+Status RollingLog::CompressFile(const std::string& path) const {
+  unique_ptr<SequentialFile> in_file;
+  RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(path, &in_file),
+                        "Unable to open input file to compress");
+
+  string gz_path = path + ".gz";
+  gzFile gzf = gzopen(gz_path.c_str(), "w");
+  if (!gzf) {
+    return Status::IOError("Unable to open gzip stream");
+  }
+
+  ScopedGzipCloser closer(gzf);
+
+  // Loop reading data from the input file and writing to the gzip stream.
+  uint8_t buf[32 * 1024];
+  while (true) {
+    Slice result(buf, arraysize(buf));
+    RETURN_NOT_OK_PREPEND(in_file->Read(&result),
+                          "Unable to read from gzip input");
+    if (result.size() == 0) {
+      break;
+    }
+    int n = gzwrite(gzf, result.data(), result.size());
+    if (n == 0) {
+      int errnum;
+      return Status::IOError("Unable to write to gzip output",
+                             gzerror(gzf, &errnum));
+    }
+  }
+  closer.Cancel();
+  RETURN_NOT_OK_PREPEND(GzClose(gzf),
+                        "Unable to close gzip output");
+
+  WARN_NOT_OK(env_->DeleteFile(path),
+              "Unable to delete gzip input file after compression");
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rolling_log.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.h b/be/src/kudu/util/rolling_log.h
new file mode 100644
index 0000000..0bb6755
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ROLLING_LOG_H
+#define KUDU_UTIL_ROLLING_LOG_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+class WritableFile;
+
+// A simple rolling log.
+//
+// This creates a log which spans multiple files in a specified directory.
+// After a log file reaches a specified size threshold, it automatically rolls
+// to the next file in the sequence.
+//
+// The files are named similarly to glog log files and use the following pattern:
+//
+// <log_dir>/<program-name>.<hostname>.<user-name>.<log-name>.<timestamp>.<sequence>.<pid>
+//   log_dir:      the log_dir specified in the constructor
+//   program-name: argv[0], as determined by google::ProgramInvocationShortName()
+//   hostname:     the local machine hostname
+//   user-name:    the current user name
+//   log-name:     the log_name specified in the constructor
+//   timestamp:    the wall clock time when the log file was created, in
+//                 YYYYmmdd-HHMMSS fixed-length format.
+//   sequence:     a sequence number which is used to disambiguate when the log file is
+//                 rolled multiple times within a second
+//   pid:          the pid of the daemon
+//
+// The log implementation does not ensure durability of the log or its files in any way.
+// This class is not thread-safe and must be externally synchronized.
+class RollingLog {
+ public:
+  RollingLog(Env* env, std::string log_dir, std::string log_name);
+
+  ~RollingLog();
+
+  // Open the log.
+  // It is optional to call this function. Append() will automatically open
+  // the log as necessary if it is not open.
+  Status Open();
+
+  // Set the pre-compression size threshold at which the log file will be rolled.
+  // If the log is already open, this applies for the the current and any future
+  // log file.
+  //
+  // NOTE: This is the limit on a single segment of the log, not a limit on the total
+  // size of the log.
+  //
+  // NOTE: The threshold is checked _after_ each call to Append(). So, the size of
+  // the log may overshoot this threshold by as much as the size of a single appended
+  // message.
+  void SetRollThresholdBytes(int64_t size);
+
+  // Set the total number of log segments to be retained. When the log is rolled,
+  // old segments are removed to achieve the targeted number of segments.
+  void SetMaxNumSegments(int num_segments);
+
+  // If compression is enabled, log files are compressed.
+  // NOTE: this requires that the passed-in Env instance is the local file system.
+  void SetCompressionEnabled(bool compress);
+
+  // Append the given data to the current log file.
+  //
+  // If, after appending this data, the file size has crossed the configured roll
+  // threshold, a new empty log file is created. Note that this is a synchronous API and
+  // causes potentially-blocking IO on the current thread. However, this does not fsync()
+  // or otherwise ensure durability of the appended data.
+  Status Append(StringPiece data) WARN_UNUSED_RESULT;
+
+  // Close the log.
+  Status Close();
+
+  // Return the number of times this log has rolled since it was first opened.
+  int roll_count() const {
+    return roll_count_;
+  }
+
+ private:
+  std::string GetLogFileName(int sequence) const;
+
+  // Get a glob pattern matching all log files written by this instance.
+  std::string GetLogFilePattern() const;
+
+  // Compress the given path, writing a new file '<path>.gz'.
+  Status CompressFile(const std::string& path) const;
+
+  Env* const env_;
+  const std::string log_dir_;
+  const std::string log_name_;
+
+  int64_t roll_threshold_bytes_;
+  int max_num_segments_;
+
+  std::unique_ptr<WritableFile> file_;
+  bool compress_after_close_;
+
+  int roll_count_ = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(RollingLog);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_ROLLING_LOG_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex-test.cc b/be/src/kudu/util/rw_mutex-test.cc
new file mode 100644
index 0000000..c2cb394
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex-test.cc
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <ostream>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/test_util.h"
+
+using std::lock_guard;
+using std::thread;
+using std::try_to_lock;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+class RWMutexTest : public KuduTest,
+                    public ::testing::WithParamInterface<RWMutex::Priority> {
+ public:
+  RWMutexTest()
+     : lock_(GetParam()) {
+  }
+ protected:
+  RWMutex lock_;
+};
+
+// Instantiate every test for each kind of RWMutex priority.
+INSTANTIATE_TEST_CASE_P(Priorities, RWMutexTest,
+                        ::testing::Values(RWMutex::Priority::PREFER_READING,
+                                          RWMutex::Priority::PREFER_WRITING));
+
+// Multi-threaded test that tries to find deadlocks in the RWMutex wrapper.
+TEST_P(RWMutexTest, TestDeadlocks) {
+  uint64_t number_of_writes = 0;
+  AtomicInt<uint64_t> number_of_reads(0);
+
+  AtomicBool done(false);
+  vector<thread> threads;
+
+  // Start several blocking and non-blocking read-write workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        lock_guard<RWMutex> l(lock_);
+        number_of_writes++;
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        unique_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_writes++;
+        }
+      }
+    });
+  }
+
+  // Start several blocking and non-blocking read-only workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_);
+        number_of_reads.Increment();
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_reads.Increment();
+        }
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(1));
+  done.Store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  shared_lock<RWMutex> l(lock_);
+  LOG(INFO) << "Number of writes: " << number_of_writes;
+  LOG(INFO) << "Number of reads: " << number_of_reads.Load();
+}
+
+#ifndef NDEBUG
+// Tests that the RWMutex wrapper catches basic usage errors. This checking is
+// only enabled in debug builds.
+TEST_P(RWMutexTest, TestLockChecking) {
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.ReadLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.WriteLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadUnlock();
+  }, "wasn't holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteUnlock();
+  }, "wasn't holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+}
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.cc b/be/src/kudu/util/rw_mutex.cc
new file mode 100644
index 0000000..75fc1e7
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rw_mutex.h"
+
+#include <cerrno>
+#include <cstring>
+#include <mutex>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/env.h"
+
+using std::lock_guard;
+
+namespace {
+
+void unlock_rwlock(pthread_rwlock_t* rwlock) {
+  int rv = pthread_rwlock_unlock(rwlock);
+  DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+} // anonymous namespace
+
+namespace kudu {
+
+RWMutex::RWMutex()
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
+  Init(Priority::PREFER_READING);
+}
+
+RWMutex::RWMutex(Priority prio)
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
+  Init(prio);
+}
+
+void RWMutex::Init(Priority prio) {
+#ifdef __linux__
+  // Adapt from priority to the pthread type.
+  int kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+  switch (prio) {
+    case Priority::PREFER_READING:
+      kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+      break;
+    case Priority::PREFER_WRITING:
+      kind = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
+      break;
+  }
+
+  // Initialize the new rwlock with the user's preference.
+  pthread_rwlockattr_t attr;
+  int rv = pthread_rwlockattr_init(&attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlockattr_setkind_np(&attr, kind);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlock_init(&native_handle_, &attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlockattr_destroy(&attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+#else
+  int rv = pthread_rwlock_init(&native_handle_, NULL);
+  DCHECK_EQ(0, rv) << strerror(rv);
+#endif
+}
+
+RWMutex::~RWMutex() {
+  int rv = pthread_rwlock_destroy(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+void RWMutex::ReadLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_rdlock(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
+}
+
+void RWMutex::ReadUnlock() {
+  CheckLockState(LockState::READER);
+  UnmarkForReading();
+  unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryReadLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_tryrdlock(&native_handle_);
+  if (rv == EBUSY) {
+    return false;
+  }
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
+  return true;
+}
+
+void RWMutex::WriteLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_wrlock(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
+}
+
+void RWMutex::WriteUnlock() {
+  CheckLockState(LockState::WRITER);
+  UnmarkForWriting();
+  unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryWriteLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_trywrlock(&native_handle_);
+  if (rv == EBUSY) {
+    return false;
+  }
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
+  return true;
+}
+
+#ifndef NDEBUG
+
+void RWMutex::AssertAcquired() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()) ||
+        Env::Default()->gettid() == writer_tid_);
+}
+
+void RWMutex::AssertAcquiredForReading() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()));
+}
+
+void RWMutex::AssertAcquiredForWriting() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK_EQ(Env::Default()->gettid(), writer_tid_);
+}
+
+void RWMutex::CheckLockState(LockState state) const {
+  pid_t my_tid = Env::Default()->gettid();
+  bool is_reader;
+  bool is_writer;
+  {
+    lock_guard<simple_spinlock> l(tid_lock_);
+    is_reader = ContainsKey(reader_tids_, my_tid);
+    is_writer = writer_tid_ == my_tid;
+  }
+
+  switch (state) {
+    case LockState::NEITHER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      break;
+    case LockState::READER:
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      CHECK(is_reader) << "Invalid state, wasn't holding lock for reading";
+      break;
+    case LockState::WRITER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(is_writer) << "Invalid state, wasn't holding lock for writing";
+      break;
+  }
+}
+
+void RWMutex::MarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.insert(Env::Default()->gettid());
+}
+
+void RWMutex::MarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = Env::Default()->gettid();
+}
+
+void RWMutex::UnmarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.erase(Env::Default()->gettid());
+}
+
+void RWMutex::UnmarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = 0;
+}
+
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_mutex.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.h b/be/src/kudu/util/rw_mutex.h
new file mode 100644
index 0000000..bb6168c
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <unordered_set>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Read/write mutex. Implemented as a thin wrapper around pthread_rwlock_t.
+//
+// Although pthread_rwlock_t allows recursive acquisition, this wrapper does
+// not, and will crash in debug mode if recursive acquisition is detected.
+class RWMutex {
+ public:
+
+  // Possible fairness policies for the RWMutex.
+  enum class Priority {
+    // The lock will prioritize readers at the expense of writers.
+    PREFER_READING,
+
+    // The lock will prioritize writers at the expense of readers.
+    //
+    // Care should be taken when using this fairness policy, as it can lead to
+    // unexpected deadlocks (e.g. a writer waiting on the lock will prevent
+    // additional readers from acquiring it).
+    PREFER_WRITING,
+  };
+
+  // Create an RWMutex that prioritizes readers.
+  RWMutex();
+
+  // Create an RWMutex with customized priority. This is a best effort; the
+  // underlying platform may not support custom priorities.
+  explicit RWMutex(Priority prio);
+
+  ~RWMutex();
+
+  void ReadLock();
+  void ReadUnlock();
+  bool TryReadLock();
+
+  void WriteLock();
+  void WriteUnlock();
+  bool TryWriteLock();
+
+#ifndef NDEBUG
+  void AssertAcquired() const;
+  void AssertAcquiredForReading() const;
+  void AssertAcquiredForWriting() const;
+#else
+  void AssertAcquired() const {}
+  void AssertAcquiredForReading() const {}
+  void AssertAcquiredForWriting() const {}
+#endif
+
+  // Aliases for use with std::lock_guard and kudu::shared_lock.
+  void lock() { WriteLock(); }
+  void unlock() { WriteUnlock(); }
+  bool try_lock() { return TryWriteLock(); }
+  void lock_shared() { ReadLock(); }
+  void unlock_shared() { ReadUnlock(); }
+  bool try_lock_shared() { return TryReadLock(); }
+
+ private:
+  void Init(Priority prio);
+
+  enum class LockState {
+    NEITHER,
+    READER,
+    WRITER,
+  };
+#ifndef NDEBUG
+  void CheckLockState(LockState state) const;
+  void MarkForReading();
+  void MarkForWriting();
+  void UnmarkForReading();
+  void UnmarkForWriting();
+#else
+  void CheckLockState(LockState state) const {}
+  void MarkForReading() {}
+  void MarkForWriting() {}
+  void UnmarkForReading() {}
+  void UnmarkForWriting() {}
+#endif
+
+  pthread_rwlock_t native_handle_;
+
+#ifndef NDEBUG
+  // Protects reader_tids_ and writer_tid_.
+  mutable simple_spinlock tid_lock_;
+
+  // Tracks all current readers by tid.
+  std::unordered_set<pid_t> reader_tids_;
+
+  // Tracks the current writer (if one exists) by tid.
+  pid_t writer_tid_;
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(RWMutex);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_semaphore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore-test.cc b/be/src/kudu/util/rw_semaphore-test.cc
new file mode 100644
index 0000000..7717608
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore-test.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_semaphore.h"
+
+using std::thread;
+using std::vector;
+
+namespace kudu {
+struct SharedState {
+  SharedState() : done(false), int_var(0) {}
+
+  bool done;
+  int64_t int_var;
+  rw_semaphore sem;
+};
+
+// Thread which increases the value in the shared state under the write lock.
+void Writer(SharedState* state) {
+  int i = 0;
+  while (true) {
+    std::lock_guard<rw_semaphore> l(state->sem);
+    state->int_var += (i++);
+    if (state->done) {
+      break;
+    }
+  }
+}
+
+// Thread which verifies that the value in the shared state only increases.
+void Reader(SharedState* state) {
+  int prev_val = 0;
+  while (true) {
+    shared_lock<rw_semaphore> l(state->sem);
+    // The int var should only be seen to increase.
+    CHECK_GE(state->int_var, prev_val);
+    prev_val = state->int_var;
+    if (state->done) {
+      break;
+    }
+  }
+}
+
+// Test which verifies basic functionality of the semaphore.
+// When run under TSAN this also verifies the barriers.
+TEST(RWSemaphoreTest, TestBasicOperation) {
+  SharedState s;
+  vector<thread*> threads;
+  // Start 5 readers and writers.
+  for (int i = 0; i < 5; i++) {
+    threads.push_back(new thread(Reader, &s));
+    threads.push_back(new thread(Writer, &s));
+  }
+
+  // Let them contend for a short amount of time.
+  SleepFor(MonoDelta::FromMilliseconds(50));
+
+  // Signal them to stop.
+  {
+    std::lock_guard<rw_semaphore> l(s.sem);
+    s.done = true;
+  }
+
+  for (thread* t : threads) {
+    t->join();
+    delete t;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rw_semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore.h b/be/src/kudu/util/rw_semaphore.h
new file mode 100644
index 0000000..20c0b57
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore.h
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RW_SEMAPHORE_H
+#define KUDU_UTIL_RW_SEMAPHORE_H
+
+// Uncomment for extra debugging information. See below for details.
+//   #define RW_SEMAPHORE_TRACK_HOLDER 1
+
+#include <boost/smart_ptr/detail/yield_k.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#ifdef RW_SEMAPHORE_TRACK_HOLDER
+#include "kudu/util/debug-util.h"
+#endif
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+// Read-Write semaphore. 32bit uint that contains the number of readers.
+// When someone wants to write, tries to set the 32bit, and waits until
+// the readers have finished. Readers are spinning while the write flag is set.
+//
+// This rw-semaphore makes no attempt at fairness, though it does avoid write
+// starvation (no new readers may obtain the lock if a write is waiting).
+//
+// NOTE: this means that it is not safe to reentrantly acquire the read lock,
+// due to the following deadlock:
+//   - T1: acquire read lock
+//   - T2: wait for write lock
+//     (blocks waiting for readers)
+//   - T1: try to acquire read-lock reentrantly
+//     (blocks to avoid starving writers)
+//
+// Given that this is currently based only on spinning (and not futex),
+// it should only be used in cases where the lock is held for very short
+// time intervals.
+//
+// If the semaphore is expected to always be released from the same thread
+// that acquired it, use rw_spinlock instead.
+//
+// In order to support easier debugging of leaked locks, this class can track
+// the stack trace of the last thread to lock it in write mode. To do so,
+// uncomment the definition of RW_SEMAPHORE_TRACK_HOLDER at the top of this
+// file. Then, in gdb, print the contents of the semaphore, and you should see
+// the collected stack trace.
+class rw_semaphore {
+ public:
+  rw_semaphore() : state_(0) {
+  }
+  ~rw_semaphore() {}
+
+  void lock_shared() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect no write lock
+      Atomic32 try_new_state = expected + 1;          // Add me as reader
+      cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+  void unlock_shared() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      DCHECK_GT(cur_state & kNumReadersMask, 0)
+        << "unlock_shared() called when there are no shared locks held";
+      Atomic32 expected = cur_state;           // I expect a write lock and other readers
+      Atomic32 try_new_state = expected - 1;   // Drop me as reader
+      cur_state = base::subtle::Release_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+  // Tries to acquire a write lock, if no one else has it.
+  // This function retries on CAS failure and waits for readers to complete.
+  bool try_lock() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      // someone else has already the write lock
+      if (cur_state & kWriteFlag)
+        return false;
+
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect some 0+ readers
+      Atomic32 try_new_state = kWriteFlag | expected;    // I want to lock the other writers
+      cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+
+    WaitPendingReaders();
+    RecordLockHolderStack();
+    return true;
+  }
+
+  void lock() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect some 0+ readers
+      Atomic32 try_new_state = kWriteFlag | expected;    // I want to lock the other writers
+      // Note: we use NoBarrier here because we'll do the Acquire barrier down below
+      // in WaitPendingReaders
+      cur_state = base::subtle::NoBarrier_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+
+    WaitPendingReaders();
+
+#ifndef NDEBUG
+    writer_tid_ = Thread::CurrentThreadId();
+#endif // NDEBUG
+    RecordLockHolderStack();
+  }
+
+  void unlock() {
+    // I expect to be the only writer
+    DCHECK_EQ(base::subtle::NoBarrier_Load(&state_), kWriteFlag);
+
+#ifndef NDEBUG
+    writer_tid_ = -1; // Invalid tid.
+#endif // NDEBUG
+
+    ResetLockHolderStack();
+    // Reset: no writers & no readers.
+    Release_Store(&state_, 0);
+  }
+
+  // Return true if the lock is currently held for write by any thread.
+  // See simple_semaphore::is_locked() for details about where this is useful.
+  bool is_write_locked() const {
+    return base::subtle::NoBarrier_Load(&state_) & kWriteFlag;
+  }
+
+  // Return true if the lock is currently held, either for read or write
+  // by any thread.
+  // See simple_semaphore::is_locked() for details about where this is useful.
+  bool is_locked() const {
+    return base::subtle::NoBarrier_Load(&state_);
+  }
+
+ private:
+  static const uint32_t kNumReadersMask = 0x7fffffff;
+  static const uint32_t kWriteFlag = 1 << 31;
+
+#ifdef RW_SEMAPHORE_TRACK_HOLDER
+  StackTrace writer_stack_;
+  void RecordLockHolderStack() {
+    writer_stack_.Collect();
+  }
+  void ResetLockHolderStack() {
+    writer_stack_.Reset();
+  }
+#else
+  void RecordLockHolderStack() {
+  }
+  void ResetLockHolderStack() {
+  }
+#endif
+
+  void WaitPendingReaders() {
+    int loop_count = 0;
+    while ((base::subtle::Acquire_Load(&state_) & kNumReadersMask) > 0) {
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+ private:
+  volatile Atomic32 state_;
+#ifndef NDEBUG
+  int64_t writer_tid_;
+#endif // NDEBUG
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RW_SEMAPHORE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock-test.cc b/be/src/kudu/util/rwc_lock-test.cc
new file mode 100644
index 0000000..17c2b19
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock-test.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rwc_lock.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using base::subtle::NoBarrier_Load;
+using base::subtle::Release_Store;
+using std::string;
+using std::thread;
+using std::vector;
+
+class RWCLockTest : public KuduTest {};
+
+// Holds counters of how many threads hold the lock in each of the
+// provided modes.
+struct LockHoldersCount {
+  LockHoldersCount()
+    : num_readers(0),
+      num_writers(0),
+      num_committers(0) {
+  }
+
+  // Check the invariants of the lock counts.
+  void CheckInvariants() {
+    // At no time should we have more than one writer or committer.
+    CHECK_LE(num_writers, 1);
+    CHECK_LE(num_committers, 1);
+
+    // If we have any readers, then we should not have any committers.
+    if (num_readers > 0) {
+      CHECK_EQ(num_committers, 0);
+    }
+  }
+
+  void AdjustReaders(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_readers += delta;
+    CheckInvariants();
+  }
+
+  void AdjustWriters(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_writers += delta;
+    CheckInvariants();
+  }
+
+  void AdjustCommitters(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_committers += delta;
+    CheckInvariants();
+  }
+
+  int num_readers;
+  int num_writers;
+  int num_committers;
+  simple_spinlock lock;
+};
+
+struct SharedState {
+  LockHoldersCount counts;
+  RWCLock rwc_lock;
+  Atomic32 stop;
+};
+
+void ReaderThread(SharedState* state) {
+  while (!NoBarrier_Load(&state->stop)) {
+    state->rwc_lock.ReadLock();
+    state->counts.AdjustReaders(1);
+    state->counts.AdjustReaders(-1);
+    state->rwc_lock.ReadUnlock();
+  }
+}
+
+void WriterThread(SharedState* state) {
+  string local_str;
+  while (!NoBarrier_Load(&state->stop)) {
+    state->rwc_lock.WriteLock();
+    state->counts.AdjustWriters(1);
+
+    state->rwc_lock.UpgradeToCommitLock();
+    state->counts.AdjustWriters(-1);
+    state->counts.AdjustCommitters(1);
+
+    state->counts.AdjustCommitters(-1);
+    state->rwc_lock.CommitUnlock();
+  }
+}
+
+
+TEST_F(RWCLockTest, TestCorrectBehavior) {
+  SharedState state;
+  Release_Store(&state.stop, 0);
+
+  vector<thread> threads;
+
+  const int kNumWriters = 5;
+  const int kNumReaders = 5;
+
+  for (int i = 0; i < kNumWriters; i++) {
+    threads.emplace_back(WriterThread, &state);
+  }
+  for (int i = 0; i < kNumReaders; i++) {
+    threads.emplace_back(ReaderThread, &state);
+  }
+
+  if (AllowSlowTests()) {
+    SleepFor(MonoDelta::FromSeconds(1));
+  } else {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+
+  Release_Store(&state.stop, 1);
+
+  for (thread& t : threads) {
+    t.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/rwc_lock.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.cc b/be/src/kudu/util/rwc_lock.cc
new file mode 100644
index 0000000..ffe4cbb
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rwc_lock.h"
+
+#include <glog/logging.h>
+
+#ifndef NDEBUG
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/thread.h"
+#endif // NDEBUG
+
+namespace kudu {
+
+RWCLock::RWCLock()
+  : no_mutators_(&lock_),
+    no_readers_(&lock_),
+    reader_count_(0),
+#ifdef NDEBUG
+    write_locked_(false) {
+#else
+    write_locked_(false),
+    writer_tid_(0),
+    last_writelock_acquire_time_(0) {
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+}
+
+RWCLock::~RWCLock() {
+  DCHECK(!HasReaders());
+  DCHECK(!HasWriteLock());
+}
+
+void RWCLock::ReadLock() {
+  MutexLock l(lock_);
+  reader_count_++;
+}
+
+void RWCLock::ReadUnlock() {
+  MutexLock l(lock_);
+  DCHECK(HasReadersUnlocked());
+  reader_count_--;
+  if (reader_count_ == 0) {
+    no_readers_.Signal();
+  }
+}
+
+bool RWCLock::HasReaders() const {
+  MutexLock l(lock_);
+  return HasReadersUnlocked();
+}
+
+bool RWCLock::HasReadersUnlocked() const {
+  lock_.AssertAcquired();
+  return reader_count_ > 0;
+}
+
+bool RWCLock::HasWriteLock() const {
+  MutexLock l(lock_);
+  return HasWriteLockUnlocked();
+}
+
+bool RWCLock::HasWriteLockUnlocked() const {
+  lock_.AssertAcquired();
+#ifndef NDEBUG
+  return writer_tid_ == Thread::CurrentThreadId();
+#else
+  return write_locked_;
+#endif
+}
+
+void RWCLock::WriteLock() {
+  MutexLock l(lock_);
+  // Wait for any other mutations to finish.
+  while (write_locked_) {
+    no_mutators_.Wait();
+  }
+#ifndef NDEBUG
+  last_writelock_acquire_time_ = GetCurrentTimeMicros();
+  writer_tid_ = Thread::CurrentThreadId();
+  HexStackTraceToString(last_writer_backtrace_, kBacktraceBufSize);
+#endif // NDEBUG
+  write_locked_ = true;
+}
+
+void RWCLock::WriteUnlock() {
+  MutexLock l(lock_);
+  DCHECK(HasWriteLockUnlocked());
+  write_locked_ = false;
+#ifndef NDEBUG
+  writer_tid_ = 0;
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+  no_mutators_.Signal();
+}
+
+void RWCLock::UpgradeToCommitLock() {
+  lock_.lock();
+  DCHECK(HasWriteLockUnlocked());
+  while (reader_count_ > 0) {
+    no_readers_.Wait();
+  }
+  DCHECK(HasWriteLockUnlocked());
+
+  // Leaves the lock held, which prevents any new readers
+  // or writers.
+}
+
+void RWCLock::CommitUnlock() {
+  DCHECK(!HasReadersUnlocked());
+  DCHECK(HasWriteLockUnlocked());
+  write_locked_ = false;
+#ifndef NDEBUG
+  writer_tid_ = 0;
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+  no_mutators_.Broadcast();
+  lock_.unlock();
+}
+
+} // namespace kudu