You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/21 02:54:46 UTC

[1/3] incubator-impala git commit: IMPALA-4276: Profile displays non-default query options set by planner

Repository: incubator-impala
Updated Branches:
  refs/heads/master 9d1e4449c -> daff8eb0c


IMPALA-4276: Profile displays non-default query options set by planner

Fix to populate the non-default query options set by planner in the
runtime profile.

Added a corresponding test case.

Change-Id: I08e9dc2bebb83101976bbbd903ee48c5068dbaab
Reviewed-on: http://gerrit.cloudera.org:8080/7419
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/83bfc142
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/83bfc142
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/83bfc142

Branch: refs/heads/master
Commit: 83bfc142e45e6394b30f712f90d201ea57eecc02
Parents: 9d1e444
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Jul 13 15:21:37 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 01:14:07 2017 +0000

----------------------------------------------------------------------
 be/src/service/client-request-state.cc            |  2 +-
 tests/custom_cluster/test_admission_controller.py |  7 ++++---
 tests/query_test/test_observability.py            | 17 ++++++++++++++++-
 3 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c454ebe..6be04f6 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -148,7 +148,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
   profile_.AddChild(&server_profile_);
   summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
   summary_profile_.AddInfoString("Query Options (non default)",
-      DebugQueryOptions(query_ctx_.client_request.query_options));
+      DebugQueryOptions(exec_request_.query_options));
 
   switch (exec_request->stmt_type) {
     case TStmtType::QUERY:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index e84db52..b9ae427 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -144,15 +144,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       assert re.search(expected_error_re, str(e))
 
   def __check_query_options(self, profile, expected_query_options):
-    """Validate that the per-pool query options were set on the specified profile.
-    expected_query_options is a list of "KEY=VALUE" strings, e.g. ["MEM_LIMIT=1", ...]"""
+    """Validate that the expected per-pool query options were set on the specified
+    profile. expected_query_options is a list of "KEY=VALUE" strings, e.g.
+    ["MEM_LIMIT=1", ...]"""
     confs = []
     for line in profile.split("\n"):
       if PROFILE_QUERY_OPTIONS_KEY in line:
         rhs = re.split(": ", line)[1]
         confs = re.split(",", rhs)
         break
-    assert len(confs) == len(expected_query_options)
+    assert len(confs) >= len(expected_query_options)
     confs = map(str.lower, confs)
     for expected in expected_query_options:
       assert expected.lower() in confs,\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index cdb322d..5a82a25 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -79,7 +79,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE'
     assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny'
 
-  def test_get_profile(self):
+  def test_query_states(self):
     """Tests that the query profile shows expected query states."""
     query = "select count(*) from functional.alltypes"
     handle = self.execute_query_async(query, dict())
@@ -93,3 +93,18 @@ class TestObservability(ImpalaTestSuite):
     profile = self.client.get_runtime_profile(handle)
     # After fetching the results, the query must be in state FINISHED.
     assert "Query State: FINISHED" in profile, profile
+
+  def test_query_options(self):
+    """Test that the query profile shows expected non-default query options, both set
+    explicitly through client and those set by planner"""
+    # Set a query option explicitly through client
+    self.execute_query("set mem_limit = 8589934592")
+    # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1 and
+    # RUNTIME_FILTER_MODE=0
+    expected_string = "Query Options (non default): MEM_LIMIT=8589934592,NUM_NODES=1," \
+        "NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n"
+    assert expected_string in self.execute_query("select 1").runtime_profile
+
+    # Make sure explicitly set default values are not shown in the profile
+    self.execute_query("set MAX_IO_BUFFERS = 0")
+    assert expected_string in self.execute_query("select 1").runtime_profile


[2/3] incubator-impala git commit: IMPALA-5275: Avoid printing status stack trace on hot paths

Posted by he...@apache.org.
IMPALA-5275: Avoid printing status stack trace on hot paths

Currently, creation of a Status object (non-OK and non-EXPECTED)
prints the stack trace to the log. Fetching the stack trace takes
a large chunk of CPU time close to 130ms and results in a significant
perf hit when encountered on hot paths.
Five such hot paths were identified and the following changes were
made to fix it:

1. In ImpalaServer::GetExecSummary(), create Status() without holding
the query_log_lock_.
2, 3 and 4. In impala::DeserializeThriftMsg<>(),
PartitionedAggregationNode::CodegenUpdateTuple() and
HdfsScanner::CodegenWriteCompleteTuple, use Status::Expected where
appropriate.
5. In Status::MemLimitExceeded(), create Status object without
printing stacktrace

Change-Id: Ief083f558fba587381aa7fe8f99da279da02f1f2
Reviewed-on: http://gerrit.cloudera.org:8080/7449
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54865c4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54865c4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54865c4e

Branch: refs/heads/master
Commit: 54865c4ef1a1b54bf3fcdea4de29f7aaeb9278e8
Parents: 83bfc14
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Jul 17 13:18:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 01:29:24 2017 +0000

----------------------------------------------------------------------
 be/src/common/status.cc                     |  6 +++---
 be/src/exec/hdfs-scanner.cc                 |  4 ++--
 be/src/exec/partitioned-aggregation-node.cc |  8 ++++----
 be/src/rpc/thrift-util.h                    |  2 +-
 be/src/service/impala-server.cc             | 24 ++++++++++++++++++------
 5 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index 1adc654..3fc2236 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -37,12 +37,12 @@ const Status Status::DEPRECATED_RPC(ErrorMsg::Init(TErrorCode::NOT_IMPLEMENTED_E
     "Deprecated RPC; please update your client"));
 
 Status Status::MemLimitExceeded() {
-  return Status(TErrorCode::MEM_LIMIT_EXCEEDED, "Memory limit exceeded");
+  return Status(ErrorMsg(TErrorCode::MEM_LIMIT_EXCEEDED, "Memory limit exceeded"), true);
 }
 
 Status Status::MemLimitExceeded(const std::string& details) {
-  return Status(TErrorCode::MEM_LIMIT_EXCEEDED,
-        Substitute("Memory limit exceeded: $0", details));
+  return Status(ErrorMsg(TErrorCode::MEM_LIMIT_EXCEEDED,
+      Substitute("Memory limit exceeded: $0", details)), true);
 }
 
 Status::Status(TErrorCode::type code)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0670f81..10e4edd 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -312,10 +312,10 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
   for (int i = 0; i < node->materialized_slots().size(); ++i) {
     SlotDescriptor* slot_desc = node->materialized_slots()[i];
     if (slot_desc->type().type == TYPE_TIMESTAMP) {
-      return Status("Timestamp not yet supported for codegen.");
+      return Status::Expected("Timestamp not yet supported for codegen.");
     }
     if (slot_desc->type().type == TYPE_DECIMAL) {
-      return Status("Decimal not yet supported for codegen.");
+      return Status::Expected("Decimal not yet supported for codegen.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 7849394..7067961 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1743,14 +1743,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
 
   for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
     if (slot_desc->type().type == TYPE_CHAR) {
-      return Status("PartitionedAggregationNode::CodegenUpdateTuple(): cannot codegen"
-                    "CHAR in aggregations");
+      return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): cannot "
+          "codegen CHAR in aggregations");
     }
   }
 
   if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == NULL) {
-    return Status("PartitionedAggregationNode::CodegenUpdateTuple(): failed to generate "
-                  "intermediate tuple desc");
+    return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): failed to"
+        " generate intermediate tuple desc");
   }
 
   // Get the types to match the UpdateTuple signature

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 96a864c..1a66286 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -126,7 +126,7 @@ Status DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, bool compact,
   } catch (std::exception& e) {
     std::stringstream msg;
     msg << "couldn't deserialize thrift msg:\n" << e.what();
-    return Status(msg.str());
+    return Status::Expected(msg.str());
   } catch (...) {
     /// TODO: Find the right exception for 0 bytes
     return Status("Unknown exception");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index cf91ba3..a26f0ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -658,17 +658,29 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
   }
 
   // Look for the query in completed query log.
+  // IMPALA-5275: Don't create Status while holding query_log_lock_
   {
-    lock_guard<mutex> l(query_log_lock_);
-    QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
-    if (query_record == query_log_index_.end()) {
+    string effective_user;
+    bool user_has_profile_access = false;
+    bool is_query_missing = false;
+    TExecSummary exec_summary;
+    {
+      lock_guard<mutex> l(query_log_lock_);
+      QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
+      is_query_missing = query_record == query_log_index_.end();
+      if (!is_query_missing) {
+        effective_user = query_record->second->effective_user;
+        user_has_profile_access = query_record->second->user_has_profile_access;
+        exec_summary = query_record->second->exec_summary;
+      }
+    }
+    if (is_query_missing) {
       stringstream ss;
       ss << "Query id " << PrintId(query_id) << " not found.";
       return Status(ss.str());
     }
-    RETURN_IF_ERROR(CheckProfileAccess(user, query_record->second->effective_user,
-        query_record->second->user_has_profile_access));
-    *result = query_record->second->exec_summary;
+    RETURN_IF_ERROR(CheckProfileAccess(user, effective_user, user_has_profile_access));
+    *result = exec_summary;
   }
   return Status::OK();
 }


[3/3] incubator-impala git commit: IMPALA-5627: fix dropped statuses in HDFS writers

Posted by he...@apache.org.
IMPALA-5627: fix dropped statuses in HDFS writers

The change is mostly mechanical - added Status returns where
need.

In one place I restructured the the logic around
'current_encoding_' for Parquet to allow a cleaner solution
to the dropped status from FinalizeCurrentPage() call in
ProcessValue(): after the restructuring the call was no longer
needed. 'current_encoding_' was overloaded to represent both the
encoding of the current page and the preferred encoding
for subsequent pages.

Testing:
Ran exhaustive build.

Change-Id: I753d352c640faf5eaef650cd743e53de53761431
Reviewed-on: http://gerrit.cloudera.org:8080/7372
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/daff8eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/daff8eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/daff8eb0

Branch: refs/heads/master
Commit: daff8eb0ca19aa612c9fc7cc2ddd647735b31266
Parents: 54865c4
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 6 18:41:07 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 02:51:51 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-table-writer.cc     |  2 +-
 be/src/exec/hdfs-avro-table-writer.h      | 20 +++---
 be/src/exec/hdfs-parquet-table-writer.cc  | 88 ++++++++++++++++----------
 be/src/exec/hdfs-parquet-table-writer.h   | 16 ++---
 be/src/exec/hdfs-sequence-table-writer.cc |  2 +-
 be/src/exec/hdfs-table-writer.h           | 11 ++--
 be/src/exec/parquet-column-stats.cc       |  7 +-
 be/src/exec/parquet-column-stats.h        | 10 ++-
 be/src/exec/parquet-column-stats.inline.h |  7 +-
 be/src/runtime/string-buffer-test.cc      |  6 +-
 be/src/runtime/string-buffer.h            |  6 +-
 11 files changed, 101 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index 46185e8..3ce296d 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -196,7 +196,7 @@ Status HdfsAvroTableWriter::AppendRows(
     }
   }
 
-  if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) Flush();
+  if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
   *new_file = false;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index f85659e..6966860 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -68,17 +68,17 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
 
   virtual ~HdfsAvroTableWriter() { }
 
-  virtual Status Init();
-  virtual Status Finalize() { return Flush(); }
-  virtual Status InitNewFile() { return WriteFileHeader(); }
-  virtual void Close();
-  virtual uint64_t default_block_size() const { return 0; }
-  virtual std::string file_extension() const { return "avro"; }
+  virtual Status Init() override;
+  virtual Status Finalize() override { return Flush(); }
+  virtual Status InitNewFile() override { return WriteFileHeader(); }
+  virtual void Close() override;
+  virtual uint64_t default_block_size() const override { return 0; }
+  virtual std::string file_extension() const override { return "avro"; }
 
   /// Outputs the given rows into an HDFS sequence file. The rows are buffered
   /// to fill a sequence file block.
-  virtual Status AppendRows(
-      RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
+  virtual Status AppendRows(RowBatch* rows,
+      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
 
  private:
   /// Processes a single row, appending to out_
@@ -88,11 +88,11 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
   inline void AppendField(const ColumnType& type, const void* value);
 
   /// Writes the Avro file header to HDFS
-  Status WriteFileHeader();
+  Status WriteFileHeader() WARN_UNUSED_RESULT;
 
   /// Writes the contents of out_ to HDFS as a single Avro file block.
   /// Returns an error if write to HDFS fails.
-  Status Flush();
+  Status Flush() WARN_UNUSED_RESULT;
 
   /// Buffer which holds accumulated output
   WriteStream out_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 5a2d810..04a81f1 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -103,8 +103,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
       values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
       page_stats_base_(nullptr),
       row_group_stats_base_(nullptr) {
-    Codec::CreateCompressor(nullptr, false, codec, &compressor_);
-
     def_levels_ = parent_->state_->obj_pool()->Add(
         new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
                        DEFAULT_DATA_PAGE_SIZE, 1));
@@ -113,13 +111,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   virtual ~BaseColumnWriter() {}
 
+  // Called after the constructor to initialize the column writer.
+  Status Init() WARN_UNUSED_RESULT {
+    Reset();
+    RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
+    return Status::OK();
+  }
+
   // Appends the row to this column.  This buffers the value into a data page.  Returns
   // error if the space needed for the encoded value is larger than the data page size.
   // TODO: this needs to be batch based, instead of row based for better performance.
   // This is a bit trickier to handle the case where only a partial row batch can be
   // output to the current file because it reaches the max file size.  Enabling codegen
   // would also solve this problem.
-  Status AppendRow(TupleRow* row);
+  Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Flushes all buffered data pages to the file.
   // *file_pos is an output parameter and will be incremented by
@@ -128,13 +133,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // will contain the byte offset for the data page and dictionary page.  They
   // will be set to -1 if the column does not contain that type of page.
   Status Flush(int64_t* file_pos, int64_t* first_data_page,
-      int64_t* first_dictionary_page);
+      int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
 
   // Materializes the column statistics to the per-file MemPool so they are available
   // after their row batch buffer has been freed.
-  void MaterializeStatsValues() {
-    row_group_stats_base_->MaterializeStringValuesToInternalBuffers();
-    page_stats_base_->MaterializeStringValuesToInternalBuffers();
+  Status MaterializeStatsValues() WARN_UNUSED_RESULT {
+    RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
+    RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
+    return Status::OK();
   }
 
   // Encodes the row group statistics into a parquet::Statistics object and attaches it to
@@ -157,6 +163,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
     current_encoding_ = Encoding::PLAIN;
+    next_page_encoding_ = Encoding::PLAIN;
     column_encodings_.clear();
     dict_encoding_stats_.clear();
     data_encoding_stats_.clear();
@@ -184,16 +191,18 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   friend class HdfsParquetTableWriter;
 
   // Encodes value into the current page output buffer and updates the column statistics
-  // aggregates. Returns true if the value fits on the current page. If this function
-  // returned false, the caller should create a new page and try again with the same
-  // value.
+  // aggregates. Returns true if the value was appended successfully to the current page.
+  // Returns false if the value was not appended to the current page and the caller can
+  // create a new page and try again with the same value. May change
+  // 'next_page_encoding_' if the encoding for the next page should be different - e.g.
+  // if a dictionary overflowed and dictionary encoding is no longer viable.
   // *bytes_needed will contain the (estimated) number of bytes needed to successfully
   // encode the value in the page.
   // Implemented in the subclass.
-  virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
 
   // Encodes out all data for the current page and updates the metadata.
-  virtual void FinalizeCurrentPage();
+  virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
 
   // Update current_page_ to a new page, reusing pages allocated if possible.
   void NewPage();
@@ -246,10 +255,16 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Pointer to the current page in 'pages_'. Not owned.
   DataPage* current_page_;
 
-  int64_t num_values_; // Total number of values across all pages, including nullptr.
+  // Total number of values across all pages, including NULL.
+  int64_t num_values_;
   int64_t total_compressed_byte_size_;
   int64_t total_uncompressed_byte_size_;
+  // Encoding of the current page.
   Encoding::type current_encoding_;
+  // Encoding to use for the next page. By default, the same as 'current_encoding_'.
+  // Used by the column writer to switch encoding while writing a column, e.g. if the
+  // dictionary overflows.
+  Encoding::type next_page_encoding_;
 
   // Set of all encodings used in the column chunk
   unordered_set<Encoding::type> column_encodings_;
@@ -299,6 +314,7 @@ class HdfsParquetTableWriter::ColumnWriter :
     // Default to dictionary encoding.  If the cardinality ends up being too high,
     // it will fall back to plain.
     current_encoding_ = Encoding::PLAIN_DICTIONARY;
+    next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
         new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
     dict_encoder_base_ = dict_encoder_.get();
@@ -321,10 +337,9 @@ class HdfsParquetTableWriter::ColumnWriter :
       ++num_values_since_dict_size_check_;
       *bytes_needed = dict_encoder_->Put(*CastValue(value));
       // If the dictionary contains the maximum number of values, switch to plain
-      // encoding.  The current dictionary encoded page is written out.
+      // encoding for the next page. The current page is full and must be written out.
       if (UNLIKELY(*bytes_needed < 0)) {
-        FinalizeCurrentPage();
-        current_encoding_ = Encoding::PLAIN;
+        next_page_encoding_ = Encoding::PLAIN;
         return false;
       }
       parent_->file_size_estimate_ += *bytes_needed;
@@ -423,15 +438,16 @@ class HdfsParquetTableWriter::BoolColumnWriter :
     return true;
   }
 
-  virtual void FinalizeCurrentPage() {
+  virtual Status FinalizeCurrentPage() {
     DCHECK(current_page_ != nullptr);
-    if (current_page_->finalized) return;
+    if (current_page_->finalized) return Status::OK();
     bool_values_->Flush();
     int num_bytes = bool_values_->bytes_written();
     current_page_->header.uncompressed_page_size += num_bytes;
     // Call into superclass to handle the rest.
-    BaseColumnWriter::FinalizeCurrentPage();
+    RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
     bool_values_->Clear();
+    return Status::OK();
   }
 
  private:
@@ -455,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // Ensure that we have enough space for the definition level, but don't write it yet in
   // case we don't have enough space for the value.
   if (def_levels_->buffer_full()) {
-    FinalizeCurrentPage();
+    RETURN_IF_ERROR(FinalizeCurrentPage());
     NewPage();
   }
 
@@ -475,11 +491,11 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
     int64_t bytes_needed = 0;
     if (ProcessValue(value, &bytes_needed)) {
       ++current_page_->num_non_null;
-      break;
+      break; // Succesfully appended, don't need to retry.
     }
 
     // Value didn't fit on page, try again on a new page.
-    FinalizeCurrentPage();
+    RETURN_IF_ERROR(FinalizeCurrentPage());
 
     // Check how much space is needed to write this value. If that is larger than the
     // page size then increase page size and try again.
@@ -534,7 +550,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     return Status::OK();
   }
 
-  FinalizeCurrentPage();
+  RETURN_IF_ERROR(FinalizeCurrentPage());
 
   *first_dictionary_page = -1;
   // First write the dictionary page before any of the data pages.
@@ -563,8 +579,8 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       uint8_t* compressed_data =
           parent_->per_file_mem_pool_->Allocate(max_compressed_size);
       header.compressed_page_size = max_compressed_size;
-      compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
-          &header.compressed_page_size, &compressed_data);
+      RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+          dict_buffer, &header.compressed_page_size, &compressed_data));
       dict_buffer = compressed_data;
       // We allocated the output based on the guessed size, return the extra allocated
       // bytes back to the mem pool.
@@ -614,11 +630,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
   return Status::OK();
 }
 
-void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
+Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   DCHECK(current_page_ != nullptr);
-  if (current_page_->finalized) return;
+  if (current_page_->finalized) return Status::OK();
 
-  // If the entire page was nullptr, encode it as PLAIN since there is no
+  // If the entire page was NULL, encode it as PLAIN since there is no
   // data anyway. We don't output a useless dictionary page and it works
   // around a parquet MR bug (see IMPALA-759 for more details).
   if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -670,8 +686,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
     DCHECK_GT(max_compressed_size, 0);
     uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
     header.compressed_page_size = max_compressed_size;
-    compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data,
-        &header.compressed_page_size, &compressed_data);
+    RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+        uncompressed_data, &header.compressed_page_size, &compressed_data));
     current_page_->data = compressed_data;
 
     // We allocated the output based on the guessed size, return the extra allocated
@@ -694,14 +710,15 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   // Add the size of the data page header
   uint8_t* header_buffer;
   uint32_t header_len = 0;
-  parent_->thrift_serializer_->Serialize(
-      &current_page_->header, &header_len, &header_buffer);
+  RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+      &current_page_->header, &header_len, &header_buffer));
 
   current_page_->finalized = true;
   total_compressed_byte_size_ += header_len + header.compressed_page_size;
   total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
   parent_->file_size_estimate_ += header_len + header.compressed_page_size;
   def_levels_->Clear();
+  return Status::OK();
 }
 
 void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
@@ -724,6 +741,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
     header.repetition_level_encoding = Encoding::BIT_PACKED;
     current_page_->header.__set_data_page_header(header);
   }
+  current_encoding_ = next_page_encoding_;
   current_page_->finalized = false;
   current_page_->num_non_null = 0;
   page_stats_base_->Reset();
@@ -828,7 +846,7 @@ Status HdfsParquetTableWriter::Init() {
         DCHECK(false);
     }
     columns_[i].reset(writer);
-    columns_[i]->Reset();
+    RETURN_IF_ERROR(columns_[i]->Init());
   }
   RETURN_IF_ERROR(CreateSchema());
   return Status::OK();
@@ -989,7 +1007,9 @@ Status HdfsParquetTableWriter::AppendRows(
   }
 
   // We exhausted the batch, so we materialize the statistics before releasing the memory.
-  for (unique_ptr<BaseColumnWriter>& column : columns_) column->MaterializeStatsValues();
+  for (unique_ptr<BaseColumnWriter>& column : columns_) {
+    RETURN_IF_ERROR(column->MaterializeStatsValues());
+  }
 
   // Reset the row_idx_ when we exhaust the batch.  We can exit before exhausting
   // the batch if we run out of file space and will continue from the last index.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index b3d319e..1334b19 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -58,25 +58,25 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   ~HdfsParquetTableWriter();
 
   /// Initialize column information.
-  virtual Status Init();
+  virtual Status Init() override;
 
   /// Initializes a new file.  This resets the file metadata object and writes
   /// the file header to the output file.
-  virtual Status InitNewFile();
+  virtual Status InitNewFile() override;
 
   /// Appends parquet representation of rows in the batch to the current file.
-  virtual Status AppendRows(
-      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file);
+  virtual Status AppendRows(RowBatch* batch,
+      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
 
   /// Write out all the data.
-  virtual Status Finalize();
+  virtual Status Finalize() override;
 
-  virtual void Close();
+  virtual void Close() override;
 
   /// Returns the target HDFS block size to use.
-  virtual uint64_t default_block_size() const;
+  virtual uint64_t default_block_size() const override;
 
-  virtual std::string file_extension() const { return "parq"; }
+  virtual std::string file_extension() const override { return "parq"; }
 
  private:
   /// Default data page size. In bytes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 4a66c5e..42a70f0 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -125,7 +125,7 @@ Status HdfsSequenceTableWriter::AppendRows(
     out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
   }
 
-  if (out_.Size() >= approx_block_size_) Flush();
+  if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
   *new_file = false;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index cc08b00..cc6c6cc 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -64,10 +64,10 @@ class HdfsTableWriter {
   /// text), 1) is called once and 2-4) is called repeatedly for each file.
 
   /// Do initialization of writer.
-  virtual Status Init() = 0;
+  virtual Status Init() WARN_UNUSED_RESULT = 0;
 
   /// Called when a new file is started.
-  virtual Status InitNewFile() = 0;
+  virtual Status InitNewFile() WARN_UNUSED_RESULT = 0;
 
   /// Appends rows of 'batch' to the partition that are selected via 'row_group_indices',
   /// and if the latter is empty, appends every row.
@@ -75,13 +75,14 @@ class HdfsTableWriter {
   /// *new_file == true. A new file will be opened and the same row batch will be passed
   /// again. The writer must track how much of the batch it had already processed asking
   /// for a new file. Otherwise the writer will return with *newfile == false.
-  virtual Status AppendRows(
-      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) = 0;
+  virtual Status AppendRows(RowBatch* batch,
+      const std::vector<int32_t>& row_group_indices,
+      bool* new_file) WARN_UNUSED_RESULT = 0;
 
   /// Finalize this partition. The writer needs to finish processing
   /// all data have written out after the return from this call.
   /// This is called once for each call to InitNewFile()
-  virtual Status Finalize() = 0;
+  virtual Status Finalize() WARN_UNUSED_RESULT = 0;
 
   /// Called once when this writer should cleanup any resources.
   virtual void Close() = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index bcd6fa4..76b3365 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -117,11 +117,12 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
   return false;
 }
 
-void ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
-  if (value->ptr == buffer->buffer()) return;
+Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
+  if (value->ptr == buffer->buffer()) return Status::OK();
   buffer->Clear();
-  buffer->Append(value->ptr, value->len);
+  RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
   value->ptr = buffer->buffer();
+  return Status::OK();
 }
 
 bool ColumnStatsBase::CanUseStats(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 11a01f5..7278cdc 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -81,7 +81,9 @@ class ColumnStatsBase {
   /// data types (e.g. StringValue) need to be copied at the end of processing a row
   /// batch, since the batch memory will be released. Overwrite this method in derived
   /// classes to provide the functionality.
-  virtual void MaterializeStringValuesToInternalBuffers() {}
+  virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT {
+    return Status::OK();
+  }
 
   /// Returns the number of bytes needed to encode the current statistics into a
   /// parquet::Statistics object.
@@ -100,7 +102,7 @@ class ColumnStatsBase {
  protected:
   // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
   // 'buffer' is reset before making the copy.
-  static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
+  static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT;
 
   /// Stores whether the min and max values of the current object have been initialized.
   bool has_min_max_values_;
@@ -163,7 +165,9 @@ class ColumnStats : public ColumnStatsBase {
   void Update(const T& v) { Update(v, v); }
 
   virtual void Merge(const ColumnStatsBase& other) override;
-  virtual void MaterializeStringValuesToInternalBuffers() override {}
+  virtual Status MaterializeStringValuesToInternalBuffers() override {
+    return Status::OK();
+  }
 
   virtual int64_t BytesNeeded() const override;
   virtual void EncodeToThrift(parquet::Statistics* out) const override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index b112db3..9b81ba8 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -170,9 +170,10 @@ inline void ColumnStats<StringValue>::Update(
 // StringValues need to be copied at the end of processing a row batch, since the batch
 // memory will be released.
 template <>
-inline void ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
-  if (min_buffer_.IsEmpty()) CopyToBuffer(&min_buffer_, &min_value_);
-  if (max_buffer_.IsEmpty()) CopyToBuffer(&max_buffer_, &max_value_);
+inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
+  if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
+  if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
+  return Status::OK();
 }
 
 } // end ns impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index c370728..27d1021 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -50,12 +50,12 @@ TEST(StringBufferTest, Basic) {
 
   // Append to empty
   std_str.append("Hello");
-  str.Append("Hello", strlen("Hello"));
+  ASSERT_OK(str.Append("Hello", strlen("Hello")));
   ValidateString(std_str, str);
 
   // Append some more
   std_str.append("World");
-  str.Append("World", strlen("World"));
+  ASSERT_OK(str.Append("World", strlen("World")));
   ValidateString(std_str, str);
 
   // Clear
@@ -81,7 +81,7 @@ TEST(StringBufferTest, AppendBoundary) {
   std_str.resize(chunk_size, 'a');
   int64_t data_size = 0;
   while (data_size + chunk_size <= max_data_size) {
-    str.Append(std_str.c_str(), chunk_size);
+    ASSERT_OK(str.Append(std_str.c_str(), chunk_size));
     data_size += chunk_size;
   }
   EXPECT_EQ(str.buffer_size(), data_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 5682bc7..2188e4d 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -48,7 +48,7 @@ class StringBuffer {
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
   /// Return error status if memory limit is exceeded.
-  Status Append(const char* str, int64_t str_len) {
+  Status Append(const char* str, int64_t str_len) WARN_UNUSED_RESULT {
     int64_t new_len = len_ + str_len;
     if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len));
     memcpy(buffer_ + len_, str, str_len);
@@ -57,7 +57,7 @@ class StringBuffer {
   }
 
   /// Wrapper around append() for input type 'uint8_t'.
-  Status Append(const uint8_t* str, int64_t str_len) {
+  Status Append(const uint8_t* str, int64_t str_len) WARN_UNUSED_RESULT {
     return Append(reinterpret_cast<const char*>(str), str_len);
   }
 
@@ -78,7 +78,7 @@ class StringBuffer {
   /// Grows the buffer to be at least 'new_size', copying over the previous data
   /// into the new buffer. The old buffer is not freed. Return an error status if
   /// growing the buffer will exceed memory limit.
-  Status GrowBuffer(int64_t new_size) {
+  Status GrowBuffer(int64_t new_size) WARN_UNUSED_RESULT {
     if (LIKELY(new_size > buffer_size_)) {
       int64_t old_size = buffer_size_;
       buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);