You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/02/02 18:36:35 UTC

[1/3] incubator-impala git commit: IMPALA-4829: Change default Kudu read behavior for "RYW"

Repository: incubator-impala
Updated Branches:
  refs/heads/master e3566ac04 -> 6251d8b4d


IMPALA-4829: Change default Kudu read behavior for "RYW"

Currently the default Kudu read mode is set to "READ_LATEST",
which essentially provides no guarantees on reading except
that any read issued will read the latest value that the
target replica happens to have. This is not necessarily a
time after a previous write operation in the same session.
By changing the read mode to the misleadingly named
"READ_AT_SNAPSHOT", we can ensure that Kudu reads will all
be at times at least or greater than the latest "observed"
time (which Impala already sets on the client). Note that
this does not mean all reads are performed at the same
timestamp (i.e. a snapshot read) because that requires
setting a snapshot timestamp, but doing this will require
more work in the future in both Impala and (mostly) Kudu.
The Kudu team calls this "Read Your Writes".

This means that, after this change, values written within a
session will always be visible to subsequent reads. Before
this change, this was usually the case but not guaranteed.

Testing: Private test run, running an exhaustive job now.
This is otherwise difficult to validate in new tests. This
has plenty of time to bake for 2.9 in case we discover
performance or functional issues.

Change-Id: I4011f8277083982aee2c6c2bfca2f4ae2f8cb31e
Reviewed-on: http://gerrit.cloudera.org:8080/5802
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/32ff9598
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/32ff9598
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/32ff9598

Branch: refs/heads/master
Commit: 32ff959814646458a34278500bd01fc7741951ce
Parents: e3566ac
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Thu Jan 26 12:56:19 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 1 23:42:50 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/32ff9598/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 9ec5201..8b6778f 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -44,9 +44,9 @@ using kudu::client::KuduScanBatch;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
 
-DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
-    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Invalid values "
-    "result in using READ_LATEST.");
+DEFINE_string(kudu_read_mode, "READ_AT_SNAPSHOT", "(Advanced) Sets the Kudu scan "
+    "ReadMode. Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Invalid "
+    "values result in using READ_AT_SNAPSHOT.");
 DEFINE_bool(pick_only_leaders_for_tests, false,
             "Whether to pick only leader replicas, for tests purposes only.");
 DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
@@ -57,7 +57,7 @@ DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
-const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
+const string MODE_READ_LATEST = "READ_LATEST";
 
 KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
@@ -138,9 +138,9 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
                          "Could not set replica selection.");
   }
   kudu::client::KuduScanner::ReadMode mode =
-      MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ?
-          kudu::client::KuduScanner::READ_AT_SNAPSHOT :
-          kudu::client::KuduScanner::READ_LATEST;
+      MODE_READ_LATEST == FLAGS_kudu_read_mode ?
+          kudu::client::KuduScanner::READ_LATEST :
+          kudu::client::KuduScanner::READ_AT_SNAPSHOT;
   KUDU_RETURN_IF_ERROR(scanner_->SetReadMode(mode), "Could not set scanner ReadMode");
   KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
       "Could not set scanner timeout");


[3/3] incubator-impala git commit: IMPALA-3909: Populate min/max statistics in Parquet writer

Posted by ta...@apache.org.
IMPALA-3909: Populate min/max statistics in Parquet writer

Change-Id: I8368ee58daa50c07a3b8ef65be70203eb941f619
Reviewed-on: http://gerrit.cloudera.org:8080/5611
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6251d8b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6251d8b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6251d8b4

Branch: refs/heads/master
Commit: 6251d8b4ddac3bdd6fb651f000aea15b7a0d1603
Parents: a5b7689
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Dec 5 19:18:51 2016 +0100
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Feb 2 06:44:48 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc | 209 ++++++++++-----
 be/src/exec/hdfs-parquet-table-writer.h  |   5 +
 be/src/exec/parquet-column-stats.h       | 201 +++++++++++++++
 be/src/exec/parquet-common.h             |  19 +-
 be/src/runtime/string-value.h            |   2 +-
 be/src/util/dict-test.cc                 |  14 +-
 tests/query_test/test_insert_parquet.py  | 356 ++++++++++++++++++++++++--
 tests/util/get_parquet_metadata.py       | 105 +++++++-
 8 files changed, 792 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 7ae55b7..9e9cb3e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -18,8 +18,10 @@
 #include "exec/hdfs-parquet-table-writer.h"
 
 #include "common/version.h"
-#include "exprs/expr.h"
+#include "exec/parquet-column-stats.h"
 #include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "rpc/thrift-util.h"
 #include "runtime/decimal-value.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -34,7 +36,6 @@
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
 #include "util/rle-encoding.h"
-#include "rpc/thrift-util.h"
 
 #include <sstream>
 
@@ -88,14 +89,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // expr - the expression to generate output values for this column.
   BaseColumnWriter(HdfsParquetTableWriter* parent, ExprContext* expr_ctx,
       const THdfsCompression::type& codec)
-    : parent_(parent), expr_ctx_(expr_ctx), codec_(codec),
-      page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(NULL), num_values_(0),
+    : parent_(parent),
+      expr_ctx_(expr_ctx),
+      codec_(codec),
+      page_size_(DEFAULT_DATA_PAGE_SIZE),
+      current_page_(nullptr),
+      num_values_(0),
       total_compressed_byte_size_(0),
       total_uncompressed_byte_size_(0),
-      dict_encoder_base_(NULL),
-      def_levels_(NULL),
-      values_buffer_len_(DEFAULT_DATA_PAGE_SIZE) {
-    Codec::CreateCompressor(NULL, false, codec, &compressor_);
+      dict_encoder_base_(nullptr),
+      def_levels_(nullptr),
+      values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
+      page_stats_base_(nullptr),
+      row_group_stats_base_(nullptr) {
+    Codec::CreateCompressor(nullptr, false, codec, &compressor_);
 
     def_levels_ = parent_->state_->obj_pool()->Add(
         new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
@@ -122,13 +129,24 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   Status Flush(int64_t* file_pos, int64_t* first_data_page,
       int64_t* first_dictionary_page);
 
+  // Encodes the row group statistics into a parquet::Statistics object and attaches it to
+  // 'meta_data'.
+  void EncodeRowGroupStats(ColumnMetaData* meta_data) {
+    DCHECK(row_group_stats_base_ != nullptr);
+    if (row_group_stats_base_->has_values()
+        && row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+      row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
+      meta_data->__isset.statistics = true;
+    }
+  }
+
   // Resets all the data accumulated for this column.  Memory can now be reused for
   // the next row group
   // Any data for previous row groups must be reset (e.g. dictionaries).
   // Subclasses must call this if they override this function.
   virtual void Reset() {
     num_data_pages_ = 0;
-    current_page_ = NULL;
+    current_page_ = nullptr;
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
     current_encoding_ = Encoding::PLAIN;
@@ -137,8 +155,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Close this writer. This is only called after Flush() and no more rows will
   // be added.
   void Close() {
-    if (compressor_.get() != NULL) compressor_->Close();
-    if (dict_encoder_base_ != NULL) dict_encoder_base_->ClearIndices();
+    if (compressor_.get() != nullptr) compressor_->Close();
+    if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices();
   }
 
   const ColumnType& type() const { return expr_ctx_->root()->type(); }
@@ -152,13 +170,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
-  // Encode value into the current page output buffer. Returns true if the value fits
-  // on the current page. If this function returned false, the caller should create a
-  // new page and try again with the same value.
+  // Encodes value into the current page output buffer and updates the column statistics
+  // aggregates. Returns true if the value fits on the current page. If this function
+  // returned false, the caller should create a new page and try again with the same
+  // value.
   // *bytes_needed will contain the (estimated) number of bytes needed to successfully
   // encode the value in the page.
   // Implemented in the subclass.
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) = 0;
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
 
   // Encodes out all data for the current page and updates the metadata.
   virtual void FinalizeCurrentPage();
@@ -194,7 +213,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   THdfsCompression::type codec_;
 
-  // Compression codec for this column.  If NULL, this column is will not be compressed.
+  // Compression codec for this column.  If nullptr, this column is will not be
+  // compressed.
   scoped_ptr<Codec> compressor_;
 
   vector<DataPage> pages_;
@@ -210,25 +230,31 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // TODO: Consider removing and only creating a single large page as necessary.
   int64_t page_size_;
 
+  // Pointer to the current page in 'pages_'. Not owned.
   DataPage* current_page_;
-  int64_t num_values_; // Total number of values across all pages, including NULLs.
+
+  int64_t num_values_; // Total number of values across all pages, including nullptr.
   int64_t total_compressed_byte_size_;
   int64_t total_uncompressed_byte_size_;
   Encoding::type current_encoding_;
 
-  // Created and set by the base class.
+  // Created, owned, and set by the derived class.
   DictEncoderBase* dict_encoder_base_;
 
-  // Rle encoder object for storing definition levels. For non-nested schemas,
-  // this always uses 1 bit per row.
-  // This is reused across pages since the underlying buffer is copied out when
-  // the page is finalized.
+  // Rle encoder object for storing definition levels, owned by instances of this class.
+  // For non-nested schemas, this always uses 1 bit per row. This is reused across pages
+  // since the underlying buffer is copied out when the page is finalized.
   RleEncoder* def_levels_;
 
-  // Data for buffered values. This is reused across pages.
+  // Data for buffered values. This is owned by instances of this class and gets reused
+  // across pages.
   uint8_t* values_buffer_;
   // The size of values_buffer_.
   int values_buffer_len_;
+
+  // Pointers to statistics, created, owned, and set by the derived class.
+  ColumnStatsBase* page_stats_base_;
+  ColumnStatsBase* row_group_stats_base_;
 };
 
 // Per type column writer.
@@ -237,10 +263,16 @@ class HdfsParquetTableWriter::ColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   ColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
-      const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec),
-      num_values_since_dict_size_check_(0) {
+      const THdfsCompression::type& codec)
+    : BaseColumnWriter(parent, ctx, codec),
+      num_values_since_dict_size_check_(0),
+      plain_encoded_value_size_(
+          ParquetPlainEncoder::EncodedByteSize(ctx->root()->type())),
+      page_stats_(plain_encoded_value_size_),
+      row_group_stats_(plain_encoded_value_size_) {
     DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN);
-    encoded_value_size_ = ParquetPlainEncoder::ByteSize(ctx->root()->type());
+    page_stats_base_ = &page_stats_;
+    row_group_stats_base_ = &row_group_stats_;
   }
 
   virtual void Reset() {
@@ -249,12 +281,14 @@ class HdfsParquetTableWriter::ColumnWriter :
     // it will fall back to plain.
     current_encoding_ = Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
-        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), encoded_value_size_));
+        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
     dict_encoder_base_ = dict_encoder_.get();
+    page_stats_.Reset();
+    row_group_stats_.Reset();
   }
 
  protected:
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
     if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
@@ -273,20 +307,22 @@ class HdfsParquetTableWriter::ColumnWriter :
       parent_->file_size_estimate_ += *bytes_needed;
     } else if (current_encoding_ == Encoding::PLAIN) {
       T* v = CastValue(value);
-      *bytes_needed = encoded_value_size_ < 0 ?
-          ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_;
+      *bytes_needed = plain_encoded_value_size_ < 0 ?
+          ParquetPlainEncoder::ByteSize<T>(*v) :
+          plain_encoded_value_size_;
       if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
         return false;
       }
       uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
       int64_t written_len =
-          ParquetPlainEncoder::Encode(dst_ptr, encoded_value_size_, *v);
+          ParquetPlainEncoder::Encode(dst_ptr, plain_encoded_value_size_, *v);
       DCHECK_EQ(*bytes_needed, written_len);
       current_page_->header.uncompressed_page_size += written_len;
     } else {
       // TODO: support other encodings here
       DCHECK(false);
     }
+    page_stats_.Update(*CastValue(value));
     return true;
   }
 
@@ -306,12 +342,18 @@ class HdfsParquetTableWriter::ColumnWriter :
   // The number of values added since we last checked the dictionary.
   int num_values_since_dict_size_check_;
 
-  // Size of each encoded value. -1 if the size is type is variable-length.
-  int64_t encoded_value_size_;
+  // Size of each encoded value in plain encoding. -1 if the type is variable-length.
+  int64_t plain_encoded_value_size_;
 
   // Temporary string value to hold CHAR(N)
   StringValue temp_;
 
+  // Tracks statistics per page.
+  ColumnStats<T> page_stats_;
+
+  // Tracks statistics per row group. This gets reset when starting a new row group.
+  ColumnStats<T> row_group_stats_;
+
   // Converts a slot pointer to a raw value suitable for encoding
   inline T* CastValue(void* value) {
     return reinterpret_cast<T*>(value);
@@ -334,23 +376,30 @@ class HdfsParquetTableWriter::BoolColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   BoolColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
-      const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec) {
+      const THdfsCompression::type& codec)
+    : BaseColumnWriter(parent, ctx, codec), page_stats_(-1), row_group_stats_(-1) {
     DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN);
     bool_values_ = parent_->state_->obj_pool()->Add(
         new BitWriter(values_buffer_, values_buffer_len_));
     // Dictionary encoding doesn't make sense for bools and is not allowed by
     // the format.
     current_encoding_ = Encoding::PLAIN;
-    dict_encoder_base_ = NULL;
+    dict_encoder_base_ = nullptr;
+
+    page_stats_base_ = &page_stats_;
+    row_group_stats_base_ = &row_group_stats_;
   }
 
  protected:
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
-    return bool_values_->PutValue(*reinterpret_cast<bool*>(value), 1);
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
+    bool v = *reinterpret_cast<bool*>(value);
+    if (!bool_values_->PutValue(v, 1)) return false;
+    page_stats_.Update(v);
+    return true;
   }
 
   virtual void FinalizeCurrentPage() {
-    DCHECK(current_page_ != NULL);
+    DCHECK(current_page_ != nullptr);
     if (current_page_->finalized) return;
     bool_values_->Flush();
     int num_bytes = bool_values_->bytes_written();
@@ -363,6 +412,12 @@ class HdfsParquetTableWriter::BoolColumnWriter :
  private:
   // Used to encode bools as single bit values. This is reused across pages.
   BitWriter* bool_values_;
+
+  // Tracks statistics per page.
+  ColumnStats<bool> page_stats_;
+
+  // Tracks statistics per row group. This gets reset when starting a new file.
+  ColumnStats<bool> row_group_stats_;
 };
 
 }
@@ -370,7 +425,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
 inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) {
   ++num_values_;
   void* value = expr_ctx_->GetValue(row);
-  if (current_page_ == NULL) NewPage();
+  if (current_page_ == nullptr) NewPage();
 
   // Ensure that we have enough space for the definition level, but don't write it yet in
   // case we don't have enough space for the value.
@@ -386,10 +441,10 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // this won't loop forever.
   while (true) {
     // Nulls don't get encoded.
-    if (value == NULL) break;
+    if (value == nullptr) break;
 
     int64_t bytes_needed = 0;
-    if (EncodeValue(value, &bytes_needed)) {
+    if (ProcessValue(value, &bytes_needed)) {
       ++current_page_->num_non_null;
       break;
     }
@@ -416,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   }
 
   // Now that the value has been successfully written, write the definition level.
-  bool ret = def_levels_->Put(value != NULL);
+  bool ret = def_levels_->Put(value != nullptr);
   // Writing the def level will succeed because we ensured there was enough space for it
   // above, and new pages will always have space for at least a single def level.
   DCHECK(ret);
@@ -426,7 +481,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
 }
 
 inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
-  DCHECK(dict_encoder_base_ != NULL);
+  DCHECK(dict_encoder_base_ != nullptr);
   DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
   if (current_page_->num_non_null == 0) return;
   int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
@@ -443,7 +498,7 @@ inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
 
 Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
    int64_t* first_data_page, int64_t* first_dictionary_page) {
-  if (current_page_ == NULL) {
+  if (current_page_ == nullptr) {
     // This column/file is empty
     *first_data_page = *file_pos;
     *first_dictionary_page = -1;
@@ -454,7 +509,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
   *first_dictionary_page = -1;
   // First write the dictionary page before any of the data pages.
-  if (dict_encoder_base_ != NULL) {
+  if (dict_encoder_base_ != nullptr) {
     *first_dictionary_page = *file_pos;
     // Write dictionary page header
     DictionaryPageHeader dict_header;
@@ -470,7 +525,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
         header.uncompressed_page_size);
     dict_encoder_base_->WriteDict(dict_buffer);
-    if (compressor_.get() != NULL) {
+    if (compressor_.get() != nullptr) {
       SCOPED_TIMER(parent_->parent_->compress_timer());
       int64_t max_compressed_size =
           compressor_->MaxOutputLen(header.uncompressed_page_size);
@@ -515,7 +570,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     }
 
     // Write data page header
-    uint8_t* buffer = NULL;
+    uint8_t* buffer = nullptr;
     uint32_t len = 0;
     RETURN_IF_ERROR(
         parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer));
@@ -530,10 +585,10 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 }
 
 void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
-  DCHECK(current_page_ != NULL);
+  DCHECK(current_page_ != nullptr);
   if (current_page_->finalized) return;
 
-  // If the entire page was NULL, encode it as PLAIN since there is no
+  // If the entire page was nullptr, encode it as PLAIN since there is no
   // data anyway. We don't output a useless dictionary page and it works
   // around a parquet MR bug (see IMPALA-759 for more details).
   if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -549,8 +604,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   header.uncompressed_page_size += current_page_->num_def_bytes;
 
   // At this point we know all the data for the data page.  Combine them into one buffer.
-  uint8_t* uncompressed_data = NULL;
-  if (compressor_.get() == NULL) {
+  uint8_t* uncompressed_data = nullptr;
+  if (compressor_.get() == nullptr) {
     uncompressed_data =
         parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
   } else {
@@ -571,8 +626,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   buffer.Append(values_buffer_, buffer.capacity() - buffer.size());
 
   // Apply compression if necessary
-  if (compressor_.get() == NULL) {
-    current_page_->data = reinterpret_cast<uint8_t*>(uncompressed_data);
+  if (compressor_.get() == nullptr) {
+    current_page_->data = uncompressed_data;
     header.compressed_page_size = header.uncompressed_page_size;
   } else {
     SCOPED_TIMER(parent_->parent_->compress_timer());
@@ -591,6 +646,18 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
         max_compressed_size - header.compressed_page_size);
   }
 
+  // Build page statistics and add them to the header.
+  DCHECK(page_stats_base_ != nullptr);
+  if (page_stats_base_->has_values()
+      && page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+    page_stats_base_->EncodeToThrift(&header.data_page_header.statistics);
+    header.data_page_header.__isset.statistics = true;
+  }
+
+  // Update row group statistics from page statistics.
+  DCHECK(row_group_stats_base_ != nullptr);
+  row_group_stats_base_->Merge(*page_stats_base_);
+
   // Add the size of the data page header
   uint8_t* header_buffer;
   uint32_t header_len = 0;
@@ -623,21 +690,20 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
   }
   current_page_->finalized = false;
   current_page_->num_non_null = 0;
+  page_stats_base_->Reset();
 }
 
 HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
     OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
     const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& output_expr_ctxs)
-    : HdfsTableWriter(
-        parent, state, output, part_desc, table_desc, output_expr_ctxs),
-      thrift_serializer_(new ThriftSerializer(true)),
-      current_row_group_(NULL),
-      row_count_(0),
-      file_size_limit_(0),
-      reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
-      per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
-      row_idx_(0) {
-}
+  : HdfsTableWriter(parent, state, output, part_desc, table_desc, output_expr_ctxs),
+    thrift_serializer_(new ThriftSerializer(true)),
+    current_row_group_(nullptr),
+    row_count_(0),
+    file_size_limit_(0),
+    reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
+    per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
+    row_idx_(0) {}
 
 HdfsParquetTableWriter::~HdfsParquetTableWriter() {
 }
@@ -671,7 +737,7 @@ Status HdfsParquetTableWriter::Init() {
   columns_.resize(table_desc_->num_cols() - table_desc_->num_clustering_cols());
   // Initialize each column structure.
   for (int i = 0; i < columns_.size(); ++i) {
-    BaseColumnWriter* writer = NULL;
+    BaseColumnWriter* writer = nullptr;
     const ColumnType& type = output_expr_ctxs_[i]->root()->type();
     switch (type.type) {
       case TYPE_BOOLEAN:
@@ -776,7 +842,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
 }
 
 Status HdfsParquetTableWriter::AddRowGroup() {
-  if (current_row_group_ != NULL) RETURN_IF_ERROR(FlushCurrentRowGroup());
+  if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
   file_metadata_.row_groups.push_back(RowGroup());
   current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
 
@@ -825,7 +891,7 @@ uint64_t HdfsParquetTableWriter::default_block_size() const {
 }
 
 Status HdfsParquetTableWriter::InitNewFile() {
-  DCHECK(current_row_group_ == NULL);
+  DCHECK(current_row_group_ == nullptr);
 
   per_file_mem_pool_->Clear();
 
@@ -939,7 +1005,7 @@ Status HdfsParquetTableWriter::WriteFileHeader() {
 }
 
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
-  if (current_row_group_ == NULL) return Status::OK();
+  if (current_row_group_ == nullptr) return Status::OK();
 
   int num_clustering_cols = table_desc_->num_clustering_cols();
   for (int i = 0; i < columns_.size(); ++i) {
@@ -965,6 +1031,9 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
     parquet_stats_.per_column_size[col_name] += columns_[i]->total_compressed_size();
 
+    // Build column statistics and add them to the header.
+    columns_[i]->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
+
     // Since we don't supported complex schemas, all columns should have the same
     // number of values.
     DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
@@ -973,7 +1042,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     // Metadata for this column is complete, write it out to file.  The column metadata
     // goes at the end so that when we have collocated files, the column data can be
     // written without buffering.
-    uint8_t* buffer = NULL;
+    uint8_t* buffer = nullptr;
     uint32_t len = 0;
     RETURN_IF_ERROR(
         thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, &buffer));
@@ -983,14 +1052,14 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     columns_[i]->Reset();
   }
 
-  current_row_group_ = NULL;
+  current_row_group_ = nullptr;
   return Status::OK();
 }
 
 Status HdfsParquetTableWriter::WriteFileFooter() {
   // Write file_meta_data
   uint32_t file_metadata_len = 0;
-  uint8_t* buffer = NULL;
+  uint8_t* buffer = nullptr;
   RETURN_IF_ERROR(
       thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, &buffer));
   RETURN_IF_ERROR(Write(buffer, file_metadata_len));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index 4e16707..94ad932 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -100,6 +100,11 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// Minimum file size.  If the configured size is less, fail.
   static const int HDFS_MIN_FILE_SIZE = 8 * 1024 * 1024;
 
+  /// Maximum statistics size. If the size of a single thrift parquet::Statistics struct
+  /// for a page or row group exceed this value, we'll not write it. We use the same value
+  /// as 'parquet-mr'.
+  static const int MAX_COLUMN_STATS_SIZE = 4 * 1024;
+
   /// Per-column information state.  This contains some metadata as well as the
   /// data buffers.
   class BaseColumnWriter;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
new file mode 100644
index 0000000..8bcf4de
--- /dev/null
+++ b/be/src/exec/parquet-column-stats.h
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+
+#include <type_traits>
+
+namespace impala {
+
+/// This class, together with its derivatives, is used to track column statistics when
+/// writing parquet files. It provides an interface to populate a parquet::Statistics
+/// object and attach it to an object supplied by the caller.
+///
+/// We currently support tracking 'min' and 'max' values for statistics. The other two
+/// statistical values in parquet.thrift, 'null_count' and 'distinct_count' are not
+/// tracked or populated.
+///
+/// Regarding the ordering of values, we follow the parquet-mr reference implementation.
+///
+/// Numeric values (BOOLEAN, INT, FLOAT, DOUBLE) are ordered by their numeric
+/// value (as opposed to their binary representation).
+///
+/// We currently don't write statistics for DECIMAL values and character array values
+/// (CHAR, VARCHAR, STRING) due to several issues with parquet-mr and subsequently, Hive
+/// (PARQUET-251, PARQUET-686). For those types, the Update() method is empty, so that the
+/// stats are not tracked.
+///
+/// NULL values are not considered for min/max statistics, and if a column consists only
+/// of NULL values, then no min/max statistics are written.
+///
+/// Updating the statistics is handled in derived classes to alleviate the need for
+/// virtual function calls.
+///
+/// TODO: Populate null_count and distinct_count.
+class ColumnStatsBase {
+ public:
+  ColumnStatsBase() : has_values_(false) {}
+  virtual ~ColumnStatsBase() {}
+
+  /// Merges this statistics object with values from 'other'. If other has not been
+  /// initialized, then this object will not be changed.
+  virtual void Merge(const ColumnStatsBase& other) = 0;
+
+  /// Returns the number of bytes needed to encode the current statistics into a
+  /// parquet::Statistics object.
+  virtual int64_t BytesNeeded() const = 0;
+
+  /// Encodes the current values into a Statistics thrift message.
+  virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
+
+  /// Resets the state of this object.
+  void Reset() { has_values_ = false; }
+
+  bool has_values() const { return has_values_; }
+
+ protected:
+  /// Stores whether the current object has been initialized with a set of values.
+  bool has_values_;
+};
+
+/// This class contains the type-specific behavior to track statistics per column.
+template <typename T>
+class ColumnStats : public ColumnStatsBase {
+  // We explicitly require types to be listed here in order to support column statistics.
+  // When adding a type here, users of this class need to ensure that the statistics
+  // follow the ordering semantics of parquet's min/max statistics for the new type.
+  // Details on how the values should be ordered can be found in the 'parquet-format'
+  // project in 'parquet.thrift' and 'LogicalTypes.md'.
+  using value_type = typename std::enable_if<
+      std::is_arithmetic<T>::value
+        || std::is_same<bool, T>::value
+        || std::is_same<StringValue, T>::value
+        || std::is_same<TimestampValue, T>::value
+        || std::is_same<Decimal4Value, T>::value
+        || std::is_same<Decimal8Value, T>::value
+        || std::is_same<Decimal16Value, T>::value,
+      T>::type;
+
+ public:
+  ColumnStats(int plain_encoded_value_size)
+    : ColumnStatsBase(), plain_encoded_value_size_(plain_encoded_value_size) {}
+
+  /// Updates the statistics based on the value 'v'. If necessary, initializes the
+  /// statistics.
+  void Update(const T& v) {
+    if (!has_values_) {
+      has_values_ = true;
+      min_value_ = v;
+      max_value_ = v;
+    } else {
+      min_value_ = min(min_value_, v);
+      max_value_ = max(max_value_, v);
+    }
+  }
+
+  virtual void Merge(const ColumnStatsBase& other) override {
+    DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
+    const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
+    if (!cs->has_values_) return;
+    if (!has_values_) {
+      has_values_ = true;
+      min_value_ = cs->min_value_;
+      max_value_ = cs->max_value_;
+    } else {
+      min_value_ = min(min_value_, cs->min_value_);
+      max_value_ = max(max_value_, cs->max_value_);
+    }
+  }
+
+  virtual int64_t BytesNeeded() const override {
+    return BytesNeededInternal(min_value_) + BytesNeededInternal(max_value_);
+  }
+
+  virtual void EncodeToThrift(parquet::Statistics* out) const override {
+    DCHECK(has_values_);
+    string min_str;
+    EncodeValueToString(min_value_, &min_str);
+    out->__set_min(move(min_str));
+    string max_str;
+    EncodeValueToString(max_value_, &max_str);
+    out->__set_max(move(max_str));
+  }
+
+ protected:
+  /// Encodes a single value using parquet's PLAIN encoding and stores it into the
+  /// binary string 'out'.
+  void EncodeValueToString(const T& v, string* out) const {
+    int64_t bytes_needed = BytesNeededInternal(v);
+    out->resize(bytes_needed);
+    int64_t bytes_written = ParquetPlainEncoder::Encode(
+        reinterpret_cast<uint8_t*>(&(*out)[0]), bytes_needed, v);
+    DCHECK_EQ(bytes_needed, bytes_written);
+  }
+
+  /// Returns the number of bytes needed to encode value 'v'.
+  int64_t BytesNeededInternal(const T& v) const {
+    return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
+        plain_encoded_value_size_;
+  }
+
+  // Size of each encoded value in plain encoding, -1 if the type is variable-length.
+  int plain_encoded_value_size_;
+
+  // Minimum value since the last call to Reset().
+  T min_value_;
+
+  // Maximum value since the last call to Reset().
+  T max_value_;
+};
+
+/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus
+/// needs special handling here.
+template <>
+void ColumnStats<bool>::EncodeValueToString(const bool& v, string* out) const {
+  char c = v;
+  out->assign(1, c);
+}
+
+template <>
+int64_t ColumnStats<bool>::BytesNeededInternal(const bool& v) const {
+  return 1;
+}
+
+/// parquet-mr and subsequently Hive currently do not handle the following types
+/// correctly (PARQUET-251, PARQUET-686), so we disable support for them.
+/// The relevant Impala Jiras are for
+/// - StringValue    IMPALA-4817
+/// - TimestampValue IMPALA-4819
+/// - DecimalValue   IMPALA-4815
+template <>
+void ColumnStats<StringValue>::Update(const StringValue& v) {}
+
+template <>
+void ColumnStats<TimestampValue>::Update(const TimestampValue& v) {}
+
+template <>
+void ColumnStats<Decimal4Value>::Update(const Decimal4Value& v) {}
+
+template <>
+void ColumnStats<Decimal8Value>::Update(const Decimal8Value& v) {}
+
+template <>
+void ColumnStats<Decimal16Value>::Update(const Decimal16Value& v) {}
+
+} // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index d4ffa3d..ff82fed 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -90,7 +90,7 @@ class ParquetPlainEncoder {
 
   /// Returns the encoded size of values of type t. Returns -1 if it is variable
   /// length. This can be different than the slot size of the types.
-  static int ByteSize(const ColumnType& t) {
+  static int EncodedByteSize(const ColumnType& t) {
     switch (t.type) {
       case TYPE_STRING:
       case TYPE_VARCHAR:
@@ -185,20 +185,13 @@ class ParquetPlainEncoder {
     memcpy(v, buffer, byte_size);
     return byte_size;
   }
-
-  /// Encode 't', which must be in the machine endian, to FIXED_LEN_BYTE_ARRAY
-  /// of 'fixed_len_size'. The result is encoded as big endian.
-  template <typename T>
-  static int EncodeToFixedLenByteArray(uint8_t* buffer, int fixed_len_size, const T& t);
-
-  /// Decodes into v assuming buffer is encoded using FIXED_LEN_BYTE_ARRAY of
-  /// 'fixed_len_size'. The bytes in buffer must be big endian and the result stored in
-  /// v is the machine endian format. The caller is responsible for ensuring that
-  /// 'buffer' is at least 'fixed_len_size' bytes long.
-  template<typename T>
-  static int DecodeFromFixedLenByteArray(uint8_t* buffer, int fixed_len_size, T* v);
 };
 
+/// Calling this with arguments of type ColumnType is certainly a programmer error, so we
+/// disallow it.
+template <>
+int ParquetPlainEncoder::ByteSize(const ColumnType& t);
+
 /// Disable for bools. Plain encoding is not used for booleans.
 template<> int ParquetPlainEncoder::ByteSize(const bool& b);
 template<> int ParquetPlainEncoder::Encode(uint8_t*, int fixed_len_size, const bool&);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/runtime/string-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h
index cc905fe..3fce865 100644
--- a/be/src/runtime/string-value.h
+++ b/be/src/runtime/string-value.h
@@ -31,7 +31,7 @@ namespace impala {
 
 /// The format of a string-typed slot.
 /// The returned StringValue of all functions that return StringValue
-/// shares its buffer the parent.
+/// shares its buffer with the parent.
 /// TODO: rename this to be less confusing with impala_udf::StringVal.
 struct StringValue {
   /// The current limitation for a string instance is 1GB character data.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 39cba66..a14922c 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -95,7 +95,7 @@ TEST(DictTest, TestTimestamps) {
   values.push_back(tv1);
   values.push_back(tv1);
 
-  ValidateDict(values, ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TIMESTAMP)));
+  ValidateDict(values, ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
 }
 
 template<typename T>
@@ -125,12 +125,12 @@ void TestNumbers(int value_byte_size) {
 }
 
 TEST(DictTest, TestNumbers) {
-  TestNumbers<int8_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TINYINT)));
-  TestNumbers<int16_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_SMALLINT)));
-  TestNumbers<int32_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_INT)));
-  TestNumbers<int64_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_BIGINT)));
-  TestNumbers<float>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_FLOAT)));
-  TestNumbers<double>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_DOUBLE)));
+  TestNumbers<int8_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
+  TestNumbers<int16_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
+  TestNumbers<int32_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
+  TestNumbers<int64_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
+  TestNumbers<float>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
+  TestNumbers<double>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
 
   for (int i = 1; i <= 16; ++i) {
     if (i <= 4) TestNumbers<Decimal4Value>(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index b7cb285..58a3d74 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -18,8 +18,8 @@
 # Targeted Impala insert tests
 
 import os
-import pytest
 
+from collections import namedtuple
 from shutil import rmtree
 from subprocess import check_call
 from tempfile import mkdtemp as make_tmp_dir
@@ -30,16 +30,35 @@ from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
-from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import get_parquet_metadata, decode_stats_value
 
 # TODO: Add Gzip back.  IMPALA-424
 PARQUET_CODECS = ['none', 'snappy']
 
+
+class RoundFloat():
+  """Class to compare floats after rounding them to a specified number of digits. This
+  can be used in scenarios where floating point precision is an issue.
+  """
+  def __init__(self, value, num_digits):
+    self.value = value
+    self.num_digits = num_digits
+
+  def __eq__(self, numeral):
+    """Compares this objects's value to a numeral after rounding it."""
+    return round(self.value, self.num_digits) == round(numeral, self.num_digits)
+
+ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
+
 # Test a smaller parquet file size as well
 # TODO: these tests take a while so we don't want to go through too many sizes but
 # we should in more exhaustive testing
 PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024]
+
+
 class TestInsertParquetQueries(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -57,14 +76,14 @@ class TestInsertParquetQueries(ImpalaTestSuite):
         sync_ddl=[1]))
 
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", *PARQUET_CODECS));
+        ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES));
+        ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES))
 
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   @UniqueDatabase.parametrize(sync_ddl=True)
@@ -75,7 +94,9 @@ class TestInsertParquetQueries(ImpalaTestSuite):
         vector.get_value('compression_codec')
     self.run_test_case('insert_parquet', vector, unique_database, multiple_impalad=True)
 
+
 class TestInsertParquetInvalidCodec(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -88,21 +109,22 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite):
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
         sync_ddl=[1]))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", 'bzip2'));
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+        ImpalaTestDimension("compression_codec", 'bzip2'))
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   def test_insert_parquet_invalid_codec(self, vector):
     vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
         vector.get_value('compression_codec')
-    self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,\
+    self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,
                        multiple_impalad=True)
 
 
 class TestInsertParquetVerifySize(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -114,12 +136,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
         sync_ddl=[1]))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        v.get_value('table_format').compression_codec == 'none')
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", *PARQUET_CODECS));
+        ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
 
   @SkipIfIsilon.hdfs_block_size
   @SkipIfLocal.hdfs_client
@@ -149,10 +171,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
       assert size < block_size, "File size greater than expected.\
           Expected: {0}, Got: {1}".format(block_size, size)
       if size < block_size * 0.80:
-        assert found_small_file == False
+        assert not found_small_file
         found_small_file = True
 
+
 class TestHdfsParquetTableWriter(ImpalaTestSuite):
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -171,12 +195,12 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "%s.%s" % (unique_database, table_name)
     self.execute_query("create table %s stored as parquet as select l_linenumber from "
-        "tpch_parquet.lineitem limit 180000" % qualified_table_name)
+                       "tpch_parquet.lineitem limit 180000" % qualified_table_name)
 
     tmp_dir = make_tmp_dir()
     try:
       hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
-          % (unique_database, table_name))
+                              % (unique_database, table_name))
       check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
 
       for root, subdirs, files in os.walk(tmp_dir):
@@ -184,7 +208,293 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
           if not f.endswith('parq'):
             continue
           check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
-              os.path.join(tmp_dir, str(f))])
+                      os.path.join(tmp_dir, str(f))])
     finally:
       self.execute_query("drop table %s" % qualified_table_name)
       rmtree(tmp_dir)
+
+
+class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableStatsWriter, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _decode_row_group_stats(self, schemas, row_group_stats):
+    """Decodes and return a list of statistics for a single row group."""
+    decoded = []
+    assert len(schemas) == len(row_group_stats)
+    for schema, stats in zip(schemas, row_group_stats):
+      if stats is None:
+        decoded.append(None)
+        continue
+
+      if stats.min is None and stats.max is None:
+        decoded.append(None)
+        continue
+
+      assert stats.min is not None and stats.max is not None
+      min_value = decode_stats_value(schema, stats.min)
+      max_value = decode_stats_value(schema, stats.max)
+      decoded.append(ColumnStats(schema.name, min_value, max_value))
+
+    assert len(decoded) == len(schemas)
+    return decoded
+
+  def _get_row_group_stats_from_file(self, parquet_file):
+    """Returns a list of statistics for each row group in file 'parquet_file'. The result
+    is a two-dimensional list, containing stats by row group and column."""
+    file_meta_data = get_parquet_metadata(parquet_file)
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    file_stats = []
+    for row_group in file_meta_data.row_groups:
+      num_columns = len(row_group.columns)
+      assert num_columns == len(schemas)
+      column_stats = [c.meta_data.statistics for c in row_group.columns]
+      file_stats.append(self._decode_row_group_stats(schemas, column_stats))
+
+    return file_stats
+
+  def _get_row_group_stats_from_hdfs_folder(self, hdfs_path):
+    """Returns a list of statistics for each row group in all parquet files in
+    'hdfs_path'. The result is a two-dimensional list, containing stats by row group and
+    column."""
+    row_group_stats = []
+
+    try:
+      tmp_dir = make_tmp_dir()
+      check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+
+      for root, subdirs, files in os.walk(tmp_dir):
+        for f in files:
+          parquet_file = os.path.join(root, str(f))
+          row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
+
+    finally:
+      rmtree(tmp_dir)
+
+    return row_group_stats
+
+  def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
+    statistics in that file match the values in 'expected_values'. Columns indexed by
+    'skip_col_idx' are excluded from the verification of the expected values.
+    """
+    skip_col_idxs = skip_col_idxs or []
+    # The caller has to make sure that the table fits into a single row group. We enforce
+    # it here to make sure the results are predictable and independent of how the data
+    # could get written across multiple files.
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    assert(len(row_group_stats)) == 1
+    table_stats = row_group_stats[0]
+
+    num_columns = len(table_stats)
+    assert num_columns == len(expected_values)
+
+    for col_idx, stats, expected in zip(range(num_columns), table_stats, expected_values):
+      if col_idx in skip_col_idxs:
+        continue
+      if not expected:
+        assert not stats
+        continue
+      assert stats == expected
+
+  def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
+                                   expected_values, hive_skip_col_idx = None):
+    """Copies 'source_table' into a parquet table and makes sure that the row group
+    statistics in the resulting parquet file match those in 'expected_values'. The
+    comparison is performed against both Hive and Impala. For Hive, columns indexed by
+    'hive_skip_col_idx' are excluded from the verification of the expected values.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+
+    # Validate against Hive.
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    self.run_stmt_in_hive("create table {0} stored as parquet as select * from "
+                          "{1}".format(qualified_table_name, source_table))
+    self.execute_query("invalidate metadata {0}".format(qualified_table_name))
+    self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx)
+
+    # Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit
+    # clause ensures that the query is executed on the coordinator, resulting in a single
+    # parquet file being written.
+    num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
+    self.execute_query("drop table {0}".format(qualified_table_name))
+    query = ("create table {0} stored as parquet as select * from {1} limit "
+             "{2}").format(qualified_table_name, source_table, num_rows)
+    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_min_max_stats(hdfs_path, expected_values)
+
+  def test_write_statistics_alltypes(self, vector, unique_database):
+    """Test that writing a parquet file populates the rowgroup statistics with the correct
+    values.
+    """
+    # Expected values for functional.alltypes
+    expected_min_max_values = [
+        ColumnStats('id', 0, 7299),
+        ColumnStats('bool_col', False, True),
+        ColumnStats('tinyint_col', 0, 9),
+        ColumnStats('smallint_col', 0, 9),
+        ColumnStats('int_col', 0, 9),
+        ColumnStats('bigint_col', 0, 90),
+        ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
+        ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
+        None,
+        None,
+        None,
+        ColumnStats('year', 2009, 2010),
+        ColumnStats('month', 1, 12),
+    ]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = [8, 9, 10]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
+                                      expected_min_max_values, hive_skip_col_idx)
+
+  def test_write_statistics_decimal(self, vector, unique_database):
+    """Test that Impala does not write statistics for decimal columns."""
+    # Expected values for functional.decimal_tbl
+    expected_min_max_values = [None, None, None, None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
+                                      expected_min_max_values, hive_skip_col_idx)
+
+  def test_write_statistics_multi_page(self, vector, unique_database):
+    """Test that writing a parquet file populates the rowgroup statistics with the correct
+    values. This test write a single parquet file with several pages per column.
+    """
+    # Expected values for tpch_parquet.customer
+    expected_min_max_values = [
+        ColumnStats('c_custkey', 1, 150000),
+        None,
+        None,
+        ColumnStats('c_nationkey', 0, 24),
+        None,
+        None,
+        None,
+        None,
+    ]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = [1, 2, 4, 5, 6, 7]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
+                                      expected_min_max_values, hive_skip_col_idx)
+
+  def test_write_statistics_null(self, vector, unique_database):
+    """Test that we don't write min/max statistics for null columns."""
+    expected_min_max_values = [None, None, None, None, None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
+                                      expected_min_max_values, hive_skip_col_idx)
+
+  def test_write_statistics_char_types(self, vector, unique_database):
+    """Test that Impala does not write statistics for char columns."""
+    expected_min_max_values = [None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, "functional.chars_formats",
+                                      expected_min_max_values, hive_skip_col_idx)
+
+  def test_write_statistics_negative(self, vector, unique_database):
+    """Test that Impala correctly writes statistics for negative values."""
+    view_name = "test_negative_view"
+    qualified_view_name = "{0}.{1}".format(unique_database, view_name)
+
+    # Create a view to generate test data with negative values by negating every other
+    # row.
+    create_view_stmt = """create view {0} as select
+        id * cast(pow(-1, id % 2) as int) as id,
+        int_col * cast(pow(-1, id % 2) as int) as int_col,
+        bigint_col * cast(pow(-1, id % 2) as bigint) as bigint_col,
+        float_col * pow(-1, id % 2) as float_col,
+        double_col * pow(-1, id % 2) as double_col
+        from functional.alltypes""".format(qualified_view_name)
+    self.execute_query(create_view_stmt)
+
+    expected_min_max_values = [
+        ColumnStats('id', -7299, 7298),
+        ColumnStats('int_col', -9, 8),
+        ColumnStats('bigint_col', -90, 80),
+        ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)),
+        ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)),
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name,
+                                      expected_min_max_values)
+
+  def test_write_statistics_multiple_row_groups(self, vector, unique_database):
+    """Test that writing multiple row groups works as expected. This is done by inserting
+    into a table using the sortby() hint and then making sure that the min and max values
+    of row groups don't overlap."""
+    source_table = "tpch_parquet.orders"
+    target_table = "test_hdfs_parquet_table_writer"
+    qualified_target_table = "{0}.{1}".format(unique_database, target_table)
+    hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(
+        unique_database, target_table))
+
+    # Insert a large amount of data on a single backend with a limited parquet file size.
+    # This will result in several files being written, exercising code that tracks
+    # statistics for row groups.
+    num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
+    query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
+                                                                 source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} limit"
+             "{2}").format(qualified_target_table, source_table, num_rows)
+    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
+    vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024
+    self.execute_query(query, vector.get_value('exec_option'))
+
+    # Get all stats for the o_orderkey column
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    assert len(row_group_stats) > 1
+    orderkey_stats = [s[0] for s in row_group_stats]
+
+    # Make sure that they don't overlap by ordering by the min value, then looking at
+    # boundaries.
+    orderkey_stats.sort(key = lambda s: s.min)
+    for l, r in zip(orderkey_stats, orderkey_stats[1:]):
+      assert l.max <= r.min
+
+  def test_write_statistics_float_infinity(self, vector, unique_database):
+    """Test that statistics for -inf and inf are written correctly."""
+    table_name = "test_float_infinity"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+
+    create_table_stmt = "create table {0} (f float, d double);".format(
+        qualified_table_name)
+    self.execute_query(create_table_stmt)
+
+    insert_stmt = """insert into {0} values
+        (cast('-inf' as float), cast('-inf' as double)),
+        (cast('inf' as float), cast('inf' as double))""".format(qualified_table_name)
+    self.execute_query(insert_stmt)
+
+    expected_min_max_values = [
+        ColumnStats('f', float('-inf'), float('inf')),
+        ColumnStats('d', float('-inf'), float('inf')),
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
+                                      expected_min_max_values)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/util/get_parquet_metadata.py
----------------------------------------------------------------------
diff --git a/tests/util/get_parquet_metadata.py b/tests/util/get_parquet_metadata.py
index cb417a5..8cf0405 100644
--- a/tests/util/get_parquet_metadata.py
+++ b/tests/util/get_parquet_metadata.py
@@ -18,15 +18,110 @@
 import os
 import struct
 
-from parquet.ttypes import FileMetaData
+from datetime import date, datetime, time, timedelta
+from decimal import Decimal
+from parquet.ttypes import FileMetaData, Type
 from thrift.protocol import TCompactProtocol
 from thrift.transport import TTransport
 
 PARQUET_VERSION_NUMBER = 'PAR1'
 
-def parse_int(s):
-  """Reinterprets the string 's' as a 4-byte integer."""
-  return struct.unpack('i', s)[0]
+
+def julian_day_to_date(julian_day):
+  """Converts a julian day number into a Gregorian date. The reference date is picked
+  arbitrarily and can be validated with an online converter like
+  http://aa.usno.navy.mil/jdconverter?ID=AA&jd=2457755
+  """
+  return date(2017, 01, 01) + timedelta(julian_day - 2457755)
+
+
+def nanos_to_time(nanos):
+  """Converts nanoseconds to time of day."""
+  micros = nanos // 1000  # integer division
+  seconds, micros = divmod(micros, 10**6)
+  minutes, seconds = divmod(seconds, 60)
+  hours, minutes = divmod(minutes, 60)
+  return time(hours, minutes, seconds, micros)
+
+
+def parse_boolean(s):
+  """Parses a single boolean value from a single byte"""
+  return struct.unpack('<?', s)[0]
+
+
+def parse_int32(s):
+  """Reinterprets the string 's' as a signed 4-byte integer."""
+  return struct.unpack('<i', s)[0]
+
+
+def parse_int64(s):
+  """Reinterprets the string 's' as a signed 8-byte integer."""
+  return struct.unpack('<q', s)[0]
+
+
+def parse_float(s):
+  """Reinterprets the string 's' as an IEEE single precision float."""
+  return struct.unpack('<f', s)[0]
+
+
+def parse_double(s):
+  """Reinterprets the string 's' as an IEEE double precision float."""
+  return struct.unpack('<d', s)[0]
+
+
+def decode_timestamp(s):
+  """Reinterprets the string 's' as a 12-byte timestamp as written by Impala and decode it
+  into a datetime object."""
+  # Impala writes timestamps as 12-byte values. The first 8 byte store a
+  # boost::posix_time::time_duration, which is the time within the current day in
+  # nanoseconds stored as int64. The last 4 bytes store a boost::gregorian::date,
+  # which is the Julian date, stored as utin32.
+  day_nanos, julian_day = struct.unpack('<qI', s)
+  return datetime.combine(julian_day_to_date(julian_day), nanos_to_time(day_nanos))
+
+
+def decode_decimal(schema, value):
+  """Decodes 'value' into a decimal by interpreting its contents according to 'schema'."""
+  assert len(value) > 0
+  assert schema.type_length == len(value)
+  assert schema.type == Type.FIXED_LEN_BYTE_ARRAY
+
+  numeric = Decimal(reduce(lambda x, y: x * 256 + y, map(ord, value)))
+
+  # Compute two's complement for negative values.
+  if (ord(value[0]) > 127):
+    bit_width = 8 * len(value)
+    numeric = numeric - (2 ** bit_width)
+
+  return numeric / 10 ** schema.scale
+
+
+def decode_stats_value(schema, value):
+  """Decodes 'value' according to 'schema. It expects 'value' to be plain encoded. For
+  BOOLEAN values, only the least significant bit is parsed and returned. Binary arrays are
+  expected to be stored as such, without a preceding length."""
+  column_type = schema.type
+  if column_type == Type.BOOLEAN:
+    return parse_boolean(value)
+  elif column_type == Type.INT32:
+    return parse_int32(value)
+  elif column_type == Type.INT64:
+    return parse_int64(value)
+  elif column_type == Type.INT96:
+    # Impala uses INT96 to store timestamps
+    return decode_timestamp(value)
+  elif column_type == Type.FLOAT:
+    return parse_float(value)
+  elif column_type == Type.DOUBLE:
+    return parse_double(value)
+  elif column_type == Type.BYTE_ARRAY:
+    # In parquet::Statistics, strings are stored as is.
+    return value
+  elif column_type == Type.FIXED_LEN_BYTE_ARRAY:
+    return decode_decimal(schema, value)
+  assert False
+  return None
+
 
 def get_parquet_metadata(filename):
   """Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a local
@@ -43,7 +138,7 @@ def get_parquet_metadata(filename):
 
     # Read metadata length
     f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4)
-    metadata_len = parse_int(f.read(4))
+    metadata_len = parse_int32(f.read(4))
 
     # Read metadata
     f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len)


[2/3] incubator-impala git commit: IMPALA-4853: Skip test_kudu_dml_reporting if Kudu is not supported.

Posted by ta...@apache.org.
IMPALA-4853: Skip test_kudu_dml_reporting if Kudu is not supported.

This test is failing on distros that don't support Kudu, but it
shouldn't even be run.

Tested by setting KUDU_IS_SUPPORTED to false, and then trying to
run the test, confirming that it gets skipped. When the env var
KUDU_IS_SUPPORTED is true, the test runs.

Change-Id: Ia36319228d4e9cac9cb675f3207ef2ba39f24e7e
Reviewed-on: http://gerrit.cloudera.org:8080/5854
Reviewed-by: Michael Brown <mi...@cloudera.com>
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a5b76895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a5b76895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a5b76895

Branch: refs/heads/master
Commit: a5b768953d3cff909f1ed1c5aa4e4607de062d08
Parents: 32ff959
Author: David Knupp <dk...@cloudera.com>
Authored: Wed Feb 1 10:06:06 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 2 00:23:50 2017 +0000

----------------------------------------------------------------------
 tests/shell/test_shell_commandline.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a5b76895/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 3d1f9e3..62cb376 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -25,6 +25,7 @@ import signal
 from subprocess import call
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIf
 from time import sleep
 from util import IMPALAD, SHELL_CMD
 from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
@@ -475,6 +476,7 @@ class TestImpalaShell(ImpalaTestSuite):
         (expected_rows_modified, expected_row_errors)
     assert expected_output in results.stderr
 
+  @SkipIf.kudu_not_supported
   def test_kudu_dml_reporting(self, unique_database):
     db = unique_database
     run_impala_shell_cmd('--query="create table %s.dml_test (id int primary key, '\