You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/17 02:08:45 UTC

[7/7] impala git commit: IMPALA-4123: Columnar decoding in Parquet

IMPALA-4123: Columnar decoding in Parquet

The idea is to optimise the common case where there are long runs of
NULL or non-NULL values (i.e. the def level is repeated). We can
detect this cheaply by keying the decoding loop in the column reader
off the state of the def level RLE decoder - if there's a long run
of repeated levels, we can skip checking the def level for every
value. We still fall back to decoding, caching and reading
value-by-value a batch of def levels whenever the next def level is not
in a repeated run. We still use the old approach for decoding rep
levels. There might be some benefit to using the same approach for rep
levels *if* repeated def and rep level runs line up.

These changes should unlock further optimizations because more time is
spent in simple kernel functions, e.g. UnpackAndDecode32Values() for
dictionary decompression, which is very optimisable using SIMD etc.

Snappy decompression now seems to be the main CPU bottleneck for
decoding snappy-compressed Parquet.

Perf:
Running TPC-H scale factor 60 on uncompressed and snappy parquet
both showed a ~4% speedup overall.

Microbenchmarks on uncompressed parquet show scans only doing
dictionary decoding on uncompressed Parquet is ~75% faster:

   set mt_dop=1;
   select min(l_returnflag) from lineitem;

Testing:
We have alltypes agg with a mix of null and non-null.

Many tables have long runs of non-null values.

Added new test data and coverage:
* a test table manynulls with long runs of null values.
* a large CHAR test table
* missing coverage for materialising pos slot in flattened nested types
  scan.
* Extended dict test to test longer runs.
* A larger version of complextypestbl with interesting collection
  shapes - NULL collections, empty collections, etc, particularly runs
  of collections with the same shape.
* Test interaction of timestamp validation with conversion
* Ran code coverage build to confirm all code paths are tested
* ASAN and exhaustive runs.

Change-Id: I8c03006981c46ef0dae30602f2b73c253d9b49ef
Reviewed-on: http://gerrit.cloudera.org:8080/8319
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/153663c2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/153663c2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/153663c2

Branch: refs/heads/master
Commit: 153663c22f13f175975de9b1f20d0e2a8affbd94
Parents: a47b3b8
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Oct 11 23:58:19 2017 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Nov 17 01:48:05 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |   1 +
 be/src/exec/parquet-column-readers.cc           | 306 ++++++++++++++++---
 be/src/exec/parquet-column-readers.h            |  26 +-
 be/src/exec/parquet-common.h                    |  31 +-
 be/src/runtime/tuple.cc                         |  11 +
 be/src/runtime/tuple.h                          |  46 ++-
 be/src/util/bit-packing.h                       |  16 +-
 be/src/util/bit-packing.inline.h                |  26 +-
 be/src/util/bit-stream-utils.h                  |   7 +-
 be/src/util/bit-stream-utils.inline.h           |   6 +-
 be/src/util/dict-encoding.h                     |  87 +++++-
 be/src/util/dict-test.cc                        |   6 +-
 be/src/util/mem-util.h                          |  68 +++++
 be/src/util/rle-encoding.h                      |  64 ++--
 testdata/bin/generate-schema-statements.py      |   7 +-
 testdata/data/README                            |  22 ++
 .../out_of_range_timestamp2_hive_211.parquet    | Bin 0 -> 67105 bytes
 .../out_of_range_timestamp_hive_211.parquet     | Bin 0 -> 169096 bytes
 .../functional/functional_schema_template.sql   |  72 +++++
 .../datasets/functional/schema_constraints.csv  |  10 +-
 .../queries/QueryTest/chars.test                |  40 +++
 .../nested-types-scanner-position.test          |  20 ++
 .../queries/QueryTest/nested-types-tpch.test    |  18 ++
 ...-of-range-timestamp-local-tz-conversion.test |  47 +++
 .../queries/QueryTest/scanners-many-nulls.test  |  47 +++
 .../test_hive_parquet_timestamp_conversion.py   |  21 +-
 tests/query_test/test_scanners.py               |  11 +
 27 files changed, 883 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 7635f31..8a8b700 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -347,6 +347,7 @@ Status HdfsParquetScanner::ProcessSplit() {
 }
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
+  DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
   if (scan_node_->optimize_parquet_count_star()) {
     // Populate the single slot with the Parquet num rows statistic.
     int64_t tuple_buf_size;

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 5bf6543..aec1265 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -82,17 +82,14 @@ namespace impala {
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
     "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
 
-Status ParquetLevelDecoder::Init(const string& filename,
-    Encoding::type encoding, MemPool* cache_pool, int cache_size,
-    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+Status ParquetLevelDecoder::Init(const string& filename, Encoding::type encoding,
+    MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size) {
   DCHECK(*data != nullptr);
   DCHECK_GE(*data_size, 0);
-  DCHECK_GE(num_buffered_values, 0);
   DCHECK_GT(cache_size, 0);
   cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
   encoding_ = encoding;
   max_level_ = max_level;
-  num_buffered_values_ = num_buffered_values;
   filename_ = filename;
   RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
 
@@ -162,14 +159,30 @@ inline int16_t ParquetLevelDecoder::ReadLevel() {
   return CacheGetNext();
 }
 
+int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
+  if (CacheHasNext() || encoding_ != Encoding::RLE) return 0;
+  // Treat always-zero levels as an infinitely long run of zeroes. Return the maximum
+  // run length allowed by the Parquet standard.
+  if (max_level_ == 0) return numeric_limits<int32_t>::max();
+  return rle_decoder_.NextNumRepeats();
+}
+
+uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
+  DCHECK(!CacheHasNext());
+  DCHECK_EQ(encoding_, Encoding::RLE);
+  // Treat always-zero levels as an infinitely long run of zeroes.
+  if (max_level_ == 0) return 0;
+  return rle_decoder_.GetRepeatedValue(num_to_consume);
+}
+
 Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
   /// Fill the cache completely if there are enough values remaining.
   /// Otherwise don't try to read more values than are left.
   int batch_size = min(vals_remaining, cache_size_);
   if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) ||
-          num_cached_levels_ < batch_size)) {
-      return Status(decoding_error_code_, num_buffered_values_, filename_);
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_)
+            || num_cached_levels_ < batch_size)) {
+      return Status(decoding_error_code_, vals_remaining, filename_);
     }
   } else {
     // No levels to read, e.g., because the field is required. The cache was
@@ -276,11 +289,35 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
       int* RESTRICT num_values) RESTRICT;
 
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) override {
+  /// Fast path for MaterializeValueBatch() that materializes values for a run of
+  /// repeated definition levels. Read up to 'max_values' values into 'tuple_mem',
+  /// returning the number of values materialised in 'num_values'.
+  bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'.
+  bool ReadSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is needed.
+  bool ReadAndConvertSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is not needed.
+  bool ReadSlotsNoConversion(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' position values into a batch of tuples starting at 'tuple_mem'.
+  void ReadPositions(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  virtual Status CreateDictionaryDecoder(
+      uint8_t* values, int size, DictDecoderBase** decoder) override {
     if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(), "could not decode dictionary");
+      return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+          slot_desc_->type().DebugString(), "could not decode dictionary");
     }
     dict_decoder_init_ = true;
     *decoder = &dict_decoder_;
@@ -315,8 +352,13 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// past the end of the buffer. Return false and set 'parse_error_' if there is an
   /// error decoding the value.
   template <Encoding::type ENCODING>
-  inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end,
-      InternalType* RESTRICT val) RESTRICT;
+  inline ALWAYS_INLINE bool DecodeValue(
+      uint8_t** data, const uint8_t* data_end, InternalType* RESTRICT val) RESTRICT;
+
+  /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes. Return
+  /// false and set 'parse_error_' if there is an error decoding any value.
+  inline ALWAYS_INLINE bool DecodeValues(
+      int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT;
 
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
@@ -374,6 +416,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   /// Contains extra data needed for Timestamp decoding.
   ParquetTimestampDecoder timestamp_decoder_;
+
+  /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null otherwise.
+  uint8_t* conversion_buffer_ = nullptr;
 };
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
@@ -427,6 +472,17 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag
     }
     RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
   }
+  // Allocate a temporary buffer to hold InternalType values if we need to convert
+  // before writing to the final slot.
+  if (NeedsConversionInline() && conversion_buffer_ == nullptr) {
+    int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size();
+    conversion_buffer_ =
+        parent_->perm_pool_->TryAllocateAligned(buffer_size, alignof(InternalType));
+    if (conversion_buffer_ == nullptr) {
+      return parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_,
+          "Failed to allocate conversion buffer in Parquet scanner", buffer_size);
+    }
+  }
   // TODO: Perform filter selectivity checks here.
   return Status::OK();
 }
@@ -493,6 +549,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
       int vals_to_add = min(num_buffered_values_, max_values - val_count);
       val_count += vals_to_add;
       num_buffered_values_ -= vals_to_add;
+      DCHECK_GE(num_buffered_values_, 0);
       continue;
     }
     // Fill the rep level cache if needed. We are flattening out the fields of the
@@ -504,21 +561,29 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
       if (UNLIKELY(!parent_->parse_status_.ok())) return false;
     }
 
-    // Fill def level cache if needed.
-    if (!def_levels_.CacheHasNext()) {
-      // TODO: add a fast path here if there's a run of repeated values.
-      parent_->parse_status_.MergeStatus(
-          def_levels_.CacheNextBatch(num_buffered_values_));
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-    }
-
-    // Read data page and cached levels to materialize values.
+    const int remaining_val_capacity = max_values - val_count;
     uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-    int remaining_val_capacity = max_values - val_count;
-    int ret_val_count = 0;
-    continue_execution = MaterializeValueBatch<IN_COLLECTION>(
-        remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-    val_count += ret_val_count;
+    if (def_levels_.NextRepeatedRunLength() > 0) {
+      // Fast path to materialize a run of values with the same definition level. This
+      // avoids checking for NULL/not-NULL for every value.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatchRepeatedDefLevel(
+          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    } else {
+      // We don't have a repeated run - cache def levels and process value-by-value.
+      if (!def_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(
+            def_levels_.CacheNextBatch(num_buffered_values_));
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    }
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
       continue_execution &= ColReaderDebugAction(&val_count);
     }
@@ -539,6 +604,8 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa
   const int cache_start_idx = def_levels_.CacheCurrIdx();
   uint8_t* curr_tuple = tuple_mem;
   int val_count = 0;
+  DCHECK_LE(def_levels_.CacheRemaining(), num_buffered_values_);
+  max_values = min(max_values, num_buffered_values_);
   while (def_levels_.CacheHasNext() && val_count < max_values) {
     Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
     int def_level = def_levels_.CacheGetNext();
@@ -568,10 +635,66 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa
     ++val_count;
   }
   num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+  DCHECK_GE(num_buffered_values_, 0);
   *num_values = val_count;
   return true;
 }
 
+// Note that the structure of this function is very similar to MaterializeValueBatch()
+// above, except it is unrolled to operate on multiple values at a time.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE,
+    MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+    uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
+  DCHECK_GT(num_buffered_values_, 0);
+  if (pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
+  DCHECK_GT(def_level_repeats, 0);
+  // Peek at the def level. The number of def levels we'll consume depends on several
+  // conditions below.
+  uint8_t def_level = def_levels_.GetRepeatedValue(0);
+  int32_t num_def_levels_to_consume = 0;
+
+  if (def_level < def_level_of_immediate_repeated_ancestor()) {
+    DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
+    // A containing repeated field is empty or NULL. We don't need to return any values
+    // but need to advance any rep levels.
+    if (pos_slot_desc_ != nullptr) {
+      num_def_levels_to_consume =
+          min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
+      rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
+    } else {
+      num_def_levels_to_consume = def_level_repeats;
+    }
+    *num_values = 0;
+  } else {
+    // Cannot consume more levels than allowed by buffered input values and output space.
+    num_def_levels_to_consume =
+        min(num_buffered_values_, min(max_values, def_level_repeats));
+    if (pos_slot_desc_ != nullptr) {
+      num_def_levels_to_consume =
+          min<uint32_t>(num_def_levels_to_consume, rep_levels_.CacheRemaining());
+      ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
+    }
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) {
+          return false;
+        }
+      } else {
+        Tuple::SetNullIndicators(
+            null_indicator_offset_, num_def_levels_to_consume, tuple_size, tuple_mem);
+      }
+    }
+    *num_values = num_def_levels_to_consume;
+  }
+  // We now know how many we actually consumed.
+  def_levels_.GetRepeatedValue(num_def_levels_to_consume);
+  num_buffered_values_ -= num_def_levels_to_consume;
+  DCHECK_GE(num_buffered_values_, 0);
+  return true;
+}
+
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 template <bool IN_COLLECTION>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
@@ -622,6 +745,67 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  if (NeedsConversionInline()) {
+    return ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem);
+  } else {
+    return ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem);
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConvertSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(NeedsConversionInline());
+  DCHECK(conversion_buffer_ != nullptr);
+  InternalType* first_val = reinterpret_cast<InternalType*>(conversion_buffer_);
+  // Decode into the conversion buffer before doing the conversion into the output tuples.
+  if (!DecodeValues(sizeof(InternalType), num_to_read, first_val)) return false;
+
+  InternalType* curr_val = first_val;
+  uint8_t* curr_tuple = tuple_mem;
+  for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      // The value is invalid but execution should continue - set the null indicator and
+      // skip conversion.
+      tuple->SetNull(null_indicator_offset_);
+      continue;
+    }
+    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
+      return false;
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlotsNoConversion(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(!NeedsConversionInline());
+  // No conversion needed - decode directly into the output slots.
+  InternalType* first_slot = reinterpret_cast<InternalType*>(tuple_mem + tuple_offset_);
+  if (!DecodeValues(tuple_size, num_to_read, first_slot)) return false;
+  if (NeedsValidationInline()) {
+    // Validate the written slots.
+    uint8_t* curr_tuple = tuple_mem;
+    for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      InternalType* val = static_cast<InternalType*>(tuple->GetSlot(tuple_offset_));
+      if (UNLIKELY(!ValidateValue(val))) {
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+        // The value is invalid but execution should continue - set the null indicator and
+        // skip conversion.
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 template <Encoding::type ENCODING>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
     uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
@@ -670,16 +854,48 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::DecodeValue
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
-    ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
+    int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    int64_t encoded_len = ParquetPlainEncoder::DecodeBatch<InternalType, PARQUET_TYPE>(
+        data_, data_end_, fixed_len_size_, count, stride, out_vals);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    data_ += encoded_len;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositionBatched(
+    int16_t rep_level, int64_t* pos) {
   // Reset position counter if we are at the start of a new parent collection.
   if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
   *pos = pos_current_value_++;
 }
 
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  const int pos_slot_offset = pos_slot_desc()->tuple_offset();
+  void* first_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetSlot(pos_slot_offset);
+  StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(first_slot), tuple_size};
+  for (int64_t i = 0; i < num_to_read; ++i) {
+    ReadPositionBatched(rep_levels_.CacheGetNext(), out.Advance());
+  }
+}
+
 template <>
-inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
-::NeedsConversionInline() const {
+inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
+    true>::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
@@ -716,6 +932,9 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot
   DCHECK(timestamp_decoder_.NeedsConversion());
   TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
   *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
   timestamp_decoder_.ConvertToLocalTime(dst_ts);
   return true;
 }
@@ -726,6 +945,9 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot
   DCHECK(timestamp_decoder_.NeedsConversion());
   TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
   *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
   timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
   return true;
 }
@@ -1339,14 +1561,20 @@ Status BaseScalarColumnReader::ReadDataPage() {
     int data_size = current_page_header_.compressed_page_size;
     if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
       // We can safely skip non-data pages
-      if (!stream_->SkipBytes(data_size, &status)) return status;
+      if (!stream_->SkipBytes(data_size, &status)) {
+        DCHECK(!status.ok());
+        return status;
+      }
       continue;
     }
 
     // Read Data Page
     // TODO: when we start using page statistics, we will need to ignore certain corrupt
     // statistics. See IMPALA-2208 and PARQUET-251.
-    if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+    if (!stream_->ReadBytes(data_size, &data_, &status)) {
+      DCHECK(!status.ok());
+      return status;
+    }
     data_end_ = data_ + data_size;
     int num_values = current_page_header_.data_page_header.num_values;
     if (num_values < 0) {
@@ -1398,15 +1626,14 @@ Status BaseScalarColumnReader::ReadDataPage() {
     // Initialize the repetition level data
     RETURN_IF_ERROR(rep_levels_.Init(filename(),
         current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(),
-        max_rep_level(), num_buffered_values_,
-        &data_, &data_size));
+        parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
+        &data_size));
 
     // Initialize the definition level data
     RETURN_IF_ERROR(def_levels_.Init(filename(),
         current_page_header_.data_page_header.definition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(),
-        max_def_level(), num_buffered_values_, &data_, &data_size));
+        parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
+        &data_size));
 
     // Data can be empty if the column contains all NULLs
     RETURN_IF_ERROR(InitDataPage(data_, data_size));
@@ -1436,6 +1663,7 @@ bool BaseScalarColumnReader::NextLevels() {
     if (!NextPage()) return parent_->parse_status_.ok();
   }
   --num_buffered_values_;
+  DCHECK_GE(num_buffered_values_, 0);
 
   // Definition level is not present if column and any containing structs are required.
   def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index bce114a..d689aed 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -36,27 +36,32 @@ class MemPool;
 /// Level values are unsigned 8-bit integers because we support a maximum nesting
 /// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
 /// populating the level cache (e.g., with RLE we can memset() repeated values).
-///
-/// TODO: expose whether we're in a run of repeated values so that callers can
-/// optimise for that case.
 class ParquetLevelDecoder {
  public:
   ParquetLevelDecoder(bool is_def_level_decoder)
-    : decoding_error_code_(is_def_level_decoder ?
-          TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) {
-  }
+    : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR :
+                                                  TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
 
   /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
   /// encoding requires reading metadata from the page header. 'cache_size' will be
   /// rounded up to a multiple of 32 internally.
   Status Init(const string& filename, parquet::Encoding::type encoding,
-      MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
-      uint8_t** data, int* data_size);
+      MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
 
   /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
   /// as batched methods.
   inline int16_t ReadLevel();
 
+  /// If the next value is part of a repeated run and is not cached, return the length
+  /// of the repeated run. A max level of 0 is treated as an arbitrarily long run of
+  /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0.
+  int32_t NextRepeatedRunLength();
+
+  /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume
+  /// 'num_to_consume' items in the run. Not valid to call if there are cached levels
+  /// that have not been consumed.
+  uint8_t GetRepeatedValue(uint32_t num_to_consume);
+
   /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
   /// values left to decode in the page. Resets members associated with the cache.
   /// Returns a non-ok status if there was a problem decoding a level, if a level was
@@ -93,7 +98,7 @@ class ParquetLevelDecoder {
   /// CacheHasNext() is false.
   bool FillCache(int batch_size, int* num_cached_levels);
 
-  /// RLE decoder, used if 'encoding_' is RLE.
+  /// RLE decoder, used if 'encoding_' is RLE and max_level_ > 0.
   RleBatchDecoder<uint8_t> rle_decoder_;
 
   /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
@@ -116,9 +121,6 @@ class ParquetLevelDecoder {
   /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
   int cache_size_ = 0;
 
-  /// Number of remaining data values in the current data page.
-  int num_buffered_values_ = 0;
-
   /// Name of the parquet file. Used for reporting level decoding errors.
   string filename_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index a42ac35..806db45 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -27,6 +27,7 @@
 #include "runtime/timestamp-value.inline.h"
 #include "util/bit-util.h"
 #include "util/decimal-util.h"
+#include "util/mem-util.h"
 
 /// This file contains common elements between the parquet Writer and Scanner.
 namespace impala {
@@ -203,6 +204,15 @@ class ParquetPlainEncoder {
     memcpy(v, buffer, byte_size);
     return byte_size;
   }
+
+  /// Batched version of Decode() that tries to decode 'num_values' values from the memory
+  /// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes.
+  /// Returns the number of bytes read from 'buffer' or -1 if there was an error
+  /// decoding, e.g. invalid data or running out of input data before reading
+  /// 'num_values'.
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
+      int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v);
 };
 
 /// Calling this with arguments of type ColumnType is certainly a programmer error, so we
@@ -364,9 +374,24 @@ inline int ParquetPlainEncoder::Encode(
   return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
-template<typename T>
-inline int DecodeDecimalFixedLen(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, T* v) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
+    InternalType* v) {
+  const uint8_t* buffer_pos = buffer;
+  StrideWriter<InternalType> out(v, stride);
+  for (int64_t i = 0; i < num_values; ++i) {
+    int encoded_len = Decode<InternalType, PARQUET_TYPE>(
+        buffer_pos, buffer_end, fixed_len_size, out.Advance());
+    if (UNLIKELY(encoded_len < 0)) return -1;
+    buffer_pos += encoded_len;
+  }
+  return buffer_pos - buffer;
+}
+
+template <typename T>
+inline int DecodeDecimalFixedLen(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* v) {
   if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 0061419..4f9448a 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -205,6 +205,17 @@ void Tuple::ConvertOffsetsToPointers(const TupleDescriptor& desc, uint8_t* tuple
   }
 }
 
+void Tuple::SetNullIndicators(NullIndicatorOffset offset, int64_t num_tuples,
+    int64_t tuple_stride, uint8_t* tuple_mem) {
+  // 'offset' is passed by value instead of const reference so that the compiler knows
+  // that it doesn't alias 'tuple_mem' and keep the offset values in registers in the
+  // below loop.
+  for (int64_t i = 0; i < num_tuples; ++i) {
+    reinterpret_cast<Tuple*>(tuple_mem)->SetNull(offset);
+    tuple_mem += tuple_stride;
+  }
+}
+
 template <bool COLLECT_STRING_VALS, bool NO_POOL>
 void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
     ScalarExprEvaluator* const* evals, MemPool* pool,

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 91517f1..6a40af8 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -52,22 +52,30 @@ struct SlotOffsets {
 };
 
 /// A tuple is stored as a contiguous sequence of bytes containing a fixed number
-/// of fixed-size slots. The slots are arranged in order of increasing byte length;
-/// the tuple might contain padding between slots in order to align them according
-/// to their type.
-//
-/// The contents of a tuple:
-/// 1) a number of bytes holding a bitvector of null indicators
-/// 2) bool slots
-/// 3) tinyint slots
-/// 4) smallint slots
-/// 5) int slots
-/// 6) float slots
-/// 7) bigint slots
-/// 8) double slots
-/// 9) string slots
-//
-/// A tuple with 0 materialised slots is represented as NULL.
+/// of fixed-size slots along with a bit vector containing null indicators
+/// for each nullable slots. The layout of a tuple is computed by the planner and
+/// represented in a TupleDescriptor. A tuple has a "packed" memory layout - the
+/// start of the tuple can have any alignment and slots within the tuple are not
+/// necessarily aligned.
+///
+/// Tuples are handled as untyped memory throughout much of the runtime, with that
+/// memory reinterpreted as the appropriate slot types when needed. This Tuple class
+/// (which is a zero-length class with no members) provides a convenient abstraction
+/// over this untyped memory for common operations. The untyped tuple memory can be
+/// cast to a Tuple* in order to use the functions below.
+///
+/// NULL and zero-length Tuples
+/// ===========================
+/// Tuples can be logically NULL in some cases to indicate that all slots in the
+/// tuple are NULL. This occurs in rows produced by an outer join where a matching
+/// tuple was not found on one side of the join. In some plans the distinction between
+/// a NULL tuple and a non-NULL tuple with all NULL slots is significant and used by
+/// the planner via TupleIsNulLPredicate() to correctly place predicates at certain
+/// places in the plan.
+///
+/// A tuple with 0 materialised slots is either represented as an arbitrary non-NULL
+/// pointer (e.g. POISON), if the tuple is logically non-NULL or as a NULL pointer
+/// if the tuple is logically NULL.
 ///
 /// TODO: Our projection of collection-typed slots breaks/augments the conventional
 /// semantics of the null bits, because we rely on producers of array values to also
@@ -233,6 +241,12 @@ class Tuple {
     return (*null_indicator_byte & offset.bit_mask) != 0;
   }
 
+  /// Set the null indicators on 'num_tuples' tuples. The first tuple is stored at
+  /// 'tuple_mem' and subsequent tuples must be stored at a stride of 'tuple_stride'
+  /// bytes.
+  static void SetNullIndicators(NullIndicatorOffset offset, int64_t num_tuples,
+      int64_t tuple_stride, uint8_t* tuple_mem);
+
   void* GetSlot(int offset) {
     DCHECK(offset != -1); // -1 offset indicates non-materialized slot
     return reinterpret_cast<char*>(this) + offset;

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 05036db..38b39e2 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -68,19 +68,20 @@ class BitPacking {
       int64_t in_bytes, int64_t num_values, OutType* __restrict__ out);
 
   /// Unpack values as above, treating them as unsigned integers, and decode them
-  /// using the provided dict. Sets 'decode_error' to true if one of the packed
-  /// values was greater than 'dict_len'. Does not modify 'decode_error' on success.
+  /// using the provided dict. Writes them to 'out' with a stride of 'stride' bytes.
+  /// Sets 'decode_error' to true if one of the packed values was greater than 'dict_len'.
+  /// Does not modify 'decode_error' on success.
   template <typename OutType>
   static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int bit_width,
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
       bool* __restrict__ decode_error);
 
   /// Same as above, templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
       bool* __restrict__ decode_error);
 
   /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to
@@ -100,13 +101,14 @@ class BitPacking {
   template <typename OutType>
   static const uint8_t* UnpackAndDecode32Values(int bit_width,
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ decode_error);
+      int64_t dict_len, OutType* __restrict__ out, int64_t stride,
+      bool* __restrict__ decode_error);
 
   /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
-      OutType* __restrict__ out, bool* __restrict__ decode_error);
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error);
 
   /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'.
   /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable
@@ -121,7 +123,7 @@ class BitPacking {
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
-      OutType* __restrict__ out, bool* __restrict__ decode_error);
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error);
 
  private:
   /// Compute the number of values with the given bit width that can be unpacked from

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 9a43644..6fa31cc 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -90,13 +90,13 @@ std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(
 template <typename OutType>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_width,
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
     bool* __restrict__ decode_error) {
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
   case i:                                       \
     return UnpackAndDecodeValues<OutType, i>(   \
-        in, in_bytes, dict, dict_len, num_values, out, decode_error);
+        in, in_bytes, dict, dict_len, num_values, out, stride, decode_error);
 
   switch (bit_width) {
     // Expand cases from 0 to 32.
@@ -111,25 +111,27 @@ std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_wid
 template <typename OutType, int BIT_WIDTH>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
     bool* __restrict__ decode_error) {
   constexpr int BATCH_SIZE = 32;
   const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
   const int64_t batches_to_read = values_to_read / BATCH_SIZE;
   const int64_t remainder_values = values_to_read % BATCH_SIZE;
   const uint8_t* in_pos = in;
-  OutType* out_pos = out;
+  uint8_t* out_pos = reinterpret_cast<uint8_t*>(out);
   // First unpack as many full batches as possible.
   for (int64_t i = 0; i < batches_to_read; ++i) {
     in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(
-        in_pos, in_bytes, dict, dict_len, out_pos, decode_error);
-    out_pos += BATCH_SIZE;
+        in_pos, in_bytes, dict, dict_len, reinterpret_cast<OutType*>(out_pos), stride,
+        decode_error);
+    out_pos += stride * BATCH_SIZE;
     in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
   }
   // Then unpack the final partial batch.
   if (remainder_values > 0) {
     in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
-        in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, decode_error);
+        in_pos, in_bytes, dict, dict_len, remainder_values,
+        reinterpret_cast<OutType*>(out_pos), stride, decode_error);
   }
   return std::make_pair(in_pos, values_to_read);
 }
@@ -237,7 +239,7 @@ const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restri
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in,
     int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
-    OutType* __restrict__ out, bool* __restrict__ decode_error) {
+    OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
   constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
@@ -250,7 +252,8 @@ const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ i
 #define DECODE_VALUE_CALL(ignore1, i, ignore2)               \
   {                                                          \
     uint32_t idx = UnpackValue<BIT_WIDTH, i>(in);            \
-    DecodeValue(dict, dict_len, idx, &out[i], decode_error); \
+    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + i * stride; \
+    DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \
   }
 
   BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
@@ -301,7 +304,7 @@ const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
-      OutType* __restrict__ out, bool* __restrict__ decode_error) {
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
   constexpr int MAX_BATCH_SIZE = 31;
@@ -326,7 +329,8 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict
 #define DECODE_VALUES_CASE(ignore1, i, ignore2)                   \
   case 31 - i: {                                                  \
     uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer);     \
-    DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \
+    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * stride; \
+    DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \
   }
 
   // Use switch with fall-through cases to minimise branching.

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 67f1a00..a7ef292 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -136,10 +136,11 @@ class BatchedBitReader {
   /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the
   /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is
   /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'.
-  /// Otherwise returns the number of values decoded.
-  template<typename T>
+  /// Otherwise returns the number of values decoded. The values are written to 'v' with
+  /// a stride of 'stride' bytes.
+  template <typename T>
   int UnpackAndDecodeBatch(
-      int bit_width, T* dict, int64_t dict_len, int num_values, T* v);
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride);
 
   /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T
   /// needs to be a little-endian native type and big enough to store 'num_bytes'.

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index 371a81d..aa53c52 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -102,9 +102,9 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
   return static_cast<int>(num_read);
 }
 
-template<typename T>
+template <typename T>
 inline int BatchedBitReader::UnpackAndDecodeBatch(
-      int bit_width, T* dict, int64_t dict_len, int num_values, T* v){
+    int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride) {
   DCHECK(buffer_pos_ != nullptr);
   DCHECK_GE(bit_width, 0);
   DCHECK_LE(bit_width, MAX_BITWIDTH);
@@ -114,7 +114,7 @@ inline int BatchedBitReader::UnpackAndDecodeBatch(
   int64_t num_read;
   bool decode_error = false;
   std::tie(new_buffer_pos, num_read) = BitPacking::UnpackAndDecodeValues(bit_width,
-      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error);
+      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, stride, &decode_error);
   if (UNLIKELY(decode_error)) return -1;
   buffer_pos_ = new_buffer_pos;
   DCHECK_LE(buffer_pos_, buffer_end_);

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 8e0f5df..bf40301 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -28,6 +28,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
 #include "util/bit-util.h"
+#include "util/mem-util.h"
 #include "util/rle-encoding.h"
 
 namespace impala {
@@ -353,6 +354,11 @@ class DictDecoder : public DictDecoderBase {
   /// the string data is from the dictionary buffer passed into the c'tor.
   bool GetNextValue(T* value) WARN_UNUSED_RESULT;
 
+  /// Batched version of GetNextValue(). Reads the next 'count' values into
+  /// 'first_values'. Returns false if the data was invalid and 'count' values could not
+  /// be successfully read. 'stride' is the stride in bytes between each subsequent value.
+  bool GetNextValues(T* first_value, int64_t stride, int count) WARN_UNUSED_RESULT;
+
   /// This function returns the size in bytes of the dictionary vector.
   /// It is used by dict-test.cc for validation of bytes consumed against
   /// memory tracked.
@@ -373,6 +379,11 @@ class DictDecoder : public DictDecoderBase {
   /// 'next_literal_idx_'.
   T decoded_values_[DICT_DECODER_BUFFER_SIZE];
 
+  /// Copy as many as possible literal values, up to 'max_to_copy' from 'decoded_values_'
+  /// to '*out'. Return the number copied and advance '*out'.
+  uint32_t CopyLiteralsToOutput(
+      uint32_t max_to_copy, StrideWriter<T>* RESTRICT out) RESTRICT;
+
   /// Slow path for GetNextValue() where we need to decode new values. Should not be
   /// inlined everywhere.
   bool DecodeNextValue(T* value);
@@ -465,6 +476,77 @@ ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
 }
 
 template <typename T>
+ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValues(
+    T* first_value, int64_t stride, int count) {
+  DCHECK_GE(count, 0);
+  StrideWriter<T> out(first_value, stride);
+  if (num_repeats_ > 0) {
+    // Consume any already-decoded repeated value.
+    int num_to_copy = std::min<uint32_t>(num_repeats_, count);
+    T repeated_val = decoded_values_[0];
+    out.SetNext(repeated_val, num_to_copy);
+    count -= num_to_copy;
+    num_repeats_ -= num_to_copy;
+  } else if (next_literal_idx_ < num_literal_values_) {
+    // Consume any already-decoded literal values.
+    count -= CopyLiteralsToOutput(count, &out);
+  }
+  DCHECK_GE(count, 0);
+  while (count > 0) {
+    uint32_t num_repeats = data_decoder_.NextNumRepeats();
+    if (num_repeats > 0) {
+      // Decode repeats directly to the output.
+      uint32_t num_repeats_to_consume = std::min<uint32_t>(num_repeats, count);
+      uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats_to_consume);
+      if (UNLIKELY(idx >= dict_.size())) return false;
+      T repeated_val = dict_[idx];
+      out.SetNext(repeated_val, num_repeats_to_consume);
+      count -= num_repeats_to_consume;
+    } else {
+      // Decode as many literals as possible directly to the output, buffer the rest.
+      uint32_t num_literals = data_decoder_.NextNumLiterals();
+      if (UNLIKELY(num_literals == 0)) return false;
+      // Case 1: decode the whole literal run directly to the output.
+      // Case 2: decode none or some of the run to the output, buffer some remaining.
+      if (count >= num_literals) { // Case 1
+        if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
+                num_literals, dict_.data(), dict_.size(), &out))) {
+          return false;
+        }
+        count -= num_literals;
+      } else { // Case 2
+        uint32_t num_to_decode = BitUtil::RoundDown(count, 32);
+        if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
+                num_to_decode, dict_.data(), dict_.size(), &out))) {
+          return false;
+        }
+        count -= num_to_decode;
+        DCHECK_GE(count, 0);
+        if (count > 0) {
+          if (UNLIKELY(!DecodeNextValue(out.Advance()))) return false;
+          --count;
+          // Consume any already-decoded literal values.
+          count -= CopyLiteralsToOutput(count, &out);
+        }
+        return true;
+      }
+    }
+  }
+  return true;
+}
+
+template <typename T>
+uint32_t DictDecoder<T>::CopyLiteralsToOutput(
+    uint32_t max_to_copy, StrideWriter<T>* out) {
+  uint32_t num_to_copy =
+      std::min<uint32_t>(num_literal_values_ - next_literal_idx_, max_to_copy);
+  for (uint32_t i = 0; i < num_to_copy; ++i) {
+    out->SetNext(decoded_values_[next_literal_idx_++]);
+  }
+  return num_to_copy;
+}
+
+template <typename T>
 bool DictDecoder<T>::DecodeNextValue(T* value) {
   // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
   // byte aligned for Decimal16Values.
@@ -483,8 +565,9 @@ bool DictDecoder<T>::DecodeNextValue(T* value) {
 
     DCHECK_GT(num_literals, 0);
     int32_t num_to_decode = std::min(num_literals, DICT_DECODER_BUFFER_SIZE);
-    if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
-            num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) {
+    StrideWriter<T> dst(&decoded_values_[0], sizeof(T));
+    if (UNLIKELY(!data_decoder_.DecodeLiteralValues(num_to_decode, dict_.data(),
+            dict_.size(), &dst))) {
       return false;
     }
     num_literal_values_ = num_to_decode;

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index cf438f6..ecf24f5 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -73,10 +73,12 @@ void ValidateDict(const vector<InternalType>& values,
   }
   // Test access to dictionary via internal stream
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
-  for (InternalType i: values) {
+  int count = 0;
+  for (InternalType i : values) {
     InternalType j;
     ASSERT_TRUE(decoder.GetNextValue(&j));
-    EXPECT_EQ(i, j);
+    EXPECT_EQ(i, j) << count;
+    ++count;
   }
   pool.FreeAll();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/mem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/mem-util.h b/be/src/util/mem-util.h
new file mode 100644
index 0000000..a7125a9
--- /dev/null
+++ b/be/src/util/mem-util.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/compiler-util.h"
+
+namespace impala {
+
+/// Utility classes for reading and writing to memory.
+
+
+/// Utility class for writing to unaligned memory containing values of type T separated
+/// by an arbitrary byte stride.
+template <typename T>
+struct StrideWriter {
+  // The next element to write to. May not be aligned to the natural alignment of T.
+  T* current;
+
+  // The stride in bytes between subsequent values.
+  int64_t stride;
+
+  explicit StrideWriter(T* current, int64_t stride) : current(current), stride(stride) {
+    DCHECK(current != nullptr);
+  }
+
+  /// Set the next element to 'val' and advance 'current' to the next element.
+  ALWAYS_INLINE void SetNext(T& val) {
+    // memcpy() is necessary because 'current' may not be aligned.
+    memcpy(current, &val, sizeof(T));
+    SkipNext(1);
+  }
+
+  /// Return a pointer to the current element and advance 'current' to the next element.
+  ALWAYS_INLINE T* Advance() {
+    T* curr = current;
+    SkipNext(1);
+    return curr;
+  }
+
+  /// Set the next 'count' elements to 'val', advancing 'current' by 'count' values.
+  void SetNext(T& val, int64_t count) {
+    for (int64_t i = 0; i < count; ++i) SetNext(val);
+  }
+
+  /// Advance 'current' by 'count' values.
+  ALWAYS_INLINE void SkipNext(int64_t count) {
+    DCHECK_GE(count, 0);
+    current = reinterpret_cast<T*>(reinterpret_cast<uint8_t*>(current) + stride * count);
+  }
+};
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index a621c65..7dc05ab 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -23,6 +23,7 @@
 #include "common/compiler-util.h"
 #include "util/bit-stream-utils.inline.h"
 #include "util/bit-util.h"
+#include "util/mem-util.h"
 #include "util/test-info.h"
 
 namespace impala {
@@ -96,7 +97,7 @@ class RleBatchDecoder {
   RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
     Reset(buffer, buffer_len, bit_width);
   }
-  RleBatchDecoder() : bit_width_(-1) {}
+  RleBatchDecoder() {}
 
   /// Reset the decoder to read from a new buffer.
   void Reset(uint8_t* buffer, int buffer_len, int bit_width);
@@ -107,7 +108,8 @@ class RleBatchDecoder {
 
   /// Get the value of the current repeated run and consume the given number of repeats.
   /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
-  /// be greater than the remaining number of repeats in the run.
+  /// be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
+  /// can be set to 0 to peek at the value without consuming repeats.
   T GetRepeatedValue(int32_t num_repeats_to_consume);
 
   /// Return the size of the current literal run. Returns zero if the current run is
@@ -122,7 +124,7 @@ class RleBatchDecoder {
   bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
 
   /// Consume 'num_literals_to_consume' literals from the current literal run,
-  /// decoding them using 'dict' and outputting them to 'values'.
+  /// decoding them using 'dict' and outputting them to 'out'.
   /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if
   /// the requested number of literals were successfully read or false if an error
   /// was encountered, e.g. the input was truncated or the value was not present
@@ -130,7 +132,7 @@ class RleBatchDecoder {
   /// to read from a new buffer.
   template <typename OutType>
   bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
-      int64_t dict_len, OutType* values) WARN_UNUSED_RESULT;
+      int64_t dict_len, StrideWriter<OutType>* RESTRICT out) WARN_UNUSED_RESULT;
 
   /// Convenience method to get the next value. Not efficient. Returns true on success
   /// or false if no more values can be read from the input or an error was encountered
@@ -144,16 +146,18 @@ class RleBatchDecoder {
  private:
   BatchedBitReader bit_reader_;
 
-  /// Number of bits needed to encode the value. Must be between 0 and 64.
-  int bit_width_;
+  /// Number of bits needed to encode the value. Must be between 0 and 64 after
+  /// the decoder is initialized with a buffer. -1 indicates the decoder was not
+  /// initialized.
+  int bit_width_ = -1;
 
   /// If a repeated run, the number of repeats remaining in the current run to be read.
   /// If the current run is a literal run, this is 0.
-  int32_t repeat_count_;
+  int32_t repeat_count_ = 0;
 
   /// If a literal run, the number of literals remaining in the current run to be read.
   /// If the current run is a repeated run, this is 0.
-  int32_t literal_count_;
+  int32_t literal_count_ = 0;
 
   /// If a repeated run, the current repeated value.
   T repeated_value_;
@@ -166,8 +170,8 @@ class RleBatchDecoder {
   /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
   /// position of the next literal to be read from the buffer.
   T literal_buffer_[LITERAL_BUFFER_LEN];
-  int num_buffered_literals_;
-  int literal_buffer_pos_;
+  int num_buffered_literals_ = 0;
+  int literal_buffer_pos_ = 0;
 
   /// Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
   /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
@@ -191,8 +195,8 @@ class RleBatchDecoder {
   /// 'literal_count_'. Returns the number of literals outputted or 0 if a
   /// decoding error is encountered.
   template <typename OutType>
-  int32_t DecodeBufferedLiterals(
-      int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values);
+  int32_t DecodeBufferedLiterals(int32_t max_to_output, OutType* dict, int64_t dict_len,
+      StrideWriter<OutType>* RESTRICT out);
 };
 
 /// Class to incrementally build the rle data. This class does not allocate any memory.
@@ -608,7 +612,7 @@ inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
 
 template <typename T>
 inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
-  DCHECK_GT(num_repeats_to_consume, 0);
+  DCHECK_GE(num_repeats_to_consume, 0);
   DCHECK_GE(repeat_count_, num_repeats_to_consume);
   repeat_count_ -= num_repeats_to_consume;
   return repeated_value_;
@@ -660,19 +664,20 @@ inline bool RleBatchDecoder<T>::GetLiteralValues(
 
 template <typename T>
 template <typename OutType>
-inline bool RleBatchDecoder<T>::DecodeLiteralValues(
-    int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* values) {
+inline bool RleBatchDecoder<T>::DecodeLiteralValues(int32_t num_literals_to_consume,
+    OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) {
   DCHECK_GE(num_literals_to_consume, 0);
   DCHECK_GE(literal_count_, num_literals_to_consume);
-  int32_t num_consumed = 0;
+
+  int32_t num_remaining = num_literals_to_consume;
   // Decode any buffered literals left over from previous calls.
   if (HaveBufferedLiterals()) {
-    num_consumed =
-        DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values);
+    int32_t num_consumed = DecodeBufferedLiterals(num_remaining, dict, dict_len, out);
     if (UNLIKELY(num_consumed == 0)) return false;
+    DCHECK_LE(num_consumed, num_remaining);
+    num_remaining -= num_consumed;
   }
 
-  int32_t num_remaining = num_literals_to_consume - num_consumed;
   // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
   // Need to round to a batch of 32 if the caller is consuming only part of the current
   // run avoid ending on a non-byte boundery.
@@ -680,20 +685,20 @@ inline bool RleBatchDecoder<T>::DecodeLiteralValues(
       std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
   if (num_to_bypass > 0) {
     int num_read = bit_reader_.UnpackAndDecodeBatch(
-        bit_width_, dict, dict_len, num_to_bypass, values + num_consumed);
+        bit_width_, dict, dict_len, num_to_bypass, out->current, out->stride);
     // If we couldn't read the expected number, that means the input was truncated.
     if (num_read < num_to_bypass) return false;
+    DCHECK_EQ(num_read, num_to_bypass);
     literal_count_ -= num_to_bypass;
-    num_consumed += num_to_bypass;
-    num_remaining = num_literals_to_consume - num_consumed;
+    out->SkipNext(num_to_bypass);
+    num_remaining -= num_to_bypass;
   }
 
   if (num_remaining > 0) {
     // We weren't able to copy all the literals requested directly from the input.
     // Buffer literals and copy over the requested number.
     if (UNLIKELY(!FillLiteralBuffer())) return false;
-    int32_t num_copied =
-        DecodeBufferedLiterals(num_remaining, dict, dict_len, values + num_consumed);
+    int32_t num_copied = DecodeBufferedLiterals(num_remaining, dict, dict_len, out);
     if (UNLIKELY(num_copied == 0)) return false;
     DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
   }
@@ -716,6 +721,7 @@ inline bool RleBatchDecoder<T>::GetSingleValue(T* val) {
 
 template <typename T>
 inline void RleBatchDecoder<T>::NextCounts() {
+  DCHECK_GE(bit_width_, 0) << "RleBatchDecoder must be initialised";
   DCHECK_EQ(0, literal_count_);
   DCHECK_EQ(0, repeat_count_);
   // Read the next run's indicator int, it could be a literal or repeated run.
@@ -737,10 +743,10 @@ inline void RleBatchDecoder<T>::NextCounts() {
     literal_count_ = literal_count;
   } else {
     if (UNLIKELY(run_len == 0)) return;
-    bool result =
-        bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
+    bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
     if (UNLIKELY(!result)) return;
     repeat_count_ = run_len;
+    DCHECK_GE(repeat_count_, 0);
   }
 }
 
@@ -769,14 +775,14 @@ inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
 
 template <typename T>
 template <typename OutType>
-inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(
-    int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) {
+inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(int32_t max_to_output,
+    OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) {
   int32_t num_to_output =
       std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
   for (int32_t i = 0; i < num_to_output; ++i) {
     T idx = literal_buffer_[literal_buffer_pos_ + i];
     if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0;
-    memcpy(&values[i], &dict[idx], sizeof(OutType));
+    out->SetNext(dict[idx]);
   }
   literal_buffer_pos_ += num_to_output;
   literal_count_ -= num_to_output;

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 2189eff..4d122a6 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -390,8 +390,11 @@ def avro_schema(columns):
       type = {"type": "bytes", "logicalType": "decimal", "precision": precision,
               "scale": scale}
     else:
-      hive_type = column_spec.split()[1]
-      type = HIVE_TO_AVRO_TYPE_MAP[hive_type.upper()]
+      hive_type = column_spec.split()[1].upper()
+      if hive_type.startswith('CHAR(') or hive_type.startswith('VARCHAR('):
+        type = 'string'
+      else:
+        type = HIVE_TO_AVRO_TYPE_MAP[hive_type]
 
     record["fields"].append(
       {'name': name,

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 14b37ba..425ee87 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -259,3 +259,25 @@ Example (all dates are 2017-10-29):
 UTC: 00:45:00, 01:00:00, 01:10:00 =>
 CET: 02:45:00, 02:00:00, 02:10:00
 Columns: rawvalue bigint, rowgroup int, millisutc timsestamp, microsutc timestamp
+
+out_of_range_timestamp_hive_211.parquet
+Hive-generated file with an out-of-range timestamp. Generated with Hive 2.1.1 using
+the following query:
+create table alltypes_hive stored as parquet as
+select * from functional.alltypes
+union all
+select -1, false, 0, 0, 0, 0, 0, 0, '', '', cast('1399-01-01 00:00:00' as timestamp), 0, 0
+
+out_of_range_timestamp2_hive_211.parquet
+Hive-generated file with out-of-range timestamps every second value, to exercise code
+paths in Parquet scanner for non-repeated runs. Generated with Hive 2.1.1 using
+the following query:
+create table hive_invalid_timestamps stored as parquet as
+select id,
+  case id % 3
+    when 0 then timestamp_col
+    when 1 then NULL
+    when 2 then cast('1300-01-01 9:9:9' as timestamp)
+  end timestamp_col
+from functional.alltypes
+sort by id

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/data/out_of_range_timestamp2_hive_211.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/out_of_range_timestamp2_hive_211.parquet b/testdata/data/out_of_range_timestamp2_hive_211.parquet
new file mode 100644
index 0000000..9255420
Binary files /dev/null and b/testdata/data/out_of_range_timestamp2_hive_211.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/data/out_of_range_timestamp_hive_211.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/out_of_range_timestamp_hive_211.parquet b/testdata/data/out_of_range_timestamp_hive_211.parquet
new file mode 100644
index 0000000..d084eab
Binary files /dev/null and b/testdata/data/out_of_range_timestamp_hive_211.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 0712b7c..117dc33 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -705,6 +705,22 @@ hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nonnullable.parq \
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+complextypestbl_medium
+---- COLUMNS
+id bigint
+int_array array<int>
+int_array_array array<array<int>>
+int_map map<string, int>
+int_map_array array<map<string, int>>
+nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
+---- DEPENDENT_LOAD_HIVE
+-- This INSERT must run in Hive, because Impala doesn't support inserting into tables
+-- with complex types.
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT c.* FROM functional_parquet.complextypestbl c join functional.alltypes sort by id;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 complextypes_fileformat
 ---- CREATE_HIVE
 -- Used for positive/negative testing of complex types on various file formats.
@@ -2222,3 +2238,59 @@ INSERT into TABLE {db_name}{db_suffix}.{table_name}
 SELECT s, i
 FROM {db_name}.{table_name};
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+manynulls
+---- COLUMNS
+id int
+nullcol int
+---- ALTER
+-- Ensure the nulls are clustered together.
+ALTER TABLE {table_name} SORT BY (id);
+---- CREATE_KUDU
+DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx;
+
+CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
+  kudu_idx BIGINT PRIMARY KEY,
+  id INT,
+  nullcol INT NULL
+)
+PARTITION BY HASH (kudu_idx) PARTITIONS 3 STORED AS KUDU;
+CREATE VIEW {db_name}{db_suffix}.{table_name} AS
+SELECT id, nullcol
+FROM {db_name}{db_suffix}.{table_name}_idx;
+---- DEPENDENT_LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, nullcol
+FROM {db_name}.{table_name};
+---- DEPENDENT_LOAD_KUDU
+INSERT into TABLE {db_name}{db_suffix}.{table_name}_idx
+SELECT row_number() over (order by id),
+       id, nullcol
+FROM {db_name}.{table_name};
+---- LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, if((id div 500) % 2 = 0, NULL, id) as nullcol
+FROM functional.alltypesagg;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+chars_medium
+---- COLUMNS
+id int
+date_char_col char(8)
+char_col char(3)
+date_varchar_col varchar(8)
+varchar_col varchar(3)
+---- DEPENDENT_LOAD
+insert overwrite table {db_name}{db_suffix}.{table_name}
+select id, date_char_col, char_col, date_varchar_col, varchar_col
+from {db_name}.{table_name}
+---- LOAD
+insert overwrite table {db_name}{db_suffix}.{table_name}
+select id, date_string_col, case when id % 3 in (0, 1) then string_col end, date_string_col, case when id % 3 = 0 then string_col end
+from functional.alltypesagg
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index e09ca36..3b356b2 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -71,6 +71,7 @@ table_name:complextypes_multifileformat, constraint:restrict_to, table_format:te
 
 # TODO: Avro
 table_name:complextypestbl, constraint:restrict_to, table_format:parquet/none/none
+table_name:complextypestbl_medium, constraint:restrict_to, table_format:parquet/none/none
 
 table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
 table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
@@ -102,10 +103,11 @@ table_name:view_view, constraint:restrict_to, table_format:seq/snap/block
 table_name:subquery_view, constraint:restrict_to, table_format:seq/snap/block
 table_name:subquery_view, constraint:restrict_to, table_format:rc/none/none
 
-# liketbl and tblwithraggedcolumns all have
+# liketbl, tblwithraggedcolumns and manynulls all have
 # NULLs in primary key columns. hbase does not support
 # writing NULLs to primary key columns.
 table_name:liketbl, constraint:exclude, table_format:hbase/none/none
+table_name:manynulls, constraint:exclude, table_format:hbase/none/none
 table_name:tblwithraggedcolumns, constraint:exclude, table_format:hbase/none/none
 
 # Tables with only one column are not supported in hbase.
@@ -140,8 +142,9 @@ table_name:decimal_tiny, constraint:restrict_to, table_format:orc/def/block
 
 table_name:avro_decimal_tbl, constraint:restrict_to, table_format:avro/snap/block
 
-# TODO first set of tests are for text/none/none
-table_name:chars_tiny, constraint:restrict_to, table_format:text/none/none
+# CHAR is not supported by HBase.
+table_name:chars_tiny, constraint:exclude, table_format:hbase/none/none
+table_name:chars_medium, constraint:exclude, table_format:hbase/none/none
 
 # invalid_decimal_part_tbl[1,2,3] tables are used for testing invalid decimal
 # partition key values (see IMPALA-1040)
@@ -194,6 +197,7 @@ table_name:nullescapedtable, constraint:only, table_format:kudu/none/none
 table_name:decimal_tbl, constraint:only, table_format:kudu/none/none
 table_name:decimal_tiny, constraint:only, table_format:kudu/none/none
 table_name:strings_with_quotes, constraint:only, table_format:kudu/none/none
+table_name:manynulls, constraint:only, table_format:kudu/none/none
 
 # Skipping header lines is only effective with text tables
 table_name:table_with_header, constraint:restrict_to, table_format:text/none/none

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/workloads/functional-query/queries/QueryTest/chars.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/chars.test b/testdata/workloads/functional-query/queries/QueryTest/chars.test
index 6802647..e029b7d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/chars.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/chars.test
@@ -197,3 +197,43 @@ char, char, varchar
 'NULL','b                                                                                                                                           ','NULL'
 'NULL','NULL','NULL'
 ====
+---- QUERY
+# Scan non-NULL CHAR and VARCHAR from larger table.
+select date_char_col, date_varchar_col, count(*)
+from chars_medium
+group by 1, 2
+---- TYPES
+char, string, bigint
+---- HS2_TYPES
+char, varchar, bigint
+---- RESULTS
+'01/01/10','01/01/10',1100
+'01/02/10','01/02/10',1100
+'01/03/10','01/03/10',1100
+'01/10/10','01/10/10',1100
+'01/08/10','01/08/10',1100
+'01/07/10','01/07/10',1100
+'01/04/10','01/04/10',1100
+'01/09/10','01/09/10',1100
+'01/06/10','01/06/10',1100
+'01/05/10','01/05/10',1100
+====
+---- QUERY
+# Scan CHAR and VARCHAR with NULLs from larger table.
+select id, char_col, varchar_col
+from chars_medium
+where id in (3, 1000, 1234, 5678, 1111, 2222, 3333)
+---- TYPES
+int, char, string
+---- HS2_TYPES
+int, char, varchar
+---- RESULTS
+3,'3  ','3'
+1000,'0  ','NULL'
+1111,'111','NULL'
+1234,'234','NULL'
+2222,'NULL','NULL'
+3333,'333','333'
+5678,'NULL','NULL'
+1000,'0  ','NULL'
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-position.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-position.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-position.test
index 8529b4e..76a6e51 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-position.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-scanner-position.test
@@ -644,3 +644,23 @@ where i1.e = 10 or i2.e = 10
 ---- TYPES
 bigint,bigint,bigint,bigint
 ====
+---- QUERY
+# Test materialising position slot in repeated nested collection to exercise
+# code path with repeated definition levels.
+select pos, item, count(*)
+from complextypestbl_medium.int_array
+group by 1, 2
+---- RESULTS
+0,-1,7300
+0,1,7300
+0,NULL,7300
+1,1,7300
+1,2,7300
+2,2,7300
+2,3,7300
+3,NULL,7300
+4,3,7300
+5,NULL,7300
+---- TYPES
+BIGINT, INT, BIGINT
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
index b4c53f7..41d415a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch.test
@@ -189,3 +189,21 @@ order by c_custkey
 ---- TYPES
 bigint, bigint
 ====
+---- QUERY
+# Materialise runs of positions from flattened nested collections. The query is
+# constructed to pick out all of the nations from two of the regions. The position
+# of the nations in the collection may vary, but we expect all positions to be present.
+select pos, count(*)
+from region.r_nations
+where n_name in ('UNITED KINGDOM', 'RUSSIA', 'ROMANIA', 'GERMANY', 'FRANCE',
+                 'SAUDI ARABIA', 'JORDAN', 'IRAQ', 'IRAN', 'EGYPT')
+group by pos
+---- RESULTS
+0,2
+1,2
+2,2
+3,2
+4,2
+---- TYPES
+bigint, bigint
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
new file mode 100644
index 0000000..0201cea
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
@@ -0,0 +1,47 @@
+====
+---- QUERY
+set abort_on_error=0;
+# Spot check some values to make sure the correct results are returned.
+SELECT id, timestamp_col FROM out_of_range_timestamp_hive_211 where id < 5;
+---- TYPES
+INT, TIMESTAMP
+---- RESULTS
+0,2009-01-01 00:00:00
+1,2009-01-01 00:01:00
+2,2009-01-01 00:02:00.100000000
+3,2009-01-01 00:03:00.300000000
+4,2009-01-01 00:04:00.600000000
+-1,NULL
+---- ERRORS
+Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp_hive_211/out_of_range_timestamp_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31.
+====
+---- QUERY
+set abort_on_error=1;
+SELECT id, timestamp_col FROM out_of_range_timestamp_hive_211 where id < 5;
+---- CATCH
+Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp_hive_211/out_of_range_timestamp_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31.
+====
+---- QUERY
+set abort_on_error=0;
+# Spot check some values to make sure the correct results are returned.
+SELECT id, timestamp_col FROM out_of_range_timestamp2_hive_211 where id in (-1, 5, 2000, 2345, 2346, 2347, 4100, 4101, 4102)
+---- TYPES
+INT, TIMESTAMP
+---- RESULTS
+5,NULL
+2000,NULL
+2345,NULL
+2346,2009-08-23 03:46:10.500000000
+2347,NULL
+4100,NULL
+4101,2010-02-15 02:21:06.300000000
+4102,NULL
+---- ERRORS
+Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp2_hive_211/out_of_range_timestamp2_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31. (1 of 2433 similar)
+====
+---- QUERY
+set abort_on_error=1;
+SELECT id, timestamp_col FROM out_of_range_timestamp2_hive_211 where id in (-1, 5, 2000, 2345, 2346, 2347, 4100, 4101, 4102)
+---- CATCH
+Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp2_hive_211/out_of_range_timestamp2_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31.
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test b/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test
new file mode 100644
index 0000000..a82af23
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test
@@ -0,0 +1,47 @@
+====
+---- QUERY
+# Test that we materialize the right number of nulls.
+select count(*),
+  count(id),
+  count(nullcol),
+  sum(nullcol)
+from manynulls
+---- RESULTS
+11000,11000,5500,28870000
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Spot check some values.
+select id, nullcol
+from manynulls
+where id >= 4490 and id <= 4510
+order by id
+---- RESULTS
+4490,NULL
+4490,NULL
+4491,NULL
+4492,NULL
+4493,NULL
+4494,NULL
+4495,NULL
+4496,NULL
+4497,NULL
+4498,NULL
+4499,NULL
+4500,4500
+4500,4500
+4501,4501
+4502,4502
+4503,4503
+4504,4504
+4505,4505
+4506,4506
+4507,4507
+4508,4508
+4509,4509
+4510,4510
+4510,4510
+---- TYPES
+INT,INT
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
index 62f4cd4..dab3918 100644
--- a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
+++ b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
@@ -22,6 +22,7 @@ import pytest
 from subprocess import check_call
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.file_utils import create_table_from_parquet
 from tests.util.filesystem_utils import get_fs_path
 
 class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
@@ -42,6 +43,10 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
         v.get_value('table_format').compression_codec == 'none')
 
   @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
   def setup_class(cls):
     super(TestHiveParquetTimestampConversion, cls).setup_class()
     cls._orig_tz_name = os.getenv('TZ')
@@ -79,7 +84,7 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true "
       "-hdfs_zone_info_zip=%s" % get_fs_path("/test-warehouse/tzdb/2017c.zip"))
-  def test_conversion(self, vector):
+  def test_conversion(self, vector, unique_database):
     tz_name = TestHiveParquetTimestampConversion._test_tz_name
     self.check_sanity(tz_name not in ("UTC", "GMT"))
     # The value read from the Hive table should be the same as reading a UTC converted
@@ -97,6 +102,20 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
         .get_data()
     assert len(data) == 0
 
+    self._test_conversion_with_validation(vector, unique_database)
+
+  def _test_conversion_with_validation(self, vector, unique_database):
+    """Test that timestamp validation also works as expected when converting timestamps.
+    Runs as part of test_conversion() to avoid restarting the cluster."""
+    create_table_from_parquet(self.client, unique_database,
+                              "out_of_range_timestamp_hive_211")
+    create_table_from_parquet(self.client, unique_database,
+                              "out_of_range_timestamp2_hive_211")
+    # Allow tests to override abort_or_error
+    del vector.get_value('exec_option')['abort_on_error']
+    self.run_test_case('QueryTest/out-of-range-timestamp-local-tz-conversion',
+         vector, unique_database)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=false "
       "-hdfs_zone_info_zip=%s" % get_fs_path("/test-warehouse/tzdb/2017c.zip"))

http://git-wip-us.apache.org/repos/asf/impala/blob/153663c2/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index bcb200b..dcbead4 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -87,10 +87,21 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
 
   def test_scanners(self, vector):
     new_vector = deepcopy(vector)
+    # Copy over test dimensions to the matching query options.
     new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
     new_vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
     self.run_test_case('QueryTest/scanners', new_vector)
 
+  def test_many_nulls(self, vector):
+    if vector.get_value('table_format').file_format == 'hbase':
+      # manynulls table not loaded for HBase
+      pytest.skip()
+    # Copy over test dimensions to the matching query options.
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
+    new_vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
+    self.run_test_case('QueryTest/scanners-many-nulls', new_vector)
+
   def test_hdfs_scanner_profile(self, vector):
     if vector.get_value('table_format').file_format in ('kudu', 'hbase') or \
        vector.get_value('exec_option')['num_nodes'] != 0: