You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/23 16:36:51 UTC

[impala] 04/04: IMPALA-7979: Enhance decoders to support value-skipping

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 969dea84f41cfccf863b14904958a5d2be91983f
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Jan 7 16:50:36 2019 +0100

    IMPALA-7979: Enhance decoders to support value-skipping
    
    This commit adds value-skipping functionality to the decoders.
    Value-skipping is useful when we know that we won't need the
    following N values, so instead of decoding and dropping them, we
    can just "jump" through them.
    
    This feature is a prerequisite for Parquet page skipping (IMPALA-5843).
    
    I added backend tests for all the decoders. Backed tests related to
    bitpacking are moved to the newly created bit-stream-utils-test.
    
    Change-Id: Ib848f1bd71735fe84e8064daf700417b32589f57
    Reviewed-on: http://gerrit.cloudera.org:8080/12172
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/CMakeLists.txt               |   1 +
 be/src/exec/parquet/parquet-bool-decoder-test.cc | 108 ++++++++
 be/src/exec/parquet/parquet-bool-decoder.cc      |  28 ++
 be/src/exec/parquet/parquet-bool-decoder.h       |   4 +
 be/src/util/CMakeLists.txt                       |   1 +
 be/src/util/bit-stream-utils-test.cc             | 207 ++++++++++++++
 be/src/util/bit-stream-utils.h                   |   5 +
 be/src/util/bit-stream-utils.inline.h            |  12 +
 be/src/util/dict-encoding.h                      |  20 ++
 be/src/util/dict-test.cc                         |  92 ++++++-
 be/src/util/rle-encoding.h                       |  87 ++++++
 be/src/util/rle-test.cc                          | 330 ++++++++++++++---------
 12 files changed, 765 insertions(+), 130 deletions(-)

diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt
index a3ac2de..7bf24e2 100644
--- a/be/src/exec/parquet/CMakeLists.txt
+++ b/be/src/exec/parquet/CMakeLists.txt
@@ -38,6 +38,7 @@ add_library(Parquet
 
 add_dependencies(Parquet gen-deps)
 
+ADD_BE_LSAN_TEST(parquet-bool-decoder-test)
 ADD_BE_LSAN_TEST(parquet-plain-test)
 ADD_BE_LSAN_TEST(parquet-version-test)
 ADD_BE_LSAN_TEST(hdfs-parquet-scanner-test)
diff --git a/be/src/exec/parquet/parquet-bool-decoder-test.cc b/be/src/exec/parquet/parquet-bool-decoder-test.cc
new file mode 100644
index 0000000..a414546
--- /dev/null
+++ b/be/src/exec/parquet/parquet-bool-decoder-test.cc
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-bool-decoder.h"
+#include "testutil/gtest-util.h"
+
+#include <vector>
+
+#include "common/names.h"
+
+namespace impala {
+
+void EncodeData(const vector<bool>& data, parquet::Encoding::type encoding,
+    uint8_t* buffer, int buffer_len) {
+  if (encoding == parquet::Encoding::PLAIN) {
+    BitWriter writer(buffer, buffer_len);
+    for (int b : data) {
+      ASSERT_TRUE(writer.PutValue(b, 1));
+    }
+    writer.Flush();
+  } else {
+    DCHECK(encoding == parquet::Encoding::RLE);
+    // We need to pass 'buffer + 4' because the ParquetBoolDecoder ignores the first 4
+    // bytes (in Parquet RLE, the first 4 bytes are used to encode the data size).
+    RleEncoder encoder(buffer + 4, buffer_len - 4, 1, 8);
+    for (int b : data) {
+      ASSERT_TRUE(encoder.Put(b));
+    }
+    encoder.Flush();
+  }
+}
+
+void TestSkipping(parquet::Encoding::type encoding, uint8_t* encoded_data,
+    int encoded_data_len, const vector<bool>& expected_data, int skip_at,
+    int skip_count) {
+  using namespace parquet;
+  ParquetBoolDecoder decoder;
+  decoder.SetData(encoding, encoded_data, encoded_data_len);
+
+  auto Decode = [encoding, &decoder]() {
+    bool b;
+    if (encoding == Encoding::PLAIN) {
+      EXPECT_TRUE(decoder.DecodeValue<Encoding::PLAIN>(&b));
+    } else {
+      EXPECT_TRUE(decoder.DecodeValue<Encoding::RLE>(&b));
+    }
+    return b;
+  };
+
+  for (int i = 0; i < skip_at && i < expected_data.size(); ++i) {
+    EXPECT_EQ(Decode(), expected_data[i]) << i;
+  }
+  decoder.SkipValues(skip_count);
+  for (int i = skip_at + skip_count; i < expected_data.size(); ++i) {
+    EXPECT_EQ(Decode(), expected_data[i]) << i;
+  }
+}
+
+TEST(ParquetBoolDecoder, TestDecodeAndSkipping) {
+  vector<bool> expected_data;
+  // Write 100 falses, 100 trues, 100 alternating falses and trues, 100 falses
+  for (int i = 0; i < 100; ++i) expected_data.push_back(false);
+  for (int i = 0; i < 100; ++i) expected_data.push_back(true);
+  for (int i = 0; i < 100; ++i) expected_data.push_back(i % 2);
+  for (int i = 0; i < 100; ++i) expected_data.push_back(false);
+
+  for (auto encoding : {parquet::Encoding::PLAIN, parquet::Encoding::RLE}) {
+    constexpr int buffer_len = 128;
+    uint8_t buffer[buffer_len];
+    EncodeData(expected_data, encoding, buffer, buffer_len);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 8);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 79);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 160);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 260);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 370);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 27, 13);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 112);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 183);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 270);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 350);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 123, 8);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 125, 100);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 225, 17);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 225, 70);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 235, 160);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 17);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 60);
+    TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 63);
+  }
+}
+
+}
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/exec/parquet/parquet-bool-decoder.cc b/be/src/exec/parquet/parquet-bool-decoder.cc
index fdcd1d5..31b9998 100644
--- a/be/src/exec/parquet/parquet-bool-decoder.cc
+++ b/be/src/exec/parquet/parquet-bool-decoder.cc
@@ -65,4 +65,32 @@ bool ParquetBoolDecoder::DecodeValues(
   }
   return true;
 }
+
+bool ParquetBoolDecoder::SkipValues(int num_values) {
+  DCHECK_GT(num_values, 0);
+  int skip_cached = min(num_unpacked_values_ - unpacked_value_idx_, num_values);
+  unpacked_value_idx_ += skip_cached;
+  if (skip_cached == num_values) return true;
+  int num_remaining = num_values - skip_cached;
+  if (encoding_ == parquet::Encoding::PLAIN) {
+    int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32);
+    if (num_to_skip > 0) bool_values_.SkipBatch(1, num_to_skip);
+    num_remaining -= num_to_skip;
+    if (num_remaining > 0) {
+      DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN);
+      num_unpacked_values_ = bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN,
+          &unpacked_values_[0]);
+      if (UNLIKELY(num_unpacked_values_ < num_remaining)) return false;
+      unpacked_value_idx_ = num_remaining;
+    }
+    return true;
+  } else {
+    // rle_decoder_.SkipValues() might fill its internal buffer 'literal_buffer_'.
+    // This can result in sub-optimal decoding later, because 'literal_buffer_' might
+    // be used again and again, especially when reading a very long literal run.
+    DCHECK_EQ(encoding_, parquet::Encoding::RLE);
+    return rle_decoder_.SkipValues(num_remaining) == num_remaining;
+  }
+}
+
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-bool-decoder.h b/be/src/exec/parquet/parquet-bool-decoder.h
index 6ecdbd2..1498ca6 100644
--- a/be/src/exec/parquet/parquet-bool-decoder.h
+++ b/be/src/exec/parquet/parquet-bool-decoder.h
@@ -40,6 +40,10 @@ class ParquetBoolDecoder {
   /// Batched version of DecodeValue() that decodes multiple values at a time.
   bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT;
 
+  /// Skip 'num_values' values from the column data.
+  ///TODO: add e2e tests when page filtering is implemented (IMPALA-5843).
+  bool SkipValues(int num_values) RESTRICT;
+
  private:
   /// Implementation of DecodeValues, templated by ENCODING.
   template <parquet::Encoding::type ENCODING>
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 1405dd8..e80560e 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -113,6 +113,7 @@ target_link_libraries(loggingsupport ${IMPALA_LINK_LIBS_DYNAMIC_TARGETS})
 ADD_BE_LSAN_TEST(benchmark-test)
 ADD_BE_LSAN_TEST(bitmap-test)
 ADD_BE_LSAN_TEST(bit-packing-test)
+ADD_BE_LSAN_TEST(bit-stream-utils-test)
 ADD_BE_LSAN_TEST(bit-util-test)
 ADD_BE_LSAN_TEST(blocking-queue-test)
 ADD_BE_LSAN_TEST(bloom-filter-test)
diff --git a/be/src/util/bit-stream-utils-test.cc b/be/src/util/bit-stream-utils-test.cc
new file mode 100644
index 0000000..56879e7
--- /dev/null
+++ b/be/src/util/bit-stream-utils-test.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+#include "util/bit-packing.inline.h"
+#include "util/bit-stream-utils.inline.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH;
+
+TEST(BitArray, TestBool) {
+  const int len = 8;
+  uint8_t buffer[len];
+
+  BitWriter writer(buffer, len);
+
+  // Write alternating 0's and 1's
+  for (int i = 0; i < 8; ++i) {
+    bool result = writer.PutValue(static_cast<uint64_t>(i % 2), 1);
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+
+  // Write 00110011
+  for (int i = 0; i < 8; ++i) {
+    bool result;
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        result = writer.PutValue(0, 1);
+        break;
+      default:
+        result = writer.PutValue(1, 1);
+        break;
+    }
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+
+  // Validate the exact bit value
+  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+  EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
+
+  // Use the reader and validate
+  BatchedBitReader reader(buffer, len);
+
+  // Ensure it returns the same results after Reset().
+  for (int trial = 0; trial < 2; ++trial) {
+    bool batch_vals[16];
+    EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals));
+    for (int i = 0; i < 8; ++i)  EXPECT_EQ(batch_vals[i], i % 2);
+
+    for (int i = 0; i < 8; ++i) {
+      switch (i) {
+        case 0:
+        case 1:
+        case 4:
+        case 5:
+          EXPECT_EQ(batch_vals[8 + i], false);
+          break;
+        default:
+          EXPECT_EQ(batch_vals[8 + i], true);
+          break;
+      }
+    }
+    reader.Reset(buffer, len);
+  }
+}
+
+// Tests SkipBatch() by comparing the results of a reader with skipping
+// against a reader that doesn't skip.
+template <typename T>
+void TestSkipping(const uint8_t* buffer, const int len, const int bit_width,
+    const int skip_at, const int skip_count) {
+  constexpr int MAX_LEN = 512;
+  DCHECK_LE(len, MAX_LEN);
+  DCHECK_EQ((skip_at * bit_width) % 8, 0);
+  DCHECK_EQ((skip_count * bit_width) % 8, 0);
+  int value_count = len * 8 / bit_width;
+
+  T all_vals[MAX_LEN];
+  BatchedBitReader expected_reader(buffer, len);
+  expected_reader.UnpackBatch(bit_width, value_count, all_vals);
+
+  BatchedBitReader skipping_reader(buffer, len);
+  T vals[MAX_LEN];
+  skipping_reader.UnpackBatch(bit_width, skip_at, vals);
+  skipping_reader.SkipBatch(bit_width, skip_count);
+  skipping_reader.UnpackBatch(bit_width, value_count - skip_count, vals + skip_at);
+
+  for (int i = 0; i < skip_at; ++i) {
+    EXPECT_EQ(all_vals[i], vals[i]);
+  }
+  for (int i = skip_at + skip_count; i < len; ++i) {
+    EXPECT_EQ(all_vals[i], vals[i - skip_count]);
+  }
+}
+
+TEST(BitArray, TestBoolSkip) {
+  const int len = 4;
+  uint8_t buffer[len];
+
+  BitWriter writer(buffer, len);
+  // Write 00000000 11111111 11111111 00000000
+  for (int i = 0; i < 8; ++i) ASSERT_TRUE(writer.PutValue(0, 1));
+  for (int i = 0; i < 16; ++i) ASSERT_TRUE(writer.PutValue(1, 1));
+  for (int i = 0; i < 8; ++i) ASSERT_TRUE(writer.PutValue(0, 1));
+  writer.Flush();
+
+  TestSkipping<bool>(buffer, len, 1, 0, 8);
+  TestSkipping<bool>(buffer, len, 1, 0, 16);
+  TestSkipping<bool>(buffer, len, 1, 8, 8);
+  TestSkipping<bool>(buffer, len, 1, 8, 16);
+  TestSkipping<bool>(buffer, len, 1, 16, 8);
+  TestSkipping<bool>(buffer, len, 1, 16, 16);
+}
+
+TEST(BitArray, TestIntSkip) {
+  constexpr int len = 512;
+  const int bit_width = 6;
+  uint8_t buffer[len];
+
+  BitWriter writer(buffer, len);
+  for (int i = 0; i < (1 << bit_width); ++i) {
+    ASSERT_TRUE(writer.PutValue(i, bit_width));
+  }
+  int bytes_written = writer.bytes_written();
+  TestSkipping<int>(buffer, bytes_written, bit_width, 0, 4);
+  TestSkipping<int>(buffer, bytes_written, bit_width, 4, 4);
+  TestSkipping<int>(buffer, bytes_written, bit_width, 4, 8);
+  TestSkipping<int>(buffer, bytes_written, bit_width, 8, 56);
+}
+
+// Writes 'num_vals' values with width 'bit_width' and reads them back.
+void TestBitArrayValues(int bit_width, int num_vals) {
+  const int len = BitUtil::Ceil(bit_width * num_vals, 8);
+  const int64_t mod = bit_width == 64 ? 1 : 1LL << bit_width;
+
+  uint8_t buffer[(len > 0) ? len : 1];
+  BitWriter writer(buffer, len);
+  for (int i = 0; i < num_vals; ++i) {
+    bool result = writer.PutValue(i % mod, bit_width);
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+  EXPECT_EQ(writer.bytes_written(), len);
+
+  BatchedBitReader reader(buffer, len);
+  BatchedBitReader reader2(reader); // Test copy constructor.
+  // Ensure it returns the same results after Reset().
+  for (int trial = 0; trial < 2; ++trial) {
+    // Unpack all values at once with one batched reader and in small batches with the
+    // other batched reader.
+    vector<int64_t> batch_vals(num_vals);
+    const int BATCH_SIZE = 32;
+    vector<int64_t> batch_vals2(BATCH_SIZE);
+    EXPECT_EQ(num_vals,
+        reader.UnpackBatch(bit_width, num_vals, batch_vals.data()));
+    for (int i = 0; i < num_vals; ++i) {
+      if (i % BATCH_SIZE == 0) {
+        int num_to_unpack = min(BATCH_SIZE, num_vals - i);
+        EXPECT_EQ(num_to_unpack,
+           reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data()));
+      }
+      EXPECT_EQ(i % mod, batch_vals[i]);
+      EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]);
+    }
+    EXPECT_EQ(reader.bytes_left(), 0);
+    EXPECT_EQ(reader2.bytes_left(), 0);
+    reader.Reset(buffer, len);
+    reader2.Reset(buffer, len);
+  }
+}
+
+TEST(BitArray, TestValues) {
+  for (int width = 0; width <= MAX_WIDTH; ++width) {
+    TestBitArrayValues(width, 1);
+    TestBitArrayValues(width, 2);
+    // Don't write too many values
+    TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
+    TestBitArrayValues(width, 1024);
+  }
+}
+
+}
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index a7ef292..9889f35 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -133,6 +133,11 @@ class BatchedBitReader {
   template<typename T>
   int UnpackBatch(int bit_width, int num_values, T* v);
 
+  /// Skip 'num_values_to_skip' bit-packed values.
+  /// 'num_values_to_skip * bit_width' is either divisible by 8, or
+  /// 'num_values_to_skip' equals to the count of the remaining bit-packed values.
+  bool SkipBatch(int bit_width, int num_values_to_skip);
+
   /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the
   /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is
   /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'.
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index 48f52da..08f1700 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -102,6 +102,18 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
   return static_cast<int>(num_read);
 }
 
+inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) {
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GT(bit_width, 0);
+  DCHECK_LE(bit_width, MAX_BITWIDTH);
+  DCHECK_GT(num_values_to_skip, 0);
+
+  int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip);
+  if (skip_bytes > buffer_end_ - buffer_pos_) return false;
+  buffer_pos_ += skip_bytes;
+  return true;
+}
+
 template <typename T>
 inline int BatchedBitReader::UnpackAndDecodeBatch(
     int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride) {
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index cc7ef82..78dc42f 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -366,6 +366,9 @@ class DictDecoder : public DictDecoderBase {
     return sizeof(T) * dict_.size();
   }
 
+  /// Skip 'num_values' values from the input.
+  bool SkipValues(int64_t num_values) WARN_UNUSED_RESULT;
+
  private:
   /// List of decoded values stored in the dict_
   std::vector<T> dict_;
@@ -536,6 +539,23 @@ ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValues(
 }
 
 template <typename T>
+ALWAYS_INLINE inline bool DictDecoder<T>::SkipValues(int64_t num_values) {
+  int64_t num_remaining = num_values;
+  if (num_repeats_ > 0) {
+    int64_t num_to_skip = std::min(num_remaining, num_repeats_);
+    num_repeats_ -= num_to_skip;
+    num_remaining -= num_to_skip;
+  } else if (next_literal_idx_ < num_literal_values_) {
+    int64_t num_to_skip = std::min<int64_t>(num_literal_values_ -
+        next_literal_idx_, num_remaining);
+    next_literal_idx_ += num_to_skip;
+    num_remaining -= num_to_skip;
+  }
+  if (num_remaining > 0) return data_decoder_.SkipValues(num_remaining) == num_remaining;
+  return true;
+}
+
+template <typename T>
 uint32_t DictDecoder<T>::CopyLiteralsToOutput(
     uint32_t max_to_copy, StrideWriter<T>* out) {
   uint32_t num_to_copy =
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index c30c30b..ebfba11 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -32,9 +32,29 @@
 
 namespace impala {
 
+// Helper function to validate that 'decoder' decodes the expected values.
+// If 'skip_at' and 'skip_count' aren't zero, then the value skipping logic
+// is also exercised.
+template<typename InternalType>
+void ValidateValues(DictDecoder<InternalType>& decoder,
+const vector<InternalType>& values, int skip_at, int skip_count, bool skip_success) {
+  for (int i = 0; i < skip_at; ++i) {
+    InternalType j;
+    ASSERT_TRUE(decoder.GetNextValue(&j));
+    EXPECT_EQ(values[i], j) << i;
+  }
+  EXPECT_EQ(decoder.SkipValues(skip_count), skip_success);
+  for (int i = skip_at + skip_count; i < values.size(); ++i) {
+    InternalType j;
+    ASSERT_TRUE(decoder.GetNextValue(&j));
+    EXPECT_EQ(values[i], j) << i;
+  }
+}
+
 template<typename InternalType, parquet::Type::type PARQUET_TYPE>
 void ValidateDict(const vector<InternalType>& values,
-    const vector<InternalType>& dict_values, int fixed_buffer_byte_size) {
+    const vector<InternalType>& dict_values, int fixed_buffer_byte_size,
+    int skip_at = 0, int skip_count = 0, bool skip_success = true) {
   set<InternalType> values_set(values.begin(), values.end());
 
   int bytes_alloc = 0;
@@ -51,7 +71,7 @@ void ValidateDict(const vector<InternalType>& values,
   uint8_t dict_buffer[encoder.dict_encoded_size()];
   encoder.WriteDict(dict_buffer);
 
-  int data_buffer_len = encoder.EstimatedDataEncodedSize();
+  int data_buffer_len = encoder.EstimatedDataEncodedSize() * 2;
   uint8_t data_buffer[data_buffer_len];
   int data_len = encoder.WriteData(data_buffer, data_buffer_len);
   EXPECT_GT(data_len, 0);
@@ -74,13 +94,7 @@ void ValidateDict(const vector<InternalType>& values,
   }
   // Test access to dictionary via internal stream
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
-  int count = 0;
-  for (InternalType i : values) {
-    InternalType j;
-    ASSERT_TRUE(decoder.GetNextValue(&j));
-    EXPECT_EQ(i, j) << count;
-    ++count;
-  }
+  ValidateValues(decoder, values, skip_at, skip_count, skip_success);
   pool.FreeAll();
 }
 
@@ -322,6 +336,66 @@ TEST(DictTest, DecodeErrors) {
   }
 }
 
+TEST(DictTest, TestSkippingValues) {
+  auto ValidateSkipping = [](const vector<int32_t>& values,
+      const vector<int32_t>& dict_values, int skip_at, int skip_count,
+      bool skip_success = true) {
+    const int value_byte_size = ParquetPlainEncoder::EncodedByteSize(
+        ColumnType(TYPE_INT));
+    ValidateDict<int32_t, parquet::Type::INT32>(values, dict_values, value_byte_size,
+        skip_at, skip_count, skip_success);
+  };
+
+  vector<int32_t> literal_values;
+  for (int i = 0; i < 200; ++i) literal_values.push_back(i);
+  ValidateSkipping(literal_values, literal_values, 0, 4);
+  ValidateSkipping(literal_values, literal_values, 0, 130);
+  ValidateSkipping(literal_values, literal_values, 2, 4);
+  ValidateSkipping(literal_values, literal_values, 4, 48);
+  ValidateSkipping(literal_values, literal_values, 7, 130);
+  // Skipping too many values should fail
+  ValidateSkipping(literal_values, literal_values, 4, 300, false);
+
+  vector<int32_t> repeated_values(200, 1000);
+  ValidateSkipping(repeated_values, {1000}, 0, 4);
+  ValidateSkipping(repeated_values, {1000}, 0, 49);
+  ValidateSkipping(repeated_values, {1000}, 0, 145);
+  ValidateSkipping(repeated_values, {1000}, 3, 4);
+  ValidateSkipping(repeated_values, {1000}, 4, 49);
+  ValidateSkipping(repeated_values, {1000}, 4, 150);
+  // Skipping too many values should fail
+  ValidateSkipping(repeated_values, {1000}, 4, 300, false);
+
+  auto Concat = [](const vector<int32_t>& a, const vector<int32_t>& b) {
+    vector<int32_t> ab(a);
+    ab.insert(ab.end(), b.begin(), b.end());
+    return ab;
+  };
+  vector<int32_t> literal_then_repeated = Concat(literal_values, repeated_values);
+  vector<int32_t> literal_then_repeated_dict = Concat(literal_values, {1000});
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 4);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 87);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 200);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 222);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 4, 19);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 4, 200);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 200, 47);
+  ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 234, 166);
+
+  vector<int32_t> repeated_then_literal = Concat(repeated_values, literal_values);
+  vector<int32_t> repeated_then_literal_dict = Concat({1000}, literal_values);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 4);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 89);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 200);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 232);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 8);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 88);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 288);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 11);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 79);
+  ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 170);
+}
+
 }
 
 IMPALA_TEST_MAIN();
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 7dc05ab..21adc02 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -143,7 +143,17 @@ class RleBatchDecoder {
   /// Returns the number of consumed values or 0 if an error occurred.
   int32_t GetValues(int32_t num_values_to_consume, T* values);
 
+  /// Skip 'num_values' values.
+  /// Returns the number of skipped values or 0 if an error occurred.
+  int32_t SkipValues(int32_t num_values);
+
  private:
+  /// Skip 'num_literals_to_skip' literals.
+  bool SkipLiteralValues(int32_t num_literals_to_skip) WARN_UNUSED_RESULT;
+
+  /// Skip 'num_values' repeated values.
+  void SkipRepeatedValues(int32_t num_values);
+
   BatchedBitReader bit_reader_;
 
   /// Number of bits needed to encode the value. Must be between 0 and 64 after
@@ -191,6 +201,9 @@ class RleBatchDecoder {
   /// 'literal_count_'. Returns the number of literals outputted.
   int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
 
+  /// Skip buffered literals
+  int32_t SkipBufferedLiterals(int32_t max_to_skip);
+
   /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
   /// 'literal_count_'. Returns the number of literals outputted or 0 if a
   /// decoding error is encountered.
@@ -619,6 +632,13 @@ inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
 }
 
 template <typename T>
+inline void RleBatchDecoder<T>::SkipRepeatedValues(int32_t num_values) {
+  DCHECK_GT(num_values, 0);
+  DCHECK_GE(repeat_count_, num_values);
+  repeat_count_ -= num_values;
+}
+
+template <typename T>
 inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
   if (literal_count_ > 0) return literal_count_;
   if (repeat_count_ == 0) NextCounts();
@@ -663,6 +683,34 @@ inline bool RleBatchDecoder<T>::GetLiteralValues(
 }
 
 template <typename T>
+inline bool RleBatchDecoder<T>::SkipLiteralValues(int32_t num_literals_to_skip) {
+  DCHECK_GT(num_literals_to_skip, 0);
+  DCHECK_GE(literal_count_, num_literals_to_skip);
+  DCHECK(!HaveBufferedLiterals());
+
+  int32_t num_remaining = num_literals_to_skip;
+
+  // Need to round to a batch of 32 if the caller is skipping only part of the current
+  // run to avoid ending on a non-byte boundary.
+  int32_t num_to_skip = std::min<int32_t>(literal_count_,
+      BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+  if (num_to_skip > 0) {
+    bit_reader_.SkipBatch(bit_width_, num_to_skip);
+    literal_count_ -= num_to_skip;
+    num_remaining -= num_to_skip;
+  }
+
+  if (num_remaining > 0) {
+    // Earlier we called RoundDownToPowerOf2() to skip literals that fit on byte boundary.
+    // But some literals still need to be skipped. Let's fill the literal buffer
+    // and skip 'num_remaining' values.
+    if (UNLIKELY(!FillLiteralBuffer())) return false;
+    if (SkipBufferedLiterals(num_remaining) != num_remaining) return false;
+  }
+  return true;
+}
+
+template <typename T>
 template <typename OutType>
 inline bool RleBatchDecoder<T>::DecodeLiteralValues(int32_t num_literals_to_consume,
     OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) {
@@ -774,6 +822,16 @@ inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
 }
 
 template <typename T>
+inline int32_t RleBatchDecoder<T>::SkipBufferedLiterals(
+    int32_t max_to_skip) {
+  int32_t num_to_skip =
+      std::min<int32_t>(max_to_skip, num_buffered_literals_ - literal_buffer_pos_);
+  literal_buffer_pos_ += num_to_skip;
+  literal_count_ -= num_to_skip;
+  return num_to_skip;
+}
+
+template <typename T>
 template <typename OutType>
 inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(int32_t max_to_output,
     OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) {
@@ -823,6 +881,35 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t num_values_to_consume, T* v
 }
 
 template <typename T>
+inline int32_t RleBatchDecoder<T>::SkipValues(int32_t num_values) {
+  DCHECK_GT(num_values, 0);
+
+  int32_t num_skipped = 0;
+  if (HaveBufferedLiterals()) {
+    num_skipped = SkipBufferedLiterals(num_values);
+  }
+  while (num_skipped < num_values) {
+    // Skip RLE encoded values
+    int32_t num_repeats = NextNumRepeats();
+    if (num_repeats > 0) {
+       int32_t num_repeats_to_consume =
+           std::min(num_repeats, num_values - num_skipped);
+       SkipRepeatedValues(num_repeats_to_consume);
+       num_skipped += num_repeats_to_consume;
+       continue;
+    }
+
+    // Skip literals
+    int32_t num_literals = NextNumLiterals();
+    if (num_literals == 0) break;
+    int32_t num_literals_to_skip = std::min(num_literals, num_values - num_skipped);
+    if (!SkipLiteralValues(num_literals_to_skip)) return 0;
+    num_skipped += num_literals_to_skip;
+  }
+  return num_skipped;
+}
+
+template <typename T>
 constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN;
 }
 
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index 4406e46..939fe19 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -21,10 +21,11 @@
 
 #include <boost/utility.hpp>
 #include <math.h>
+#include <random>
 
 #include "testutil/gtest-util.h"
 #include "util/bit-packing.inline.h"
-#include "util/bit-stream-utils.inline.h"
+#include "util/bit-stream-utils.h"
 #include "util/rle-encoding.h"
 
 #include "common/names.h"
@@ -33,119 +34,6 @@ namespace impala {
 
 const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH;
 
-TEST(BitArray, TestBool) {
-  const int len = 8;
-  uint8_t buffer[len];
-
-  BitWriter writer(buffer, len);
-
-  // Write alternating 0's and 1's
-  for (int i = 0; i < 8; ++i) {
-    bool result = writer.PutValue(static_cast<uint64_t>(i % 2), 1);
-    EXPECT_TRUE(result);
-  }
-  writer.Flush();
-  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
-
-  // Write 00110011
-  for (int i = 0; i < 8; ++i) {
-    bool result;
-    switch (i) {
-      case 0:
-      case 1:
-      case 4:
-      case 5:
-        result = writer.PutValue(0, 1);
-        break;
-      default:
-        result = writer.PutValue(1, 1);
-        break;
-    }
-    EXPECT_TRUE(result);
-  }
-  writer.Flush();
-
-  // Validate the exact bit value
-  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
-  EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
-
-  // Use the reader and validate
-  BatchedBitReader reader(buffer, len);
-
-  // Ensure it returns the same results after Reset().
-  for (int trial = 0; trial < 2; ++trial) {
-    bool batch_vals[16];
-    EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals));
-    for (int i = 0; i < 8; ++i)  EXPECT_EQ(batch_vals[i], i % 2);
-
-    for (int i = 0; i < 8; ++i) {
-      switch (i) {
-        case 0:
-        case 1:
-        case 4:
-        case 5:
-          EXPECT_EQ(batch_vals[8 + i], false);
-          break;
-        default:
-          EXPECT_EQ(batch_vals[8 + i], true);
-          break;
-      }
-    }
-    reader.Reset(buffer, len);
-  }
-}
-
-// Writes 'num_vals' values with width 'bit_width' and reads them back.
-void TestBitArrayValues(int bit_width, int num_vals) {
-  const int len = BitUtil::Ceil(bit_width * num_vals, 8);
-  const int64_t mod = bit_width == 64 ? 1 : 1LL << bit_width;
-
-  uint8_t buffer[(len > 0) ? len : 1];
-  BitWriter writer(buffer, len);
-  for (int i = 0; i < num_vals; ++i) {
-    bool result = writer.PutValue(i % mod, bit_width);
-    EXPECT_TRUE(result);
-  }
-  writer.Flush();
-  EXPECT_EQ(writer.bytes_written(), len);
-
-  BatchedBitReader reader(buffer, len);
-  BatchedBitReader reader2(reader); // Test copy constructor.
-  // Ensure it returns the same results after Reset().
-  for (int trial = 0; trial < 2; ++trial) {
-    // Unpack all values at once with one batched reader and in small batches with the
-    // other batched reader.
-    vector<int64_t> batch_vals(num_vals);
-    const int BATCH_SIZE = 32;
-    vector<int64_t> batch_vals2(BATCH_SIZE);
-    EXPECT_EQ(num_vals,
-        reader.UnpackBatch(bit_width, num_vals, batch_vals.data()));
-    for (int i = 0; i < num_vals; ++i) {
-      if (i % BATCH_SIZE == 0) {
-        int num_to_unpack = min(BATCH_SIZE, num_vals - i);
-        EXPECT_EQ(num_to_unpack,
-           reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data()));
-      }
-      EXPECT_EQ(i % mod, batch_vals[i]);
-      EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]);
-    }
-    EXPECT_EQ(reader.bytes_left(), 0);
-    EXPECT_EQ(reader2.bytes_left(), 0);
-    reader.Reset(buffer, len);
-    reader2.Reset(buffer, len);
-  }
-}
-
-TEST(BitArray, TestValues) {
-  for (int width = 0; width <= MAX_WIDTH; ++width) {
-    TestBitArrayValues(width, 1);
-    TestBitArrayValues(width, 2);
-    // Don't write too many values
-    TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
-    TestBitArrayValues(width, 1024);
-  }
-}
-
 class RleTest : public ::testing::Test {
  protected:
   /// All the legal values for min_repeated_run_length to pass to RleEncoder() in tests.
@@ -188,12 +76,35 @@ class RleTest : public ::testing::Test {
     return true;
   }
 
+  /// Get many values from a batch RLE decoder using its low level functions.
+  template <typename T>
+  static bool GetRleValuesSkip(RleBatchDecoder<T>* decoder, int num_vals, T* vals,
+      int skip_at, int skip_count) {
+    if (!GetRleValues(decoder, skip_at, vals)) return false;
+    if (decoder->SkipValues(skip_count) != skip_count) return false;
+    int consumed = skip_at + skip_count;
+    if (!GetRleValues(decoder, num_vals - consumed, vals + skip_at)) return false;
+    return true;
+  }
+
   /// Get many values from a batch RLE decoder using its GetValues() function.
   template <typename T>
   static bool GetRleValuesBatched(RleBatchDecoder<T>* decoder, int num_vals, T* vals) {
     return num_vals == decoder->GetValues(num_vals, vals);
   }
 
+  /// Get many values from a batch RLE decoder using its GetValues() function.
+  template <typename T>
+  static bool GetRleValuesBatchedSkip(RleBatchDecoder<T>* decoder, int num_vals, T* vals,
+      int skip_at, int skip_count) {
+    int cnt = 0;
+    if (skip_at > 0) cnt += decoder->GetValues(skip_at, vals);
+    if (decoder->SkipValues(skip_count) != skip_count) return false;
+    cnt += skip_count;
+    if (num_vals - cnt > 0) cnt += decoder->GetValues(num_vals - cnt, vals + skip_at);
+    return cnt == num_vals;
+  }
+
   // Validates encoding of values by encoding and decoding them.
   // If expected_encoding != NULL, validates that the encoded buffer is
   // exactly 'expected_encoding'.
@@ -260,6 +171,72 @@ class RleTest : public ::testing::Test {
     return encoded_len;
   }
 
+  int ValidateRleSkip(const vector<int>& values, int bit_width,
+      int min_repeated_run_length, int skip_at, int skip_count, unsigned int seed=0) {
+    stringstream ss;
+    ss << "bit_width=" << bit_width
+       << " min_repeated_run_length_=" << min_repeated_run_length
+       << " skip_at=" << skip_at
+       << " skip_count=" << skip_count
+       << " values.size()=" << values.size()
+       << " seed=" << seed;
+    const string& description = ss.str();
+    const int len = 64 * 1024;
+    uint8_t buffer[len];
+
+    RleEncoder encoder(buffer, len, bit_width, min_repeated_run_length);
+    int encoded_len = 0;
+
+    for (int i = 0; i < values.size(); ++i) {
+      bool result = encoder.Put(values[i]);
+      EXPECT_TRUE(result);
+    }
+    encoded_len = encoder.Flush();
+
+    vector<int> expected_values(values.begin(), values.begin() + skip_at);
+    for (int i = skip_at + skip_count; i < values.size(); ++i) {
+      expected_values.push_back(values[i]);
+    }
+
+    // Verify read.
+    RleBatchDecoder<uint64_t> per_value_decoder(buffer, len, bit_width);
+    RleBatchDecoder<uint64_t> per_run_decoder(buffer, len, bit_width);
+    RleBatchDecoder<uint64_t> batch_decoder(buffer, len, bit_width);
+    // Ensure it returns the same results after Reset().
+    for (int trial = 0; trial < 2; ++trial) {
+      for (int i = 0; i < skip_at; ++i) {
+        uint64_t val;
+        EXPECT_TRUE(per_value_decoder.GetSingleValue(&val)) << description;
+        EXPECT_EQ(expected_values[i], val) << description << " i=" << i << " trial="
+            << trial;
+      }
+      per_value_decoder.SkipValues(skip_count);
+      for (int i = skip_at; i < expected_values.size(); ++i) {
+              uint64_t val;
+              EXPECT_TRUE(per_value_decoder.GetSingleValue(&val)) << description;
+              EXPECT_EQ(expected_values[i], val) << description << " i=" << i << " trial="
+                  << trial;
+      }
+      // Unpack everything at once from the other decoders.
+      vector<uint64_t> decoded_values1(expected_values.size());
+      vector<uint64_t> decoded_values2(expected_values.size());
+      EXPECT_TRUE(
+          GetRleValuesSkip(&per_run_decoder, values.size(),
+              decoded_values1.data(), skip_at, skip_count)) << description;
+      EXPECT_TRUE(
+          GetRleValuesBatchedSkip(&batch_decoder, values.size(),
+              decoded_values2.data(), skip_at, skip_count)) << description;
+      for (int i = 0; i < expected_values.size(); ++i) {
+        EXPECT_EQ(expected_values[i], decoded_values1[i]) << description << " i=" << i;
+        EXPECT_EQ(expected_values[i], decoded_values2[i]) << description << " i=" << i;
+      }
+      per_value_decoder.Reset(buffer, len, bit_width);
+      per_run_decoder.Reset(buffer, len, bit_width);
+      batch_decoder.Reset(buffer, len, bit_width);
+    }
+    return encoded_len;
+  }
+
   // ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value
   // is used, otherwise alternating values are used.
   void TestRleValues(int bit_width, int num_vals, int value = -1) {
@@ -282,23 +259,69 @@ class RleTest : public ::testing::Test {
   }
 
   /// Make a sequence of values.
-  /// literalLength1: the length of an initial literal sequence.
-  /// repeatedLength: the length of a repeated sequence.
-  /// literalLength2: the length of a closing literal sequence.
-  vector<int>& MakeSequence(vector<int>& values, int intitial_literal_length,
-      int repeated_length, int trailing_literal_length) {
+  /// intitial_literal_length: the length of an initial literal sequence.
+  /// repeated_length: the length of a repeated sequence.
+  /// trailing_literal_length: the length of a closing literal sequence.
+  /// bit_width: the bit length of the values being used.
+  vector<int>& MakeSequenceBitWidth(vector<int>& values, int intitial_literal_length,
+      int repeated_length, int trailing_literal_length, int bit_width) {
+    const int64_t mod = 1LL << bit_width;
     values.clear();
     for (int i = 0; i < intitial_literal_length; ++i) {
-      values.push_back(i % 2);
+      values.push_back(i % mod);
     }
     for (int i = 0; i < repeated_length; ++i) {
       values.push_back(1);
     }
     for (int i = 0; i < trailing_literal_length; ++i) {
-      values.push_back(i % 2);
+      values.push_back(i % mod);
     }
     return values;
   }
+
+  /// Same as above with bit width being 1.
+  vector<int>& MakeSequence(vector<int>& values, int intitial_literal_length,
+      int repeated_length, int trailing_literal_length) {
+    return MakeSequenceBitWidth(values, intitial_literal_length, repeated_length,
+        trailing_literal_length, 1);
+  }
+
+  /// Generates a sequence that contains repeated and literal runs with random lengths.
+  /// Total length of the sequence is limited by 'max_run_length'. The random generation
+  /// is seeded by 'seed' to allow deterministic behavior.
+  vector<int> MakeRandomSequence(unsigned int seed, int total_length, int max_run_length,
+      int bit_width) {
+    std::default_random_engine random_eng(seed);
+    auto NextRunLength = [&]() {
+      std::uniform_int_distribution<int> uni_dist(1, max_run_length);
+      return uni_dist(random_eng);
+    };
+    auto IsNextRunRepeated = [&random_eng]() {
+      std::uniform_int_distribution<int> uni_dist(0, 1);
+      return uni_dist(random_eng) == 0;
+    };
+    auto NextVal = [bit_width](int val) {
+      return (val + 1) % (1 << bit_width);
+    };
+
+    vector<int> ret;
+    int run_length = 0;
+    int val = 0;
+    int is_repeated = false;
+    while (ret.size() < total_length) {
+      if (run_length == 0) {
+        run_length = NextRunLength();
+        is_repeated = IsNextRunRepeated();
+        val = NextVal(val);
+      }
+      ret.push_back(val);
+      if (!is_repeated) {
+        val = NextVal(val);
+      }
+      --run_length;
+    }
+    return ret;
+  }
 };
 
 /// Basic test case for literal unpacking - two literals in a run.
@@ -312,6 +335,71 @@ TEST_F(RleTest, TwoLiteralRun) {
   }
 }
 
+TEST_F(RleTest, ValueSkipping) {
+  vector<int> seq;
+  for (int min_run_length : legal_min_run_lengths_) {
+    for (int bit_width : {1, 3, 7, 8, 20, 32}) {
+      MakeSequenceBitWidth(seq, 100, 100, 100, bit_width);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 7);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 64);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 75);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 100);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 105);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 155);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 200);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 213);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 267);
+      ValidateRleSkip(seq, bit_width, min_run_length, 0, 300);
+      ValidateRleSkip(seq, bit_width, min_run_length, 7, 7);
+      ValidateRleSkip(seq, bit_width, min_run_length, 35, 64);
+      ValidateRleSkip(seq, bit_width, min_run_length, 55, 75);
+      ValidateRleSkip(seq, bit_width, min_run_length, 99, 100);
+      ValidateRleSkip(seq, bit_width, min_run_length, 100, 11);
+      ValidateRleSkip(seq, bit_width, min_run_length, 101, 55);
+      ValidateRleSkip(seq, bit_width, min_run_length, 102, 155);
+      ValidateRleSkip(seq, bit_width, min_run_length, 104, 17);
+      ValidateRleSkip(seq, bit_width, min_run_length, 122, 178);
+      ValidateRleSkip(seq, bit_width, min_run_length, 200, 3);
+      ValidateRleSkip(seq, bit_width, min_run_length, 200, 65);
+      ValidateRleSkip(seq, bit_width, min_run_length, 203, 17);
+      ValidateRleSkip(seq, bit_width, min_run_length, 215, 70);
+      ValidateRleSkip(seq, bit_width, min_run_length, 217, 83);
+    }
+  }
+}
+
+// Tests value-skipping on randomly generated input and
+// random skipping positions and counts.
+TEST_F(RleTest, ValueSkippingFuzzy) {
+  const int bitwidth_iteration = 10;
+  const int probe_iteration = 100;
+  const int total_sequence_length = 2048;
+
+  std::random_device r;
+  unsigned int seed = r();
+  std::default_random_engine random_eng(seed);
+
+  // Generates random number between 'bottom' and 'top' (inclusive intervals).
+  auto GetRandom = [&random_eng](int bottom, int top) {
+    std::uniform_int_distribution<int> uni_dist(bottom, top);
+    return uni_dist(random_eng);
+  };
+
+  for (int min_run_length : legal_min_run_lengths_) {
+    for (int i = 0; i < bitwidth_iteration; ++i) {
+      int bit_width = GetRandom(1, 32);
+      int max_run_length = GetRandom(5, 200);
+      vector<int> seq = MakeRandomSequence(seed, total_sequence_length, max_run_length,
+          bit_width);
+      for (int j = 0; j < probe_iteration; ++j) {
+        int skip_at = GetRandom(0, seq.size() - 1);
+        int skip_count = GetRandom(1, seq.size() - skip_at);
+        ValidateRleSkip(seq, bit_width, min_run_length, skip_at, skip_count, seed);
+      }
+    }
+  }
+}
+
 TEST_F(RleTest, SpecificSequences) {
   const int len = 1024;
   uint8_t expected_buffer[len];