You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/16 21:26:00 UTC

[1/3] incubator-impala git commit: Remove unused/defunct Maven repositories.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 6769220e2 -> ae116b5bf


Remove unused/defunct Maven repositories.

Removes three Maven repositories. davidtrott and codehaus both don't
exist any more, so they're not doing anyone any good. (We had previously
cleaned up Codehaus in IMPALA-5224, but a reference was resurrected.)
The libphonenumber repo was simply misconfigured: the library exists in
Maven central in the "normal" place, and a subdirectory repo is
unnecessary.

To test this, I ran "buildall" after removing ~/.m2/ on my machine.

Change-Id: I79eb6c483561726c7cbaf86874001f1979128720
Reviewed-on: http://gerrit.cloudera.org:8080/8497
Tested-by: Impala Public Jenkins
Reviewed-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/155bb776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/155bb776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/155bb776

Branch: refs/heads/master
Commit: 155bb7764989c45f8274aac3196852a830402375
Parents: 6769220
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Nov 6 16:10:58 2017 -0800
Committer: Alex Behm <al...@cloudera.com>
Committed: Thu Nov 16 15:51:48 2017 +0000

----------------------------------------------------------------------
 common/yarn-extras/pom.xml   |  8 --------
 fe/pom.xml                   | 11 -----------
 tests/test-hive-udfs/pom.xml |  5 -----
 3 files changed, 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/common/yarn-extras/pom.xml
----------------------------------------------------------------------
diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml
index 8f3ba4d..8b34174 100644
--- a/common/yarn-extras/pom.xml
+++ b/common/yarn-extras/pom.xml
@@ -71,14 +71,6 @@
         <enabled>false</enabled>
       </snapshots>
     </repository>
-
-    <repository>
-      <id>Codehaus repository</id>
-      <url>http://repository.codehaus.org/</url>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </repository>
   </repositories>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index a0330d7..85b16aa 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -743,17 +743,6 @@ under the License.
       </snapshots>
     </pluginRepository>
 
-    <pluginRepository>
-      <id>dtrott</id>
-      <url>http://maven.davidtrott.com/repository</url>
-      <releases>
-        <enabled>true</enabled>
-      </releases>
-      <snapshots>
-        <enabled>false</enabled>
-      </snapshots>
-    </pluginRepository>
-
     <!-- This is needed for the cup maven plugin. TODO add the plugin to our maven repo -->
     <pluginRepository>
       <id>sonatype-nexus-snapshots</id>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/tests/test-hive-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/tests/test-hive-udfs/pom.xml b/tests/test-hive-udfs/pom.xml
index 7f18c7b..17979ed 100644
--- a/tests/test-hive-udfs/pom.xml
+++ b/tests/test-hive-udfs/pom.xml
@@ -99,10 +99,5 @@ under the License.
         <enabled>false</enabled>
       </snapshots>
     </repository>
-    <repository>
-      <id>com.google.libphonenumber</id>
-      <url>http://repo1.maven.org/maven2/com/googlecode/libphonenumber</url>
-      <name>Google phone number library</name>
-    </repository>
   </repositories>
 </project>


[2/3] incubator-impala git commit: IMPALA-4177, IMPALA-6039: batched bit reading and rle decoding

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index d5b0d01..3c83c23 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -49,6 +49,7 @@ using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
 using namespace apache::thrift;
 using namespace parquet;
+using impala::RleBatchDecoder;
 using std::min;
 
 using impala::PARQUET_VERSION_NUMBER;
@@ -119,15 +120,6 @@ string GetSchema(const FileMetaData& md) {
   return ss.str();
 }
 
-// Inherit from RleDecoder to get access to repeat_count_, which is protected.
-class ParquetLevelReader : public impala::RleDecoder {
- public:
-  ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width)
-    : RleDecoder(buffer, buffer_len, bit_width) {}
-
-  uint32_t repeat_count() const { return repeat_count_; }
-};
-
 // Performs sanity checking on the contents of data pages, to ensure that:
 //   - Compressed pages can be uncompressed successfully.
 //   - The number of def levels matches num_values in the page header when using RLE.
@@ -163,18 +155,19 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
     // Parquet data pages always start with the encoded definition level data, and
     // RLE sections in Parquet always start with a 4 byte length followed by the data.
     int num_def_level_bytes = *reinterpret_cast<const int32_t*>(data);
-    ParquetLevelReader def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
+    RleBatchDecoder<uint8_t> def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
         num_def_level_bytes, sizeof(uint8_t));
     uint8_t level;
     for (int i = 0; i < header.data_page_header.num_values; ++i) {
-      if (!def_levels.Get(&level)) {
+      if (!def_levels.GetSingleValue(&level)) {
         cerr << "Error: Decoding of def levels failed.\n";
         exit(1);
       }
 
-      if (i + def_levels.repeat_count() + 1 > header.data_page_header.num_values) {
-        cerr << "Error: More def levels encoded (" << (i + def_levels.repeat_count() + 1)
-             << ") than num_values (" << header.data_page_header.num_values << ").\n";
+      if (i + def_levels.NextNumRepeats() + 1 > header.data_page_header.num_values) {
+        cerr << "Error: More def levels encoded ("
+              << (i + def_levels.NextNumRepeats() + 1) << ") than num_values ("
+              << header.data_page_header.num_values << ").\n";
         exit(1);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 2e2dd7f..861bf8e 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -76,49 +76,116 @@ namespace impala {
 /// (total 26 bytes, 1 byte overhead)
 //
 
-/// Decoder class for RLE encoded data.
-class RleDecoder {
+/// RLE decoder with a batch-oriented interface that enables fast decoding.
+/// Users of this class must first initialize the class to point to a buffer of
+/// RLE-encoded data, passed into the constructor or Reset(). Then they can
+/// decode data by checking NextNumRepeats()/NextNumLiterals() to see if the
+/// next run is a repeated or literal run, then calling GetRepeatedValue()
+/// or GetLiteralValues() respectively to read the values.
+///
+/// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
+/// Other decoding errors are signalled by functions returning false. If an
+/// error is encountered then it is not valid to read any more data until
+/// Reset() is called.
+template <typename T>
+class RleBatchDecoder {
  public:
-  /// Create a decoder object. buffer/buffer_len is the decoded data.
-  /// bit_width is the width of each value (before encoding).
-  RleDecoder(uint8_t* buffer, int buffer_len, int bit_width)
-    : bit_reader_(buffer, buffer_len),
-      bit_width_(bit_width),
-      current_value_(0),
-      repeat_count_(0),
-      literal_count_(0) {
-    DCHECK_GE(bit_width_, 0);
-    DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH);
+  RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
+    Reset(buffer, buffer_len, bit_width);
   }
+  RleBatchDecoder() : bit_width_(-1) {}
+
+  /// Reset the decoder to read from a new buffer.
+  void Reset(uint8_t* buffer, int buffer_len, int bit_width);
+
+  /// Return the size of the current repeated run. Returns zero if the current run is
+  /// a literal run or if no more runs can be read from the input.
+  int32_t NextNumRepeats();
+
+  /// Get the value of the current repeated run and consume the given number of repeats.
+  /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
+  /// be greater than the remaining number of repeats in the run.
+  T GetRepeatedValue(int32_t num_repeats_to_consume);
+
+  /// Return the size of the current literal run. Returns zero if the current run is
+  /// a repeated run or if no more runs can be read from the input.
+  int32_t NextNumLiterals();
+
+  /// Consume 'num_literals_to_consume' literals from the current literal run,
+  /// copying the values to 'values'. 'num_literals_to_consume' must be <=
+  /// NextNumLiterals(). Returns true if the requested number of literals were
+  /// successfully read or false if an error was encountered, e.g. the input was
+  /// truncated.
+  bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
+
+  /// Consume 'num_literals_to_consume' literals from the current literal run,
+  /// decoding them using 'dict' and outputting them to 'values'.
+  /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if
+  /// the requested number of literals were successfully read or false if an error
+  /// was encountered, e.g. the input was truncated or the value was not present
+  /// in the dictionary. Errors can only be recovered from by calling Reset()
+  /// to read from a new buffer.
+  template <typename OutType>
+  bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
+      int64_t dict_len, OutType* values) WARN_UNUSED_RESULT;
+
+  /// Convenience method to get the next value. Not efficient. Returns true on success
+  /// or false if no more values can be read from the input or an error was encountered
+  /// decoding the values.
+  bool GetSingleValue(T* val) WARN_UNUSED_RESULT;
 
-  RleDecoder() : bit_width_(-1) {}
+ private:
+  BatchedBitReader bit_reader_;
 
-  void Reset(uint8_t* buffer, int buffer_len, int bit_width) {
-    DCHECK_GE(bit_width, 0);
-    DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH);
-    bit_reader_.Reset(buffer, buffer_len);
-    bit_width_ = bit_width;
-    current_value_ = 0;
-    repeat_count_ = 0;
-    literal_count_ = 0;
-  }
+  /// Number of bits needed to encode the value. Must be between 0 and 64.
+  int bit_width_;
 
-  /// Gets the next value.  Returns false if there are no more.
-  template<typename T>
-  bool Get(T* val);
+  /// If a repeated run, the number of repeats remaining in the current run to be read.
+  /// If the current run is a literal run, this is 0.
+  int32_t repeat_count_;
 
- protected:
-  /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
-  /// are no more.
-  template<typename T>
-  bool NextCounts();
+  /// If a literal run, the number of literals remaining in the current run to be read.
+  /// If the current run is a repeated run, this is 0.
+  int32_t literal_count_;
 
-  BitReader bit_reader_;
-  /// Number of bits needed to encode the value. Must be between 0 and 64.
-  int bit_width_;
-  uint64_t current_value_;
-  uint32_t repeat_count_;
-  uint32_t literal_count_;
+  /// If a repeated run, the current repeated value.
+  T repeated_value_;
+
+  /// Size of buffer for literal values. Large enough to decode a full batch of 32
+  /// literals. The buffer is needed to allow clients to read in batches that are not
+  /// multiples of 32.
+  static constexpr int LITERAL_BUFFER_LEN = 32;
+
+  /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
+  /// position of the next literal to be read from the buffer.
+  T literal_buffer_[LITERAL_BUFFER_LEN];
+  int num_buffered_literals_;
+  int literal_buffer_pos_;
+
+  /// Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
+  /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
+  /// or repeated run, or leaves both at 0 if no more values can be read (either because
+  /// the end of the input was reached or an error was encountered decoding).
+  void NextCounts();
+
+  /// Fill the literal buffer. Invalid to call if there are already buffered literals.
+  /// Return false if the input was truncated. This does not advance 'literal_count_'.
+  bool FillLiteralBuffer() WARN_UNUSED_RESULT;
+
+  bool HaveBufferedLiterals() const {
+    return literal_buffer_pos_ < num_buffered_literals_;
+  }
+
+  /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
+  /// 'literal_count_'. Returns the number of literals outputted.
+  int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
+
+  /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
+  /// 'literal_count_'. Returns the number of literals outputted or 0 if a
+  /// decoding error is encountered.
+  template <typename OutType>
+  int32_t DecodeBufferedLiterals(
+      int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values);
 };
 
 /// Class to incrementally build the rle data.   This class does not allocate any memory.
@@ -153,7 +220,8 @@ class RleEncoder {
     int max_literal_run_size = 1 +
         BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8);
     /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
-    int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
+    int max_repeated_run_size =
+        BatchedBitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
     return std::max(max_literal_run_size, max_repeated_run_size);
   }
 
@@ -167,7 +235,7 @@ class RleEncoder {
 
   /// Encode value.  Returns true if the value fits in buffer, false otherwise.
   /// This value must be representable with bit_width_ bits.
-  bool Put(uint64_t value);
+  bool Put(uint64_t value) WARN_UNUSED_RESULT;
 
   /// Flushes any pending values to the underlying buffer.
   /// Returns the total number of bytes written
@@ -245,53 +313,6 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
-// Force inlining - this is used in perf-critical loops in Parquet and GCC often
-// doesn't inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool RleDecoder::Get(T* val) {
-  DCHECK_GE(bit_width_, 0);
-  // Profiling has shown that the quality and performance of the generated code is very
-  // sensitive to the exact shape of this check. For example, the version below performs
-  // significantly better than UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)
-  if (repeat_count_ == 0) {
-    if (literal_count_ == 0) {
-      if (!NextCounts<T>()) return false;
-    }
-  }
-
-  if (LIKELY(repeat_count_ > 0)) {
-    *val = current_value_;
-    --repeat_count_;
-  } else {
-    DCHECK_GT(literal_count_, 0);
-    if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false;
-    --literal_count_;
-  }
-
-  return true;
-}
-
-template<typename T>
-bool RleDecoder::NextCounts() {
-  // Read the next run's indicator int, it could be a literal or repeated run.
-  // The int is encoded as a vlq-encoded value.
-  int32_t indicator_value = 0;
-  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false;
-
-  // lsb indicates if it is a literal run or repeated run
-  bool is_literal = indicator_value & 1;
-  if (is_literal) {
-    literal_count_ = (indicator_value >> 1) * 8;
-    if (UNLIKELY(literal_count_ == 0)) return false;
-  } else {
-    repeat_count_ = indicator_value >> 1;
-    bool result = bit_reader_.GetAligned<T>(
-        BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(&current_value_));
-    if (UNLIKELY(!result || repeat_count_ == 0)) return false;
-  }
-  return true;
-}
-
 /// This function buffers input values 8 at a time.  After seeing all 8 values,
 /// it decides whether they should be encoded as a literal or repeated run.
 inline bool RleEncoder::Put(uint64_t value) {
@@ -444,5 +465,197 @@ inline void RleEncoder::Clear() {
   bit_writer_.Clear();
 }
 
+template <typename T>
+inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
+  bit_reader_.Reset(buffer, buffer_len);
+  bit_width_ = bit_width;
+  repeat_count_ = 0;
+  literal_count_ = 0;
+  num_buffered_literals_ = 0;
+  literal_buffer_pos_ = 0;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
+  if (repeat_count_ > 0) return repeat_count_;
+  if (literal_count_ == 0) NextCounts();
+  return repeat_count_;
+}
+
+template <typename T>
+inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
+  DCHECK_GT(num_repeats_to_consume, 0);
+  DCHECK_GE(repeat_count_, num_repeats_to_consume);
+  repeat_count_ -= num_repeats_to_consume;
+  return repeated_value_;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
+  if (literal_count_ > 0) return literal_count_;
+  if (repeat_count_ == 0) NextCounts();
+  return literal_count_;
 }
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetLiteralValues(
+    int32_t num_literals_to_consume, T* values) {
+  DCHECK_GE(num_literals_to_consume, 0);
+  DCHECK_GE(literal_count_, num_literals_to_consume);
+  int32_t num_consumed = 0;
+  // Copy any buffered literals left over from previous calls.
+  if (HaveBufferedLiterals()) {
+    num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
+  }
+
+  int32_t num_remaining = num_literals_to_consume - num_consumed;
+  // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
+  // Need to round to a batch of 32 if the caller is consuming only part of the current
+  // run avoid ending on a non-byte boundary.
+  int32_t num_to_bypass = std::min<int32_t>(literal_count_,
+      BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+  if (num_to_bypass > 0) {
+    int num_read =
+        bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
+    // If we couldn't read the expected number, that means the input was truncated.
+    if (num_read < num_to_bypass) return false;
+    literal_count_ -= num_to_bypass;
+    num_consumed += num_to_bypass;
+    num_remaining = num_literals_to_consume - num_consumed;
+  }
+
+  if (num_remaining > 0) {
+    // We weren't able to copy all the literals requested directly from the input.
+    // Buffer literals and copy over the requested number.
+    if (UNLIKELY(!FillLiteralBuffer())) return false;
+    int32_t num_copied = OutputBufferedLiterals(num_remaining, values + num_consumed);
+    DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
+  }
+  return true;
+}
+
+template <typename T>
+template <typename OutType>
+inline bool RleBatchDecoder<T>::DecodeLiteralValues(
+    int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* values) {
+  DCHECK_GE(num_literals_to_consume, 0);
+  DCHECK_GE(literal_count_, num_literals_to_consume);
+  int32_t num_consumed = 0;
+  // Decode any buffered literals left over from previous calls.
+  if (HaveBufferedLiterals()) {
+    num_consumed =
+        DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values);
+    if (UNLIKELY(num_consumed == 0)) return false;
+  }
+
+  int32_t num_remaining = num_literals_to_consume - num_consumed;
+  // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
+  // Need to round to a batch of 32 if the caller is consuming only part of the current
+  // run avoid ending on a non-byte boundery.
+  int32_t num_to_bypass =
+      std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+  if (num_to_bypass > 0) {
+    int num_read = bit_reader_.UnpackAndDecodeBatch(
+        bit_width_, dict, dict_len, num_to_bypass, values + num_consumed);
+    // If we couldn't read the expected number, that means the input was truncated.
+    if (num_read < num_to_bypass) return false;
+    literal_count_ -= num_to_bypass;
+    num_consumed += num_to_bypass;
+    num_remaining = num_literals_to_consume - num_consumed;
+  }
+
+  if (num_remaining > 0) {
+    // We weren't able to copy all the literals requested directly from the input.
+    // Buffer literals and copy over the requested number.
+    if (UNLIKELY(!FillLiteralBuffer())) return false;
+    int32_t num_copied =
+        DecodeBufferedLiterals(num_remaining, dict, dict_len, values + num_consumed);
+    if (UNLIKELY(num_copied == 0)) return false;
+    DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
+  }
+  return true;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetSingleValue(T* val) {
+  if (NextNumRepeats() > 0) {
+    DCHECK_EQ(0, NextNumLiterals());
+    *val = GetRepeatedValue(1);
+    return true;
+  }
+  if (NextNumLiterals() > 0) {
+    DCHECK_EQ(0, NextNumRepeats());
+    return GetLiteralValues(1, val);
+  }
+  return false;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::NextCounts() {
+  DCHECK_EQ(0, literal_count_);
+  DCHECK_EQ(0, repeat_count_);
+  // Read the next run's indicator int, it could be a literal or repeated run.
+  // The int is encoded as a vlq-encoded value.
+  int32_t indicator_value = 0;
+  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return;
+
+  // lsb indicates if it is a literal run or repeated run
+  bool is_literal = indicator_value & 1;
+  if (is_literal) {
+    literal_count_ = (indicator_value >> 1) * 8;
+  } else {
+    int32_t repeat_count = indicator_value >> 1;
+    if (UNLIKELY(repeat_count == 0)) return;
+    bool result =
+        bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
+    if (UNLIKELY(!result)) return;
+    repeat_count_ = repeat_count;
+  }
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
+  DCHECK(!HaveBufferedLiterals());
+  int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
+  num_buffered_literals_ =
+      bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
+  // If we couldn't read the expected number, that means the input was truncated.
+  if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
+  literal_buffer_pos_ = 0;
+  return true;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
+    int32_t max_to_output, T* values) {
+  int32_t num_to_output =
+      std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
+  memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
+  literal_buffer_pos_ += num_to_output;
+  literal_count_ -= num_to_output;
+  return num_to_output;
+}
+
+template <typename T>
+template <typename OutType>
+inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(
+    int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) {
+  int32_t num_to_output =
+      std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
+  for (int32_t i = 0; i < num_to_output; ++i) {
+    T idx = literal_buffer_[literal_buffer_pos_ + i];
+    if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0;
+    memcpy(&values[i], &dict[idx], sizeof(OutType));
+  }
+  literal_buffer_pos_ += num_to_output;
+  literal_count_ -= num_to_output;
+  return num_to_output;
+}
+
+template <typename T>
+constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN;
+}
+
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index 0501d71..b8a1d94 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -30,7 +30,7 @@
 
 namespace impala {
 
-const int MAX_WIDTH = BitReader::MAX_BITWIDTH;
+const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH;
 
 TEST(BitArray, TestBool) {
   const int len = 8;
@@ -69,29 +69,24 @@ TEST(BitArray, TestBool) {
   EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
 
   // Use the reader and validate
-  BitReader reader(buffer, len);
+  BatchedBitReader reader(buffer, len);
+
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
-    for (int i = 0; i < 8; ++i) {
-      bool val = false;
-      bool result = reader.GetValue(1, &val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, i % 2);
-    }
+    bool batch_vals[16];
+    EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals));
+    for (int i = 0; i < 8; ++i)  EXPECT_EQ(batch_vals[i], i % 2);
 
     for (int i = 0; i < 8; ++i) {
-      bool val = false;
-      bool result = reader.GetValue(1, &val);
-      EXPECT_TRUE(result);
       switch (i) {
         case 0:
         case 1:
         case 4:
         case 5:
-          EXPECT_EQ(val, false);
+          EXPECT_EQ(batch_vals[8 + i], false);
           break;
         default:
-          EXPECT_EQ(val, true);
+          EXPECT_EQ(batch_vals[8 + i], true);
           break;
       }
     }
@@ -113,17 +108,30 @@ void TestBitArrayValues(int bit_width, int num_vals) {
   writer.Flush();
   EXPECT_EQ(writer.bytes_written(), len);
 
-  BitReader reader(buffer, len);
+  BatchedBitReader reader(buffer, len);
+  BatchedBitReader reader2(reader); // Test copy constructor.
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
+    // Unpack all values at once with one batched reader and in small batches with the
+    // other batched reader.
+    vector<int64_t> batch_vals(num_vals);
+    const int BATCH_SIZE = 32;
+    vector<int64_t> batch_vals2(BATCH_SIZE);
+    EXPECT_EQ(num_vals,
+        reader.UnpackBatch(bit_width, num_vals, batch_vals.data()));
     for (int i = 0; i < num_vals; ++i) {
-      int64_t val;
-      bool result = reader.GetValue(bit_width, &val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, i % mod);
+      if (i % BATCH_SIZE == 0) {
+        int num_to_unpack = min(BATCH_SIZE, num_vals - i);
+        EXPECT_EQ(num_to_unpack,
+           reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data()));
+      }
+      EXPECT_EQ(i % mod, batch_vals[i]);
+      EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]);
     }
     EXPECT_EQ(reader.bytes_left(), 0);
+    EXPECT_EQ(reader2.bytes_left(), 0);
     reader.Reset(buffer, len);
+    reader2.Reset(buffer, len);
   }
 }
 
@@ -137,45 +145,32 @@ TEST(BitArray, TestValues) {
   }
 }
 
-// Test some mixed values
-TEST(BitArray, TestMixed) {
-  const int len = 1024;
-  uint8_t buffer[len];
-  bool parity = true;
-
-  BitWriter writer(buffer, len);
-  for (int i = 0; i < len; ++i) {
-    bool result;
-    if (i % 2 == 0) {
-      result = writer.PutValue(parity, 1);
-      parity = !parity;
-    } else {
-      result = writer.PutValue(i, 10);
-    }
-    EXPECT_TRUE(result);
-  }
-  writer.Flush();
-
-  parity = true;
-  BitReader reader(buffer, len);
-  // Ensure it returns the same results after Reset().
-  for (int trial = 0; trial < 2; ++trial) {
-    for (int i = 0; i < len; ++i) {
-      bool result;
-      if (i % 2 == 0) {
-        bool val;
-        result = reader.GetValue(1, &val);
-        EXPECT_EQ(val, parity);
-        parity = !parity;
-      } else {
-        int val;
-        result = reader.GetValue(10, &val);
-        EXPECT_EQ(val, i);
+/// Get many values from a batch RLE decoder.
+template <typename T>
+static bool GetRleValues(RleBatchDecoder<T>* decoder, int num_vals, T* vals) {
+  int decoded = 0;
+  // Decode repeated and literal runs until we've filled the output.
+  while (decoded < num_vals) {
+    if (decoder->NextNumRepeats() > 0) {
+      EXPECT_EQ(0, decoder->NextNumLiterals());
+      int num_repeats_to_output =
+          min<int>(decoder->NextNumRepeats(), num_vals - decoded);
+      T repeated_val = decoder->GetRepeatedValue(num_repeats_to_output);
+      for (int i = 0; i < num_repeats_to_output; ++i) {
+        *vals = repeated_val;
+        ++vals;
       }
-      EXPECT_TRUE(result);
+      decoded += num_repeats_to_output;
+      continue;
     }
-    reader.Reset(buffer, len);
+    int num_literals_to_output =
+          min<int>(decoder->NextNumLiterals(), num_vals - decoded);
+    if (num_literals_to_output == 0) return false;
+    if (!decoder->GetLiteralValues(num_literals_to_output, vals)) return false;
+    decoded += num_literals_to_output;
+    vals += num_literals_to_output;
   }
+  return true;
 }
 
 // Validates encoding of values by encoding and decoding them.  If
@@ -203,16 +198,32 @@ void ValidateRle(const vector<int>& values, int bit_width,
   }
 
   // Verify read
-  RleDecoder decoder(buffer, len, bit_width);
+  RleBatchDecoder<uint64_t> decoder(buffer, len, bit_width);
+  RleBatchDecoder<uint64_t> decoder2(buffer, len, bit_width);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     for (int i = 0; i < values.size(); ++i) {
       uint64_t val;
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(values[i], val);
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
+      EXPECT_EQ(values[i], val) << i;
+    }
+    // Unpack everything at once from the second batch decoder.
+    vector<uint64_t> decoded_values(values.size());
+    EXPECT_TRUE(GetRleValues(&decoder2, values.size(), decoded_values.data()));
+    for (int i = 0; i < values.size(); ++i) {
+      EXPECT_EQ(values[i], decoded_values[i]) << i;
     }
     decoder.Reset(buffer, len, bit_width);
+    decoder2.Reset(buffer, len, bit_width);
+  }
+}
+
+/// Basic test case for literal unpacking - two literals in a run.
+TEST(Rle, TwoLiteralRun) {
+  vector<int> values{1, 0};
+  ValidateRle(values, 1, nullptr, -1);
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    ValidateRle(values, width, nullptr, -1);
   }
 }
 
@@ -287,16 +298,22 @@ TEST(Rle, BitWidthZeroRepeated) {
   uint8_t buffer[1];
   const int num_values = 15;
   buffer[0] = num_values << 1; // repeated indicator byte
-  RleDecoder decoder(buffer, sizeof(buffer), 0);
+  RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     uint8_t val;
     for (int i = 0; i < num_values; ++i) {
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
+      EXPECT_EQ(val, 0);
     }
-    EXPECT_FALSE(decoder.Get(&val));
+    EXPECT_FALSE(decoder.GetSingleValue(&val));
+
+    // Test decoding all values in a batch.
+    decoder.Reset(buffer, sizeof(buffer), 0);
+    uint8_t decoded_values[num_values];
+    EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+    for (int i = 0; i < num_values; i++) EXPECT_EQ(0, decoded_values[i]) << i;
+    EXPECT_FALSE(decoder.GetSingleValue(&val));
     decoder.Reset(buffer, sizeof(buffer), 0);
   }
 }
@@ -305,17 +322,23 @@ TEST(Rle, BitWidthZeroLiteral) {
   uint8_t buffer[1];
   const int num_groups = 4;
   buffer[0] = num_groups << 1 | 1; // literal indicator byte
-  RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+  RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     const int num_values = num_groups * 8;
     uint8_t val;
     for (int i = 0; i < num_values; ++i) {
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
       EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
     }
-    EXPECT_FALSE(decoder.Get(&val));
+
+    // Test decoding the whole batch at once.
+    decoder.Reset(buffer, sizeof(buffer), 0);
+    uint8_t decoded_values[num_values];
+    EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+    for (int i = 0; i < num_values; ++i) EXPECT_EQ(0, decoded_values[i]);
+
+    EXPECT_FALSE(GetRleValues(&decoder, 1, decoded_values));
     decoder.Reset(buffer, sizeof(buffer), 0);
   }
 }
@@ -402,20 +425,25 @@ TEST(BitRle, Overflow) {
     EXPECT_LE(bytes_written, len);
     EXPECT_GT(num_added, 0);
 
-    RleDecoder decoder(buffer, bytes_written, bit_width);
+    RleBatchDecoder<uint32_t> decoder(buffer, bytes_written, bit_width);
     // Ensure it returns the same results after Reset().
     for (int trial = 0; trial < 2; ++trial) {
       parity = true;
       uint32_t v;
       for (int i = 0; i < num_added; ++i) {
-        bool result = decoder.Get(&v);
-        EXPECT_TRUE(result);
+        EXPECT_TRUE(decoder.GetSingleValue(&v));
         EXPECT_EQ(v, parity);
         parity = !parity;
       }
       // Make sure we get false when reading past end a couple times.
-      EXPECT_FALSE(decoder.Get(&v));
-      EXPECT_FALSE(decoder.Get(&v));
+      EXPECT_FALSE(decoder.GetSingleValue(&v));
+      EXPECT_FALSE(decoder.GetSingleValue(&v));
+
+      decoder.Reset(buffer, bytes_written, bit_width);
+      uint32_t decoded_values[num_added];
+      EXPECT_TRUE(GetRleValues(&decoder, num_added, decoded_values));
+      for (int i = 0; i < num_added; ++i) EXPECT_EQ(i % 2 == 0, decoded_values[i]) << i;
+
       decoder.Reset(buffer, bytes_written, bit_width);
     }
   }
@@ -426,21 +454,20 @@ TEST(BitRle, Overflow) {
 TEST(Rle, ZeroLiteralOrRepeatCount) {
   const int len = 1024;
   uint8_t buffer[len];
-  RleDecoder decoder(buffer, len, 0);
-  uint64_t val;
-
+  RleBatchDecoder<uint64_t> decoder(buffer, len, 0);
   // Test the RLE repeated values path.
   memset(buffer, 0, len);
   for (int i = 0; i < 10; ++i) {
-    bool result = decoder.Get(&val);
-    EXPECT_FALSE(result);
+    EXPECT_EQ(0, decoder.NextNumLiterals());
+    EXPECT_EQ(0, decoder.NextNumRepeats());
   }
 
   // Test the RLE literal values path
   memset(buffer, 1, len);
+  decoder.Reset(buffer, len, 0);
   for (int i = 0; i < 10; ++i) {
-    bool result = decoder.Get(&val);
-    EXPECT_FALSE(result);
+    EXPECT_EQ(0, decoder.NextNumLiterals());
+    EXPECT_EQ(0, decoder.NextNumRepeats());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index e1dc496..8b9db3a 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -123,3 +123,12 @@ Generated using parquet-mr and contents verified using parquet-tools-1.9.1.
 Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
 and non-dictionary encoding respectively.
 
+alltypes_agg_bitpacked_def_levels.parquet:
+Generated by hacking Impala's Parquet writer to write out bitpacked def levels instead
+of the standard RLE-encoded levels. See
+https://github.com/timarmstrong/incubator-impala/tree/hack-bit-packed-levels. This
+is a single file containing all of the alltypesagg data, which includes a mix of
+null and non-null values. This is not actually a valid Parquet file because the
+bit-packed levels are written in the reverse order specified in the Parquet spec
+for BIT_PACKED. However, this is the order that Impala attempts to read the levels
+in - see IMPALA-3006.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/alltypes_agg_bitpacked_def_levels.parquet b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
new file mode 100644
index 0000000..bd2d9d8
Binary files /dev/null and b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
new file mode 100644
index 0000000..d48c333
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
@@ -0,0 +1,52 @@
+====
+---- QUERY
+# Verify that total counts of non-null values are correct.
+select count(id), count(tinyint_col), count(smallint_col), count(int_col),
+  count(bigint_col), count(float_col), count(double_col), count(date_string_col),
+  count(string_col), count(timestamp_col), count(year), count(month), count(day)
+from alltypesagg
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+11000,9000,10800,10980,10980,10980,10980,11000,11000,11000,11000,11000,10000
+====
+---- QUERY
+# Spot-check a subset of values.
+select *
+from alltypesagg
+where year = 2010 and month = 1 and int_col is null or int_col % 1000 = 77
+order by id, year, month, day
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT,INT
+---- RESULTS
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,1
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,NULL
+77,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/01/10','77',2010-01-01 01:17:29.260000000,2010,1,1
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,2
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,NULL
+1077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/02/10','77',2010-01-02 01:17:29.260000000,2010,1,2
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,3
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,NULL
+2077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/03/10','77',2010-01-03 01:17:29.260000000,2010,1,3
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,4
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,NULL
+3077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/04/10','77',2010-01-04 01:17:29.260000000,2010,1,4
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,5
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,NULL
+4077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/05/10','77',2010-01-05 01:17:29.260000000,2010,1,5
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,6
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,NULL
+5077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/06/10','77',2010-01-06 01:17:29.260000000,2010,1,6
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,7
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,NULL
+6077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/07/10','77',2010-01-07 01:17:29.260000000,2010,1,7
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,8
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,NULL
+7077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/08/10','77',2010-01-08 01:17:29.260000000,2010,1,8
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,9
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,NULL
+8077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/09/10','77',2010-01-09 01:17:29.260000000,2010,1,9
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,10
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,NULL
+9077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/10/10','77',2010-01-10 01:17:29.260000000,2010,1,10
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 17b9503..fe0577a 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -388,6 +388,22 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet-corrupt-rle-counts-abort',
                        vector, unique_database)
 
+  def test_bitpacked_def_levels(self, vector, unique_database):
+    """Test that Impala can read a Parquet file with the deprecated bit-packed def
+       level encoding."""
+    self.client.execute(("""CREATE TABLE {0}.alltypesagg (
+          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
+          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
+          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
+          year INT, month INT, day INT) STORED AS PARQUET""").format(unique_database))
+    alltypesagg_loc = get_fs_path(
+        "/test-warehouse/{0}.db/{1}".format(unique_database, "alltypesagg"))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+        "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", alltypesagg_loc])
+    self.client.execute("refresh {0}.alltypesagg".format(unique_database));
+
+    self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
+
   @SkipIfS3.hdfs_block_size
   @SkipIfADLS.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size



[3/3] incubator-impala git commit: IMPALA-4177, IMPALA-6039: batched bit reading and rle decoding

Posted by ta...@apache.org.
IMPALA-4177,IMPALA-6039: batched bit reading and rle decoding

Switch the decoders to using more batch-oriented interfaces. As an
intermediate step this doesn't make the interfaces of LevelDecoder
or DictDecoder batch-oriented, only the lower-level utility classes.

The next step would be to change those interfaces to be batch-oriented
and make according optimisations in parquet. This could deliver much
larger perf improvements than the current patch.

The high-level changes are.
* BitReader -> BatchedBitReader, which is built to unpack runs of 32
  bit-packed values efficiently.
* RleDecoder -> RleBatchDecoder, which exposes the repeated and literal
  runs to the caller and uses BatchedBitReader to unpack literal runs
  efficiently.
* Dict decoding uses RleBatchDecoder to decode repeated runs efficiently
  and uses the BitPacking utilities to unpack and encode in a single
  step.

Also removes an older benchmark that isn't too interesting (since
the batch-oriented approach to encoding and decoding is so much
faster than the value-by-value approach).

Testing:
* Ran core tests.
* Updated unit tests to exercise new code.
* Added test coverage for the deprecated bit-packed level encoding to
  that it still works (there was no coverage previously).

Perf:
Single-node benchmarks showed a few % performance gain. 16 node cluster
benchmarks only showed a gain for TPC-H nested.

Change-Id: I35de0cf80c86f501c4a39270afc8fb8111552ac6
Reviewed-on: http://gerrit.cloudera.org:8080/8267
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ae116b5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ae116b5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ae116b5b

Branch: refs/heads/master
Commit: ae116b5bf7b8b2514d7a8655d9a6666ad3fd36dd
Parents: 155bb77
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 9 17:09:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Nov 16 21:23:09 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 -
 be/src/benchmarks/bit-packing-benchmark.cc      | 120 ++++++
 be/src/benchmarks/rle-benchmark.cc              | 244 ------------
 be/src/exec/parquet-column-readers.cc           | 174 +++++----
 be/src/exec/parquet-column-readers.h            |  90 +++--
 be/src/experiments/bit-stream-utils.8byte.h     | 137 -------
 .../experiments/bit-stream-utils.8byte.inline.h | 145 -------
 be/src/util/bit-packing.h                       |  61 ++-
 be/src/util/bit-packing.inline.h                | 210 ++++++++--
 be/src/util/bit-stream-utils.h                  |  77 ++--
 be/src/util/bit-stream-utils.inline.h           | 106 +++--
 be/src/util/dict-encoding.h                     |  94 +++--
 be/src/util/dict-test.cc                        |  96 ++++-
 be/src/util/parquet-reader.cc                   |  21 +-
 be/src/util/rle-encoding.h                      | 383 +++++++++++++++----
 be/src/util/rle-test.cc                         | 185 +++++----
 testdata/data/README                            |   9 +
 .../alltypes_agg_bitpacked_def_levels.parquet   | Bin 0 -> 208380 bytes
 .../queries/QueryTest/parquet-def-levels.test   |  52 +++
 tests/query_test/test_scanners.py               |  16 +
 20 files changed, 1253 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index 1d67d45..a569a66 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,7 +48,6 @@ ADD_BE_BENCHMARK(multiint-benchmark)
 ADD_BE_BENCHMARK(network-perf-benchmark)
 ADD_BE_BENCHMARK(overflow-benchmark)
 ADD_BE_BENCHMARK(parse-timestamp-benchmark)
-ADD_BE_BENCHMARK(rle-benchmark)
 ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
 ADD_BE_BENCHMARK(scheduler-benchmark)
 ADD_BE_BENCHMARK(status-benchmark)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/bit-packing-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bit-packing-benchmark.cc b/be/src/benchmarks/bit-packing-benchmark.cc
index 955c0fb..7769182 100644
--- a/be/src/benchmarks/bit-packing-benchmark.cc
+++ b/be/src/benchmarks/bit-packing-benchmark.cc
@@ -285,6 +285,126 @@ struct BenchmarkParams {
   int64_t data_len;
 };
 
+/// Legacy value-at-a-time implementation of bit unpacking. Retained here for
+/// purposes of comparison in the benchmark.
+class BitReader {
+ public:
+  /// 'buffer' is the buffer to read from.  The buffer's length is 'buffer_len'.
+  /// Does not take ownership of the buffer.
+  BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); }
+
+  BitReader() : buffer_(NULL), max_bytes_(0) {}
+
+  // The implicit copy constructor is left defined. If a BitReader is copied, the
+  // two copies do not share any state. Invoking functions on either copy continues
+  // reading from the current read position without modifying the state of the other
+  // copy.
+
+  /// Resets the read to start reading from the start of 'buffer'. The buffer's
+  /// length is 'buffer_len'. Does not take ownership of the buffer.
+  void Reset(const uint8_t* buffer, int buffer_len) {
+    buffer_ = buffer;
+    max_bytes_ = buffer_len;
+    byte_offset_ = 0;
+    bit_offset_ = 0;
+    int num_bytes = std::min(8, max_bytes_);
+    memcpy(&buffered_values_, buffer_, num_bytes);
+  }
+
+  /// Gets the next value from the buffer.  Returns true if 'v' could be read or false if
+  /// there are not enough bytes left. num_bits must be <= 32.
+  template<typename T>
+  bool GetValue(int num_bits, T* v);
+
+  /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
+  /// little-endian native type and big enough to store 'num_bytes'. The value is assumed
+  /// to be byte-aligned so the stream will be advanced to the start of the next byte
+  /// before 'v' is read. Returns false if there are not enough bytes left.
+  template<typename T>
+  bool GetBytes(int num_bytes, T* v);
+
+  /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
+  /// there may be an additional fraction of a byte).
+  int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+
+  /// Maximum supported bitwidth for reader.
+  static const int MAX_BITWIDTH = 32;
+
+ private:
+  const uint8_t* buffer_;
+  int max_bytes_;
+
+  /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
+  /// faster than reading values byte by byte directly from buffer_.
+  uint64_t buffered_values_;
+
+  int byte_offset_;       // Offset in buffer_
+  int bit_offset_;        // Offset in buffered_values_
+};
+
+template <typename T>
+bool BitReader::GetValue(int num_bits, T* v) {
+  DCHECK(num_bits == 0 || buffer_ != NULL);
+  // TODO: revisit this limit if necessary
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
+  DCHECK_LE(num_bits, sizeof(T) * 8);
+
+  // First do a cheap check to see if we may read past the end of the stream, using
+  // constant upper bounds for 'bit_offset_' and 'num_bits'.
+  if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) {
+    // Now do the precise check.
+    if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
+      return false;
+    }
+  }
+
+  DCHECK_GE(bit_offset_, 0);
+  DCHECK_LE(bit_offset_, 64);
+  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
+
+  bit_offset_ += num_bits;
+  if (bit_offset_ >= 64) {
+    byte_offset_ += 8;
+    bit_offset_ -= 64;
+
+    int bytes_remaining = max_bytes_ - byte_offset_;
+    if (LIKELY(bytes_remaining >= 8)) {
+      memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+    } else {
+      memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+    }
+
+    // Read bits of v that crossed into new buffered_values_
+    *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
+          << (num_bits - bit_offset_);
+  }
+  DCHECK_LE(bit_offset_, 64);
+  return true;
+}
+
+template<typename T>
+bool BitReader::GetBytes(int num_bytes, T* v) {
+  DCHECK_LE(num_bytes, sizeof(T));
+  int bytes_read = BitUtil::Ceil(bit_offset_, 8);
+  if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
+
+  // Advance byte_offset to next unread byte and read num_bytes
+  byte_offset_ += bytes_read;
+  *v = 0; // Ensure unset bytes are initialized to zero.
+  memcpy(v, buffer_ + byte_offset_, num_bytes);
+  byte_offset_ += num_bytes;
+
+  // Reset buffered_values_
+  bit_offset_ = 0;
+  int bytes_remaining = max_bytes_ - byte_offset_;
+  if (LIKELY(bytes_remaining >= 8)) {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+  } else {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+  }
+  return true;
+}
+
 /// Benchmark calling BitReader::GetValue() in a loop to unpack 32 * 'batch_size' values.
 void BitReaderBenchmark(int batch_size, void* data) {
   const BenchmarkParams* p = reinterpret_cast<BenchmarkParams*>(data);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/rle-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/rle-benchmark.cc b/be/src/benchmarks/rle-benchmark.cc
deleted file mode 100644
index 439c630..0000000
--- a/be/src/benchmarks/rle-benchmark.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <iostream>
-#include <sstream>
-
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "experiments/bit-stream-utils.8byte.inline.h"
-#include "util/benchmark.h"
-#include "util/bit-stream-utils.inline.h"
-#include "util/cpu-info.h"
-
-#include "common/names.h"
-
-// Benchmark to measure how quickly we can do bit encoding and decoding.
-
-// encode:               Function     Rate (iters/ms)          Comparison
-// ----------------------------------------------------------------------
-//     "BitWriter (8 byte) 1-Bit"                66.9                  1X
-//              "BitWriter 1-Bit"               107.3              1.604X
-//     "BitWriter (8 byte) 2-Bit"               74.42                  1X
-//              "BitWriter 2-Bit"               105.6              1.419X
-//     "BitWriter (8 byte) 3-Bit"               76.91                  1X
-//              "BitWriter 3-Bit"                 104              1.353X
-//     "BitWriter (8 byte) 4-Bit"               80.37                  1X
-//              "BitWriter 4-Bit"               102.7              1.278X
-//     "BitWriter (8 byte) 5-Bit"               79.29                  1X
-//              "BitWriter 5-Bit"                 101              1.274X
-//     "BitWriter (8 byte) 6-Bit"               80.37                  1X
-//              "BitWriter 6-Bit"               99.28              1.235X
-//     "BitWriter (8 byte) 7-Bit"               80.19                  1X
-//              "BitWriter 7-Bit"               98.09              1.223X
-//     "BitWriter (8 byte) 8-Bit"               84.93                  1X
-//              "BitWriter 8-Bit"                  97              1.142X
-//     "BitWriter (8 byte) 9-Bit"               79.85                  1X
-//              "BitWriter 9-Bit"               95.09              1.191X
-//    "BitWriter (8 byte) 10-Bit"               80.51                  1X
-//             "BitWriter 10-Bit"               94.17               1.17X
-//    "BitWriter (8 byte) 11-Bit"               79.36                  1X
-//             "BitWriter 11-Bit"                93.2              1.174X
-//    "BitWriter (8 byte) 12-Bit"               80.79                  1X
-//             "BitWriter 12-Bit"               92.09               1.14X
-//    "BitWriter (8 byte) 13-Bit"               78.28                  1X
-//             "BitWriter 13-Bit"               90.83               1.16X
-//    "BitWriter (8 byte) 14-Bit"               78.57                  1X
-//             "BitWriter 14-Bit"               89.71              1.142X
-//    "BitWriter (8 byte) 15-Bit"               77.28                  1X
-//             "BitWriter 15-Bit"                  88              1.139X
-//    "BitWriter (8 byte) 16-Bit"               86.98                  1X
-//             "BitWriter 16-Bit"               88.08              1.013X
-
-// decode:               Function     Rate (iters/ms)          Comparison
-// ----------------------------------------------------------------------
-//     "BitWriter (8 byte) 1-Bit"               132.9                  1X
-//              "BitWriter 1-Bit"               126.9             0.9546X
-//     "BitWriter (8 byte) 2-Bit"               132.9                  1X
-//              "BitWriter 2-Bit"               125.6             0.9448X
-//     "BitWriter (8 byte) 3-Bit"               132.8                  1X
-//              "BitWriter 3-Bit"               122.7             0.9237X
-//     "BitWriter (8 byte) 4-Bit"               133.1                  1X
-//              "BitWriter 4-Bit"               123.6             0.9284X
-//     "BitWriter (8 byte) 5-Bit"               132.2                  1X
-//              "BitWriter 5-Bit"               118.2             0.8942X
-//     "BitWriter (8 byte) 6-Bit"               132.9                  1X
-//              "BitWriter 6-Bit"               117.6              0.885X
-//     "BitWriter (8 byte) 7-Bit"               132.3                  1X
-//              "BitWriter 7-Bit"               112.8             0.8525X
-//     "BitWriter (8 byte) 8-Bit"               132.9                  1X
-//              "BitWriter 8-Bit"               119.2             0.8971X
-//     "BitWriter (8 byte) 9-Bit"               131.8                  1X
-//              "BitWriter 9-Bit"               111.3             0.8447X
-//    "BitWriter (8 byte) 10-Bit"               131.4                  1X
-//             "BitWriter 10-Bit"               108.5             0.8255X
-//    "BitWriter (8 byte) 11-Bit"               131.7                  1X
-//             "BitWriter 11-Bit"               106.9             0.8118X
-//    "BitWriter (8 byte) 12-Bit"               132.9                  1X
-//             "BitWriter 12-Bit"               108.8             0.8189X
-//    "BitWriter (8 byte) 13-Bit"                 131                  1X
-//             "BitWriter 13-Bit"               103.1             0.7873X
-//    "BitWriter (8 byte) 14-Bit"               131.6                  1X
-//             "BitWriter 14-Bit"               101.6             0.7724X
-//    "BitWriter (8 byte) 15-Bit"               131.1                  1X
-//             "BitWriter 15-Bit"               99.91             0.7622X
-//    "BitWriter (8 byte) 16-Bit"                 133                  1X
-//             "BitWriter 16-Bit"               105.2             0.7907X
-
-using namespace impala;
-
-const int BUFFER_LEN = 64 * 4096;
-
-struct TestData {
-  uint8_t* array;
-  uint8_t* buffer;
-  int num_values;
-  int num_bits;
-  int max_value;
-  MemPool* pool;
-  bool result;
-};
-
-void TestBitWriterEncode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
-  for (int i = 0; i < batch_size; ++i) {
-    BitWriter writer(data->buffer, buffer_size);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      writer.PutValue(j + 0, data->num_bits);
-      writer.PutValue(j + 1, data->num_bits);
-      writer.PutValue(j + 2, data->num_bits);
-      writer.PutValue(j + 3, data->num_bits);
-      writer.PutValue(j + 4, data->num_bits);
-      writer.PutValue(j + 5, data->num_bits);
-      writer.PutValue(j + 6, data->num_bits);
-      writer.PutValue(j + 7, data->num_bits);
-    }
-    writer.Flush();
-  }
-}
-
-void TestBitWriter8ByteEncode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
-  for (int i = 0; i < batch_size; ++i) {
-    BitWriter_8byte writer(data->buffer, buffer_size);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      writer.PutValue(j + 0, data->num_bits);
-      writer.PutValue(j + 1, data->num_bits);
-      writer.PutValue(j + 2, data->num_bits);
-      writer.PutValue(j + 3, data->num_bits);
-      writer.PutValue(j + 4, data->num_bits);
-      writer.PutValue(j + 5, data->num_bits);
-      writer.PutValue(j + 6, data->num_bits);
-      writer.PutValue(j + 7, data->num_bits);
-    }
-  }
-}
-
-void TestBitWriterDecode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int64_t v;
-  for (int i = 0; i < batch_size; ++i) {
-    BitReader reader(data->buffer, BUFFER_LEN);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-    }
-  }
-}
-
-void TestBitWriter8ByteDecode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  data->result = true;
-  int64_t v;
-  for (int i = 0; i < batch_size; ++i) {
-    BitReader_8byte reader(data->buffer, BUFFER_LEN);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-    }
-  }
-  CHECK(data->result);
-}
-
-int main(int argc, char** argv) {
-  CpuInfo::Init();
-
-  MemTracker tracker;
-  MemPool pool(&tracker);
-
-  int num_values = 4096;
-  int max_bits = 16;
-
-  Benchmark encode_suite("encode");
-  TestData data[max_bits];
-  for (int i = 0; i < max_bits; ++i) {
-    data[i].buffer = new uint8_t[BUFFER_LEN];
-    data[i].num_values = num_values;
-    data[i].num_bits = i + 1;
-    data[i].max_value = 1 << i;
-    data[i].pool = &pool;
-
-    stringstream suffix;
-    suffix << " " << (i+1) << "-Bit";
-
-    stringstream name;
-    name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
-    int baseline =
-        encode_suite.AddBenchmark(name.str(), TestBitWriter8ByteEncode, &data[i], -1);
-
-    name.str("");
-    name << "\"BitWriter" << suffix.str() << "\"";
-    encode_suite.AddBenchmark(name.str(), TestBitWriterEncode, &data[i], baseline);
-  }
-  cout << encode_suite.Measure() << endl;
-
-  Benchmark decode_suite("decode");
-  for (int i = 0; i < max_bits; ++i) {
-    stringstream suffix;
-    suffix << " " << (i+1) << "-Bit";
-
-    stringstream name;
-    name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
-    int baseline =
-        decode_suite.AddBenchmark(name.str(), TestBitWriter8ByteDecode, &data[i], -1);
-
-    name.str("");
-    name << "\"BitWriter" << suffix.str() << "\"";
-    decode_suite.AddBenchmark(name.str(), TestBitWriterDecode, &data[i], baseline);
-  }
-  cout << decode_suite.Measure() << endl;
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 04127e3..5a2e90e 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -77,6 +77,8 @@ Status ParquetLevelDecoder::Init(const string& filename,
     parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
   DCHECK_GE(num_buffered_values, 0);
+  DCHECK_GT(cache_size, 0);
+  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
   encoding_ = encoding;
   max_level_ = max_level;
   num_buffered_values_ = num_buffered_values;
@@ -97,7 +99,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
         return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
       }
       int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
-      Reset(*data, num_bytes, bit_width);
+      rle_decoder_.Reset(*data, num_bytes, bit_width);
       break;
     }
     case parquet::Encoding::BIT_PACKED:
@@ -143,66 +145,80 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
 }
 
 inline int16_t ParquetLevelDecoder::ReadLevel() {
-  bool valid;
-  uint8_t level;
-  if (encoding_ == parquet::Encoding::RLE) {
-    valid = Get(&level);
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    valid = bit_reader_.GetValue(1, &level);
+  if (UNLIKELY(!CacheHasNext())) {
+    if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
+      return HdfsParquetScanner::INVALID_LEVEL;
+    }
+    DCHECK_GE(num_cached_levels_, 0);
+    if (UNLIKELY(num_cached_levels_ == 0)) {
+      return HdfsParquetScanner::INVALID_LEVEL;
+    }
   }
-  return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL;
+  return CacheGetNext();
 }
 
-Status ParquetLevelDecoder::CacheNextBatch(int batch_size) {
-  DCHECK_LE(batch_size, cache_size_);
-  cached_level_idx_ = 0;
+Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
+  /// Fill the cache completely if there are enough values remaining.
+  /// Otherwise don't try to read more values than are left.
+  int batch_size = min(vals_remaining, cache_size_);
   if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) ||
+          num_cached_levels_ < batch_size)) {
       return Status(decoding_error_code_, num_buffered_values_, filename_);
     }
   } else {
     // No levels to read, e.g., because the field is required. The cache was
     // already initialized with all zeros, so we can hand out those values.
     DCHECK_EQ(max_level_, 0);
+    cached_level_idx_ = 0;
     num_cached_levels_ = batch_size;
   }
   return Status::OK();
 }
 
-bool ParquetLevelDecoder::FillCache(int batch_size,
-    int* num_cached_levels) {
-  DCHECK(num_cached_levels != NULL);
-  int num_values = 0;
+bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
+  DCHECK(!CacheHasNext());
+  DCHECK(num_cached_levels != nullptr);
+  DCHECK_GE(max_level_, 0);
+  DCHECK_EQ(num_cached_levels_ % 32, 0) << "Last batch was not a multiple of 32";
+  cached_level_idx_ = 0;
+  if (max_level_ == 0) {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    *num_cached_levels = batch_size;
+    return true;
+  }
   if (encoding_ == parquet::Encoding::RLE) {
-    while (true) {
-      // Add RLE encoded values by repeating the current value this number of times.
-      uint32_t num_repeats_to_set =
-          min<uint32_t>(repeat_count_, batch_size - num_values);
-      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
-      num_values += num_repeats_to_set;
-      repeat_count_ -= num_repeats_to_set;
-
-      // Add remaining literal values, if any.
-      uint32_t num_literals_to_set =
-          min<uint32_t>(literal_count_, batch_size - num_values);
-      int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size);
-      for (; num_values < num_values_end; ++num_values) {
-        bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]);
-        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
-      }
-      literal_count_ -= num_literals_to_set;
-
-      if (num_values == batch_size) break;
-      if (UNLIKELY(!NextCounts<int16_t>())) return false;
-      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
-    }
+    return FillCacheRle(batch_size, num_cached_levels);
   } else {
     DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    for (; num_values < batch_size; ++num_values) {
-      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
-      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
+    *num_cached_levels = bit_reader_.UnpackBatch(1, batch_size, cached_levels_);
+    return true;
+  }
+}
+
+bool ParquetLevelDecoder::FillCacheRle(int batch_size, int* num_cached_levels) {
+  int num_values = 0;
+  while (num_values < batch_size) {
+    // Add RLE encoded values by repeating the current value this number of times.
+    uint32_t num_repeats = rle_decoder_.NextNumRepeats();
+    if (num_repeats > 0) {
+      uint32_t num_repeats_to_set = min<uint32_t>(num_repeats, batch_size - num_values);
+      uint8_t repeated_value = rle_decoder_.GetRepeatedValue(num_repeats_to_set);
+      memset(cached_levels_ + num_values, repeated_value, num_repeats_to_set);
+      num_values += num_repeats_to_set;
+      continue;
+    }
+
+    // Add remaining literal values, if any.
+    uint32_t num_literals = rle_decoder_.NextNumLiterals();
+    if (num_literals == 0) break;
+    uint32_t num_literals_to_set = min<uint32_t>(num_literals, batch_size - num_values);
+    if (!rle_decoder_.GetLiteralValues(
+          num_literals_to_set, &cached_levels_[num_values])) {
+      return false;
     }
+    num_values += num_literals_to_set;
   }
   *num_cached_levels = num_values;
   return true;
@@ -282,9 +298,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     // NextLevels() should have already been called and def and rep levels should be in
     // valid range.
     DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
     DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
     DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
         "Caller should have called NextLevels() until we are ready to read a value";
 
@@ -330,6 +344,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     int val_count = 0;
     bool continue_execution = true;
     while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+      DCHECK_GE(num_buffered_values_, 0);
       // Read next page if necessary.
       if (num_buffered_values_ == 0) {
         if (!NextPage()) {
@@ -338,26 +353,29 @@ class ScalarColumnReader : public BaseScalarColumnReader {
         }
       }
 
-      // Fill def/rep level caches if they are empty.
-      int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_);
-      if (!def_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
-      }
-      // We only need the repetition levels for populating the position slot since we
-      // are only populating top-level tuples.
-      if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
-      }
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-
-      // This special case is most efficiently handled here directly.
+      // Not materializing anything - skip decoding any levels and rely on the value
+      // count from page metadata to return the correct number of rows.
       if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count);
+        int vals_to_add = min(num_buffered_values_, max_values - val_count);
         val_count += vals_to_add;
-        def_levels_.CacheSkipLevels(vals_to_add);
         num_buffered_values_ -= vals_to_add;
         continue;
       }
+      // Fill the rep level cache if needed. We are flattening out the fields of the
+      // nested collection into the top-level tuple returned by the scan, so we don't
+      // care about the nesting structure unless the position slot is being populated.
+      if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(
+            rep_levels_.CacheNextBatch(num_buffered_values_));
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      }
+
+      // Fill def level cache if needed.
+      if (!def_levels_.CacheHasNext()) {
+        // TODO: add a fast path here if there's a run of repeated values.
+        parent_->parse_status_.MergeStatus(
+            def_levels_.CacheNextBatch(num_buffered_values_));
+      }
 
       // Read data page and cached levels to materialize values.
       int cache_start_idx = def_levels_.CacheCurrIdx();
@@ -685,7 +703,9 @@ class BoolColumnReader : public BaseScalarColumnReader {
 
   virtual Status InitDataPage(uint8_t* data, int size) {
     // Initialize bool decoder
-    bool_values_ = BitReader(data, size);
+    bool_values_.Reset(data, size);
+    num_unpacked_values_ = 0;
+    unpacked_value_idx_ = 0;
     return Status::OK();
   }
 
@@ -695,9 +715,7 @@ class BoolColumnReader : public BaseScalarColumnReader {
     DCHECK(slot_desc_ != NULL);
     // Def and rep levels should be in valid range.
     DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
     DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
     DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
         "Caller should have called NextLevels() until we are ready to read a value";
 
@@ -719,14 +737,38 @@ class BoolColumnReader : public BaseScalarColumnReader {
   template<bool IN_COLLECTION>
   inline bool ReadSlot(Tuple* tuple, MemPool* pool)  {
     void* slot = tuple->GetSlot(tuple_offset_);
-    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
-      parent_->parse_status_ = Status("Invalid bool column.");
-      return false;
+    bool val;
+    if (unpacked_value_idx_ < num_unpacked_values_) {
+      val = unpacked_values_[unpacked_value_idx_++];
+    } else {
+      // Unpack as many values as we can into the buffer. We expect to read at least one
+      // value.
+      int num_unpacked =
+          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+      if (UNLIKELY(num_unpacked == 0)) {
+        parent_->parse_status_ = Status("Invalid bool column.");
+        return false;
+      }
+      val = unpacked_values_[0];
+      num_unpacked_values_ = num_unpacked;
+      unpacked_value_idx_ = 1;
     }
+    *reinterpret_cast<bool*>(slot) = val;
     return NextLevels<IN_COLLECTION>();
   }
 
-  BitReader bool_values_;
+  /// A buffer to store unpacked values. Must be a multiple of 32 size to use the
+  /// batch-oriented interface of BatchedBitReader.
+  static const int UNPACKED_BUFFER_LEN = 128;
+  bool unpacked_values_[UNPACKED_BUFFER_LEN];
+
+  /// The number of valid values in 'unpacked_values_'.
+  int num_unpacked_values_ = 0;
+
+  /// The next value to return from 'unpacked_values_'.
+  int unpacked_value_idx_ = 0;
+
+  BatchedBitReader bool_values_;
 };
 
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index dea84a8..17b8c0f 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -22,6 +22,7 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 #include "util/codec.h"
+#include "util/bit-stream-utils.h"
 #include "util/dict-encoding.h"
 #include "util/rle-encoding.h"
 
@@ -36,39 +37,34 @@ class MemPool;
 /// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
 /// populating the level cache (e.g., with RLE we can memset() repeated values).
 ///
-/// Inherits from RleDecoder instead of containing one for performance reasons.
-/// The containment design would require two BitReaders per column reader. The extra
-/// BitReader causes enough bloat for a column reader to require another cache line.
-/// TODO: It is not clear whether the inheritance vs. containment choice still makes
-/// sense with column-wise materialization. The containment design seems cleaner and
-/// we should revisit.
-class ParquetLevelDecoder : public RleDecoder {
+/// TODO: expose whether we're in a run of repeated values so that callers can
+/// optimise for that case.
+class ParquetLevelDecoder {
  public:
   ParquetLevelDecoder(bool is_def_level_decoder)
-    : cached_levels_(NULL),
-      num_cached_levels_(0),
-      cached_level_idx_(0),
-      encoding_(parquet::Encoding::PLAIN),
-      max_level_(0),
-      cache_size_(0),
-      num_buffered_values_(0),
-      decoding_error_code_(is_def_level_decoder ?
+    : decoding_error_code_(is_def_level_decoder ?
           TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) {
   }
 
   /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
-  /// encoding requires reading metadata from the page header.
+  /// encoding requires reading metadata from the page header. 'cache_size' will be
+  /// rounded up to a multiple of 32 internally.
   Status Init(const string& filename, parquet::Encoding::type encoding,
       MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
       uint8_t** data, int* data_size);
 
-  /// Returns the next level or INVALID_LEVEL if there was an error.
+  /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
+  /// as batched methods.
   inline int16_t ReadLevel();
 
-  /// Decodes and caches the next batch of levels. Resets members associated with the
-  /// cache. Returns a non-ok status if there was a problem decoding a level, or if a
-  /// level was encountered with a value greater than max_level_.
-  Status CacheNextBatch(int batch_size);
+  /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
+  /// values left to decode in the page. Resets members associated with the cache.
+  /// Returns a non-ok status if there was a problem decoding a level, if a level was
+  /// encountered with a value greater than max_level_, or if fewer than
+  /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the
+  /// input did not have the expected number of values. Only valid to call when
+  /// the cache has been exhausted, i.e. CacheHasNext() is false.
+  Status CacheNextBatch(int vals_remaining);
 
   /// Functions for working with the level cache.
   inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
@@ -83,33 +79,57 @@ class ParquetLevelDecoder : public RleDecoder {
   inline int CacheSize() const { return num_cached_levels_; }
   inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
   inline int CacheCurrIdx() const { return cached_level_idx_; }
-
  private:
   /// Initializes members associated with the level cache. Allocates memory for
   /// the cache from pool, if necessary.
   Status InitCache(MemPool* pool, int cache_size);
 
-  /// Decodes and writes a batch of levels into the cache. Sets the number of
-  /// values written to the cache in *num_cached_levels. Returns false if there was
-  /// an error decoding a level or if there was a level value greater than max_level_.
+  /// Decodes and writes a batch of levels into the cache. Returns true and sets
+  /// the number of values written to the cache via *num_cached_levels if no errors
+  /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the
+  /// end of input was hit without any other errors. Returns false if there was an
+  /// error decoding a level or if there was an invalid level value greater than
+  /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
+  /// CacheHasNext() is false.
   bool FillCache(int batch_size, int* num_cached_levels);
 
-  /// Buffer for a batch of levels. The memory is allocated and owned by a pool in
-  /// passed in Init().
-  uint8_t* cached_levels_;
+  /// Implementation of FillCache() for RLE encoding.
+  bool FillCacheRle(int batch_size, int* num_cached_levels);
+
+  /// RLE decoder, used if 'encoding_' is RLE.
+  RleBatchDecoder<uint8_t> rle_decoder_;
+
+  /// Bit unpacker, used if 'encoding_' is BIT_PACKED.
+  BatchedBitReader bit_reader_;
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
+  /// in Init().
+  uint8_t* cached_levels_ = nullptr;
+
   /// Number of valid level values in the cache.
-  int num_cached_levels_;
+  int num_cached_levels_ = 0;
+
   /// Current index into cached_levels_.
-  int cached_level_idx_;
-  parquet::Encoding::type encoding_;
+  int cached_level_idx_ = 0;
+
+  /// The parquet encoding used for the levels. Usually RLE but the deprecated BIT_PACKED
+  /// encoding is also allowed.
+  parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN;
 
   /// For error checking and reporting.
-  int max_level_;
-  /// Number of level values cached_levels_ has memory allocated for.
-  int cache_size_;
+  int max_level_ = 0;
+
+  /// Number of level values cached_levels_ has memory allocated for. Always
+  /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
+  int cache_size_ = 0;
+
   /// Number of remaining data values in the current data page.
-  int num_buffered_values_;
+  int num_buffered_values_ = 0;
+
+  /// Name of the parquet file. Used for reporting level decoding errors.
   string filename_;
+
+  /// Error code to use when reporting level decoding errors.
   TErrorCode::type decoding_error_code_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.h b/be/src/experiments/bit-stream-utils.8byte.h
deleted file mode 100644
index f9418a5..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.h
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-
-#include <boost/cstdint.hpp>
-#include <string.h>
-#include "common/compiler-util.h"
-#include "common/logging.h"
-#include "util/bit-util.h"
-
-namespace impala {
-
-/// Utility class to write bit/byte streams.  This class can write data to either be
-/// bit packed or byte aligned (and a single stream that has a mix of both).
-/// This class does not allocate memory.
-class BitWriter_8byte {
- public:
-  /// buffer: buffer to write bits to.  Buffer should be preallocated with
-  /// 'buffer_len' bytes. 'buffer_len' must be a multiple of 8.
-  BitWriter_8byte(uint8_t* buffer, int buffer_len) :
-      buffer_(reinterpret_cast<uint64_t*>(buffer)),
-      max_bytes_(buffer_len),
-      offset_(0),
-      bit_offset_(0) {
-    DCHECK_EQ(buffer_len % 8, 0);
-  }
-
-  void Clear() {
-    offset_ = 0;
-    bit_offset_ = 0;
-    memset(buffer_, 0, max_bytes_);
-  }
-
-  uint8_t* buffer() const { return reinterpret_cast<uint8_t*>(buffer_); }
-  int buffer_len() const { return max_bytes_; }
-
-  inline int bytes_written() const {
-    return offset_ * 8 + BitUtil::Ceil(bit_offset_, 8);
-  }
-
-  /// Writes a value to the buffer.  This is bit packed.  Returns false if
-  /// there was not enough space.
-  bool PutValue(uint64_t v, int num_bits);
-
-  /// Writes v to the next aligned byte.
-  template<typename T>
-  bool PutAligned(T v, int num_bits);
-
-  /// Write a Vlq encoded int to the buffer.  Returns false if there was not enough
-  /// room.  The value is written byte aligned.
-  /// For more details on vlq:
-  /// en.wikipedia.org/wiki/Variable-length_quantity
-  bool PutVlqInt(int32_t v);
-
-  /// Get a pointer to the next aligned byte and advance the underlying buffer
-  /// by num_bytes.
-  /// Returns NULL if there was not enough space.
-  uint8_t* GetNextBytePtr(int num_bytes = 1);
-
- private:
-  uint64_t* buffer_;
-  int max_bytes_;
-  int offset_;            // Offset into buffer_
-  int bit_offset_;        // Offset into current uint64_t
-};
-
-/// Utility class to read bit/byte stream.  This class can read bits or bytes
-/// that are either byte aligned or not.  It also has utilities to read multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader_8byte {
- public:
-  /// buffer: buffer to read from.  The buffer's length is 'buffer_len' and must be a
-  /// multiple of 8.
-  BitReader_8byte(uint8_t* buffer, int buffer_len) :
-      buffer_(reinterpret_cast<uint64_t*>(buffer)),
-      max_bytes_(buffer_len),
-      offset_(0),
-      bit_offset_(0) {
-    DCHECK_EQ(buffer_len % 8, 0);
-  }
-
-  BitReader_8byte() : buffer_(NULL), max_bytes_(0) {}
-
-  /// Gets the next value from the buffer.
-  /// Returns true if 'v' could be read or false if there are not enough bytes left.
-  template<typename T>
-  bool GetValue(int num_bits, T* v);
-
-  /// Reads a T sized value from the buffer.  T needs to be a native type and little
-  /// endian.  The value is assumed to be byte aligned so the stream will be advance
-  /// to the start of the next byte before v is read.
-  template<typename T>
-  bool GetAligned(int num_bits, T* v);
-
-  /// Reads a vlq encoded int from the stream.  The encoded int must start at the
-  /// beginning of a byte. Return false if there were not enough bytes in the buffer.
-  bool GetVlqInt(int32_t* v);
-
-  /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
-  /// there may be an additional fraction of a byte).
-  inline int bytes_left() {
-    return max_bytes_ - (offset_ * 8 + BitUtil::Ceil(bit_offset_, 8));
-  }
-
-  /// Maximum byte length of a vlq encoded int
-  static const int MAX_VLQ_BYTE_LEN = 5;
- 
- private:
-  uint64_t* buffer_;
-  int max_bytes_;
-  int offset_;            // Offset into buffer_
-  int bit_offset_;        // Offset into current uint64_t
-
-  /// Advances offset_ and/or bit_offset_ to next byte boundary in buffer_.
-  inline void Align();
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.inline.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.inline.h b/be/src/experiments/bit-stream-utils.8byte.inline.h
deleted file mode 100644
index 1b1c05a..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.inline.h
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-
-#include "experiments/bit-stream-utils.8byte.h"
-
-namespace impala {
-
-inline bool BitWriter_8byte::PutValue(uint64_t v, int num_bits) {
-  DCHECK_LE(num_bits, 64);
-  DCHECK(num_bits == 64 || v >> num_bits == 0)
-      << "v = " << v << ", num_bits = " << num_bits;
-
-  if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false;
-
-  buffer_[offset_] |= v << bit_offset_;
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    ++offset_;
-    bit_offset_ -= 64;
-    if (UNLIKELY(bit_offset_ > 0)) {
-      // Write out bits of v that crossed into new byte offset
-      // We cannot perform v >> num_bits (i.e. when bit_offset_ is 0) because v >> 64 != 0
-      buffer_[offset_] = v >> (num_bits - bit_offset_);
-    }
-  }
-  DCHECK_LT(bit_offset_, 64);
-  return true;
-}
-
-inline uint8_t* BitWriter_8byte::GetNextBytePtr(int num_bytes) {
-  if (UNLIKELY(bytes_written() + num_bytes > max_bytes_)) return NULL;
-
-  // Advance to next aligned byte if necessary
-  if (UNLIKELY(bit_offset_ > 56)) {
-    ++offset_;
-    bit_offset_ = 0;
-  } else {
-    bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
-  }
-
-  DCHECK_EQ(bit_offset_ % 8, 0);
-  uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer_ + offset_) + bit_offset_ / 8;
-  bit_offset_ += num_bytes * 8;
-  offset_ += bit_offset_ / 64;
-  bit_offset_ %= 64;
-  return ptr;
-}
-
-template<typename T>
-inline bool BitWriter_8byte::PutAligned(T val, int num_bits) {
-  // Align to byte boundary
-  uint8_t* byte_ptr = GetNextBytePtr(0);
-  bool result = PutValue(val, num_bits);
-  if (!result) return false;
-  // Pad to next byte boundary
-  byte_ptr = GetNextBytePtr(0);
-  DCHECK(byte_ptr != NULL);
-  return true;
-}
-
-inline bool BitWriter_8byte::PutVlqInt(int32_t v) {
-  bool result = true;
-  while ((v & 0xFFFFFF80) != 0L) {
-    result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 8);
-    v >>= 7;
-  }
-  result &= PutAligned<uint8_t>(v & 0x7F, 8);
-  return result;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetValue(int num_bits, T* v) {
-  int size = sizeof(T) * 8;
-  DCHECK_LE(num_bits, size);
-  if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false;
-
-  *v = BitUtil::TrailingBits(buffer_[offset_], bit_offset_ + num_bits) >> bit_offset_;
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    ++offset_;
-    bit_offset_ -= 64;
-    // Read bits of v that crossed into new byte offset
-    *v |= BitUtil::TrailingBits(buffer_[offset_], bit_offset_)
-        << (num_bits - bit_offset_);
-  }
-  DCHECK_LT(bit_offset_, 64);
-  return true;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetAligned(int num_bits, T* v) {
- Align();
-  if (UNLIKELY(bytes_left() < BitUtil::Ceil(num_bits, 8))) return false;
-  DCHECK_EQ(bit_offset_ % 8, 0);
-  bool result = GetValue(num_bits, v);
-  DCHECK(result);
-  Align();
-  return true;
-}
-
-inline bool BitReader_8byte::GetVlqInt(int32_t* v) {
-  *v = 0;
-  int shift = 0;
-  int num_bytes = 0;
-  uint8_t byte = 0;
-  do {
-    if (!GetAligned<uint8_t>(8, &byte)) return false;
-    *v |= (byte & 0x7F) << shift;
-    shift += 7;
-    DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
-  } while ((byte & 0x80) != 0);
-  return true;
-}
-
-inline void BitReader_8byte::Align() {
-  if (UNLIKELY(bit_offset_ > 56)) {
-    ++offset_;
-    bit_offset_ = 0;
-    DCHECK_LE(offset_, max_bytes_);
-  } else {
-    bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
-  }
-}
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 62e5e88..05036db 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -62,6 +62,27 @@ class BitPacking {
       const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
       OutType* __restrict__ out);
 
+  /// Same as above, templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* __restrict__ in,
+      int64_t in_bytes, int64_t num_values, OutType* __restrict__ out);
+
+  /// Unpack values as above, treating them as unsigned integers, and decode them
+  /// using the provided dict. Sets 'decode_error' to true if one of the packed
+  /// values was greater than 'dict_len'. Does not modify 'decode_error' on success.
+  template <typename OutType>
+  static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int bit_width,
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      bool* __restrict__ decode_error);
+
+  /// Same as above, templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      bool* __restrict__ decode_error);
+
   /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to
   /// 'in_bytes' of addressable memory, and 'in_bytes' must be at least
   /// (32 * bit_width / 8). 'out' must have space for 32 OutType values.
@@ -70,22 +91,42 @@ class BitPacking {
   static const uint8_t* Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ out);
 
- private:
-  /// Implementation of Unpack32Values() that uses 32-bit integer loads to
-  /// unpack values with the given BIT_WIDTH from 'in' to 'out'.
+  /// Same as Unpack32Values() but templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* Unpack32Values(
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out);
 
-  /// Function that unpacks 'num_values' values with the given BIT_WIDTH from 'in' to
-  /// 'out'. 'num_values' can be at most 32. The version with 'bit_width' as an argument
-  /// dispatches based on 'bit_width' to the appropriate templated implementation.
-  template <typename OutType, int BIT_WIDTH>
-  static const uint8_t* UnpackUpTo32Values(const uint8_t* __restrict__ in,
-      int64_t in_bytes, int num_values, OutType* __restrict__ out);
+  /// Same as Unpack32Values() with dictionary decoding.
   template <typename OutType>
-  static const uint8_t* UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in,
+  static const uint8_t* UnpackAndDecode32Values(int bit_width,
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+      int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+  /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+      OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+  /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'.
+  /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable
+  /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 8).
+  /// 'out' must have space for 'num_values' OutType values.
+  /// 0 <= 'bit_width' <= 32 and 'bit_width' <= # of bits in OutType.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, int num_values, OutType* __restrict__ out);
+
+  /// Same as UnpackUpTo31Values() with dictionary decoding.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
+      OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+ private:
+  /// Compute the number of values with the given bit width that can be unpacked from
+  /// an input buffer of 'in_bytes' into an output buffer with space for 'num_values'.
+  static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 37d51ab..4b2bb33 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -31,32 +31,109 @@
 
 namespace impala {
 
+inline int64_t BitPacking::NumValuesToUnpack(
+    int bit_width, int64_t in_bytes, int64_t num_values) {
+  // Check if we have enough input bytes to decode 'num_values'.
+  if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= in_bytes) {
+    // Limited by output space.
+    return num_values;
+  } else {
+    // Limited by the number of input bytes. Compute the number of values that can be
+    // unpacked from the input.
+    return (in_bytes * CHAR_BIT) / bit_width;
+  }
+}
+
 template <typename OutType>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width,
     const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
     OutType* __restrict__ out) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+  case i:                                       \
+    return UnpackValues<OutType, i>(in, in_bytes, num_values, out);
+
+  switch (bit_width) {
+    // Expand cases from 0 to 32.
+    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+    default:
+      DCHECK(false);
+      return std::make_pair(nullptr, -1);
+  }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(
+    const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
+    OutType* __restrict__ out) {
   constexpr int BATCH_SIZE = 32;
-  const int64_t max_input_values =
-      bit_width ? (in_bytes * CHAR_BIT) / bit_width : num_values;
-  const int64_t values_to_read = std::min(num_values, max_input_values);
+  const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
   const int64_t batches_to_read = values_to_read / BATCH_SIZE;
   const int64_t remainder_values = values_to_read % BATCH_SIZE;
   const uint8_t* in_pos = in;
   OutType* out_pos = out;
   // First unpack as many full batches as possible.
   for (int64_t i = 0; i < batches_to_read; ++i) {
-    in_pos = Unpack32Values<OutType>(bit_width, in_pos, in_bytes, out_pos);
+    in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos);
     out_pos += BATCH_SIZE;
-    in_bytes -= (BATCH_SIZE * bit_width) / CHAR_BIT;
+    in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
   }
   // Then unpack the final partial batch.
   if (remainder_values > 0) {
-    in_pos = UnpackUpTo32Values<OutType>(bit_width,
+    in_pos = UnpackUpTo31Values<OutType, BIT_WIDTH>(
         in_pos, in_bytes, remainder_values, out_pos);
   }
   return std::make_pair(in_pos, values_to_read);
 }
 
+template <typename OutType>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_width,
+    const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    bool* __restrict__ decode_error) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+  case i:                                       \
+    return UnpackAndDecodeValues<OutType, i>(   \
+        in, in_bytes, dict, dict_len, num_values, out, decode_error);
+
+  switch (bit_width) {
+    // Expand cases from 0 to 32.
+    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+    default:
+      DCHECK(false);
+      return std::make_pair(nullptr, -1);
+  }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
+    const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    bool* __restrict__ decode_error) {
+  constexpr int BATCH_SIZE = 32;
+  const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
+  const int64_t batches_to_read = values_to_read / BATCH_SIZE;
+  const int64_t remainder_values = values_to_read % BATCH_SIZE;
+  const uint8_t* in_pos = in;
+  OutType* out_pos = out;
+  // First unpack as many full batches as possible.
+  for (int64_t i = 0; i < batches_to_read; ++i) {
+    in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(
+        in_pos, in_bytes, dict, dict_len, out_pos, decode_error);
+    out_pos += BATCH_SIZE;
+    in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
+  }
+  // Then unpack the final partial batch.
+  if (remainder_values > 0) {
+    in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
+        in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, decode_error);
+  }
+  return std::make_pair(in_pos, values_to_read);
+}
+
 // Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit width of
 // the packed values. 'in_buf' is the start of the input buffer and 'out_vals' is the
 // start of the output values array. This function unpacks the VALUE_IDX'th packed value
@@ -111,60 +188,83 @@ inline uint32_t ALWAYS_INLINE UnpackValue(const uint8_t* __restrict__ in_buf) {
   }
 }
 
+template <typename OutType>
+inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t dict_len,
+    uint32_t idx, OutType* __restrict__ out_val, bool* __restrict__ decode_error) {
+  if (UNLIKELY(idx >= dict_len)) {
+    *decode_error = true;
+  } else {
+    // Use memcpy() because we can't assume sufficient alignment in some cases (e.g.
+    // 16 byte decimals).
+    memcpy(out_val, &dict[idx], sizeof(OutType));
+  }
+}
+
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::Unpack32Values(
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
-  static_assert(
-      BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
   constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
   DCHECK_GE(in_bytes, BYTES_TO_READ);
 
-// Call UnpackValue for 0 <= i < 32.
-#pragma push_macro("UNPACK_VALUES_CALL")
+  // Call UnpackValue for 0 <= i < 32.
+#pragma push_macro("UNPACK_VALUE_CALL")
 #define UNPACK_VALUE_CALL(ignore1, i, ignore2) \
   out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i>(in));
+
   BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore);
-#pragma pop_macro("UNPACK_VALUES_CALL")
   return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUE_CALL")
 }
 
 template <typename OutType>
 const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
     int64_t in_bytes, OutType* __restrict__ out) {
-  switch (bit_width) {
-    // Expand cases from 0 to 32.
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
     case i: return Unpack32Values<OutType, i>(in, in_bytes, out);
-    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
-    default: DCHECK(false); return in;
-  }
-}
 
-template <typename OutType>
-const uint8_t* BitPacking::UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in,
-    int64_t in_bytes, int num_values, OutType* __restrict__ out) {
   switch (bit_width) {
     // Expand cases from 0 to 32.
-#pragma push_macro("UNPACK_VALUES_CASE")
-#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
-    case i: return UnpackUpTo32Values<OutType, i>(in, in_bytes, num_values, out);
     BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
     default: DCHECK(false); return in;
   }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in,
+    int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+    OutType* __restrict__ out, bool* __restrict__ decode_error) {
+  static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+  static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
+  constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
+  DCHECK_GE(in_bytes, BYTES_TO_READ);
+  // TODO: this could be optimised further by using SIMD instructions.
+  // https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/
+
+  // Call UnpackValue() and DecodeValue() for 0 <= i < 32.
+#pragma push_macro("DECODE_VALUE_CALL")
+#define DECODE_VALUE_CALL(ignore1, i, ignore2)               \
+  {                                                          \
+    uint32_t idx = UnpackValue<BIT_WIDTH, i>(in);            \
+    DecodeValue(dict, dict_len, idx, &out[i], decode_error); \
+  }
+
+  BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
+  return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUE_CALL")
 }
 
 template <typename OutType, int BIT_WIDTH>
-const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in,
+const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
     int64_t in_bytes, int num_values, OutType* __restrict__ out) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
-  static_assert(
-      BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
   constexpr int MAX_BATCH_SIZE = 31;
   const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
   DCHECK_GE(in_bytes, BYTES_TO_READ);
@@ -183,19 +283,65 @@ const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in,
     in_buffer = tmp_buffer;
   }
 
-  // Use switch with fall-through cases to minimise branching.
-  switch (num_values) {
-// Expand cases from 31 down to 1.
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
   case 31 - i: out[30 - i] = \
       static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i>(in_buffer));
+
+  // Use switch with fall-through cases to minimise branching.
+  switch (num_values) {
+  // Expand cases from 31 down to 1.
     BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
     case 0: break;
     default: DCHECK(false);
   }
   return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
+      OutType* __restrict__ out, bool* __restrict__ decode_error) {
+  static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+  static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
+  constexpr int MAX_BATCH_SIZE = 31;
+  const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
+  DCHECK_GE(in_bytes, BYTES_TO_READ);
+  DCHECK_LE(num_values, MAX_BATCH_SIZE);
+
+  // Make sure the buffer is at least 1 byte.
+  constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ?
+    (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1;
+  uint8_t tmp_buffer[TMP_BUFFER_SIZE];
+
+  const uint8_t* in_buffer = in;
+  // Copy into padded temporary buffer to avoid reading past the end of 'in' if the
+  // last 32-bit load would go past the end of the buffer.
+  if (BitUtil::RoundUp(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) {
+    memcpy(tmp_buffer, in, BYTES_TO_READ);
+    in_buffer = tmp_buffer;
+  }
+
+#pragma push_macro("DECODE_VALUES_CASE")
+#define DECODE_VALUES_CASE(ignore1, i, ignore2)                   \
+  case 31 - i: {                                                  \
+    uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer);     \
+    DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \
+  }
+
+  // Use switch with fall-through cases to minimise branching.
+  switch (num_values) {
+    // Expand cases from 31 down to 1.
+    BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore);
+    case 0:
+      break;
+    default:
+      DCHECK(false);
+  }
+  return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUES_CASE")
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 5acdeee..bac127a 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -92,53 +92,67 @@ class BitWriter {
   int bit_offset_;        // Offset in buffered_values_
 };
 
-/// Utility class to read bit/byte stream.  This class can read bits or bytes
-/// that are either byte aligned or not.  It also has utilities to read multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader {
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not. It also has utilities to read multiple bytes in one
+/// read (e.g. encoded int). Exposes a batch-oriented interface to allow efficient
+/// processing of multiple values at a time.
+class BatchedBitReader {
  public:
   /// 'buffer' is the buffer to read from.  The buffer's length is 'buffer_len'.
   /// Does not take ownership of the buffer.
-  BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); }
+  BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) {
+    Reset(buffer, buffer_len);
+  }
 
-  BitReader() : buffer_(NULL), max_bytes_(0) {}
+  BatchedBitReader() {}
 
-  // The implicit copy constructor is left defined. If a BitReader is copied, the
+  // The implicit copy constructor is left defined. If a BatchedBitReader is copied, the
   // two copies do not share any state. Invoking functions on either copy continues
   // reading from the current read position without modifying the state of the other
   // copy.
 
   /// Resets the read to start reading from the start of 'buffer'. The buffer's
   /// length is 'buffer_len'. Does not take ownership of the buffer.
-  void Reset(const uint8_t* buffer, int buffer_len) {
-    buffer_ = buffer;
-    max_bytes_ = buffer_len;
-    byte_offset_ = 0;
-    bit_offset_ = 0;
-    int num_bytes = std::min(8, max_bytes_);
-    memcpy(&buffered_values_, buffer_, num_bytes);
+  void Reset(const uint8_t* buffer, int64_t buffer_len) {
+    buffer_pos_ = buffer;
+    buffer_end_ = buffer + buffer_len;
   }
 
-  /// Gets the next value from the buffer.  Returns true if 'v' could be read or false if
-  /// there are not enough bytes left. num_bits must be <= 32.
+  /// Gets up to 'num_values' bit-packed values, starting from the current byte in the
+  /// buffer and advance the read position. 'bit_width' must be <= 32.
+  /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing bytes are
+  /// skipped and the next UnpackBatch() call will start reading from the next byte.
+  ///
+  /// If the caller does not want to drop trailing bits, 'num_values' must be exactly the
+  /// total number of values the caller wants to read from a run of bit-packed values, or
+  /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is always
+  /// satisfied if 'num_values' is a multiple of 32.
+  ///
+  /// Returns the number of values read.
   template<typename T>
-  bool GetValue(int num_bits, T* v);
+  int UnpackBatch(int bit_width, int num_values, T* v);
 
-  /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
-  /// little-endian native type and big enough to store 'num_bytes'. The value is assumed
-  /// to be byte-aligned so the stream will be advanced to the start of the next byte
-  /// before 'v' is read. Returns false if there are not enough bytes left.
+  /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the
+  /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is
+  /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'.
+  /// Otherwise returns the number of values decoded.
   template<typename T>
-  bool GetAligned(int num_bytes, T* v);
+  int UnpackAndDecodeBatch(
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v);
+
+  /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T
+  /// needs to be a little-endian native type and big enough to store 'num_bytes'.
+  /// Returns false if there are not enough bytes left.
+  template<typename T>
+  bool GetBytes(int num_bytes, T* v);
 
   /// Reads a vlq encoded int from the stream.  The encoded int must start at the
   /// beginning of a byte. Return false if there were not enough bytes in the buffer or
   /// the int is invalid.
   bool GetVlqInt(int32_t* v);
 
-  /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
-  /// there may be an additional fraction of a byte).
-  int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+  /// Returns the number of bytes left in the stream.
+  int bytes_left() { return buffer_end_ - buffer_pos_; }
 
   /// Maximum byte length of a vlq encoded int
   static const int MAX_VLQ_BYTE_LEN = 5;
@@ -147,17 +161,12 @@ class BitReader {
   static const int MAX_BITWIDTH = 32;
 
  private:
-  const uint8_t* buffer_;
-  int max_bytes_;
+  /// Current read position in the buffer.
+  const uint8_t* buffer_pos_ = nullptr;
 
-  /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
-  /// faster than reading values byte by byte directly from buffer_.
-  uint64_t buffered_values_;
-
-  int byte_offset_;       // Offset in buffer_
-  int bit_offset_;        // Offset in buffered_values_
+  /// Pointer to the byte after the end of the buffer.
+  const uint8_t* buffer_end_ = nullptr;
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index c8744aa..3974492 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -18,9 +18,11 @@
 #ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
 #define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
 
-#include "common/compiler-util.h"
 #include "util/bit-stream-utils.h"
 
+#include "common/compiler-util.h"
+#include "util/bit-packing.inline.h"
+
 namespace impala {
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
@@ -84,83 +86,67 @@ inline bool BitWriter::PutVlqInt(int32_t v) {
   return result;
 }
 
-/// Force inlining - this is used in perf-critical loops in Parquet and GCC often doesn't
-/// inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool BitReader::GetValue(int num_bits, T* v) {
-  DCHECK(num_bits == 0 || buffer_ != NULL);
-  // TODO: revisit this limit if necessary
-  DCHECK_LE(num_bits, MAX_BITWIDTH);
-  DCHECK_LE(num_bits, sizeof(T) * 8);
-
-  // First do a cheap check to see if we may read past the end of the stream, using
-  // constant upper bounds for 'bit_offset_' and 'num_bits'.
-  if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) {
-    // Now do the precise check.
-    if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
-      return false;
-    }
-  }
-
-  DCHECK_GE(bit_offset_, 0);
-  DCHECK_LE(bit_offset_, 64);
-  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
-
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    byte_offset_ += 8;
-    bit_offset_ -= 64;
-
-    int bytes_remaining = max_bytes_ - byte_offset_;
-    if (LIKELY(bytes_remaining >= 8)) {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
-    } else {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
-    }
+template<typename T>
+inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, MAX_BITWIDTH);
+  DCHECK_LE(bit_width, sizeof(T) * 8);
+  DCHECK_GE(num_values, 0);
+
+  int64_t num_read;
+  std::tie(buffer_pos_, num_read) = BitPacking::UnpackValues(bit_width, buffer_pos_,
+      bytes_left(), num_values, v);
+  DCHECK_LE(buffer_pos_, buffer_end_);
+  DCHECK_LE(num_read, num_values);
+  return static_cast<int>(num_read);
+}
 
-    // Read bits of v that crossed into new buffered_values_
-    *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
-          << (num_bits - bit_offset_);
-  }
-  DCHECK_LE(bit_offset_, 64);
-  return true;
+template<typename T>
+inline int BatchedBitReader::UnpackAndDecodeBatch(
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v){
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, MAX_BITWIDTH);
+  DCHECK_LE(bit_width, sizeof(T) * 8);
+  DCHECK_GE(num_values, 0);
+
+  const uint8_t* new_buffer_pos;
+  int64_t num_read;
+  bool decode_error = false;
+  std::tie(new_buffer_pos, num_read) = BitPacking::UnpackAndDecodeValues(bit_width,
+      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error);
+  if (UNLIKELY(decode_error)) return -1;
+  buffer_pos_ = new_buffer_pos;
+  DCHECK_LE(buffer_pos_, buffer_end_);
+  DCHECK_LE(num_read, num_values);
+  return static_cast<int>(num_read);
 }
 
 template<typename T>
-inline bool BitReader::GetAligned(int num_bytes, T* v) {
+inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) {
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(num_bytes, 0);
   DCHECK_LE(num_bytes, sizeof(T));
-  int bytes_read = BitUtil::Ceil(bit_offset_, 8);
-  if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
-
-  // Advance byte_offset to next unread byte and read num_bytes
-  byte_offset_ += bytes_read;
-  memcpy(v, buffer_ + byte_offset_, num_bytes);
-  byte_offset_ += num_bytes;
-
-  // Reset buffered_values_
-  bit_offset_ = 0;
-  int bytes_remaining = max_bytes_ - byte_offset_;
-  if (LIKELY(bytes_remaining >= 8)) {
-    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
-  } else {
-    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
-  }
+  if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false;
+  *v = 0; // Ensure unset bytes are initialized to zero.
+  memcpy(v, buffer_pos_, num_bytes);
+  buffer_pos_ += num_bytes;
   return true;
 }
 
-inline bool BitReader::GetVlqInt(int32_t* v) {
+inline bool BatchedBitReader::GetVlqInt(int32_t* v) {
   *v = 0;
   int shift = 0;
   uint8_t byte = 0;
   do {
     if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false;
-    if (!GetAligned<uint8_t>(1, &byte)) return false;
+    if (!GetBytes(1, &byte)) return false;
     *v |= (byte & 0x7F) << shift;
     shift += 7;
   } while ((byte & 0x80) != 0);
   return true;
 }
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 62b3d3a..fa5e798 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -174,13 +174,16 @@ class DictDecoderBase {
     DCHECK_GE(buffer_len, 0);
     if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
     uint8_t bit_width = *buffer;
-    if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
+    if (UNLIKELY(bit_width < 0 || bit_width > BatchedBitReader::MAX_BITWIDTH)) {
       return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
           "width: $0", bit_width));
     }
     ++buffer;
     --buffer_len;
     data_decoder_.Reset(buffer, buffer_len, bit_width);
+    num_repeats_ = 0;
+    num_literal_values_ = 0;
+    next_literal_idx_ = 0;
     return Status::OK();
   }
 
@@ -193,7 +196,22 @@ class DictDecoderBase {
   virtual void GetValue(int index, void* buffer) = 0;
 
  protected:
-  RleDecoder data_decoder_;
+  /// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow
+  /// efficient reading in batches from data_decoder_. Increasing the batch size up to
+  /// 128 seems to improve performance, but increasing further did not make a noticeable
+  /// difference.
+  static const int DECODED_BUFFER_SIZE = 128;
+
+  RleBatchDecoder<uint32_t> data_decoder_;
+
+  /// Greater than zero if we've started decoding a repeated run.
+  int64_t num_repeats_ = 0;
+
+  /// Greater than zero if we have buffered some literal values.
+  int num_literal_values_ = 0;
+
+  /// The index of the next decoded value to return.
+  int next_literal_idx_ = 0;
 };
 
 template<typename T>
@@ -211,7 +229,7 @@ class DictDecoder : public DictDecoderBase {
   /// Returns true if the dictionary values were all successfully decoded, or false
   /// if the dictionary was corrupt.
   template<parquet::Type::type PARQUET_TYPE>
-  bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
+  bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) WARN_UNUSED_RESULT;
 
   virtual int num_entries() const { return dict_.size(); }
 
@@ -219,17 +237,26 @@ class DictDecoder : public DictDecoderBase {
     T* val_ptr = reinterpret_cast<T*>(buffer);
     DCHECK_GE(index, 0);
     DCHECK_LT(index, dict_.size());
-    // TODO: is there any circumstance where this should be a memcpy?
     *val_ptr = dict_[index];
   }
 
   /// Returns the next value.  Returns false if the data is invalid.
   /// For StringValues, this does not make a copy of the data.  Instead,
   /// the string data is from the dictionary buffer passed into the c'tor.
-  bool GetNextValue(T* value);
+  bool GetNextValue(T* value) WARN_UNUSED_RESULT;
 
  private:
   std::vector<T> dict_;
+
+  /// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of
+  /// a repeated run, the first element is the current dict value. If in a literal run,
+  /// this contains 'num_literal_values_' values, with the next value to be returned at
+  /// 'next_literal_idx_'.
+  T decoded_values_[DECODED_BUFFER_SIZE];
+
+  /// Slow path for GetNextValue() where we need to decode new values. Should not be
+  /// inlined everywhere.
+  bool DecodeNextValue(T* value);
 };
 
 template<typename T>
@@ -290,30 +317,47 @@ inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
 // Force inlining - GCC does not always inline this into hot loops in Parquet scanner.
 template <typename T>
 ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
-  int index = -1; // Initialize to avoid compiler warning.
-  bool result = data_decoder_.Get(&index);
-  // Use & to avoid branches.
-  if (LIKELY(result & (index >= 0) & (index < dict_.size()))) {
-    *value = dict_[index];
+  // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
+  // byte aligned for Decimal16Values.
+  if (num_repeats_ > 0) {
+    --num_repeats_;
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    return true;
+  } else if (next_literal_idx_ < num_literal_values_) {
+    int idx = next_literal_idx_++;
+    memcpy(value, &decoded_values_[idx], sizeof(T));
     return true;
   }
-  return false;
+  // No decoded values left - need to decode some more.
+  return DecodeNextValue(value);
 }
 
-// Force inlining - GCC does not always inline this into hot loops in Parquet scanner.
-template <>
-ALWAYS_INLINE inline bool DictDecoder<Decimal16Value>::GetNextValue(
-    Decimal16Value* value) {
-  int index;
-  bool result = data_decoder_.Get(&index);
-  if (!result) return false;
-  if (index >= dict_.size()) return false;
-  // Workaround for IMPALA-959. Use memcpy instead of '=' so addresses
-  // do not need to be 16 byte aligned.
-  uint8_t* addr = reinterpret_cast<uint8_t*>(dict_.data());
-  addr = addr + index * sizeof(*value);
-  memcpy(value, addr, sizeof(*value));
-  return true;
+template <typename T>
+bool DictDecoder<T>::DecodeNextValue(T* value) {
+  // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
+  // byte aligned for Decimal16Values.
+  uint32_t num_repeats = data_decoder_.NextNumRepeats();
+  if (num_repeats > 0) {
+    uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats);
+    if (UNLIKELY(idx >= dict_.size())) return false;
+    memcpy(&decoded_values_[0], &dict_[idx], sizeof(T));
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    num_repeats_ = num_repeats - 1;
+    return true;
+  } else {
+    uint32_t num_literals = data_decoder_.NextNumLiterals();
+    if (UNLIKELY(num_literals == 0)) return false;
+
+    uint32_t num_to_decode = std::min<uint32_t>(num_literals, DECODED_BUFFER_SIZE);
+    if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
+            num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) {
+      return false;
+    }
+    num_literal_values_ = num_to_decode;
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    next_literal_idx_ = 1;
+    return true;
+  }
 }
 
 template<typename T>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index de0fb11..11043f0 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -17,7 +17,9 @@
 
 #include <stdlib.h>
 #include <stdio.h>
+
 #include <iostream>
+#include <utility>
 
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.inline.h"
@@ -65,7 +67,7 @@ void ValidateDict(const vector<InternalType>& values,
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
   for (InternalType i: values) {
     InternalType j;
-    decoder.GetNextValue(&j);
+    ASSERT_TRUE(decoder.GetNextValue(&j));
     EXPECT_EQ(i, j);
   }
   pool.FreeAll();
@@ -196,6 +198,98 @@ TEST(DictTest, TestStringBufferOverrun) {
       0));
 }
 
+// Make sure that SetData() resets the dictionary decoder, including the embedded RLE
+// decoder to a clean state, even if the input is not fully consumed. The RLE decoder
+// has various state that needs to be reset.
+TEST(DictTest, SetDataAfterPartialRead) {
+  MemTracker tracker;
+  MemPool pool(&tracker);
+  DictEncoder<int> encoder(&pool, sizeof(int));
+
+  // Literal run followed by a repeated run.
+  vector<int> values{1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9};
+  for (int val: values) encoder.Put(val);
+
+  vector<uint8_t> dict_buffer(encoder.dict_encoded_size());
+  encoder.WriteDict(dict_buffer.data());
+  vector<uint8_t> data_buffer(encoder.EstimatedDataEncodedSize() * 2);
+  int data_len = encoder.WriteData(data_buffer.data(), data_buffer.size());
+  ASSERT_GT(data_len, 0);
+  encoder.ClearIndices();
+
+  DictDecoder<int> decoder;
+  ASSERT_TRUE(decoder.template Reset<parquet::Type::INT32>(
+      dict_buffer.data(), dict_buffer.size(), sizeof(int)));
+
+  // Test decoding some of the values, then resetting. If the decoder incorrectly
+  // caches some values, this could produce incorrect results.
+  for (int num_to_decode = 0; num_to_decode < values.size(); ++num_to_decode) {
+    ASSERT_OK(decoder.SetData(data_buffer.data(), data_buffer.size()));
+    for (int i = 0; i < num_to_decode; ++i) {
+      int val;
+      ASSERT_TRUE(decoder.GetNextValue(&val));
+      EXPECT_EQ(values[i], val) << num_to_decode << " " << i;
+    }
+  }
+}
+
+// Test handling of decode errors from out-of-range values.
+TEST(DictTest, DecodeErrors) {
+  MemTracker tracker;
+  MemPool pool(&tracker);
+  DictEncoder<int> small_dict_encoder(&pool, sizeof(int));
+
+  // Generate a dictionary with 9 values (requires 4 bits to encode).
+  vector<int> small_dict_values{1, 2, 3, 4, 5, 6, 7, 8, 9};
+  for (int val: small_dict_values) small_dict_encoder.Put(val);
+
+  vector<uint8_t> small_dict_buffer(small_dict_encoder.dict_encoded_size());
+  small_dict_encoder.WriteDict(small_dict_buffer.data());
+  small_dict_encoder.ClearIndices();
+
+  DictDecoder<int> small_dict_decoder;
+  ASSERT_TRUE(small_dict_decoder.template Reset<parquet::Type::INT32>(
+        small_dict_buffer.data(), small_dict_buffer.size(), sizeof(int)));
+
+  // Generate dictionary-encoded data with between 9 and 15 distinct values to test that
+  // error is detected when the decoder reads a 4-bit value that is out of range.
+  using TestCase = pair<string, vector<int>>;
+  vector<TestCase> test_cases{
+    {"Out-of-range value in a repeated run",
+        {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}},
+    {"Out-of-range literal run in the last < 32 element batch",
+      {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}},
+    {"Out-of-range literal run in the middle of a 32 element batch",
+      {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
+       11, 12, 13, 14, 15, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}}};
+  for (TestCase& test_case: test_cases) {
+    // Encode the values. This will produce a dictionary with more distinct values than
+    // the small dictionary that we'll use to decode it.
+    DictEncoder<int> large_dict_encoder(&pool, sizeof(int));
+    // Initialize the dictionary with the values already in the small dictionary.
+    for (int val : small_dict_values) large_dict_encoder.Put(val);
+    large_dict_encoder.ClearIndices();
+
+    for (int val: test_case.second) large_dict_encoder.Put(val);
+
+    vector<uint8_t> data_buffer(large_dict_encoder.EstimatedDataEncodedSize() * 2);
+    int data_len = large_dict_encoder.WriteData(data_buffer.data(), data_buffer.size());
+    ASSERT_GT(data_len, 0);
+    large_dict_encoder.ClearIndices();
+
+    ASSERT_OK(small_dict_decoder.SetData(data_buffer.data(), data_buffer.size()));
+    bool failed = false;
+    for (int i = 0; i < test_case.second.size(); ++i) {
+      int val;
+      failed = !small_dict_decoder.GetNextValue(&val);
+      if (failed) break;
+    }
+    EXPECT_TRUE(failed) << "Should have detected out-of-range dict-encoded value in test "
+        << test_case.first;
+  }
+}
+
 }
 
 IMPALA_TEST_MAIN();