You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/07 18:13:57 UTC

[1/3] incubator-impala git commit: IMPALA-2494: Support for byte array encoded decimals in Parquet scanner

Repository: incubator-impala
Updated Branches:
  refs/heads/master e6c3a01b9 -> 216642e28


IMPALA-2494: Support for byte array encoded decimals in Parquet scanner

Extendes parquet column reader and associated classes to allow for more
than one possible physical type for a given logical type. This patch
only adds support for variable sized byte array encoded decimals and
more will be added in upcoming commits.
Also, column level metadata verification which was currently being
done per row group will now only be done once per column per file.

Testing:
Added backend test for verifying newly added decimal types are decoded
correctly.
Added Query test that decodes both plain and dictionary-encoded
decimals using binary encoding.

Performance:
Initial perf testing using tpcds_1000 shows no regression.

Change-Id: I2c0e881045109f337fecba53fec21f9cfb9e619e
Reviewed-on: http://gerrit.cloudera.org:8080/7822
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/94236ff2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/94236ff2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/94236ff2

Branch: refs/heads/master
Commit: 94236ff2ff6e3d6d25a80150c98d4275914dc8c2
Parents: e6c3a01
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Aug 21 15:30:33 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Nov 7 04:34:26 2017 +0000

----------------------------------------------------------------------
 be/src/exec/data-source-scan-node.cc            |  16 +-
 be/src/exec/hdfs-parquet-scanner.cc             |   9 +-
 be/src/exec/hdfs-parquet-scanner.h              |   6 +-
 be/src/exec/hdfs-parquet-table-writer.cc        |   4 +-
 be/src/exec/parquet-column-readers.cc           | 134 ++++++++-----
 be/src/exec/parquet-column-stats.cc             |  38 ++--
 be/src/exec/parquet-column-stats.h              |   3 +-
 be/src/exec/parquet-column-stats.inline.h       |  21 +-
 be/src/exec/parquet-common.h                    | 183 ++++++++++++-----
 be/src/exec/parquet-metadata-utils.cc           | 160 ++++++++-------
 be/src/exec/parquet-metadata-utils.h            |  11 +-
 be/src/exec/parquet-plain-test.cc               | 198 ++++++++++++++-----
 be/src/util/dict-encoding.h                     |   6 +-
 be/src/util/dict-test.cc                        |  84 ++++----
 testdata/data/README                            |   7 +
 testdata/data/binary_decimal_dictionary.parquet | Bin 0 -> 1222 bytes
 .../data/binary_decimal_no_dictionary.parquet   | Bin 0 -> 1211 bytes
 .../QueryTest/parquet-decimal-formats.test      |  25 +++
 tests/query_test/test_scanners.py               |  18 ++
 19 files changed, 637 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 2639510..a836033 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -23,6 +23,7 @@
 #include "exec/parquet-common.h"
 #include "exec/read-write-util.h"
 #include "exprs/scalar-expr.h"
+#include "gen-cpp/parquet_types.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
@@ -172,24 +173,27 @@ inline Status SetDecimalVal(const ColumnType& type, char* bytes, int len,
   switch (type.GetByteSize()) {
     case 4: {
       Decimal4Value* val = reinterpret_cast<Decimal4Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal4Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal4Value) || (ParquetPlainEncoder::Decode
+          <Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;
     }
     case 8: {
       Decimal8Value* val = reinterpret_cast<Decimal8Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal8Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal8Value) || (ParquetPlainEncoder::Decode
+          <Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;
     }
     case 16: {
       Decimal16Value* val = reinterpret_cast<Decimal16Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal16Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal16Value) || (ParquetPlainEncoder::Decode
+          <Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b301ea0..e35f64a 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1485,6 +1485,9 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
       continue;
     }
 
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(), *node->element,
+        slot_desc, state_));
+
     ParquetColumnReader* col_reader = ParquetColumnReader::Create(
         *node, slot_desc->type().IsCollectionType(), slot_desc, this);
     column_readers->push_back(col_reader);
@@ -1620,9 +1623,9 @@ Status HdfsParquetScanner::InitColumns(
           col_chunk.meta_data.num_values, num_values, filename());
     }
 
-    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(file_metadata_, filename(),
-        row_group_idx, scalar_reader->col_idx(), scalar_reader->schema_element(),
-        scalar_reader->slot_desc_, state_));
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
+        filename(), row_group_idx, scalar_reader->col_idx(),
+        scalar_reader->schema_element(), state_));
 
     if (col_chunk.meta_data.__isset.dictionary_page_offset) {
       // Already validated in ValidateColumnOffsets()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index a5333fe..e4b6ae7 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -43,7 +43,8 @@ class ParquetLevelDecoder;
 class ParquetColumnReader;
 class CollectionColumnReader;
 class BaseScalarColumnReader;
-template<typename T, bool MATERIALIZED> class ScalarColumnReader;
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+class ScalarColumnReader;
 class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
@@ -354,7 +355,8 @@ class HdfsParquetScanner : public HdfsScanner {
   friend class ParquetColumnReader;
   friend class CollectionColumnReader;
   friend class BaseScalarColumnReader;
-  template<typename T, bool MATERIALIZED> friend class ScalarColumnReader;
+  template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+  friend class ScalarColumnReader;
   friend class BoolColumnReader;
 
   /// Size of the file footer.  This is a guess.  If this value is too little, we will

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 4bbadb4..8c9d92e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -859,7 +859,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
     parquet::SchemaElement& node = file_metadata_.schema[i + 1];
     const ColumnType& type = output_expr_evals_[i]->root().type();
     node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    node.__set_type(IMPALA_TO_PARQUET_TYPES[type.type]);
+    node.__set_type(INTERNAL_TO_PARQUET_TYPES[type.type]);
     node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
     if (type.type == TYPE_DECIMAL) {
       // This column is type decimal. Update the file metadata to include the
@@ -892,7 +892,7 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     ColumnMetaData metadata;
-    metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->type().type];
+    metadata.type = INTERNAL_TO_PARQUET_TYPES[columns_[i]->type().type];
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
     metadata.codec = columns_[i]->codec();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 6d211a6..15c963d 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -208,10 +208,13 @@ bool ParquetLevelDecoder::FillCache(int batch_size,
   return true;
 }
 
-/// Per column type reader. If MATERIALIZED is true, the column values are materialized
-/// into the slot described by slot_desc. If MATERIALIZED is false, the column values
-/// are not materialized, but the position can be accessed.
-template<typename T, bool MATERIALIZED>
+/// Per column type reader. InternalType is the datatype that Impala uses internally to
+/// store tuple data and PARQUET_TYPE is the corresponding primitive datatype (as defined
+/// in the parquet spec) that is used to store column values in parquet files.
+/// If MATERIALIZED is true, the column values are materialized into the slot described
+/// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
+/// the position can be accessed.
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader : public BaseScalarColumnReader {
  public:
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
@@ -228,8 +231,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
     DCHECK(slot_desc_ != NULL);
     DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL) {
-      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
+    if (slot_desc_->type().type == TYPE_DECIMAL
+        && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      fixed_len_size_ = node.element->type_length;
     } else if (slot_desc_->type().type == TYPE_VARCHAR) {
       fixed_len_size_ = slot_desc_->type().len;
     } else {
@@ -449,7 +453,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
       DictDecoderBase** decoder) {
-    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
+    if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
         return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
             slot_desc_->type().DebugString(), "could not decode dictionary");
     }
@@ -505,8 +509,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     void* slot = tuple->GetSlot(tuple_offset_);
     // Use an uninitialized stack allocation for temporary value to avoid running
     // constructors doing work unnecessarily, e.g. if T == StringValue.
-    alignas(T) uint8_t val_buf[sizeof(T)];
-    T* val_ptr = reinterpret_cast<T*>(NEEDS_CONVERSION ? val_buf : slot);
+    alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+    InternalType* val_ptr = reinterpret_cast<InternalType*>(
+        NEEDS_CONVERSION ? val_buf : slot);
     if (IS_DICT_ENCODED) {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
       if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
@@ -515,8 +520,8 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       }
     } else {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len =
-          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr);
+      int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+          data_,data_end_,fixed_len_size_, val_ptr);
       if (UNLIKELY(encoded_len < 0)) {
         SetPlainDecodeError();
         return false;
@@ -551,14 +556,14 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
-  bool ConvertSlot(const T* src, void* slot, MemPool* pool) {
+  bool ConvertSlot(const InternalType* src, void* slot, MemPool* pool) {
     DCHECK(false);
     return false;
   }
 
   /// Sets error message and returns false if the slot value is invalid, e.g., due to
   /// being out of the valid value range.
-  bool ValidateSlot(T* src, Tuple* tuple) const {
+  bool ValidateSlot(InternalType* src, Tuple* tuple) const {
     DCHECK(false);
     return false;
   }
@@ -574,7 +579,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Dictionary decoder for decoding column values.
-  DictDecoder<T> dict_decoder_;
+  DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
   bool dict_decoder_init_;
@@ -588,12 +593,13 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 };
 
 template<>
-inline bool ScalarColumnReader<StringValue, true>::NeedsConversionInline() const {
+inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
+::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
 template<>
-bool ScalarColumnReader<StringValue, true>::ConvertSlot(
+bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
     const StringValue* src, void* slot, MemPool* pool) {
   DCHECK(slot_desc() != NULL);
   DCHECK(slot_desc()->type().type == TYPE_CHAR);
@@ -606,12 +612,13 @@ bool ScalarColumnReader<StringValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversionInline() const {
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
 template<>
-bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
     const TimestampValue* src, void* slot, MemPool* pool) {
   // Conversion should only happen when this flag is enabled.
   DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
@@ -622,12 +629,13 @@ bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsValidationInline() const {
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsValidationInline() const {
   return true;
 }
 
 template<>
-bool ScalarColumnReader<TimestampValue, true>::ValidateSlot(
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateSlot(
     TimestampValue* src, Tuple* tuple) const {
   if (UNLIKELY(!TimestampValue::IsValidDate(src->date()))) {
     ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
@@ -808,8 +816,8 @@ Status BaseScalarColumnReader::ReadPageHeader(bool peek,
     DCHECK(stream_->eosr());
     DCHECK_LT(num_values_read_, metadata_->num_values);
     // TODO for 2.3: node_.element->name isn't necessarily useful
-    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-                 metadata_->num_values, num_values_read_, node_.element->name, filename());
+    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
+        num_values_read_, node_.element->name, filename());
     RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
     *eos = true;
     return Status::OK();
@@ -1248,8 +1256,46 @@ void CollectionColumnReader::UpdateDerivedState() {
   }
 }
 
+/// Returns a column reader for decimal types based on its size and parquet type.
+static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
+    const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+  switch (node.element->type) {
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      }
+      break;
+    case parquet::Type::BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      }
+      break;
+    default:
+      DCHECK(false) << "Invalid decimal primitive type";
+  }
+  DCHECK(false) << "Invalid decimal type";
+  return nullptr;
+}
+
 ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
-    bool is_collection_field, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+    bool is_collection_field, const SlotDescriptor* slot_desc,
+    HdfsParquetScanner* parent) {
   ParquetColumnReader* reader = NULL;
   if (is_collection_field) {
     // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
@@ -1261,46 +1307,41 @@ ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
         reader = new BoolColumnReader(parent, node, slot_desc);
         break;
       case TYPE_TINYINT:
-        reader = new ScalarColumnReader<int8_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_SMALLINT:
-        reader = new ScalarColumnReader<int16_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_INT:
-        reader = new ScalarColumnReader<int32_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_BIGINT:
-        reader = new ScalarColumnReader<int64_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_FLOAT:
-        reader = new ScalarColumnReader<float, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_DOUBLE:
-        reader = new ScalarColumnReader<double, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_TIMESTAMP:
-        reader = new ScalarColumnReader<TimestampValue, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
+            parent, node, slot_desc);
         break;
       case TYPE_STRING:
       case TYPE_VARCHAR:
       case TYPE_CHAR:
-        reader = new ScalarColumnReader<StringValue, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
         break;
       case TYPE_DECIMAL:
-        switch (slot_desc->type().GetByteSize()) {
-          case 4:
-            reader = new ScalarColumnReader<Decimal4Value, true>(
-                parent, node, slot_desc);
-            break;
-          case 8:
-            reader = new ScalarColumnReader<Decimal8Value, true>(
-                parent, node, slot_desc);
-            break;
-          case 16:
-            reader = new ScalarColumnReader<Decimal16Value, true>(
-                parent, node, slot_desc);
-            break;
-        }
+        reader = GetDecimalColumnReader(node, slot_desc, parent);
         break;
       default:
         DCHECK(false) << slot_desc->type().DebugString();
@@ -1309,7 +1350,8 @@ ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
     // Special case for counting scalar values (e.g. count(*), no materialized columns in
     // the file, only materializing a position slot). We won't actually read any values,
     // only the rep and def levels, so it doesn't matter what kind of reader we make.
-    reader = new ScalarColumnReader<int8_t, false>(parent, node, slot_desc);
+    reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
+        slot_desc);
   }
   return parent->obj_pool_.Add(reader);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index 76b3365..4443309 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -61,11 +61,13 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
 
   switch (col_type.type) {
     case TYPE_BOOLEAN:
-      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot,
+          parquet::Type::BOOLEAN);
     case TYPE_TINYINT: {
       // parquet::Statistics encodes INT_8 values using 4 bytes.
       int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats);
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats,
+          parquet::Type::INT32);
       if (!ret || col_stats < std::numeric_limits<int8_t>::min() ||
           col_stats > std::numeric_limits<int8_t>::max()) {
         return false;
@@ -76,7 +78,8 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
     case TYPE_SMALLINT: {
       // parquet::Statistics encodes INT_16 values using 4 bytes.
       int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats);
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats,
+          parquet::Type::INT32);
       if (!ret || col_stats < std::numeric_limits<int16_t>::min() ||
           col_stats > std::numeric_limits<int16_t>::max()) {
         return false;
@@ -85,18 +88,24 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
       return true;
     }
     case TYPE_INT:
-      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_BIGINT:
-      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_FLOAT:
-      return ColumnStats<float>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<float>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_DOUBLE:
-      return ColumnStats<double>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<double>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_TIMESTAMP:
-      return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_STRING:
     case TYPE_VARCHAR:
-      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_CHAR:
       /// We don't read statistics for CHAR columns, since CHAR support is broken in
       /// Impala (IMPALA-1652).
@@ -104,12 +113,15 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
     case TYPE_DECIMAL:
       switch (col_type.GetByteSize()) {
         case 4:
-          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot);
+          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
         case 8:
-          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot);
+          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
         case 16:
-          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot);
-      }
+          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
+        }
       DCHECK(false) << "Unknown decimal byte size: " << col_type.GetByteSize();
     default:
       DCHECK(false) << col_type.DebugString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 7278cdc..0ff277c 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -181,7 +181,8 @@ class ColumnStats : public ColumnStatsBase {
   /// Decodes the plain encoded stats value from 'buffer' and writes the result into the
   /// buffer pointed to by 'slot'. Returns true if decoding was successful, false
   /// otherwise. For timestamps, an additional validation will be performed.
-  static bool DecodePlainValue(const std::string& buffer, void* slot);
+  static bool DecodePlainValue(const std::string& buffer, void* slot,
+      parquet::Type::type parquet_type);
 
   /// Returns the number of bytes needed to encode value 'v'.
   int64_t BytesNeeded(const T& v) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index c0e90aa..5b67ee7 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -19,6 +19,7 @@
 #define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
 
 #include "exec/parquet-common.h"
+#include "gen-cpp/parquet_types.h"
 #include "parquet-column-stats.h"
 #include "runtime/string-value.inline.h"
 
@@ -79,11 +80,15 @@ inline void ColumnStats<T>::EncodePlainValue(
 }
 
 template <typename T>
-inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot) {
+inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot,
+    parquet::Type::type parquet_type) {
   T* result = reinterpret_cast<T*>(slot);
   int size = buffer.size();
   const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::Decode(data, data + size, size, result) == -1) return false;
+  if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, result,
+      parquet_type) == -1) {
+    return false;
+  }
   return true;
 }
 
@@ -103,7 +108,8 @@ inline void ColumnStats<bool>::EncodePlainValue(
 }
 
 template <>
-inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot) {
+inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot,
+    parquet::Type::type parquet_type) {
   bool* result = reinterpret_cast<bool*>(slot);
   DCHECK(buffer.size() == 1);
   *result = (buffer[0] != 0);
@@ -118,11 +124,14 @@ inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const {
 /// Timestamp values need validation.
 template <>
 inline bool ColumnStats<TimestampValue>::DecodePlainValue(
-    const std::string& buffer, void* slot) {
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
   TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
   int size = buffer.size();
   const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::Decode(data, data + size, size, result) == -1) return false;
+  if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data, data + size,
+      size, result) == -1) {
+    return false;
+  }
   // We don't need to convert the value here, since we don't support reading timestamp
   // statistics written by Hive / old versions of parquet-mr. Should Hive add support for
   // writing new statistics for the deprecated timestamp type, we will have to add support
@@ -139,7 +148,7 @@ inline void ColumnStats<StringValue>::EncodePlainValue(
 
 template <>
 inline bool ColumnStats<StringValue>::DecodePlainValue(
-    const std::string& buffer, void* slot) {
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
   StringValue* result = reinterpret_cast<StringValue*>(slot);
   result->ptr = const_cast<char*>(buffer.data());
   result->len = buffer.size();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index 53de54a..91712e4 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -35,9 +35,9 @@ class TimestampValue;
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 const uint32_t PARQUET_CURRENT_VERSION = 1;
 
-/// Mapping of impala types to parquet storage types.  This is indexed by
+/// Mapping of impala's internal types to parquet storage types. This is indexed by
 /// PrimitiveType enum
-const parquet::Type::type IMPALA_TO_PARQUET_TYPES[] = {
+const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
   parquet::Type::BOOLEAN,     // Invalid
   parquet::Type::BOOLEAN,     // NULL type
   parquet::Type::BOOLEAN,
@@ -84,9 +84,10 @@ const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
 /// calls.
 class ParquetPlainEncoder {
  public:
-  /// Returns the byte size of 'v'.
-  template <typename T>
-  static int ByteSize(const T& v) { return sizeof(T); }
+  /// Returns the byte size of 'v' where InternalType is the datatype that Impala uses
+  /// internally to store tuple data.
+  template <typename InternalType>
+  static int ByteSize(const InternalType& v) { return sizeof(InternalType); }
 
   /// Returns the encoded size of values of type t. Returns -1 if it is variable
   /// length. This can be different than the slot size of the types.
@@ -167,19 +168,52 @@ class ParquetPlainEncoder {
   /// be preallocated and big enough.  Buffer need not be aligned.
   /// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and
   /// is the number of bytes the plain encoder should use.
-  template <typename T>
-  static int Encode(const T& t, int fixed_len_size, uint8_t* buffer) {
+  template <typename InternalType>
+  static int Encode(const InternalType& t, int fixed_len_size, uint8_t* buffer) {
     memcpy(buffer, &t, ByteSize(t));
     return ByteSize(t);
   }
 
+  template <typename InternalType>
+  static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* buffer_end,
+      int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) {
+    switch (parquet_type) {
+      case parquet::Type::BOOLEAN:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::BOOLEAN>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT32:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT32>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT64:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT64>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT96:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT96>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::FLOAT:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::FLOAT>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::DOUBLE:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::DOUBLE>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
+      case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
+      default:
+        DCHECK(false) << "Unexpected physical type";
+    }
+  }
+
   /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer'
-  /// need not be aligned. For types that are stored as FIXED_LEN_BYTE_ARRAY,
-  /// 'fixed_len_size' is the size of the object. Otherwise, it is unused.
+  /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size'
+  /// is the size of the object. Otherwise, it is unused.
   /// Returns the number of bytes read or -1 if the value was not decoded successfully.
-  template <typename T>
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
   static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-      T* v) {
+      InternalType* v) {
     int byte_size = ByteSize(*v);
     if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
     memcpy(v, buffer, byte_size);
@@ -194,25 +228,27 @@ template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t);
 /// Disable for bools. Plain encoding is not used for booleans.
 template <> int ParquetPlainEncoder::ByteSize(const bool& b);
 template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, uint8_t*);
-template <> int ParquetPlainEncoder::Decode(const uint8_t*, const uint8_t*,
-    int fixed_len_size, bool* v);
+template <> int ParquetPlainEncoder::Decode<bool, parquet::Type::BOOLEAN>(const uint8_t*,
+    const uint8_t*, int fixed_len_size, bool* v);
 
 /// Not used for decimals since the plain encoding encodes them using
 /// FIXED_LEN_BYTE_ARRAY.
-template <>
-inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
+inline int DecimalByteSize() {
   DCHECK(false);
   return -1;
 }
+
+template <>
+inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
+  return DecimalByteSize();
+}
 template <>
 inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
-  DCHECK(false);
-  return -1;
+  return DecimalByteSize();
 }
 template <>
 inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
-  DCHECK(false);
-  return -1;
+  return DecimalByteSize();
 }
 
 /// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
@@ -232,36 +268,39 @@ inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, int8_t* v) {
+inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int8_t* v) {
   int byte_size = ByteSize(*v);
   if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   *v = *buffer;
   return byte_size;
 }
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, int16_t* v) {
+inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int16_t* v) {
   int byte_size = ByteSize(*v);
   if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   memcpy(v, buffer, sizeof(int16_t));
   return byte_size;
 }
 
+template<typename T>
+inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) {
+  int32_t val = v;
+  memcpy(buffer, &val, sizeof(int32_t));
+  return ParquetPlainEncoder::ByteSize(v);
+}
+
 template <>
 inline int ParquetPlainEncoder::Encode(
     const int8_t& v, int fixed_len_size, uint8_t* buffer) {
-  int32_t val = v;
-  memcpy(buffer, &val, sizeof(int32_t));
-  return ByteSize(v);
+  return EncodeToInt32(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const int16_t& v, int fixed_len_size, uint8_t* buffer) {
-  int32_t val = v;
-  memcpy(buffer, &val, sizeof(int32_t));
-  return ByteSize(v);
+  return EncodeToInt32(v, fixed_len_size, buffer);
 }
 
 template <>
@@ -273,8 +312,9 @@ inline int ParquetPlainEncoder::Encode(
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, StringValue* v) {
+inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    StringValue* v) {
   if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
   memcpy(&v->len, buffer, sizeof(int32_t));
   int byte_size = ByteSize(*v);
@@ -290,49 +330,92 @@ inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buf
 /// that the value in the in-memory format has leading zeros or negative 1's.
 /// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value
 /// will have 3 bytes of leading zeros, we will only store the interesting byte.
+template<typename T>
+inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) {
+  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
+  return fixed_len_size;
+}
+
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
-template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal4Value* v) {
+template<typename T>
+inline int DecodeDecimalFixedLen(const uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, T* v) {
   if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal8Value* v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
-  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+inline int ParquetPlainEncoder::
+Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal16Value* v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
-  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+inline int ParquetPlainEncoder::
+Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+/// Helper method to decode Decimal type stored as variable length byte array.
+template<typename T>
+inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, T* v) {
+  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
+  int encoded_byte_size;
+  memcpy(&encoded_byte_size, buffer, sizeof(int32_t));
+  int byte_size = sizeof(int32_t) + encoded_byte_size;
+  if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) return -1;
+  uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t);
+  DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v);
+  return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal4Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal4Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal8Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal16Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index fc34eda..3fc6b3e 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -40,6 +40,39 @@ using boost::algorithm::token_compress_on;
 
 namespace impala {
 
+namespace {
+
+const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = {
+    {PrimitiveType::INVALID_TYPE, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_NULL, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_BOOLEAN, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT64}},
+    {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
+    {PrimitiveType::TYPE_DOUBLE, {parquet::Type::DOUBLE}},
+    {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
+    {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATETIME, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_BINARY, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DECIMAL, {parquet::Type::FIXED_LEN_BYTE_ARRAY,
+        parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_CHAR, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}},
+};
+
+/// Returns true if 'parquet_type' is a supported physical encoding for the Impala
+/// primitive type, false otherwise.
+bool IsSupportedPhysicalType(PrimitiveType impala_type,
+    parquet::Type::type parquet_type) {
+  auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type);
+  DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end());
+  return encodings->second.find(parquet_type) != encodings->second.end();
+}
+
+}
 // Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
 const std::vector<ParquetSchemaResolver::ArrayEncoding>
     ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
@@ -110,97 +143,85 @@ static bool IsEncodingSupported(parquet::Encoding::type e) {
   }
 }
 
-Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata,
-    const char* filename, int row_group_idx, int col_idx,
-    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
-    RuntimeState* state) {
-  const parquet::ColumnChunk& file_data =
-      file_metadata.row_groups[row_group_idx].columns[col_idx];
+Status ParquetMetadataUtils::ValidateRowGroupColumn(
+    const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx,
+    int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) {
+  const parquet::ColumnMetaData& col_chunk_metadata =
+      file_metadata.row_groups[row_group_idx].columns[col_idx].meta_data;
 
   // Check the encodings are supported.
-  const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
+  const vector<parquet::Encoding::type>& encodings = col_chunk_metadata.encodings;
   for (int i = 0; i < encodings.size(); ++i) {
     if (!IsEncodingSupported(encodings[i])) {
-      stringstream ss;
-      ss << "File '" << filename << "' uses an unsupported encoding: "
-         << PrintEncoding(encodings[i]) << " for column '" << schema_element.name
-         << "'.";
-      return Status(ss.str());
+      return Status(Substitute("File '$0' uses an unsupported encoding: $1 for column "
+          "'$2'.", filename, PrintEncoding(encodings[i]), schema_element.name));
     }
   }
 
   // Check the compression is supported.
-  if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
-      file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
-      file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
-    stringstream ss;
-    ss << "File '" << filename << "' uses an unsupported compression: "
-        << file_data.meta_data.codec << " for column '" << schema_element.name
-        << "'.";
-    return Status(ss.str());
+  if (col_chunk_metadata.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::SNAPPY &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::GZIP) {
+    return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
+        "'$2'.", filename, col_chunk_metadata.codec, schema_element.name));
   }
 
-  // Validation after this point is only if col_reader is reading values.
-  if (slot_desc == NULL) return Status::OK();
-
-  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
-  if (UNLIKELY(type != file_data.meta_data.type)) {
-    return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 "
-        "actual $2: file may be corrupt", filename, type, file_data.meta_data.type));
+  if (col_chunk_metadata.type != schema_element.type) {
+    return Status(Substitute("Mismatched column chunk Parquet type in file '$0' column "
+            "'$1'. Expected $2 actual $3: file may be corrupt", filename,
+            schema_element.name, col_chunk_metadata.type, schema_element.type));
   }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const char* filename,
+    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+    RuntimeState* state) {
+  // Following validation logic is only for non-complex types.
+  if (slot_desc->type().IsComplexType()) return Status::OK();
+
+  if (UNLIKELY(!IsSupportedPhysicalType(slot_desc->type().type, schema_element.type))) {
+    return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical "
+        "type: $1, physical type: $2. File may be corrupt.",
+        filename, slot_desc->type().type, schema_element.type));
+    }
 
   // Check the decimal scale in the file matches the metastore scale and precision.
   // We fail the query if the metadata makes it impossible for us to safely read
   // the file. If we don't require the metadata, we will fail the query if
   // abort_on_error is true, otherwise we will just log a warning.
-  bool is_converted_type_decimal = schema_element.__isset.converted_type &&
-      schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+  bool is_converted_type_decimal = schema_element.__isset.converted_type
+      && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
   if (slot_desc->type().type == TYPE_DECIMAL) {
     // We require that the scale and byte length be set.
-    if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
-      return Status(ss.str());
-    }
-
-    if (!schema_element.__isset.type_length) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' does not have type_length set.";
-      return Status(ss.str());
-    }
+    if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      if (!schema_element.__isset.type_length) {
+        return Status(Substitute("File '$0' column '$1' does not have type_length set.",
+            filename, schema_element.name));
+      }
 
-    int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
-    if (schema_element.type_length != expected_len) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' has an invalid type length. Expecting: " << expected_len
-         << " len in file: " << schema_element.type_length;
-      return Status(ss.str());
+      int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+      if (schema_element.type_length != expected_len) {
+        return Status(Substitute("File '$0' column '$1' has an invalid type length. "
+            "Expecting: $2 len in file: $3", filename, schema_element.name, expected_len,
+            schema_element.type_length));
+      }
     }
-
     if (!schema_element.__isset.scale) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' does not have the scale set.";
-      return Status(ss.str());
+      return Status(Substitute("File '$0' column '$1' does not have the scale set.",
+          filename, schema_element.name));
     }
 
     if (schema_element.scale != slot_desc->type().scale) {
       // TODO: we could allow a mismatch and do a conversion at this step.
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' has a scale that does not match the table metadata scale."
-         << " File metadata scale: " << schema_element.scale
-         << " Table metadata scale: " << slot_desc->type().scale;
-      return Status(ss.str());
+      return Status(Substitute("File '$0' column '$1' has a scale that does not match "
+          "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
+          filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
     }
 
     // The other decimal metadata should be there but we don't need it.
     if (!schema_element.__isset.precision) {
-      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename,
-          schema_element.name);
+      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     } else {
       if (schema_element.precision != slot_desc->type().precision) {
@@ -218,10 +239,10 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_me
           schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     }
-  } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
-      is_converted_type_decimal) {
-    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename,
-        schema_element.name, slot_desc->type().DebugString());
+  } else if (schema_element.__isset.scale || schema_element.__isset.precision
+      || is_converted_type_decimal) {
+    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, schema_element.name,
+        slot_desc->type().DebugString());
     RETURN_IF_ERROR(state->LogOrReturnError(msg));
   }
   return Status::OK();
@@ -350,8 +371,8 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     ++(*col_idx);
   } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
     // Sanity-check the schema to avoid allocating absurdly large buffers below.
-    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit of "
-        "$2. File is likely corrupt", filename_, node->element->num_children,
+    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than "
+        "limit of $2. File is likely corrupt", filename_, node->element->num_children,
         SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
   } else if (node->element->num_children < 0) {
     return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
@@ -668,8 +689,7 @@ Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);
   }
-  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type];
-  if (type != node.element->type) {
+  if (!IsSupportedPhysicalType(col_type.type, node.element->type)) {
     ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
index 767a1a3..f3a144d 100644
--- a/be/src/exec/parquet-metadata-utils.h
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -44,10 +44,15 @@ class ParquetMetadataUtils {
   static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
       int64_t file_length, int64_t offset, const std::string& offset_name);
 
-  /// Validates the column metadata to make sure this column is supported (e.g. encoding,
-  /// type, etc) and matches the type of given slot_desc.
-  static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
+  /// Validates the column metadata inside a row group to make sure this column is
+  /// supported (e.g. encoding, type, etc).
+  static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata,
       const char* filename, int row_group_idx, int col_idx,
+      const parquet::SchemaElement& schema_element, RuntimeState* state);
+
+  /// Validates the column metadata to make sure the column is supported and its type
+  /// attributes conform to the parquet spec.
+  static Status ValidateColumn(const char* filename,
       const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
       RuntimeState* state);
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-plain-test.cc b/be/src/exec/parquet-plain-test.cc
index c86a33b..37acd2c 100644
--- a/be/src/exec/parquet-plain-test.cc
+++ b/be/src/exec/parquet-plain-test.cc
@@ -29,39 +29,83 @@
 
 namespace impala {
 
+template <typename InternalType>
+int Encode(const InternalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type physical_type){
+  return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+}
+
+// Handle special case of encoding decimal types stored as BYTE_ARRAY since it is not
+// implemented in Impala.
+// When parquet_type equals BYTE_ARRAY: 'encoded_byte_size' is the sum of the
+// minimum number of bytes required to store the unscaled value and the bytes required to
+// store the size. Value 'v' passed to it should not contain leading zeros as this
+// method does not strictly conform to the parquet spec in removing those.
+template <typename DecimalType>
+int EncodeDecimal(const DecimalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  if (parquet_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+  } else if (parquet_type == parquet::Type::BYTE_ARRAY) {
+    int decimal_size = encoded_byte_size - sizeof(int32_t);
+    memcpy(buffer, &decimal_size, sizeof(int32_t));
+    DecimalUtil::EncodeToFixedLenByteArray(buffer + sizeof(int32_t), decimal_size, v);
+    return encoded_byte_size;
+  }
+  return -1;
+}
+
+template<>
+int Encode(const Decimal4Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal8Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type){
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
 /// Test that the decoder fails when asked to decode a truncated value.
-template <typename T>
-void TestTruncate(const T& v, int expected_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
-  int encoded_size = ParquetPlainEncoder::Encode(v, expected_byte_size, buffer);
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
   EXPECT_EQ(encoded_size, expected_byte_size);
 
   // Check all possible truncations of the buffer.
   for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
-    T result;
+    InternalType result;
     /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
     uint8_t* truncated_buffer = new uint8_t[truncated_size];
     memcpy(truncated_buffer, buffer, truncated_size);
-    int decoded_size = ParquetPlainEncoder::Decode(truncated_buffer,
-        truncated_buffer + truncated_size, expected_byte_size, &result);
+    int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
     EXPECT_EQ(-1, decoded_size);
     delete[] truncated_buffer;
   }
 }
 
-template <typename T>
-void TestType(const T& v, int expected_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestType(const InternalType& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
-  int encoded_size = ParquetPlainEncoder::Encode(v, expected_byte_size, buffer);
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
   EXPECT_EQ(encoded_size, expected_byte_size);
 
-  T result;
-  int decoded_size = ParquetPlainEncoder::Decode(buffer, buffer + expected_byte_size,
-      expected_byte_size, &result);
+  InternalType result;
+  int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer,
+      buffer + expected_byte_size, expected_byte_size, &result);
   EXPECT_EQ(decoded_size, expected_byte_size);
   EXPECT_EQ(result, v);
 
-  TestTruncate(v, expected_byte_size);
+  TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
 }
 
 TEST(PlainEncoding, Basic) {
@@ -74,42 +118,108 @@ TEST(PlainEncoding, Basic) {
   StringValue sv("Hello");
   TimestampValue tv;
 
-  TestType(i8, sizeof(int32_t));
-  TestType(i16, sizeof(int32_t));
-  TestType(i32, sizeof(int32_t));
-  TestType(i64, sizeof(int64_t));
-  TestType(f, sizeof(float));
-  TestType(d, sizeof(double));
-  TestType(sv, sizeof(int32_t) + sv.len);
-  TestType(tv, 12);
-
-  TestType(Decimal4Value(1234), sizeof(Decimal4Value));
-  TestType(Decimal4Value(-1234), sizeof(Decimal4Value));
-
-  TestType(Decimal8Value(1234), sizeof(Decimal8Value));
-  TestType(Decimal8Value(-1234), sizeof(Decimal8Value));
-  TestType(Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
-  TestType(Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
-
-  TestType(Decimal16Value(1234), 16);
-  TestType(Decimal16Value(-1234), 16);
-  TestType(Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
+  TestType<int8_t, parquet::Type::INT32>(i8, sizeof(int32_t));
+  TestType<int16_t, parquet::Type::INT32>(i16, sizeof(int32_t));
+  TestType<int32_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+  TestType<int64_t, parquet::Type::INT64>(i64, sizeof(int64_t));
+  TestType<float, parquet::Type::FLOAT>(f, sizeof(float));
+  TestType<double, parquet::Type::DOUBLE>(d, sizeof(double));
+  TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + sv.len);
+  TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
+
+  int test_val = 1234;
+  int var_len_decimal_size = sizeof(int32_t)
+      + 2 /*min bytes required for storing test_val*/;
+  // Decimal4Value: General test case
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(test_val),
+      sizeof(Decimal4Value));
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal4Value(test_val * -1), sizeof(Decimal4Value));
+
+  // Decimal8Value: General test case
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(test_val),
+      sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(test_val * -1), sizeof(Decimal8Value));
+
+  // Decimal16Value: General test case
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( Decimal16Value(test_val),
+      sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(test_val * -1), sizeof(Decimal16Value));
+
+  // Decimal8Value: int32 limits test
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
+
+  // Decimal16Value: int32 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
+
+  // Decimal16Value: int64 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
 
   // two digit values can be encoded with any byte size.
   for (int i = 1; i <=16; ++i) {
     if (i <= 4) {
-      TestType(Decimal4Value(i), i);
-      TestType(Decimal4Value(-i), i);
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(i), i);
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(-i), i);
     }
     if (i <= 8) {
-      TestType(Decimal8Value(i), i);
-      TestType(Decimal8Value(-i), i);
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(i), i);
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(-i), i);
     }
-    TestType(Decimal16Value(i), i);
-    TestType(Decimal16Value(-i), i);
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(-i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(i), i);
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(-i), i);
   }
 }
 
@@ -155,8 +265,8 @@ TEST(PlainEncoding, CorruptString) {
   memcpy(buffer, &len, sizeof(int32_t));
 
   StringValue result;
-  int decoded_size =
-      ParquetPlainEncoder::Decode(buffer, buffer + sizeof(buffer), 0, &result);
+  int decoded_size = ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+      buffer, buffer + sizeof(buffer), 0, &result);
   EXPECT_EQ(decoded_size, -1);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 479f095..62b3d3a 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -210,6 +210,7 @@ class DictDecoder : public DictDecoderBase {
   /// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
   /// Returns true if the dictionary values were all successfully decoded, or false
   /// if the dictionary was corrupt.
+  template<parquet::Type::type PARQUET_TYPE>
   bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
 
   virtual int num_entries() const { return dict_.size(); }
@@ -337,14 +338,15 @@ inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
 }
 
 template<typename T>
+template<parquet::Type::type PARQUET_TYPE>
 inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
     int fixed_len_size) {
   dict_.clear();
   uint8_t* end = dict_buffer + dict_len;
   while (dict_buffer < end) {
     T value;
-    int decoded_len =
-        ParquetPlainEncoder::Decode(dict_buffer, end, fixed_len_size, &value);
+    int decoded_len = ParquetPlainEncoder::Decode<T, PARQUET_TYPE>(dict_buffer, end,
+        fixed_len_size, &value);
     if (UNLIKELY(decoded_len < 0)) return false;
     dict_buffer += decoded_len;
     dict_.push_back(value);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 620f431..de0fb11 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -29,15 +29,15 @@
 
 namespace impala {
 
-template<typename T>
-void ValidateDict(const vector<T>& values, const vector<T>& dict_values,
-                  int fixed_buffer_byte_size) {
-  set<T> values_set(values.begin(), values.end());
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
+void ValidateDict(const vector<InternalType>& values,
+    const vector<InternalType>& dict_values, int fixed_buffer_byte_size) {
+  set<InternalType> values_set(values.begin(), values.end());
 
   MemTracker tracker;
   MemPool pool(&tracker);
-  DictEncoder<T> encoder(&pool, fixed_buffer_byte_size);
-  for (T i: values) encoder.Put(i);
+  DictEncoder<InternalType> encoder(&pool, fixed_buffer_byte_size);
+  for (InternalType i: values) encoder.Put(i);
   EXPECT_EQ(encoder.num_entries(), values_set.size());
 
   uint8_t dict_buffer[encoder.dict_encoded_size()];
@@ -49,22 +49,22 @@ void ValidateDict(const vector<T>& values, const vector<T>& dict_values,
   EXPECT_GT(data_len, 0);
   encoder.ClearIndices();
 
-  DictDecoder<T> decoder;
-  ASSERT_TRUE(
-      decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
+  DictDecoder<InternalType> decoder;
+  ASSERT_TRUE(decoder.template Reset<PARQUET_TYPE>(dict_buffer,
+      encoder.dict_encoded_size(),fixed_buffer_byte_size));
 
   // Test direct access to the dictionary via indexes
   for (int i = 0; i < dict_values.size(); ++i) {
-    T expected_value = dict_values[i];
-    T out_value;
+    InternalType expected_value = dict_values[i];
+    InternalType out_value;
 
     decoder.GetValue(i, &out_value);
     EXPECT_EQ(expected_value, out_value);
   }
   // Test access to dictionary via internal stream
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
-  for (T i: values) {
-    T j;
+  for (InternalType i: values) {
+    InternalType j;
     decoder.GetNextValue(&j);
     EXPECT_EQ(i, j);
   }
@@ -96,7 +96,7 @@ TEST(DictTest, TestStrings) {
   values.push_back(sv3);
   values.push_back(sv4);
 
-  ValidateDict(values, dict_values, -1);
+  ValidateDict<StringValue, parquet::Type::BYTE_ARRAY>(values, dict_values, -1);
 }
 
 TEST(DictTest, TestTimestamps) {
@@ -117,51 +117,57 @@ TEST(DictTest, TestTimestamps) {
   values.push_back(tv1);
   values.push_back(tv1);
 
-  ValidateDict(values, dict_values,
+  ValidateDict<TimestampValue, parquet::Type::INT96>(values, dict_values,
       ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
 }
 
-template<typename T>
-void IncrementValue(T* t) { ++(*t); }
+template<typename InternalType>
+void IncrementValue(InternalType* t) { ++(*t); }
 
 template <> void IncrementValue(Decimal4Value* t) { ++(t->value()); }
 template <> void IncrementValue(Decimal8Value* t) { ++(t->value()); }
 template <> void IncrementValue(Decimal16Value* t) { ++(t->value()); }
 
-template<typename T>
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
 void TestNumbers(int max_value, int repeat, int value_byte_size) {
-  vector<T> values;
-  vector<T> dict_values;
-  for (T val = 0; val < max_value; IncrementValue(&val)) {
+  vector<InternalType> values;
+  vector<InternalType> dict_values;
+  for (InternalType val = 0; val < max_value; IncrementValue(&val)) {
     for (int i = 0; i < repeat; ++i) {
       values.push_back(val);
     }
     dict_values.push_back(val);
   }
 
-  ValidateDict(values, dict_values, value_byte_size);
+  ValidateDict<InternalType, PARQUET_TYPE>(values, dict_values, value_byte_size);
 }
 
-template<typename T>
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
 void TestNumbers(int value_byte_size) {
-  TestNumbers<T>(100, 1, value_byte_size);
-  TestNumbers<T>(1, 100, value_byte_size);
-  TestNumbers<T>(1, 1, value_byte_size);
-  TestNumbers<T>(1, 2, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(100, 1, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 100, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 1, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 2, value_byte_size);
 }
 
 TEST(DictTest, TestNumbers) {
-  TestNumbers<int8_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
-  TestNumbers<int16_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
-  TestNumbers<int32_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
-  TestNumbers<int64_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
-  TestNumbers<float>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
-  TestNumbers<double>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
+  TestNumbers<int8_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
+  TestNumbers<int16_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
+  TestNumbers<int32_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
+  TestNumbers<int64_t, parquet::Type::INT64>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
+  TestNumbers<float, parquet::Type::FLOAT>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
+  TestNumbers<double, parquet::Type::DOUBLE>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
 
   for (int i = 1; i <= 16; ++i) {
-    if (i <= 4) TestNumbers<Decimal4Value>(i);
-    if (i <= 8) TestNumbers<Decimal8Value>(i);
-    TestNumbers<Decimal16Value>(i);
+    if (i <= 4) TestNumbers<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
+    if (i <= 8) TestNumbers<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
+    TestNumbers<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
   }
 }
 
@@ -173,7 +179,8 @@ TEST(DictTest, TestInvalidStrings) {
   // Test a dictionary with a string encoded with negative length. Initializing
   // the decoder should fail.
   DictDecoder<StringValue> decoder;
-  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+  ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
+      0));
 }
 
 TEST(DictTest, TestStringBufferOverrun) {
@@ -185,7 +192,8 @@ TEST(DictTest, TestStringBufferOverrun) {
   // Initializing the dictionary should fail, since the string would reference
   // invalid memory.
   DictDecoder<StringValue> decoder;
-  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+  ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
+      0));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 231a901..e1dc496 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -116,3 +116,10 @@ Reproduction steps:
    file_metadata_.schema[0].__set_repetition_type(FieldRepetitionType::REQUIRED);
 2: Run test_compute_stats and grab the created Parquet file for
    alltypes_parquet table.
+
+binary_decimal_dictionary.parquet,
+binary_decimal_no_dictionary.parquet:
+Generated using parquet-mr and contents verified using parquet-tools-1.9.1.
+Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
+and non-dictionary encoding respectively.
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/binary_decimal_dictionary.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/binary_decimal_dictionary.parquet b/testdata/data/binary_decimal_dictionary.parquet
new file mode 100644
index 0000000..621ed40
Binary files /dev/null and b/testdata/data/binary_decimal_dictionary.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/binary_decimal_no_dictionary.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/binary_decimal_no_dictionary.parquet b/testdata/data/binary_decimal_no_dictionary.parquet
new file mode 100644
index 0000000..3b8b096
Binary files /dev/null and b/testdata/data/binary_decimal_no_dictionary.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
new file mode 100644
index 0000000..3c54aa1
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+select * from decimal_encodings;
+---- RESULTS
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+9999999.99,9999999.99,9999999.99
+0.00,9999999999999999.99,999999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-9999999.99,-9999999.99,-9999999.99
+0.00,-9999999999999999.99,-999999999999999999999999999999999999.99
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+9999999.99,9999999.99,9999999.99
+0.00,9999999999999999.99,999999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-9999999.99,-9999999.99,-9999999.99
+0.00,-9999999999999999.99,-999999999999999999999999999999999999.99
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 954830a..17b9503 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -589,6 +589,24 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
                        use_db=unique_database)
 
+  def test_decimal_encodings(self, vector, unique_database):
+    # Create a table using an existing data file with dictionary-encoded, variable-length
+    # physical encodings for decimals.
+    TABLE_NAME = "decimal_encodings"
+    self.client.execute('''create table if not exists %s.%s
+    (small_dec decimal(9,2), med_dec decimal(18,2), large_dec decimal(38,2))
+    STORED AS PARQUET''' % (unique_database, TABLE_NAME))
+
+    table_loc = get_fs_path(
+      "/test-warehouse/%s.db/%s" % (unique_database, TABLE_NAME))
+    for file_name in ["binary_decimal_dictionary.parquet",
+                      "binary_decimal_no_dictionary.parquet"]:
+      data_file_path = os.path.join(os.environ['IMPALA_HOME'],
+                                    "testdata/data/", file_name)
+      check_call(['hdfs', 'dfs', '-copyFromLocal', data_file_path, table_loc])
+
+    self.run_test_case('QueryTest/parquet-decimal-formats', vector, unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range


[3/3] incubator-impala git commit: IMPALA-6105: Clarify argument order in single_node_perf_run

Posted by ta...@apache.org.
IMPALA-6105: Clarify argument order in single_node_perf_run

single_node_perf_run.py uses git_hash_A vs. git_hash_B, distinguish
them by their position in the command-line
arguments. single_node_perf_run.py calls report_benchmark_results.py,
which uses the "reference vs. input", distinguished by their
command-line flags. The output of report_benchmark_results.py uses
"{empty string} vs Base".

In the long run, I think it would be better to fix all three to use
the same terminology, but this comment hopefully adds clarity.

Change-Id: Ib236ce7e83dc193ef1382f6304444ce58759a639
Reviewed-on: http://gerrit.cloudera.org:8080/8470
Tested-by: Impala Public Jenkins
Reviewed-by: Jim Apple <jb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/216642e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/216642e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/216642e2

Branch: refs/heads/master
Commit: 216642e28d2454f7078c2e19f84e91e63221ea3c
Parents: fe90867
Author: Jim Apple <jb...@apache.org>
Authored: Sat Nov 4 15:18:31 2017 -0700
Committer: Jim Apple <jb...@apache.org>
Committed: Tue Nov 7 16:16:09 2017 +0000

----------------------------------------------------------------------
 bin/single_node_perf_run.py | 24 ++++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/216642e2/bin/single_node_perf_run.py
----------------------------------------------------------------------
diff --git a/bin/single_node_perf_run.py b/bin/single_node_perf_run.py
index b67f661..c49e8e0 100755
--- a/bin/single_node_perf_run.py
+++ b/bin/single_node_perf_run.py
@@ -22,7 +22,17 @@
 #
 # When one hash is given, measures the performance on the specified workloads.
 # When two hashes are given, compares their performance. Output is in
-# $IMPALA_HOME/perf_results/latest.
+# $IMPALA_HOME/perf_results/latest. In the performance_result.txt file,
+# git_hash_A is referred to as the "Base" result. For example, if you run with
+# git_hash_A = aBad1dea... and git_hash_B = 8675309... the
+# performance_result.txt will say at the top:
+#
+#   Run Description: "aBad1dea... vs 8675309..."
+#
+# The different queries will have their run time statistics in columns
+# "Avg(s)", "StdDev(%)", "BaseAvg(s)", "Base StdDev(%)". The first two refer
+# to git_hash_B, the second two refer to git_hash_A. The column "Delta(Avg)"
+# is negative if git_hash_B is faster and is positive if git_hash_A is faster.
 #
 # WARNING: This script will run git checkout. You should not touch the tree
 # while the script is running. You should start the script from a clean git
@@ -266,7 +276,17 @@ def parse_options():
 
     When one hash is given, measures the performance on the specified workloads.
     When two hashes are given, compares their performance. Output is in
-    $IMPALA_HOME/perf_results/latest.
+    $IMPALA_HOME/perf_results/latest. In the performance_result.txt file,
+    git_hash_A is referred to as the "Base" result. For example, if you run with
+    git_hash_A = aBad1dea... and git_hash_B = 8675309... the
+    performance_result.txt will say at the top:
+
+      Run Description: "aBad1dea... vs 8675309..."
+
+    The different queries will have their run time statistics in columns
+    "Avg(s)", "StdDev(%)", "BaseAvg(s)", "Base StdDev(%)". The first two refer
+    to git_hash_B, the second two refer to git_hash_A. The column "Delta(Avg)"
+    is negative if git_hash_B is faster and is positive if git_hash_A is faster.
 
     WARNING: This script will run git checkout. You should not touch the tree
     while the script is running. You should start the script from a clean git


[2/3] incubator-impala git commit: IMPALA-1575: part 2: yield admission control resources

Posted by ta...@apache.org.
IMPALA-1575: part 2: yield admission control resources

This change releases admission control resources more eagerly,
once the query has finished actively executing. Some resources
(tracked and untracked) are still consumed by the client request
as long as it remains open, e.g. memory for control structures
and the result cache. However, these resources are relatively
small and should not block admission of new queries.

The same as in part 1, query execution is considered to be finished
under any of the following conditions:
1. The query encounters an error and fails
2. The query is cancelled due to the idle query timeout
3. The query reaches eos (or the DML completes)
4. The client cancels the query without closing the query

Admission control resources are released in two ways:
1. by calling AdmissionController::ReleaseQuery() on the coordinator
   promptly after query execution finishes, instead of waiting for
   UnregisterQuery(). This means that the query and its memory is
   no longer considered "admitted".
2. by changing the behaviour of MemTracker::GetPoolMemReserved() so
   that it is aware of when a query has finished executing and does not
   consider its entire memory limit to be "reserved".

The preconditions for releasing an admitted query are subtle because the
queries are being admitted to a distributed system, not just the
coordinator.  The comment for ReleaseAdmissionControlResources()
documents the preconditions and rationale. Note that the preconditions
are not weaker than the preconditions of calling UnregisterQuery()
before this patch.

Testing:
TestAdmissionController is extended to end queries in four ways:
cancellation by client, idle timeout, the last row being fetched,
and the client closing the query. The test uses a mix of all four.
After the query ends, all clients wait for the test to complete
before closing the query or closing the connection. This ensures
that the admission control decisions are based entirely on the
query end behavior. This test works for both query admission control
and mem_limit admission control and can detect both kinds of admission
control resources ("admitted" and "reserved") not being released
promptly.

This is based on an earlier patch by Joe McDonnell.

Change-Id: I80279eb2bda740d7f61420f52db3bfa42a6a51ac
Reviewed-on: http://gerrit.cloudera.org:8080/8323
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fe90867d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fe90867d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fe90867d

Branch: refs/heads/master
Commit: fe90867d890c71bfdcf8ff941f8ec51e36083f25
Parents: 94236ff
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Oct 17 16:25:24 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Nov 7 05:16:11 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc                   |  29 +++-
 be/src/runtime/coordinator.h                    |  38 ++++-
 be/src/runtime/mem-tracker.cc                   |   6 +-
 be/src/runtime/mem-tracker.h                    |  16 +-
 be/src/runtime/query-state.cc                   |   3 +
 be/src/scheduling/admission-controller.cc       |  13 +-
 be/src/scheduling/admission-controller.h        |  15 +-
 be/src/service/client-request-state.cc          |  11 --
 .../custom_cluster/test_admission_controller.py | 149 ++++++++++++-------
 9 files changed, 189 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d18d658..94b0bdb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -37,6 +37,7 @@
 #include "runtime/coordinator-backend-state.h"
 #include "runtime/debug-options.h"
 #include "runtime/query-state.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
@@ -79,6 +80,9 @@ Coordinator::Coordinator(
 Coordinator::~Coordinator() {
   DCHECK(released_exec_resources_)
       << "ReleaseExecResources() must be called before Coordinator is destroyed";
+  DCHECK(released_admission_control_resources_)
+      << "ReleaseAdmissionControlResources() must be called before Coordinator is "
+      << "destroyed";
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -817,15 +821,16 @@ Status Coordinator::Wait() {
   // Execution of query fragments has finished. We don't need to hold onto query execution
   // resources while we finalize the query.
   ReleaseExecResources();
-
   // Query finalization is required only for HDFS table sinks
   if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+  // Release admission control resources after we'd done the potentially heavyweight
+  // finalization.
+  ReleaseAdmissionControlResources();
 
   query_profile_->AddInfoString(
       "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
   // For DML queries, when Wait is done, the query is complete.
   ComputeQuerySummary();
-
   return status;
 }
 
@@ -856,10 +861,11 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
     returned_all_results_ = true;
     // release query execution resources here, since we won't be fetching more result rows
     ReleaseExecResources();
-
     // wait for all backends to complete before computing the summary
     // TODO: relocate this so GetNext() won't have to wait for backends to complete?
     RETURN_IF_ERROR(WaitForBackendCompletion());
+    // Release admission control resources after backends are finished.
+    ReleaseAdmissionControlResources();
     // if the query completed successfully, compute the summary
     if (query_status_.ok()) ComputeQuerySummary();
   }
@@ -898,7 +904,7 @@ void Coordinator::CancelInternal() {
   backend_completion_cv_.notify_all();
 
   ReleaseExecResourcesLocked();
-
+  ReleaseAdmissionControlResourcesLocked();
   // Report the summary with whatever progress the query made before being cancelled.
   ComputeQuerySummary();
 }
@@ -1070,6 +1076,21 @@ void Coordinator::ReleaseExecResourcesLocked() {
   // caching. The query MemTracker will be cleaned up later.
 }
 
+void Coordinator::ReleaseAdmissionControlResources() {
+  lock_guard<mutex> l(lock_);
+  ReleaseAdmissionControlResourcesLocked();
+}
+
+void Coordinator::ReleaseAdmissionControlResourcesLocked() {
+  if (released_admission_control_resources_) return;
+  LOG(INFO) << "Release admssion control resources for query "
+            << PrintId(query_ctx_.query_id);
+  AdmissionController* admission_controller =
+      ExecEnv::GetInstance()->admission_controller();
+  if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
+  released_admission_control_resources_ = true;
+}
+
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 611e4ae..3549ae9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -189,7 +189,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   class FilterState;
   class FragmentStats;
 
-  const QuerySchedule schedule_;
+  /// owned by the ClientRequestState that owns this coordinator
+  const QuerySchedule& schedule_;
 
   /// copied from TQueryExecRequest; constant across all fragments
   TQueryCtx query_ctx_;
@@ -346,6 +347,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// True if and only if ReleaseExecResources() has been called.
   bool released_exec_resources_ = false;
 
+  /// True if and only if ReleaseAdmissionControlResources() has been called.
+  bool released_admission_control_resources_ = false;
+
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -437,6 +441,38 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Same as ReleaseExecResources() except the lock must be held by the caller.
   void ReleaseExecResourcesLocked();
+
+  /// Releases admission control resources for use by other queries.
+  /// This should only be called if one of following preconditions is satisfied for each
+  /// backend on which the query is executing:
+  /// * The backend finished execution.
+  ///   Rationale: the backend isn't consuming resources.
+  //
+  /// * A cancellation RPC was delivered to the backend.
+  ///   Rationale: the backend will be cancelled and release resources soon. By the
+  ///   time a newly admitted query fragment starts up on the backend and starts consuming
+  ///   resources, the resources from this query will probably have been released.
+  //
+  /// * Sending the cancellation RPC to the backend failed
+  ///   Rationale: the backend is either down or will tear itself down when it next tries
+  ///   to send a status RPC to the coordinator. It's possible that the fragment will be
+  ///   slow to tear down and we could overadmit and cause query failures. However, given
+  ///   the communication errors, we need to proceed based on incomplete information about
+  ///   the state of the cluster. We choose to optimistically assume that the backend will
+  ///   tear itself down in a timely manner and admit more queries instead of
+  ///   pessimistically queueing queries while we wait for a response from a backend that
+  ///   may never come.
+  ///
+  /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
+  /// sufficient to satisfy the above preconditions. If the query has an expensive
+  /// finalization step post query execution (e.g. a DML statement), then this should
+  /// be called after that completes to avoid over-admitting queries.
+  ///
+  /// Acquires lock_. Idempotent.
+  void ReleaseAdmissionControlResources();
+
+  /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
+  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a8cb37e..4162e8a 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -141,11 +141,13 @@ int64_t MemTracker::GetPoolMemReserved() {
   lock_guard<SpinLock> l(child_trackers_lock_);
   for (MemTracker* child : child_trackers_) {
     int64_t child_limit = child->limit();
-    if (child_limit > 0) {
+    bool query_exec_finished = child->query_exec_finished_.Load() != 0;
+    if (child_limit > 0 && !query_exec_finished) {
       // Make sure we don't overflow if the query limits are set to ridiculous values.
       mem_reserved += std::min(child_limit, MemInfo::physical_mem());
     } else {
-      DCHECK_EQ(child_limit, -1) << child->LogUsage(UNLIMITED_DEPTH);
+      DCHECK(query_exec_finished || child_limit == -1)
+          << child->LogUsage(UNLIMITED_DEPTH);
       mem_reserved += child->consumption();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 1260351..539f973 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -294,8 +294,9 @@ class MemTracker {
 
   /// Returns the memory 'reserved' by this resource pool mem tracker, which is the sum
   /// of the memory reserved by the queries in it (i.e. its child trackers). The mem
-  /// reserved for a query is its limit_, if set (which should be the common case with
-  /// admission control). Otherwise the current consumption is used.
+  /// reserved for a query that is currently executing is its limit_, if set (which
+  /// should be the common case with admission control). Otherwise, if the query has
+  /// no limit or the query is finished executing, the current consumption is used.
   int64_t GetPoolMemReserved();
 
   /// Returns the memory consumed in bytes.
@@ -351,6 +352,11 @@ class MemTracker {
   Status MemLimitExceeded(RuntimeState* state, const std::string& details,
       int64_t failed_allocation = 0) WARN_UNUSED_RESULT;
 
+  void set_query_exec_finished() {
+    DCHECK(is_query_mem_tracker_);
+    query_exec_finished_.Store(1);
+  }
+
   static const std::string COUNTER_NAME;
 
  private:
@@ -386,6 +392,12 @@ class MemTracker {
   /// True if this is a Query MemTracker returned from CreateQueryMemTracker().
   bool is_query_mem_tracker_ = false;
 
+  /// Only used if 'is_query_mem_tracker_' is true.
+  /// 0 if the query is still executing or 1 if it has finished executing. Before
+  /// it has finished executing, the tracker limit is treated as "reserved memory"
+  /// for the purpose of admission control - see GetPoolMemReserved().
+  AtomicInt32 query_exec_finished_{0};
+
   /// Only valid for MemTrackers returned from CreateQueryMemTracker()
   TUniqueId query_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 6796c82..fd72f6a 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -82,6 +82,9 @@ void QueryState::ReleaseExecResources() {
   if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
+  // Mark the query as finished on the query MemTracker so that admission control will
+  // not consider the whole query memory limit to be "reserved".
+  query_mem_tracker_->set_query_exec_finished();
   // At this point query execution should not be consuming any resources but some tracked
   // memory may still be used by the ClientRequestState for result caching. The query
   // MemTracker will be closed later when this QueryState is torn down.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index c29020c..8994a5b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -608,20 +608,19 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
   }
 }
 
-Status AdmissionController::ReleaseQuery(QuerySchedule* schedule) {
-  if (!schedule->is_admitted()) return Status::OK(); // No-op if query was not admitted
-  const string& pool_name = schedule->request_pool();
+void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
+  if (!schedule.is_admitted()) return; // No-op if query was not admitted
+  const string& pool_name = schedule.request_pool();
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(pool_name);
-    stats->Release(*schedule);
-    UpdateHostMemAdmitted(*schedule, -schedule->GetPerHostMemoryEstimate());
+    stats->Release(schedule);
+    UpdateHostMemAdmitted(schedule, -schedule.GetPerHostMemoryEstimate());
     pools_for_updates_.insert(pool_name);
-    VLOG_RPC << "Released query id=" << schedule->query_id() << " "
+    VLOG_RPC << "Released query id=" << schedule.query_id() << " "
              << stats->DebugString();
   }
   dequeue_cv_.notify_one();
-  return Status::OK();
 }
 
 // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 3f18f6d..0cb9f2a 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -95,12 +95,13 @@ class ExecEnv;
 ///  a) Mem Reserved: the amount of memory that has been reported as reserved by all
 ///     backends, which come from the statestore topic updates. The values that are sent
 ///     come from the pool mem trackers in UpdateMemTrackerStats(), which reflects the
-///     memory reserved by fragments that have begun execution. For queries that have mem
-///     limits, the limit is considered to be its reserved memory, otherwise the current
-///     consumption is used (see MemTracker::GetPoolMemReserved()). The per-pool and
-///     per-host aggregates are computed in UpdateClusterAggregates(). This state, once
-///     all updates are fully distributed and aggregated, provides enough information to
-///     make admission decisions by any impalad. However, this requires waiting for both
+///     memory reserved by fragments that have begun execution. For queries that are
+///     executing and have mem limits, the limit is considered to be its reserved memory
+///     because it may consume up to that limit. Otherwise the query's current consumption
+///     is used (see MemTracker::GetPoolMemReserved()). The per-pool and per-host
+///     aggregates are computed in UpdateClusterAggregates(). This state, once all updates
+///     are fully distributed and aggregated, provides enough information to make
+///     admission decisions by any impalad. However, this requires waiting for both
 ///     admitted requests to start all remote fragments and then for the updated state to
 ///     be distributed via the statestore.
 ///  b) Mem Admitted: the amount of memory required (i.e. the value used in admission,
@@ -195,7 +196,7 @@ class AdmissionController {
   /// been submitted via AdmitQuery(). (If the request was not admitted, this is
   /// a no-op.)
   /// This does not block.
-  Status ReleaseQuery(QuerySchedule* schedule);
+  void ReleaseQuery(const QuerySchedule& schedule);
 
   /// Registers the request queue topic with the statestore.
   Status Init();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 918fc03..523e4ae 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -572,17 +572,6 @@ void ClientRequestState::Done() {
   // Update result set cache metrics, and update mem limit accounting before tearing
   // down the coordinator.
   ClearResultCache();
-
-  if (coord_.get() != NULL) {
-    // Release any reserved resources.
-    if (exec_env_->admission_controller() != nullptr) {
-      Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
-      if (!status.ok()) {
-        LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
-                     << " because of error: " << status.GetDetail();
-      }
-    }
-  }
 }
 
 Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fe90867d/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 712ce57..02fd6db 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -40,10 +40,11 @@ from TCLIService import TCLIService
 
 LOG = logging.getLogger('admission_test')
 
-# We set a WAIT debug action so it doesn't complete the execution of this query. The
-# limit is a parameter for debugging purposes; each thread will insert its id so
+# The query used for testing. It is important that this query be able to fetch many
+# rows. This allows a thread to stay active by fetching one row at a time. The
+# where clause is for debugging purposes; each thread will insert its id so
 # that running queries can be correlated with the thread that submitted them.
-QUERY = "select * from alltypes where id != %s"# limit %s"
+QUERY = "select * from alltypes where id != %s"
 
 # Time to sleep (in milliseconds) between issuing queries. The default statestore
 # heartbeat is 500ms, so the lower the delay the more we can submit before the global
@@ -86,6 +87,9 @@ _STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
 # Key in the query profile for the query options.
 PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
 
+# The different ways that a query thread can end its query.
+QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE']
+
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
     proc_mem_limit = None):
   if proc_mem_limit is not None:
@@ -365,12 +369,13 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin
-  fashion. The queries are set with the WAIT debug action so that we have more control
-  over the state that the admission controller uses to make decisions.  Each query is
-  submitted on a separate thread. Depending on the test parameters a varying number of
-  queries will be admitted, queued, and rejected. Once queries are admitted, the query
-  execution blocks and we can cancel the query in order to allow another queued query to
-  be admitted.
+  fashion. Each query is submitted on a separate thread. After admission, the query
+  thread will block with the query open and wait for the main thread to notify it to
+  end its query. The query thread can end its query by fetching to the end, cancelling
+  itself, closing itself, or waiting for the query timeout to take effect. Depending
+  on the test parameters a varying number of queries will be admitted, queued, and
+  rejected. After the queries are admitted, the main thread will request each admitted
+  query thread to end its query and allow queued queries to be admitted.
 
   The test tracks the state of the admission controller using the metrics from each
   impalad to do the following:
@@ -378,13 +383,15 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       queued, and rejected requests should sum to the number of queries and that the
       values are reasonable given the test parameters.
   (2) While there are running queries:
-      * Cancel the currently running queries (they are blocked with the WAIT debug action)
-        and verify the metric for the number of completed queries. The threads that
-        submitted those queries should complete.
+      * Request the currently running queries to end and wait for the queries to end.
+        Verify the metric for the number of completed queries. The threads that
+        submitted those queries will keep their connections open until the entire test
+        completes. This verifies that admission control is tied to the end of the query
+        and does not depend on closing the connection.
       * Check that queued requests are then dequeued and verify using the metric for the
         number of dequeued requests. The threads that were waiting to submit the query
         should then insert themselves into a list of currently running queries and then
-        fetch() the results (which will block).
+        wait for a notification from the main thread.
   (3) After all queries have completed, check that the final number of admitted,
       queued, and rejected requests are reasonable given the test parameters. When
       submitting to a single impalad, we know exactly what the values should be,
@@ -428,6 +435,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     self.executing_threads = list()
 
   def teardown(self):
+    # Set shutdown for all threads (cancel if needed)
     for thread in self.all_threads:
       try:
         thread.lock.acquire()
@@ -442,6 +450,9 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
             client.close()
       finally:
         thread.lock.release()
+
+    # Wait for all threads to exit
+    for thread in self.all_threads:
       thread.join(5)
       LOG.debug("Join thread for query num %s %s", thread.query_num,
           "TIMED OUT" if thread.isAlive() else "")
@@ -537,36 +548,39 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     LOG.debug("Found all %s admitted threads after %s seconds", num_threads,
         round(time() - start_time, 1))
 
-  def cancel_admitted_queries(self, num_queries):
+  def end_admitted_queries(self, num_queries):
     """
-    Cancels queries on threads that are currently blocked on query execution.
+    Requests each admitted query to end its query.
     """
     assert len(self.executing_threads) >= num_queries
-    LOG.debug("Cancelling %s queries", num_queries)
+    LOG.debug("Requesting {0} clients to end queries".format(num_queries))
+
+    # Request admitted clients to end their queries
+    current_executing_queries = []
     for i in xrange(num_queries):
       # pop() is thread-safe, it's OK if another thread is appending concurrently.
       thread = self.executing_threads.pop(0)
       LOG.debug("Cancelling query %s", thread.query_num)
-      # The other thread sets the query_state before appending itself to the list,
-      # and will not change its state until it is cancelled by this thread.
       assert thread.query_state == 'ADMITTED'
-      client = thread.impalad.service.create_beeswax_client()
-      try:
-        cancel_result = client.cancel(thread.query_handle)
-        assert cancel_result.status_code == 0,\
-            'Unexpected status code from cancel request: %s' % cancel_result
-        # Wait for the query to be cancelled and return
-        thread.join(20)
-        LOG.debug("Cancelled admitted query %s %s",
-            thread.query_num, "TIMED OUT" if thread.isAlive() else "")
-        assert not thread.isAlive()
-        assert thread.query_state == 'COMPLETED'
-      finally:
-        client.close()
+      current_executing_queries.append(thread)
+      thread.query_state = 'REQUEST_QUERY_END'
+
+    # Wait for the queries to end
+    start_time = time()
+    while True:
+      all_done = True
+      for thread in self.all_threads:
+        if thread.query_state == 'REQUEST_QUERY_END':
+          all_done = False
+      if all_done:
+        break
+      assert (time() - start_time < STRESS_TIMEOUT),\
+        "Timed out waiting %s seconds for query end" % (STRESS_TIMEOUT,)
+      sleep(1)
 
   class SubmitQueryThread(threading.Thread):
     def __init__(self, impalad, additional_query_options, vector, query_num,
-        executing_threads):
+        query_end_behavior, executing_threads):
       """
       executing_threads must be provided so that this thread can add itself when the
       query is admitted and begins execution.
@@ -576,6 +590,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       self.vector = vector
       self.additional_query_options = additional_query_options
       self.query_num = query_num
+      self.query_end_behavior = query_end_behavior
       self.impalad = impalad
       self.error = None
       # query_state is defined and used only by the test code, not a property exposed by
@@ -599,7 +614,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
             return
 
           exec_options = self.vector.get_value('exec_option')
-          exec_options['debug_action'] = '0:GETNEXT:WAIT'
           exec_options.update(self.additional_query_options)
           query = QUERY % (self.query_num,)
           self.query_state = 'SUBMITTING'
@@ -607,6 +621,9 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
           ImpalaTestSuite.change_database(client, self.vector.get_value('table_format'))
           client.set_configuration(exec_options)
 
+          if self.query_end_behavior == 'QUERY_TIMEOUT':
+            client.execute("SET QUERY_TIMEOUT_S=5")
+
           LOG.debug("Submitting query %s", self.query_num)
           self.query_handle = client.execute_async(query)
         except ImpalaBeeswaxException as e:
@@ -627,22 +644,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         # The thread becomes visible to the main thread when it is added to the
         # shared list of executing_threads. append() is atomic and thread-safe.
         self.executing_threads.append(self)
-        try:
-          # fetch() will block until we cancel the query from the main thread
-          # (unless an unexpected error occurs). If an error occurs on the main therad,
-          # it is possible that teardown() cancels this query before we call fetch(). In
-          # that case a different exception is thrown and we handle it gracefully.
-          client.fetch(query, self.query_handle)
-        except ImpalaBeeswaxException as e:
-          if "Cancelled" in str(e):
-            LOG.debug("Query %s completed", self.query_num)
+
+        # Synchronize with the main thread. At this point, the thread is executing a
+        # query. It needs to wait until the main thread requests it to end its query.
+        while not self.shutdown:
+          # The QUERY_TIMEOUT needs to stay active until the main thread requests it
+          # to end. Otherwise, the query may get cancelled early. Fetch a row every
+          # second to avoid going idle.
+          if self.query_end_behavior == 'QUERY_TIMEOUT' and \
+             self.query_state != 'COMPLETED':
+            client.fetch(query, self.query_handle, 1)
+          if self.query_state == 'REQUEST_QUERY_END':
+            self._end_query(client, query)
+            # The query has released admission control resources
             self.query_state = 'COMPLETED'
             self.query_handle = None
-          elif "Invalid or unknown query handle" in str(e):
-            # May happen if the test is being torn down early (i.e. an error occurred).
-            LOG.debug("Query %s already cancelled in test shutdown.")
-          else:
-            raise e
+          sleep(1)
       except Exception as e:
         LOG.exception(e)
         # Unknown errors will be raised later
@@ -653,6 +670,27 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         if client is not None:
           client.close()
 
+    def _end_query(self, client, query):
+      """Bring the query to the appropriate end state defined by self.query_end_behaviour.
+      Returns once the query has reached that state."""
+      if self.query_end_behavior == 'QUERY_TIMEOUT':
+        # Sleep and wait for the query to be cancelled. The cancellation will
+        # set the state to EXCEPTION.
+        start_time = time()
+        while (client.get_state(self.query_handle) != \
+               client.QUERY_STATES['EXCEPTION']):
+          assert (time() - start_time < STRESS_TIMEOUT),\
+            "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
+          sleep(1)
+      elif self.query_end_behavior == 'EOS':
+        # Fetch all rows so we hit eos.
+        client.fetch(query, self.query_handle)
+      elif self.query_end_behavior == 'CLIENT_CANCEL':
+        client.cancel(self.query_handle)
+      else:
+        assert self.query_end_behavior == 'CLIENT_CLOSE'
+        client.close_query(self.query_handle)
+
   def _check_queries_page_resource_pools(self):
     """Checks that all queries in the '/queries' webpage json have the correct resource
     pool (this is called after all queries have been admitted, queued, or rejected, so
@@ -693,14 +731,11 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     initial_metrics = self.get_admission_metrics();
     log_metrics("Initial metrics: ", initial_metrics);
 
-    # Want query_num to start at 1 because this gets used as the limit in the query to
-    # help debugging (we can associate a running query with a thread). If we start at 0,
-    # that query would be evaluated as a constant expression and never hit the WAIT debug
-    # action.
-    for query_num in xrange(1, num_queries + 1):
+    for query_num in xrange(num_queries):
       impalad = self.impalads[query_num % len(self.impalads)]
+      query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)]
       thread = self.SubmitQueryThread(impalad, additional_query_options, vector,
-          query_num, self.executing_threads)
+          query_num, query_end_behavior, self.executing_threads)
       thread.start()
       self.all_threads.append(thread)
       sleep(submission_delay_ms / 1000.0)
@@ -735,10 +770,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     while len(self.executing_threads) > 0:
       curr_metrics = self.get_admission_metrics();
       log_metrics("Main loop, curr_metrics: ", curr_metrics);
-      num_to_cancel = len(self.executing_threads)
-      LOG.debug("Main loop, will cancel %s queries", num_to_cancel)
-      self.cancel_admitted_queries(num_to_cancel)
-      self.wait_for_metric_changes(['released'], curr_metrics, num_to_cancel)
+      num_to_end = len(self.executing_threads)
+      LOG.debug("Main loop, will request %s queries to end", num_to_end)
+      self.end_admitted_queries(num_to_end)
+      self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
 
       num_queued_remaining =\
           curr_metrics['queued'] - curr_metrics['dequeued'] - curr_metrics['timed-out']