You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/04 15:16:58 UTC

[1/8] impala git commit: IMPALA-6959: [DOCS] Update to HAProxy configuration sample code

Repository: impala
Updated Branches:
  refs/heads/2.x ae9c47bb5 -> dd6f28747


IMPALA-6959: [DOCS] Update to HAProxy configuration sample code

- Changed to deprecated timeouts: contimeout, clitimeout, srvtimeout
- Changed the sample timeout values to more realistic values
- Added a note that actual timeout values should depend on
  the user cluster

Change-Id: Idff3aa9bbb58c1953cb7e9394ade01c7833c3a34
Reviewed-on: http://gerrit.cloudera.org:8080/10284
Reviewed-by: Alan Choi <al...@cloudera.com>
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9d36d38a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9d36d38a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9d36d38a

Branch: refs/heads/2.x
Commit: 9d36d38acc0d03ad48aeacdf2a4e05655c625b56
Parents: d5123b7
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed May 2 11:20:08 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_proxy.xml | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9d36d38a/docs/topics/impala_proxy.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_proxy.xml b/docs/topics/impala_proxy.xml
index 588fada..ddf63c3 100644
--- a/docs/topics/impala_proxy.xml
+++ b/docs/topics/impala_proxy.xml
@@ -469,6 +469,9 @@ The setup of the -principal and -be_principal has to be set through safety valve
 # use if not designated in their block
 #
 # You might need to adjust timing values to prevent timeouts.
+#
+# The timeout values should be dependant on how you use the cluster
+# and how long your queries run.
 #---------------------------------------------------------------------
 defaults
     mode                    http
@@ -480,9 +483,9 @@ defaults
     option                  redispatch
     retries                 3
     maxconn                 3000
-    contimeout 5000
-    clitimeout 50000
-    srvtimeout 50000
+    timeout connect 5000
+    timeout client 3600s
+    timeout server 3600s
 
 #
 # This sets up the admin page for HA Proxy at port 25002.


[7/8] impala git commit: IMPALA-4123 (prep): Parquet column reader cleanup

Posted by ta...@apache.org.
IMPALA-4123 (prep): Parquet column reader cleanup

Some miscellaneous cleanup to make it easier to understand and
make future changes to the Parquet scanner.

A lot of the refactoring is about more cleanly separating functions
so that they have clearer purpose, e.g.:
* Functions that strictly do decoding, i.e. materialize values, convert
  and validate them. These are changed to operate on single values, not tuples.
* Functions that are used for the non-batched decoding path (i.e. driven
  by CollectionColumnReader or BoolColumnReader).
* Functions that dispatch to a templated implementation based on one or
  more runtime values.

Other misc changes:
* Move large functions out of class bodies.
* Use parquet::Encoding instead of bool to indicate encoding.
* Add some additional DCHECKs.

Testing:
* Ran exhaustive tests
* Ran fuzz test in a loop

Change-Id: Ibc00352df3a0b2d605f872ae7e43db2dc90faab1
Reviewed-on: http://gerrit.cloudera.org:8080/9799
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/13604bd4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/13604bd4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/13604bd4

Branch: refs/heads/2.x
Commit: 13604bd4e1103559f6644828329f4bd3b6a55d54
Parents: 9d36d38
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Oct 11 23:58:19 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:26 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   |  10 +-
 be/src/exec/parquet-column-readers.cc | 833 ++++++++++++++++-------------
 be/src/exec/parquet-column-readers.h  |  25 +-
 be/src/runtime/tuple.h                |   8 +
 be/src/util/bit-stream-utils.h        |   2 +
 be/src/util/rle-encoding.h            |   2 +
 6 files changed, 482 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b40816d..8cec3d2 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -362,8 +362,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
       Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
       TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
       InitTuple(template_tuple_, dst_tuple);
-      int64_t* dst_slot = reinterpret_cast<int64_t*>(dst_tuple->GetSlot(
-          scan_node_->parquet_count_star_slot_offset()));
+      int64_t* dst_slot =
+          dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
       *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
       row_group_rows_read_ += *dst_slot;
       dst_row->SetTuple(0, dst_tuple);
@@ -1142,7 +1142,11 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
       FILE_CHECK_GE(col_reader->def_level(),
                     col_reader->def_level_of_immediate_repeated_ancestor());
       // Fill in position slot if applicable
-      if (col_reader->pos_slot_desc() != nullptr) col_reader->ReadPosition(tuple);
+      const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
+      if (pos_slot_desc != nullptr) {
+        col_reader->ReadPositionNonBatched(
+            tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
+      }
       continue_execution = col_reader->ReadValue(pool, tuple);
     } else {
       // A containing repeated field is empty or NULL

http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 101f690..f403ce6 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -75,14 +75,18 @@ static int debug_count = 0;
 
 using namespace impala::io;
 
+using parquet::Encoding;
+
 namespace impala {
 
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
     "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
 
 Status ParquetLevelDecoder::Init(const string& filename,
-    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+    Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  DCHECK(*data != nullptr);
+  DCHECK_GE(*data_size, 0);
   DCHECK_GE(num_buffered_values, 0);
   DCHECK_GT(cache_size, 0);
   cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
@@ -97,7 +101,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
 
   int32_t num_bytes = 0;
   switch (encoding) {
-    case parquet::Encoding::RLE: {
+    case Encoding::RLE: {
       Status status;
       if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
         return status;
@@ -136,15 +140,15 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
   num_cached_levels_ = 0;
   cached_level_idx_ = 0;
   // Memory has already been allocated.
-  if (cached_levels_ != NULL) {
+  if (cached_levels_ != nullptr) {
     DCHECK_EQ(cache_size_, cache_size);
     return Status::OK();
   }
 
   cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
-  if (cached_levels_ == NULL) {
+  if (cached_levels_ == nullptr) {
     return pool->mem_tracker()->MemLimitExceeded(
-        NULL, "Definition level cache", cache_size);
+        nullptr, "Definition level cache", cache_size);
   }
   memset(cached_levels_, 0, cache_size);
   cache_size_ = cache_size;
@@ -211,98 +215,41 @@ bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
 /// If MATERIALIZED is true, the column values are materialized into the slot described
 /// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
 /// the position can be accessed.
-template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader : public BaseScalarColumnReader {
  public:
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc),
-      dict_decoder_(parent->scan_node_->mem_tracker()),
-      dict_decoder_init_(false),
-      needs_conversion_(false) {
-    if (!MATERIALIZED) {
-      // We're not materializing any values, just counting them. No need (or ability) to
-      // initialize state used to materialize values.
-      DCHECK(slot_desc_ == NULL);
-      return;
-    }
-
-    DCHECK(slot_desc_ != NULL);
-    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL
-        && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
-      fixed_len_size_ = node.element->type_length;
-    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
-      fixed_len_size_ = slot_desc_->type().len;
-    } else {
-      fixed_len_size_ = -1;
-    }
-    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-        // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
-        // values. Currently all versions have converted values.
-        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
-        slot_desc_->type().type == TYPE_TIMESTAMP &&
-        parent->file_version_.application == "parquet-mr");
-  }
-
+      const SlotDescriptor* slot_desc);
   virtual ~ScalarColumnReader() { }
 
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<true>(tuple);
   }
 
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<false>(tuple);
   }
 
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
   }
 
   virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values);
   }
 
-  virtual DictDecoderBase* GetDictionaryDecoder() {
+  virtual DictDecoderBase* GetDictionaryDecoder() override {
     return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
   }
 
-  virtual bool NeedsConversion() { return NeedsConversionInline(); }
-  virtual bool NeedsValidation() { return NeedsValidationInline(); }
+  virtual bool NeedsConversion() override { return NeedsConversionInline(); }
+  virtual bool NeedsValidation() override { return NeedsValidationInline(); }
 
  protected:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    // NextLevels() should have already been called and def and rep levels should be in
-    // valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_GE(def_level_, 0);
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (MATERIALIZED) {
-      if (def_level_ >= max_def_level()) {
-        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-          if (NeedsConversionInline()) {
-            if (!ReadSlot<true, true>(tuple, pool)) return false;
-          } else {
-            if (!ReadSlot<true, false>(tuple, pool)) return false;
-          }
-        } else {
-          if (NeedsConversionInline()) {
-            if (!ReadSlot<false, true>(tuple, pool)) return false;
-          } else {
-            if (!ReadSlot<false, false>(tuple, pool)) return false;
-          }
-        }
-      } else {
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(Tuple* tuple);
 
   /// Implementation of the ReadValueBatch() functions specialized for this
   /// column reader type. This function drives the reading of data pages and
@@ -315,74 +262,14 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
   /// -fno-strict-alias).
   template <bool IN_COLLECTION>
-  bool ReadValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    // Repetition level is only present if this column is nested in a collection type.
-    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
-    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
-
-    int val_count = 0;
-    bool continue_execution = true;
-    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-      DCHECK_GE(num_buffered_values_, 0);
-      // Read next page if necessary.
-      if (num_buffered_values_ == 0) {
-        if (!NextPage()) {
-          continue_execution = parent_->parse_status_.ok();
-          continue;
-        }
-      }
-
-      // Not materializing anything - skip decoding any levels and rely on the value
-      // count from page metadata to return the correct number of rows.
-      if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(num_buffered_values_, max_values - val_count);
-        val_count += vals_to_add;
-        num_buffered_values_ -= vals_to_add;
-        continue;
-      }
-      // Fill the rep level cache if needed. We are flattening out the fields of the
-      // nested collection into the top-level tuple returned by the scan, so we don't
-      // care about the nesting structure unless the position slot is being populated.
-      if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(
-            rep_levels_.CacheNextBatch(num_buffered_values_));
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      }
-
-      // Fill def level cache if needed.
-      if (!def_levels_.CacheHasNext()) {
-        // TODO: add a fast path here if there's a run of repeated values.
-        parent_->parse_status_.MergeStatus(
-            def_levels_.CacheNextBatch(num_buffered_values_));
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      }
-
-      // Read data page and cached levels to materialize values.
-      int cache_start_idx = def_levels_.CacheCurrIdx();
-      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-      int remaining_val_capacity = max_values - val_count;
-      int ret_val_count = 0;
-      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      } else {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      }
-      val_count += ret_val_count;
-      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
-      if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-        continue_execution &= ColReaderDebugAction(&val_count);
-      }
-    }
-    *num_values = val_count;
-    return continue_execution;
-  }
+  bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
   /// Helper function for ReadValueBatch() above that performs value materialization.
   /// It assumes a data page with remaining values is available, and that the def/rep
-  /// level caches have been populated.
+  /// level caches have been populated. Materializes values into 'tuple_mem' with a
+  /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of
+  /// values materialized in 'num_values'.
   /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
   /// handled in this function.
   /// Use RESTRICT so that the compiler knows that it is safe to cache member
@@ -390,68 +277,18 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
   /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
   /// -fno-strict-alias).
-  template <bool IN_COLLECTION, bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
-  bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    DCHECK(MATERIALIZED || IN_COLLECTION);
-    DCHECK_GT(num_buffered_values_, 0);
-    DCHECK(def_levels_.CacheHasNext());
-    if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext());
-
-    uint8_t* curr_tuple = tuple_mem;
-    int val_count = 0;
-    while (def_levels_.CacheHasNext() && val_count < max_values) {
-      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-      int def_level = def_levels_.CacheGetNext();
-
-      if (IN_COLLECTION) {
-        if (def_level < def_level_of_immediate_repeated_ancestor()) {
-          // A containing repeated field is empty or NULL. Skip the value but
-          // move to the next repetition level if necessary.
-          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
-          continue;
-        }
-        if (pos_slot_desc_ != NULL) {
-          int rep_level = rep_levels_.CacheGetNext();
-          // Reset position counter if we are at the start of a new parent collection.
-          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
-        }
-      }
+  template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
-      if (MATERIALIZED) {
-        if (def_level >= max_def_level()) {
-          bool continue_execution =
-              ReadSlot<IS_DICT_ENCODED, NEEDS_CONVERSION>(tuple, pool);
-          if (UNLIKELY(!continue_execution)) return false;
-        } else {
-          tuple->SetNull(null_indicator_offset_);
-        }
-      }
-      curr_tuple += tuple_size;
-      ++val_count;
-    }
-    *num_values = val_count;
-    return true;
-  }
-
-  // Dispatch to the correct templated implementation of MaterializeValueBatch based
-  // on NeedsConversionInline().
-  template <bool IN_COLLECTION, bool IS_DICT_ENCODED>
-  bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    if (NeedsConversionInline()) {
-      return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, true>(
-          pool, max_values, tuple_size, tuple_mem, num_values);
-    } else {
-      return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, false>(
-          pool, max_values, tuple_size, tuple_mem, num_values);
-    }
-  }
+  /// Same as above, but dispatches to the appropriate templated implementation of
+  /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline().
+  template <bool IN_COLLECTION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
+      DictDecoderBase** decoder) override {
     if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
         return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
             slot_desc_->type().DebugString(), "could not decode dictionary");
@@ -461,78 +298,36 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     return Status::OK();
   }
 
-  virtual bool HasDictionaryDecoder() {
+  virtual bool HasDictionaryDecoder() override {
     return dict_decoder_init_;
   }
 
-  virtual void ClearDictionaryDecoder() {
+  virtual void ClearDictionaryDecoder() override {
     dict_decoder_init_ = false;
   }
 
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    // Data can be empty if the column contains all NULLs
-    DCHECK_GE(size, 0);
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
-        page_encoding_ != parquet::Encoding::PLAIN) {
-      return GetUnsupportedDecodingError();
-    }
-
-    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
-    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) {
-      if (!dict_decoder_init_) {
-        return Status("File corrupt. Missing dictionary page.");
-      }
-      RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
-    }
-
-    // TODO: Perform filter selectivity checks here.
-    return Status::OK();
-  }
+  virtual Status InitDataPage(uint8_t* data, int size) override;
 
  private:
-  /// Writes the next value into the appropriate destination slot in 'tuple' using pool
-  /// if necessary.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
+  /// Writes the next value into the appropriate destination slot in 'tuple'. Returns
+  /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the
+  /// query is cancelled, or the scan node limit was reached. Otherwise returns true.
   ///
   /// Force inlining - GCC does not always inline this into hot loops.
-  template <bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
-  ALWAYS_INLINE bool ReadSlot(Tuple* tuple, MemPool* pool) {
-    void* slot = tuple->GetSlot(tuple_offset_);
-    // Use an uninitialized stack allocation for temporary value to avoid running
-    // constructors doing work unnecessarily, e.g. if T == StringValue.
-    alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
-    InternalType* val_ptr = reinterpret_cast<InternalType*>(
-        NEEDS_CONVERSION ? val_buf : slot);
-    if (IS_DICT_ENCODED) {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
-      if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
-        SetDictDecodeError();
-        return false;
-      }
-    } else {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
-          data_,data_end_,fixed_len_size_, val_ptr);
-      if (UNLIKELY(encoded_len < 0)) {
-        SetPlainDecodeError();
-        return false;
-      }
-      data_ += encoded_len;
-    }
+  template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
 
-    if (UNLIKELY(NeedsValidationInline() && !ValidateSlot(val_ptr, tuple))) {
-      return false;
-    }
-    if (NEEDS_CONVERSION && !tuple->IsNull(null_indicator_offset_)
-            && UNLIKELY(!ConvertSlot(val_ptr, slot, pool))) {
-      return false;
-    }
-    return true;
-  }
+  /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
+  /// This helper is only used on the batched decoding path where we need to reset
+  /// 'pos_current_value_' to 0 based on 'rep_level'.
+  inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos);
+
+  /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte
+  /// past the end of the buffer. Return false and set 'parse_error_' if there is an
+  /// error decoding the value.
+  template <Encoding::type ENCODING>
+  inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end,
+      InternalType* RESTRICT val) RESTRICT;
 
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
@@ -551,14 +346,15 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
-  bool ConvertSlot(const InternalType* src, void* slot, MemPool* pool) {
+  bool ConvertSlot(const InternalType* src, void* slot) {
     DCHECK(false);
     return false;
   }
 
-  /// Sets error message and returns false if the slot value is invalid, e.g., due to
-  /// being out of the valid value range.
-  bool ValidateSlot(InternalType* src, Tuple* tuple) const {
+  /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
+  /// is invalid, logs the error and returns false. If the error should stop execution,
+  /// sets 'parent_->parse_status_'.
+  bool ValidateValue(InternalType* val) const {
     DCHECK(false);
     return false;
   }
@@ -578,26 +374,301 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
-  bool dict_decoder_init_;
+  bool dict_decoder_init_ = false;
 
   /// true if decoded values must be converted before being written to an output tuple.
-  bool needs_conversion_;
+  bool needs_conversion_ = false;
 
   /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
 };
 
-template<>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
+    HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
+  : BaseScalarColumnReader(parent, node, slot_desc),
+    dict_decoder_(parent->scan_node_->mem_tracker()) {
+  if (!MATERIALIZED) {
+    // We're not materializing any values, just counting them. No need (or ability) to
+    // initialize state used to materialize values.
+    DCHECK(slot_desc_ == nullptr);
+    return;
+  }
+
+  DCHECK(slot_desc_ != nullptr);
+  DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
+  if (slot_desc_->type().type == TYPE_DECIMAL
+      && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    fixed_len_size_ = node.element->type_length;
+  } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+    fixed_len_size_ = slot_desc_->type().len;
+  } else {
+    fixed_len_size_ = -1;
+  }
+  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
+      // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
+      // values. Currently all versions have converted values.
+      (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+      slot_desc_->type().type == TYPE_TIMESTAMP &&
+      parent->file_version_.application == "parquet-mr");
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
+    uint8_t* data, int size) {
+  // Data can be empty if the column contains all NULLs
+  DCHECK_GE(size, 0);
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+  if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
+      page_encoding_ != parquet::Encoding::PLAIN) {
+    return GetUnsupportedDecodingError();
+  }
+
+  // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does
+  // not need to be initialized.
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
+    if (!dict_decoder_init_) {
+      return Status("File corrupt. Missing dictionary page.");
+    }
+    RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
+  }
+  // TODO: Perform filter selectivity checks here.
+  return Status::OK();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
+    Tuple* tuple) {
+  // NextLevels() should have already been called and def and rep levels should be in
+  // valid range.
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (MATERIALIZED) {
+    if (def_level_ >= max_def_level()) {
+      bool continue_execution;
+      if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
+            ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
+      } else {
+        DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN, true>(tuple) :
+            ReadSlot<Encoding::PLAIN, false>(tuple);
+      }
+      if (!continue_execution) return false;
+    } else {
+      tuple->SetNull(null_indicator_offset_);
+    }
+  }
+  return NextLevels<IN_COLLECTION>();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Repetition level is only present if this column is nested in a collection type.
+  if (IN_COLLECTION) {
+    DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
+  } else {
+    DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+  }
+
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    DCHECK_GE(num_buffered_values_, 0);
+    // Read next page if necessary.
+    if (num_buffered_values_ == 0) {
+      if (!NextPage()) {
+        continue_execution = parent_->parse_status_.ok();
+        continue;
+      }
+    }
+
+    // Not materializing anything - skip decoding any levels and rely on the value
+    // count from page metadata to return the correct number of rows.
+    if (!MATERIALIZED && !IN_COLLECTION) {
+      int vals_to_add = min(num_buffered_values_, max_values - val_count);
+      val_count += vals_to_add;
+      num_buffered_values_ -= vals_to_add;
+      continue;
+    }
+    // Fill the rep level cache if needed. We are flattening out the fields of the
+    // nested collection into the top-level tuple returned by the scan, so we don't
+    // care about the nesting structure unless the position slot is being populated.
+    if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+      parent_->parse_status_.MergeStatus(
+          rep_levels_.CacheNextBatch(num_buffered_values_));
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    }
+
+    // Fill def level cache if needed.
+    if (!def_levels_.CacheHasNext()) {
+      // TODO: add a fast path here if there's a run of repeated values.
+      parent_->parse_status_.MergeStatus(
+          def_levels_.CacheNextBatch(num_buffered_values_));
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    }
+
+    // Read data page and cached levels to materialize values.
+    uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+    int remaining_val_capacity = max_values - val_count;
+    int ret_val_count = 0;
+    continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+        remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+    val_count += ret_val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= ColReaderDebugAction(&val_count);
+    }
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  DCHECK(MATERIALIZED || IN_COLLECTION);
+  DCHECK_GT(num_buffered_values_, 0);
+  DCHECK(def_levels_.CacheHasNext());
+  if (IN_COLLECTION && pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  const int cache_start_idx = def_levels_.CacheCurrIdx();
+  uint8_t* curr_tuple = tuple_mem;
+  int val_count = 0;
+  while (def_levels_.CacheHasNext() && val_count < max_values) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    int def_level = def_levels_.CacheGetNext();
+
+    if (IN_COLLECTION) {
+      if (def_level < def_level_of_immediate_repeated_ancestor()) {
+        // A containing repeated field is empty or NULL. Skip the value but
+        // move to the next repetition level if necessary.
+        if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
+        continue;
+      }
+      if (pos_slot_desc_ != nullptr) {
+        ReadPositionBatched(rep_levels_.CacheGetNext(),
+            tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
+      }
+    }
+
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
+        if (UNLIKELY(!continue_execution)) return false;
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+    curr_tuple += tuple_size;
+    ++val_count;
+  }
+  num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+  *num_values = val_count;
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Dispatch to the correct templated implementation of MaterializeValueBatch().
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
+    Tuple* RESTRICT tuple) RESTRICT {
+  void* slot = tuple->GetSlot(tuple_offset_);
+  // Use an uninitialized stack allocation for temporary value to avoid running
+  // constructors doing work unnecessarily, e.g. if T == StringValue.
+  alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+  InternalType* val_ptr =
+      reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
+
+  if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
+  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+    if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    // The value is invalid but execution should continue - set the null indicator and
+    // skip conversion.
+    tuple->SetNull(null_indicator_offset_);
+    return true;
+  }
+  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
+    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+    InternalType* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, ENCODING);
+  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(ENCODING, Encoding::PLAIN);
+    int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        *data, data_end, fixed_len_size_, val);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    *data += encoded_len;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
+    ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
+  // Reset position counter if we are at the start of a new parent collection.
+  if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+  *pos = pos_current_value_++;
+}
+
+template <>
 inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
 ::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
-template<>
+template <>
 bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
-    const StringValue* src, void* slot, MemPool* pool) {
-  DCHECK(slot_desc() != NULL);
+    const StringValue* src, void* slot) {
+  DCHECK(slot_desc() != nullptr);
   DCHECK(slot_desc()->type().type == TYPE_CHAR);
   int char_len = slot_desc()->type().len;
   int unpadded_len = min(char_len, src->len);
@@ -607,15 +678,15 @@ bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSl
   return true;
 }
 
-template<>
+template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
-template<>
+template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
-    const TimestampValue* src, void* slot, MemPool* pool) {
+    const TimestampValue* src, void* slot) {
   // Conversion should only happen when this flag is enabled.
   DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
   TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
@@ -624,24 +695,21 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot
   return true;
 }
 
-template<>
+template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsValidationInline() const {
   return true;
 }
 
-template<>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateSlot(
-    TimestampValue* src, Tuple* tuple) const {
-  if (UNLIKELY(!TimestampValue::IsValidDate(src->date()))) {
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
+    TimestampValue* val) const {
+  if (UNLIKELY(!TimestampValue::IsValidDate(val->date()))) {
     ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
         filename(), node_.element->name);
     Status status = parent_->state_->LogOrReturnError(msg);
-    if (!status.ok()) {
-      parent_->parse_status_ = status;
-      return false;
-    }
-    tuple->SetNull(null_indicator_offset_);
+    if (!status.ok()) parent_->parse_status_ = status;
+    return false;
   }
   return true;
 }
@@ -651,107 +719,45 @@ class BoolColumnReader : public BaseScalarColumnReader {
   BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : BaseScalarColumnReader(parent, node, slot_desc) {
-    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
+    if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
   }
 
   virtual ~BoolColumnReader() { }
 
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<true>(tuple);
   }
 
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<false>(tuple);
   }
 
  protected:
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
+      DictDecoderBase** decoder) override {
     DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
                   << "have gotten this far.";
     return Status::OK();
   }
 
-  virtual bool HasDictionaryDecoder() {
+  virtual bool HasDictionaryDecoder() override {
     // Decoder should never be created for bools.
     return false;
   }
 
-  virtual void ClearDictionaryDecoder() { }
+  virtual void ClearDictionaryDecoder() override { }
 
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    // Only the relevant decoder is initialized for a given data page.
-    switch (page_encoding_) {
-      case parquet::Encoding::PLAIN:
-        bool_values_.Reset(data, size);
-        break;
-      case parquet::Encoding::RLE:
-        // The first 4 bytes contain the size of the encoded data. This information is
-        // redundant, as this is the last part of the data page, and the number of
-        // remaining bytes is already known.
-        rle_decoder_.Reset(data + 4, size - 4, 1);
-        break;
-      default:
-        return GetUnsupportedDecodingError();
-    }
-    num_unpacked_values_ = 0;
-    unpacked_value_idx_ = 0;
-    return Status::OK();
-  }
+  virtual Status InitDataPage(uint8_t* data, int size) override;
 
  private:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    DCHECK(slot_desc_ != NULL);
-    // Def and rep levels should be in valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_GE(def_level_, 0);
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (def_level_ >= max_def_level()) {
-      return ReadSlot<IN_COLLECTION>(tuple, pool);
-    } else {
-      // Null value
-      tuple->SetNull(null_indicator_offset_);
-      return NextLevels<IN_COLLECTION>();
-    }
-  }
-
-  /// Writes the next value into the next slot in the *tuple using pool if necessary.
-  /// Also advances def_level_ and rep_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  template<bool IN_COLLECTION>
-  inline bool ReadSlot(Tuple* tuple, MemPool* pool)  {
-    void* slot = tuple->GetSlot(tuple_offset_);
-    bool val;
-    if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
-      val = unpacked_values_[unpacked_value_idx_++];
-    } else {
-      // Unpack as many values as we can into the buffer. We expect to read at least one
-      // value.
-      if (page_encoding_ == parquet::Encoding::PLAIN) {
-        num_unpacked_values_ =
-            bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
-      } else {
-        num_unpacked_values_ =
-            rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
-      }
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(Tuple* tuple);
 
-      if (UNLIKELY(num_unpacked_values_ == 0)) {
-        parent_->parse_status_ = Status("Invalid bool column.");
-        return false;
-      }
-      val = unpacked_values_[0];
-      unpacked_value_idx_ = 1;
-    }
-    *reinterpret_cast<bool*>(slot) = val;
-    return NextLevels<IN_COLLECTION>();
-  }
+  /// Decodes the next value into 'value'. Returns false and sets
+  /// 'parent_->parse_status_' if an error is encountered decoding the
+  /// value. Otherwise returns true.
+  template <bool IN_COLLECTION>
+  inline bool DecodeValue(bool* value);
 
   /// A buffer to store unpacked values. Must be a multiple of 32 size to use the
   /// batch-oriented interface of BatchedBitReader.
@@ -771,6 +777,71 @@ class BoolColumnReader : public BaseScalarColumnReader {
   RleBatchDecoder<bool> rle_decoder_;
 };
 
+Status BoolColumnReader::InitDataPage(uint8_t* data, int size) {
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+  // Only the relevant decoder is initialized for a given data page.
+  switch (page_encoding_) {
+    case parquet::Encoding::PLAIN:
+      bool_values_.Reset(data, size);
+      break;
+    case parquet::Encoding::RLE:
+      // The first 4 bytes contain the size of the encoded data. This information is
+      // redundant, as this is the last part of the data page, and the number of
+      // remaining bytes is already known.
+      rle_decoder_.Reset(data + 4, size - 4, 1);
+      break;
+    default:
+      return GetUnsupportedDecodingError();
+  }
+  num_unpacked_values_ = 0;
+  unpacked_value_idx_ = 0;
+  return Status::OK();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::ReadValue(Tuple* tuple) {
+  DCHECK(slot_desc_ != nullptr);
+  // Def and rep levels should be in valid range.
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (def_level_ >= max_def_level()) {
+    bool* slot = tuple->GetBoolSlot(tuple_offset_);
+    if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false;
+  } else {
+    // Null value
+    tuple->SetNull(null_indicator_offset_);
+  }
+  return NextLevels<IN_COLLECTION>();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::DecodeValue(bool* value) {
+  if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
+    *value = unpacked_values_[unpacked_value_idx_++];
+  } else {
+    // Unpack as many values as we can into the buffer. We expect to read at least one
+    // value.
+    if (page_encoding_ == parquet::Encoding::PLAIN) {
+      num_unpacked_values_ =
+          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+    } else {
+      num_unpacked_values_ =
+          rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+    }
+
+    if (UNLIKELY(num_unpacked_values_ == 0)) {
+      parent_->parse_status_ = Status("Invalid bool column.");
+      return false;
+    }
+    *value = unpacked_values_[0];
+    unpacked_value_idx_ = 1;
+  }
+  return true;
+}
+
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
 // path doesn't falsely report that the file is corrupted.
 bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
@@ -802,7 +873,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
       continue;
     }
     // Fill in position slot if applicable
-    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
+    if (pos_slot_desc_ != nullptr) {
+      ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    }
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
@@ -834,17 +907,14 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
   return continue_execution;
 }
 
-void ParquetColumnReader::ReadPosition(Tuple* tuple) {
-  DCHECK(pos_slot_desc() != NULL);
+void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
   // NextLevels() should have already been called
   DCHECK_GE(rep_level_, 0);
   DCHECK_GE(def_level_, 0);
   DCHECK_GE(pos_current_value_, 0);
   DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
       "Caller should have called NextLevels() until we are ready to read a value";
-
-  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
+  *pos = pos_current_value_++;
 }
 
 // In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
@@ -1068,8 +1138,8 @@ Status BaseScalarColumnReader::InitDictionary() {
     }
   }
   if (dict_header != nullptr &&
-      dict_header->encoding != parquet::Encoding::PLAIN &&
-      dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+      dict_header->encoding != Encoding::PLAIN &&
+      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
     return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
                   "for dictionary pages.");
   }
@@ -1223,7 +1293,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
     num_values_read_ += num_buffered_values_;
 
     int uncompressed_size = current_page_header_.uncompressed_page_size;
-    if (decompressor_.get() != NULL) {
+    if (decompressor_.get() != nullptr) {
       SCOPED_TIMER(parent_->decompress_timer_);
       uint8_t* decompressed_buffer;
       RETURN_IF_ERROR(AllocateUncompressedDataPage(
@@ -1294,7 +1364,7 @@ Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
   return Status::OK();
 }
 
-template<bool ADVANCE_REP_LEVEL>
+template <bool ADVANCE_REP_LEVEL>
 bool BaseScalarColumnReader::NextLevels() {
   if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
 
@@ -1382,7 +1452,7 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
   if (tuple_offset_ == -1) {
     return CollectionColumnReader::NextLevels();
   } else if (def_level_ >= max_def_level()) {
-    return ReadSlot(tuple, pool);
+    return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
   } else {
     // Null value
     tuple->SetNull(null_indicator_offset_);
@@ -1390,21 +1460,18 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
   }
 }
 
-bool CollectionColumnReader::ReadNonRepeatedValue(
-    MemPool* pool, Tuple* tuple) {
+bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
   return CollectionColumnReader::ReadValue(pool, tuple);
 }
 
-bool CollectionColumnReader::ReadSlot(Tuple* tuple, MemPool* pool) {
+bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
   DCHECK(!children_.empty());
   DCHECK_LE(rep_level_, new_collection_rep_level());
 
   // Recursively read the collection into a new CollectionValue.
-  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(
-      tuple->GetSlot(tuple_offset_));
-  *coll_slot = CollectionValue();
+  *slot = CollectionValue();
   CollectionValueBuilder builder(
-      coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+      slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
   bool continue_execution = parent_->AssembleCollection(
       children_, new_collection_rep_level(), &builder);
   if (!continue_execution) return false;
@@ -1483,11 +1550,11 @@ static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
 ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
     bool is_collection_field, const SlotDescriptor* slot_desc,
     HdfsParquetScanner* parent) {
-  ParquetColumnReader* reader = NULL;
+  ParquetColumnReader* reader = nullptr;
   if (is_collection_field) {
     // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
     reader = new CollectionColumnReader(parent, node, slot_desc);
-  } else if (slot_desc != NULL) {
+  } else if (slot_desc != nullptr) {
     // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
     switch (slot_desc->type().type) {
       case TYPE_BOOLEAN:

http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 28138bd..e720375 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -232,10 +232,12 @@ class ParquetColumnReader {
   /// true.
   virtual bool NextLevels() = 0;
 
-  /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to
-  /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into
-  /// 'tuple') and increments pos_current_value_.
-  void ReadPosition(Tuple* tuple);
+  /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the
+  /// parent collection) to 'pos' and increments pos_current_value_. Only valid to
+  /// call when doing non-batched reading, i.e. NextLevels() must have been called
+  /// before each call to this function to advance to the next element in the
+  /// collection.
+  void ReadPositionNonBatched(int64_t* pos);
 
   /// Returns true if this column reader has reached the end of the row group.
   inline bool RowGroupAtEnd() {
@@ -251,7 +253,7 @@ class ParquetColumnReader {
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
-  const SlotDescriptor* slot_desc_;
+  const SlotDescriptor* const slot_desc_;
 
   /// The slot descriptor for the position field of the tuple, if there is one. NULL if
   /// there's not. Only one column reader for a given tuple desc will have this set.
@@ -272,17 +274,17 @@ class ParquetColumnReader {
   /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
   /// are accessed in inner loops.
   int16_t rep_level_;
-  int16_t max_rep_level_;
+  const int16_t max_rep_level_;
   int16_t def_level_;
-  int16_t max_def_level_;
+  const int16_t max_def_level_;
 
   // Cache frequently accessed members of slot_desc_ for perf.
 
   /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
-  int tuple_offset_;
+  const int tuple_offset_;
 
   /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
-  NullIndicatorOffset null_indicator_offset_;
+  const NullIndicatorOffset null_indicator_offset_;
 
   ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
@@ -579,13 +581,12 @@ class CollectionColumnReader : public ParquetColumnReader {
   void UpdateDerivedState();
 
   /// Recursively reads from children_ to assemble a single CollectionValue into
-  /// the appropriate destination slot in 'tuple'. Also advances rep_level_ and
-  /// def_level_ via NextLevels().
+  /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels().
   ///
   /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
   /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
   /// true.
-  inline bool ReadSlot(Tuple* tuple, MemPool* pool);
+  inline bool ReadSlot(CollectionValue* slot, MemPool* pool);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index ae9be32..68f313d 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -243,6 +243,14 @@ class Tuple {
     return reinterpret_cast<const char*>(this) + offset;
   }
 
+  bool* GetBoolSlot(int offset) {
+    return static_cast<bool*>(GetSlot(offset));
+  }
+
+  int64_t* GetBigIntSlot(int offset) {
+    return static_cast<int64_t*>(GetSlot(offset));
+  }
+
   StringValue* GetStringSlot(int offset) {
     DCHECK(offset != -1);  // -1 offset indicates non-materialized slot
     return reinterpret_cast<StringValue*>(reinterpret_cast<char*>(this) + offset);

http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index bac127a..e527dc8 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -114,6 +114,8 @@ class BatchedBitReader {
   /// Resets the read to start reading from the start of 'buffer'. The buffer's
   /// length is 'buffer_len'. Does not take ownership of the buffer.
   void Reset(const uint8_t* buffer, int64_t buffer_len) {
+    DCHECK(buffer != nullptr);
+    DCHECK_GE(buffer_len, 0);
     buffer_pos_ = buffer;
     buffer_end_ = buffer + buffer_len;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 83b6315..1491066 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -471,6 +471,8 @@ inline void RleEncoder::Clear() {
 
 template <typename T>
 inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
   DCHECK_GE(bit_width, 0);
   DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
   bit_reader_.Reset(buffer, buffer_len);


[6/8] impala git commit: Fix minor perf bug in BufferedTupleStream

Posted by ta...@apache.org.
Fix minor perf bug in BufferedTupleStream

The code referenced a member variable when the intent was clearly to use
the template argument.

Change-Id: I05bae0018f79aec9c6014ae228ff28621b548860
Reviewed-on: http://gerrit.cloudera.org:8080/10239
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ad33cf55
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ad33cf55
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ad33cf55

Branch: refs/heads/2.x
Commit: ad33cf55566eae3ca8b305d1c1bf0a9a48165d4c
Parents: ae9c47b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Apr 30 09:08:11 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ad33cf55/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index f5668c7..9326507 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -1038,7 +1038,7 @@ template <bool HAS_NULLABLE_TUPLE>
 void BufferedTupleStream::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
   const int tuples_per_row = desc_->tuple_descriptors().size();
   uint8_t* ptr = *data;
-  if (has_nullable_tuple_) {
+  if (HAS_NULLABLE_TUPLE) {
     // Stitch together the tuples from the page and the NULL ones.
     const uint8_t* null_indicators = ptr;
     ptr += NullIndicatorBytesPerRow();


[8/8] impala git commit: IMPALA-6961: [DOCS] Doc --enable_minidump flag to disable minidumps

Posted by ta...@apache.org.
IMPALA-6961: [DOCS] Doc --enable_minidump flag to disable minidumps

Change-Id: I3412e36272cda0c1502d4643afcdbad01e9548a5
Reviewed-on: http://gerrit.cloudera.org:8080/10285
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/dd6f2874
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/dd6f2874
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/dd6f2874

Branch: refs/heads/2.x
Commit: dd6f2874704b53ffa01897f761e03500d7ceee6c
Parents: 13604bd
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed May 2 14:38:17 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 23:46:23 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_breakpad.xml | 24 ++++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/dd6f2874/docs/topics/impala_breakpad.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_breakpad.xml b/docs/topics/impala_breakpad.xml
index 58bd093..6bb7da6 100644
--- a/docs/topics/impala_breakpad.xml
+++ b/docs/topics/impala_breakpad.xml
@@ -61,10 +61,26 @@ under the License.
     <title>Enabling or Disabling Minidump Generation</title>
     <conbody>
       <p>
-        By default, a minidump file is generated when an Impala-related daemon crashes.
-        To turn off generation of the minidump files, change the
-        <uicontrol>minidump_path</uicontrol> configuration setting of one or more Impala-related daemons
-        to the empty string, and restart the corresponding services or daemons.
+        By default, a minidump file is generated when an Impala-related daemon
+        crashes.
+      </p>
+
+      <p>
+        To turn off generation of the minidump files, use one of the following
+        options:
+
+        <ul>
+          <li>
+            Set the <codeph>--enable_minidumps</codeph> configuration setting
+            to <codeph>false</codeph>. Restart the corresponding services or
+            daemons.
+          </li>
+
+          <li>
+            Set the <codeph>--minidump_path</codeph> configuration setting to
+            an empty string. Restart the corresponding services or daemons.
+          </li>
+        </ul>
       </p>
 
       <p rev="IMPALA-3677">


[5/8] impala git commit: IMPALA-6954: Fix problems with CTAS into Kudu with an expr rewrite

Posted by ta...@apache.org.
IMPALA-6954: Fix problems with CTAS into Kudu with an expr rewrite

This patch fixes two problems:
- Previously a CTAS into a Kudu table where an expr rewrite occurred
  would create an unpartitioned table, due to the partition info being
  reset in TableDataLayout and then never reconstructed. Since the
  Kudu partition info is set by the parser and never changes, the
  solution is to not reset it.
- Previously a CTAS into a Kudu table with a range partition where an
  expr rewrite occurred would fail with an analysis exception due to
  a Precondition check in RangePartition.analyze that checked that
  the RangePartition wasn't already analyzed, as the analysis can't
  be done twice. Since the state in RangePartition never changes, it
  doesn't need to be reanalyzed and we can just return instead of
  failing on the check.

Testing:
- Added an e2e test that creates a partitioned Kudu table with a CTAS
  with a rewrite, and checks that the expected partitions are created.

Change-Id: I731743bd84cc695119e99342e1b155096147f0ed
Reviewed-on: http://gerrit.cloudera.org:8080/10251
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bb4a59a7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bb4a59a7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bb4a59a7

Branch: refs/heads/2.x
Commit: bb4a59a73ca2d1fc93d503b25e560178a09627d8
Parents: 5ba4643
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Apr 30 15:32:30 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/CreateTableAsSelectStmt.java   |  3 +++
 .../java/org/apache/impala/analysis/RangePartition.java   |  5 +++--
 .../java/org/apache/impala/analysis/TableDataLayout.java  |  5 -----
 fe/src/main/java/org/apache/impala/analysis/TableDef.java |  1 -
 .../functional-query/queries/QueryTest/kudu_create.test   | 10 ++++++++++
 5 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bb4a59a7/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index aac6873..5c8d939 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -253,6 +253,9 @@ public class CreateTableAsSelectStmt extends StatementBase {
   public void reset() {
     super.reset();
     createStmt_.reset();
+    // This is populated for CTAS in analyze(), so it needs to be cleared here. For other
+    // types of CreateTableStmts it is set by the parser and should not be reset.
+    createStmt_.getPartitionColumnDefs().clear();
     insertStmt_.reset();
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/bb4a59a7/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
index caac462..9ed5da8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
+++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
@@ -121,8 +121,9 @@ public class RangePartition implements ParseNode {
   public void analyze(Analyzer analyzer, List<ColumnDef> partColDefs)
       throws AnalysisException {
     // Reanalyzing not supported because TIMESTAMPs are converted to BIGINT (unixtime
-    // micros) in place.
-    Preconditions.checkArgument(!isAnalyzed_);
+    // micros) in place. We can just return because none of the state will have changed
+    // since the first time we did the analysis.
+    if (isAnalyzed_) return;
     analyzeBoundaryValues(lowerBound_, partColDefs, analyzer);
     if (!isSingletonRange_) {
       analyzeBoundaryValues(upperBound_, partColDefs, analyzer);

http://git-wip-us.apache.org/repos/asf/impala/blob/bb4a59a7/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
index fea809d..aef5732 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -52,9 +52,4 @@ class TableDataLayout {
 
   List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
   List<KuduPartitionParam> getKuduPartitionParams() { return kuduPartitionParams_; }
-
-  public void reset() {
-    partitionColDefs_.clear();
-    kuduPartitionParams_.clear();
-  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/bb4a59a7/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 948515b..529594b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -161,7 +161,6 @@ class TableDef {
 
   public void reset() {
     primaryKeyColDefs_.clear();
-    dataLayout_.reset();
     columnDefs_.clear();
     isAnalyzed_ = false;
     generatedKuduProperties_.clear();

http://git-wip-us.apache.org/repos/asf/impala/blob/bb4a59a7/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
index 394157c..9d17055 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
@@ -336,3 +336,13 @@ select * from ctas_decimal;
 ---- TYPES
 DECIMAL,DECIMAL,DECIMAL,DECIMAL,DECIMAL,DECIMAL
 ====
+---- QUERY
+# IMPALA-6954: CTAS with an expr rewrite.
+create table ctas_rewrite primary key(id)
+partition by range(id) (partition 0 <= values < 100) stored as kudu
+as select id, tinyint_col from functional.alltypes
+where id between 0 and 1;
+show range partitions ctas_rewrite;
+---- RESULTS
+'0 <= VALUES < 100'
+====
\ No newline at end of file


[4/8] impala git commit: IMPALA-6931: reduces races in query expiration tests

Posted by ta...@apache.org.
IMPALA-6931: reduces races in query expiration tests

Recent tests ran into flakiness when testing query expiration.
This change makes two changes:
1) query state is retrieved earlier; a flaky test skipped
the expected state.
2) bump the timing; a flaky test had queries expire before
it could check them

Change-Id: I93f4ec450fc7e5a685c135b444e90d37e632831d
Reviewed-on: http://gerrit.cloudera.org:8080/10279
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d5123b76
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d5123b76
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d5123b76

Branch: refs/heads/2.x
Commit: d5123b76b591a1f94eba74076f5dd8765ada6b52
Parents: bb4a59a
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Tue May 1 10:28:47 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 tests/custom_cluster/test_query_expiration.py | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d5123b76/tests/custom_cluster/test_query_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py
index ed2a636..0096d81 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -38,7 +38,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
     assert actual == expected
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_query_timeout=6 --logbuflevel=-1")
+  @CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
   def test_query_expiration(self, vector):
     """Confirm that single queries expire if not fetched"""
     impalad = self.cluster.get_first_impalad()
@@ -46,7 +46,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
     num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
     handles = []
 
-    # This query will time out with the default idle timeout (6s).
+    # This query will time out with the default idle timeout (8s).
     query1 = "SELECT SLEEP(1000000)"
     default_timeout_expire_handle = client.execute_async(query1)
     handles.append(default_timeout_expire_handle)
@@ -109,7 +109,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
     # Check that we didn't wait too long to be expired (double the timeout is sufficiently
     # large to avoid most noise in measurement)
-    assert time() - before < 12
+    assert time() - before < 16
 
     client.execute("SET QUERY_TIMEOUT_S=0")
     # Synchronous execution; calls fetch() and query should not time out.
@@ -176,11 +176,9 @@ class TestQueryExpiration(CustomClusterTestSuite):
     """Try to fetch 'expected_state' from 'client' within 'timeout' seconds.
     Fail if unable."""
     start_time = time()
-    actual_state = None
-    while (time() - start_time < timeout):
+    actual_state = client.get_state(handle)
+    while (actual_state != expected_state and time() - start_time < timeout):
       actual_state = client.get_state(handle)
-      if actual_state == expected_state:
-        break
     assert expected_state == actual_state
 
   @pytest.mark.execute_serially


[3/8] impala git commit: Speed up Python dependencies.

Posted by ta...@apache.org.
Speed up Python dependencies.

This parallelizes downloading some Python libraries, giving a speedup of
$IMPALA_HOME/infra/python/deps/download_requirements.  I've seen this
take from 7-15 seconds before and from 2-5 seconds after.

I also checked that we always have at least Python 2.6 when
building Impala, so I was able to remove the try/except
handling in bootstrap_toolchain.

Change-Id: I7cbf622adb7d037f1a53c519402dcd8ae3c0fe30
Reviewed-on: http://gerrit.cloudera.org:8080/10234
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/df3b5463
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/df3b5463
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/df3b5463

Branch: refs/heads/2.x
Commit: df3b5463b8d04881cce9da09a8942c047a53c6e5
Parents: ad33cf5
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Apr 23 11:16:42 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 bin/bootstrap_toolchain.py        | 18 +++++----------
 infra/python/deps/pip_download.py | 42 +++++++++++++++++-----------------
 2 files changed, 27 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/df3b5463/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index f54bf04..6070350 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -35,6 +35,7 @@
 #
 #     python bootstrap_toolchain.py
 import logging
+import multiprocessing.pool
 import os
 import random
 import re
@@ -350,19 +351,12 @@ extern "C" void %s() {
 
 def execute_many(f, args):
   """
-  Executes f(a) for a in args. If possible, uses a threadpool
-  to execute in parallel. The pool uses the number of CPUs
-  in the system as the default size.
+  Executes f(a) for a in args using a threadpool to execute in parallel.
+  The pool uses the smaller of 4 and the number of CPUs in the system
+  as the pool size.
   """
-  pool = None
-  try:
-    import multiprocessing.pool
-    pool = multiprocessing.pool.ThreadPool(processes=min(multiprocessing.cpu_count(), 4))
-    return pool.map(f, args, 1)
-  except ImportError:
-    # multiprocessing was introduced in Python 2.6.
-    # For older Pythons (CentOS 5), degrade to single-threaded execution:
-    return [ f(a) for a in args ]
+  pool = multiprocessing.pool.ThreadPool(processes=min(multiprocessing.cpu_count(), 4))
+  return pool.map(f, args, 1)
 
 def download_cdh_components(toolchain_root, cdh_components):
   """Downloads and unpacks the CDH components into $CDH_COMPONENTS_HOME if not found."""

http://git-wip-us.apache.org/repos/asf/impala/blob/df3b5463/infra/python/deps/pip_download.py
----------------------------------------------------------------------
diff --git a/infra/python/deps/pip_download.py b/infra/python/deps/pip_download.py
index 0cce9e9..3e593c4 100755
--- a/infra/python/deps/pip_download.py
+++ b/infra/python/deps/pip_download.py
@@ -22,6 +22,7 @@
 # This script requires Python 2.6+.
 
 import hashlib
+import multiprocessing.pool
 import os
 import os.path
 import re
@@ -130,30 +131,29 @@ def main():
     download_package(pkg_name, pkg_version)
     return
 
+  pool = multiprocessing.pool.ThreadPool(processes=min(multiprocessing.cpu_count(), 4))
+  results = []
+
   for requirements_file in REQUIREMENTS_FILES:
     # If the package name and version are not specified in the command line arguments,
     # download the packages that in requirements.txt.
-    f = open(requirements_file, 'r')
-    try:
-      # requirements.txt follows the standard pip grammar.
-      for line in f:
-        # A hash symbol ("#") represents a comment that should be ignored.
-        hash_index = line.find('#')
-        if hash_index != -1:
-          line = line[:hash_index]
-        # A semi colon (";") specifies some additional condition for when the package
-        # should be installed (for example a specific OS). We can ignore this and download
-        # the package anyways because the installation script(bootstrap_virtualenv.py) can
-        # take it into account.
-        semi_colon_index = line.find(';')
-        if semi_colon_index != -1:
-          line = line[:semi_colon_index]
-        l = line.strip()
-        if len(l) > 0:
-          pkg_name, pkg_version = l.split('==')
-          download_package(pkg_name.strip(), pkg_version.strip())
-    finally:
-      f.close()
+    # requirements.txt follows the standard pip grammar.
+    for line in open(requirements_file):
+      # A hash symbol ("#") represents a comment that should be ignored.
+      line = line.split("#")[0]
+      # A semi colon (";") specifies some additional condition for when the package
+      # should be installed (for example a specific OS). We can ignore this and download
+      # the package anyways because the installation script(bootstrap_virtualenv.py) can
+      # take it into account.
+      l = line.split(";")[0].strip()
+      if not l:
+        continue
+      pkg_name, pkg_version = l.split('==')
+      results.append(pool.apply_async(
+        download_package, args=[pkg_name.strip(), pkg_version.strip()]))
+
+    for x in results:
+      x.get()
 
 if __name__ == '__main__':
   main()


[2/8] impala git commit: IMPALA-6882: prevent instr. hoist from CpuInfo::IsSupported()

Posted by ta...@apache.org.
IMPALA-6882: prevent instr. hoist from CpuInfo::IsSupported()

Marking the __asm__ with __volatile__ *should* prevent the compiler from
speculatively executing the instruction before the branch.

Testing:
Added a regression test that tries to emulate the problematic pattern,
but I was unable to reproduce a crash on a system with popcnt support -
there's probably some instruction scheduling differences.

Manually checked that popcnt was inside the expected branch:

  (gdb) disassemble /s impala::HdfsScanNodeBase::StopAndFinalizeCounters
  ...
  160         if (LIKELY(CpuInfo::IsSupported(CpuInfo::POPCNT))) {
     0x0000000000e7261b <+491>:   and    $0x10,%esi
     0x0000000000e7261e <+494>:   je     0xe73031 <impala::HdfsScanNodeBase::StopAndFinalizeCounters()+3073>

  be/src/util/sse-util.h:
  147       __asm__ __volatile__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc");
     0x0000000000e72624 <+500>:   popcnt %rdx,%rdx
     0x0000000000e72629 <+505>:   movslq %edx,%rsi
  ...

Change-Id: I9ec51bdfcb9455c93ff69827929a59fcccb81b80
Reviewed-on: http://gerrit.cloudera.org:8080/10243
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5ba46439
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5ba46439
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5ba46439

Branch: refs/heads/2.x
Commit: 5ba46439d16464118373982b8d49461f32d73459
Parents: df3b546
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Apr 30 10:09:04 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:25 2018 +0000

----------------------------------------------------------------------
 be/src/util/bit-util-test.cc | 23 +++++++++++++++++++++++
 be/src/util/sse-util.h       | 26 +++++++++++++++-----------
 2 files changed, 38 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5ba46439/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 6f70727..7c70e6e 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -304,6 +304,29 @@ TEST(BitUtil, RoundUpDown) {
   EXPECT_EQ(BitUtil::RoundDownNumi64(65), 1);
 }
 
+// Prevent inlining so that the compiler can't optimize out the check.
+__attribute__((noinline))
+int CpuInfoIsSupportedHoistHelper(int64_t cpu_info_flag, int arg) {
+  if (CpuInfo::IsSupported(cpu_info_flag)) {
+    // Assembly follows similar pattern to popcnt instruction but executes
+    // illegal instruction.
+    int64_t result;
+    __asm__ __volatile__("ud2" : "=a"(result): "mr"(arg): "cc");
+    return result;
+  } else {
+    return 12345;
+  }
+}
+
+// Regression test for IMPALA-6882 - make sure illegal instruction isn't hoisted out of
+// CpuInfo::IsSupported() checks. This doesn't test the bug precisely but is a canary for
+// this kind of optimization happening.
+TEST(BitUtil, CpuInfoIsSupportedHoist) {
+  constexpr int64_t CPU_INFO_FLAG = CpuInfo::SSSE3;
+  CpuInfo::TempDisable disable_sssse3(CPU_INFO_FLAG);
+  EXPECT_EQ(12345, CpuInfoIsSupportedHoistHelper(CPU_INFO_FLAG, 0));
+}
+
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/5ba46439/be/src/util/sse-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/sse-util.h b/be/src/util/sse-util.h
index e55a248..8837718 100644
--- a/be/src/util/sse-util.h
+++ b/be/src/util/sse-util.h
@@ -72,9 +72,13 @@ namespace SSEUtil {
   };
 }
 
-/// Define the SSE 4.2 intrinsics.  The caller must first verify at runtime (or codegen
-/// IR load time) that the processor supports SSE 4.2 before calling these.  These are
-/// defined outside the namespace because the IR w/ SSE 4.2 case needs to use macros.
+/// Define the SSE 4.2 intrinsics. The caller must first verify at runtime (or codegen
+/// IR load time) that the processor supports SSE 4.2 before calling these. All __asm__
+/// blocks are marked __volatile__ to prevent hoisting the ASM out of checks for CPU
+/// support (e.g. IMPALA-6882).
+///
+/// These intrinsics are defined outside the namespace because the IR w/ SSE 4.2 case
+/// needs to use macros.
 #ifndef IR_COMPILE
 /// When compiling to native code (i.e. not IR), we cannot use the -msse4.2 compiler
 /// flag.  Otherwise, the compiler will emit SSE 4.2 instructions outside of the runtime
@@ -99,11 +103,11 @@ static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int le
   /// Use asm reg rather than Yz output constraint to workaround LLVM bug 13199 -
   /// clang doesn't support Y-prefixed asm constraints.
   register volatile __m128i result asm ("xmm0");
-  __asm__ volatile ("pcmpestrm %5, %2, %1"
+  __asm__ __volatile__ ("pcmpestrm %5, %2, %1"
       : "=x"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc");
 #else
   __m128i result;
-  __asm__ volatile ("pcmpestrm %5, %2, %1"
+  __asm__ __volatile__ ("pcmpestrm %5, %2, %1"
       : "=Yz"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc");
 #endif
   return result;
@@ -112,35 +116,35 @@ static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int le
 template<int MODE>
 static inline int SSE4_cmpestri(__m128i str1, int len1, __m128i str2, int len2) {
   int result;
-  __asm__("pcmpestri %5, %2, %1"
+  __asm__ __volatile__("pcmpestri %5, %2, %1"
       : "=c"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc");
   return result;
 }
 
 static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
-  __asm__("crc32b %1, %0" : "+r"(crc) : "rm"(v));
+  __asm__ __volatile__("crc32b %1, %0" : "+r"(crc) : "rm"(v));
   return crc;
 }
 
 static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
-  __asm__("crc32w %1, %0" : "+r"(crc) : "rm"(v));
+  __asm__ __volatile__("crc32w %1, %0" : "+r"(crc) : "rm"(v));
   return crc;
 }
 
 static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
-  __asm__("crc32l %1, %0" : "+r"(crc) : "rm"(v));
+  __asm__ __volatile__("crc32l %1, %0" : "+r"(crc) : "rm"(v));
   return crc;
 }
 
 static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
   uint64_t result = crc;
-  __asm__("crc32q %1, %0" : "+r"(result) : "rm"(v));
+  __asm__ __volatile__("crc32q %1, %0" : "+r"(result) : "rm"(v));
   return result;
 }
 
 static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
   int64_t result;
-  __asm__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc");
+  __asm__ __volatile__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc");
   return result;
 }