You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/21 02:54:48 UTC

[3/3] incubator-impala git commit: IMPALA-5627: fix dropped statuses in HDFS writers

IMPALA-5627: fix dropped statuses in HDFS writers

The change is mostly mechanical - added Status returns where
need.

In one place I restructured the the logic around
'current_encoding_' for Parquet to allow a cleaner solution
to the dropped status from FinalizeCurrentPage() call in
ProcessValue(): after the restructuring the call was no longer
needed. 'current_encoding_' was overloaded to represent both the
encoding of the current page and the preferred encoding
for subsequent pages.

Testing:
Ran exhaustive build.

Change-Id: I753d352c640faf5eaef650cd743e53de53761431
Reviewed-on: http://gerrit.cloudera.org:8080/7372
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/daff8eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/daff8eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/daff8eb0

Branch: refs/heads/master
Commit: daff8eb0ca19aa612c9fc7cc2ddd647735b31266
Parents: 54865c4
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 6 18:41:07 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 02:51:51 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-table-writer.cc     |  2 +-
 be/src/exec/hdfs-avro-table-writer.h      | 20 +++---
 be/src/exec/hdfs-parquet-table-writer.cc  | 88 ++++++++++++++++----------
 be/src/exec/hdfs-parquet-table-writer.h   | 16 ++---
 be/src/exec/hdfs-sequence-table-writer.cc |  2 +-
 be/src/exec/hdfs-table-writer.h           | 11 ++--
 be/src/exec/parquet-column-stats.cc       |  7 +-
 be/src/exec/parquet-column-stats.h        | 10 ++-
 be/src/exec/parquet-column-stats.inline.h |  7 +-
 be/src/runtime/string-buffer-test.cc      |  6 +-
 be/src/runtime/string-buffer.h            |  6 +-
 11 files changed, 101 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index 46185e8..3ce296d 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -196,7 +196,7 @@ Status HdfsAvroTableWriter::AppendRows(
     }
   }
 
-  if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) Flush();
+  if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
   *new_file = false;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index f85659e..6966860 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -68,17 +68,17 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
 
   virtual ~HdfsAvroTableWriter() { }
 
-  virtual Status Init();
-  virtual Status Finalize() { return Flush(); }
-  virtual Status InitNewFile() { return WriteFileHeader(); }
-  virtual void Close();
-  virtual uint64_t default_block_size() const { return 0; }
-  virtual std::string file_extension() const { return "avro"; }
+  virtual Status Init() override;
+  virtual Status Finalize() override { return Flush(); }
+  virtual Status InitNewFile() override { return WriteFileHeader(); }
+  virtual void Close() override;
+  virtual uint64_t default_block_size() const override { return 0; }
+  virtual std::string file_extension() const override { return "avro"; }
 
   /// Outputs the given rows into an HDFS sequence file. The rows are buffered
   /// to fill a sequence file block.
-  virtual Status AppendRows(
-      RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
+  virtual Status AppendRows(RowBatch* rows,
+      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
 
  private:
   /// Processes a single row, appending to out_
@@ -88,11 +88,11 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
   inline void AppendField(const ColumnType& type, const void* value);
 
   /// Writes the Avro file header to HDFS
-  Status WriteFileHeader();
+  Status WriteFileHeader() WARN_UNUSED_RESULT;
 
   /// Writes the contents of out_ to HDFS as a single Avro file block.
   /// Returns an error if write to HDFS fails.
-  Status Flush();
+  Status Flush() WARN_UNUSED_RESULT;
 
   /// Buffer which holds accumulated output
   WriteStream out_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 5a2d810..04a81f1 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -103,8 +103,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
       values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
       page_stats_base_(nullptr),
       row_group_stats_base_(nullptr) {
-    Codec::CreateCompressor(nullptr, false, codec, &compressor_);
-
     def_levels_ = parent_->state_->obj_pool()->Add(
         new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
                        DEFAULT_DATA_PAGE_SIZE, 1));
@@ -113,13 +111,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   virtual ~BaseColumnWriter() {}
 
+  // Called after the constructor to initialize the column writer.
+  Status Init() WARN_UNUSED_RESULT {
+    Reset();
+    RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
+    return Status::OK();
+  }
+
   // Appends the row to this column.  This buffers the value into a data page.  Returns
   // error if the space needed for the encoded value is larger than the data page size.
   // TODO: this needs to be batch based, instead of row based for better performance.
   // This is a bit trickier to handle the case where only a partial row batch can be
   // output to the current file because it reaches the max file size.  Enabling codegen
   // would also solve this problem.
-  Status AppendRow(TupleRow* row);
+  Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Flushes all buffered data pages to the file.
   // *file_pos is an output parameter and will be incremented by
@@ -128,13 +133,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // will contain the byte offset for the data page and dictionary page.  They
   // will be set to -1 if the column does not contain that type of page.
   Status Flush(int64_t* file_pos, int64_t* first_data_page,
-      int64_t* first_dictionary_page);
+      int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
 
   // Materializes the column statistics to the per-file MemPool so they are available
   // after their row batch buffer has been freed.
-  void MaterializeStatsValues() {
-    row_group_stats_base_->MaterializeStringValuesToInternalBuffers();
-    page_stats_base_->MaterializeStringValuesToInternalBuffers();
+  Status MaterializeStatsValues() WARN_UNUSED_RESULT {
+    RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
+    RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
+    return Status::OK();
   }
 
   // Encodes the row group statistics into a parquet::Statistics object and attaches it to
@@ -157,6 +163,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
     current_encoding_ = Encoding::PLAIN;
+    next_page_encoding_ = Encoding::PLAIN;
     column_encodings_.clear();
     dict_encoding_stats_.clear();
     data_encoding_stats_.clear();
@@ -184,16 +191,18 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   friend class HdfsParquetTableWriter;
 
   // Encodes value into the current page output buffer and updates the column statistics
-  // aggregates. Returns true if the value fits on the current page. If this function
-  // returned false, the caller should create a new page and try again with the same
-  // value.
+  // aggregates. Returns true if the value was appended successfully to the current page.
+  // Returns false if the value was not appended to the current page and the caller can
+  // create a new page and try again with the same value. May change
+  // 'next_page_encoding_' if the encoding for the next page should be different - e.g.
+  // if a dictionary overflowed and dictionary encoding is no longer viable.
   // *bytes_needed will contain the (estimated) number of bytes needed to successfully
   // encode the value in the page.
   // Implemented in the subclass.
-  virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
 
   // Encodes out all data for the current page and updates the metadata.
-  virtual void FinalizeCurrentPage();
+  virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
 
   // Update current_page_ to a new page, reusing pages allocated if possible.
   void NewPage();
@@ -246,10 +255,16 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Pointer to the current page in 'pages_'. Not owned.
   DataPage* current_page_;
 
-  int64_t num_values_; // Total number of values across all pages, including nullptr.
+  // Total number of values across all pages, including NULL.
+  int64_t num_values_;
   int64_t total_compressed_byte_size_;
   int64_t total_uncompressed_byte_size_;
+  // Encoding of the current page.
   Encoding::type current_encoding_;
+  // Encoding to use for the next page. By default, the same as 'current_encoding_'.
+  // Used by the column writer to switch encoding while writing a column, e.g. if the
+  // dictionary overflows.
+  Encoding::type next_page_encoding_;
 
   // Set of all encodings used in the column chunk
   unordered_set<Encoding::type> column_encodings_;
@@ -299,6 +314,7 @@ class HdfsParquetTableWriter::ColumnWriter :
     // Default to dictionary encoding.  If the cardinality ends up being too high,
     // it will fall back to plain.
     current_encoding_ = Encoding::PLAIN_DICTIONARY;
+    next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
         new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
     dict_encoder_base_ = dict_encoder_.get();
@@ -321,10 +337,9 @@ class HdfsParquetTableWriter::ColumnWriter :
       ++num_values_since_dict_size_check_;
       *bytes_needed = dict_encoder_->Put(*CastValue(value));
       // If the dictionary contains the maximum number of values, switch to plain
-      // encoding.  The current dictionary encoded page is written out.
+      // encoding for the next page. The current page is full and must be written out.
       if (UNLIKELY(*bytes_needed < 0)) {
-        FinalizeCurrentPage();
-        current_encoding_ = Encoding::PLAIN;
+        next_page_encoding_ = Encoding::PLAIN;
         return false;
       }
       parent_->file_size_estimate_ += *bytes_needed;
@@ -423,15 +438,16 @@ class HdfsParquetTableWriter::BoolColumnWriter :
     return true;
   }
 
-  virtual void FinalizeCurrentPage() {
+  virtual Status FinalizeCurrentPage() {
     DCHECK(current_page_ != nullptr);
-    if (current_page_->finalized) return;
+    if (current_page_->finalized) return Status::OK();
     bool_values_->Flush();
     int num_bytes = bool_values_->bytes_written();
     current_page_->header.uncompressed_page_size += num_bytes;
     // Call into superclass to handle the rest.
-    BaseColumnWriter::FinalizeCurrentPage();
+    RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
     bool_values_->Clear();
+    return Status::OK();
   }
 
  private:
@@ -455,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // Ensure that we have enough space for the definition level, but don't write it yet in
   // case we don't have enough space for the value.
   if (def_levels_->buffer_full()) {
-    FinalizeCurrentPage();
+    RETURN_IF_ERROR(FinalizeCurrentPage());
     NewPage();
   }
 
@@ -475,11 +491,11 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
     int64_t bytes_needed = 0;
     if (ProcessValue(value, &bytes_needed)) {
       ++current_page_->num_non_null;
-      break;
+      break; // Succesfully appended, don't need to retry.
     }
 
     // Value didn't fit on page, try again on a new page.
-    FinalizeCurrentPage();
+    RETURN_IF_ERROR(FinalizeCurrentPage());
 
     // Check how much space is needed to write this value. If that is larger than the
     // page size then increase page size and try again.
@@ -534,7 +550,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     return Status::OK();
   }
 
-  FinalizeCurrentPage();
+  RETURN_IF_ERROR(FinalizeCurrentPage());
 
   *first_dictionary_page = -1;
   // First write the dictionary page before any of the data pages.
@@ -563,8 +579,8 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       uint8_t* compressed_data =
           parent_->per_file_mem_pool_->Allocate(max_compressed_size);
       header.compressed_page_size = max_compressed_size;
-      compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
-          &header.compressed_page_size, &compressed_data);
+      RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+          dict_buffer, &header.compressed_page_size, &compressed_data));
       dict_buffer = compressed_data;
       // We allocated the output based on the guessed size, return the extra allocated
       // bytes back to the mem pool.
@@ -614,11 +630,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
   return Status::OK();
 }
 
-void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
+Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   DCHECK(current_page_ != nullptr);
-  if (current_page_->finalized) return;
+  if (current_page_->finalized) return Status::OK();
 
-  // If the entire page was nullptr, encode it as PLAIN since there is no
+  // If the entire page was NULL, encode it as PLAIN since there is no
   // data anyway. We don't output a useless dictionary page and it works
   // around a parquet MR bug (see IMPALA-759 for more details).
   if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -670,8 +686,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
     DCHECK_GT(max_compressed_size, 0);
     uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
     header.compressed_page_size = max_compressed_size;
-    compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data,
-        &header.compressed_page_size, &compressed_data);
+    RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+        uncompressed_data, &header.compressed_page_size, &compressed_data));
     current_page_->data = compressed_data;
 
     // We allocated the output based on the guessed size, return the extra allocated
@@ -694,14 +710,15 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   // Add the size of the data page header
   uint8_t* header_buffer;
   uint32_t header_len = 0;
-  parent_->thrift_serializer_->Serialize(
-      &current_page_->header, &header_len, &header_buffer);
+  RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+      &current_page_->header, &header_len, &header_buffer));
 
   current_page_->finalized = true;
   total_compressed_byte_size_ += header_len + header.compressed_page_size;
   total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
   parent_->file_size_estimate_ += header_len + header.compressed_page_size;
   def_levels_->Clear();
+  return Status::OK();
 }
 
 void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
@@ -724,6 +741,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
     header.repetition_level_encoding = Encoding::BIT_PACKED;
     current_page_->header.__set_data_page_header(header);
   }
+  current_encoding_ = next_page_encoding_;
   current_page_->finalized = false;
   current_page_->num_non_null = 0;
   page_stats_base_->Reset();
@@ -828,7 +846,7 @@ Status HdfsParquetTableWriter::Init() {
         DCHECK(false);
     }
     columns_[i].reset(writer);
-    columns_[i]->Reset();
+    RETURN_IF_ERROR(columns_[i]->Init());
   }
   RETURN_IF_ERROR(CreateSchema());
   return Status::OK();
@@ -989,7 +1007,9 @@ Status HdfsParquetTableWriter::AppendRows(
   }
 
   // We exhausted the batch, so we materialize the statistics before releasing the memory.
-  for (unique_ptr<BaseColumnWriter>& column : columns_) column->MaterializeStatsValues();
+  for (unique_ptr<BaseColumnWriter>& column : columns_) {
+    RETURN_IF_ERROR(column->MaterializeStatsValues());
+  }
 
   // Reset the row_idx_ when we exhaust the batch.  We can exit before exhausting
   // the batch if we run out of file space and will continue from the last index.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index b3d319e..1334b19 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -58,25 +58,25 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   ~HdfsParquetTableWriter();
 
   /// Initialize column information.
-  virtual Status Init();
+  virtual Status Init() override;
 
   /// Initializes a new file.  This resets the file metadata object and writes
   /// the file header to the output file.
-  virtual Status InitNewFile();
+  virtual Status InitNewFile() override;
 
   /// Appends parquet representation of rows in the batch to the current file.
-  virtual Status AppendRows(
-      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file);
+  virtual Status AppendRows(RowBatch* batch,
+      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
 
   /// Write out all the data.
-  virtual Status Finalize();
+  virtual Status Finalize() override;
 
-  virtual void Close();
+  virtual void Close() override;
 
   /// Returns the target HDFS block size to use.
-  virtual uint64_t default_block_size() const;
+  virtual uint64_t default_block_size() const override;
 
-  virtual std::string file_extension() const { return "parq"; }
+  virtual std::string file_extension() const override { return "parq"; }
 
  private:
   /// Default data page size. In bytes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 4a66c5e..42a70f0 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -125,7 +125,7 @@ Status HdfsSequenceTableWriter::AppendRows(
     out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
   }
 
-  if (out_.Size() >= approx_block_size_) Flush();
+  if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
   *new_file = false;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index cc08b00..cc6c6cc 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -64,10 +64,10 @@ class HdfsTableWriter {
   /// text), 1) is called once and 2-4) is called repeatedly for each file.
 
   /// Do initialization of writer.
-  virtual Status Init() = 0;
+  virtual Status Init() WARN_UNUSED_RESULT = 0;
 
   /// Called when a new file is started.
-  virtual Status InitNewFile() = 0;
+  virtual Status InitNewFile() WARN_UNUSED_RESULT = 0;
 
   /// Appends rows of 'batch' to the partition that are selected via 'row_group_indices',
   /// and if the latter is empty, appends every row.
@@ -75,13 +75,14 @@ class HdfsTableWriter {
   /// *new_file == true. A new file will be opened and the same row batch will be passed
   /// again. The writer must track how much of the batch it had already processed asking
   /// for a new file. Otherwise the writer will return with *newfile == false.
-  virtual Status AppendRows(
-      RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) = 0;
+  virtual Status AppendRows(RowBatch* batch,
+      const std::vector<int32_t>& row_group_indices,
+      bool* new_file) WARN_UNUSED_RESULT = 0;
 
   /// Finalize this partition. The writer needs to finish processing
   /// all data have written out after the return from this call.
   /// This is called once for each call to InitNewFile()
-  virtual Status Finalize() = 0;
+  virtual Status Finalize() WARN_UNUSED_RESULT = 0;
 
   /// Called once when this writer should cleanup any resources.
   virtual void Close() = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index bcd6fa4..76b3365 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -117,11 +117,12 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
   return false;
 }
 
-void ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
-  if (value->ptr == buffer->buffer()) return;
+Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
+  if (value->ptr == buffer->buffer()) return Status::OK();
   buffer->Clear();
-  buffer->Append(value->ptr, value->len);
+  RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
   value->ptr = buffer->buffer();
+  return Status::OK();
 }
 
 bool ColumnStatsBase::CanUseStats(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 11a01f5..7278cdc 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -81,7 +81,9 @@ class ColumnStatsBase {
   /// data types (e.g. StringValue) need to be copied at the end of processing a row
   /// batch, since the batch memory will be released. Overwrite this method in derived
   /// classes to provide the functionality.
-  virtual void MaterializeStringValuesToInternalBuffers() {}
+  virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT {
+    return Status::OK();
+  }
 
   /// Returns the number of bytes needed to encode the current statistics into a
   /// parquet::Statistics object.
@@ -100,7 +102,7 @@ class ColumnStatsBase {
  protected:
   // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
   // 'buffer' is reset before making the copy.
-  static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
+  static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT;
 
   /// Stores whether the min and max values of the current object have been initialized.
   bool has_min_max_values_;
@@ -163,7 +165,9 @@ class ColumnStats : public ColumnStatsBase {
   void Update(const T& v) { Update(v, v); }
 
   virtual void Merge(const ColumnStatsBase& other) override;
-  virtual void MaterializeStringValuesToInternalBuffers() override {}
+  virtual Status MaterializeStringValuesToInternalBuffers() override {
+    return Status::OK();
+  }
 
   virtual int64_t BytesNeeded() const override;
   virtual void EncodeToThrift(parquet::Statistics* out) const override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index b112db3..9b81ba8 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -170,9 +170,10 @@ inline void ColumnStats<StringValue>::Update(
 // StringValues need to be copied at the end of processing a row batch, since the batch
 // memory will be released.
 template <>
-inline void ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
-  if (min_buffer_.IsEmpty()) CopyToBuffer(&min_buffer_, &min_value_);
-  if (max_buffer_.IsEmpty()) CopyToBuffer(&max_buffer_, &max_value_);
+inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
+  if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
+  if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
+  return Status::OK();
 }
 
 } // end ns impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index c370728..27d1021 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -50,12 +50,12 @@ TEST(StringBufferTest, Basic) {
 
   // Append to empty
   std_str.append("Hello");
-  str.Append("Hello", strlen("Hello"));
+  ASSERT_OK(str.Append("Hello", strlen("Hello")));
   ValidateString(std_str, str);
 
   // Append some more
   std_str.append("World");
-  str.Append("World", strlen("World"));
+  ASSERT_OK(str.Append("World", strlen("World")));
   ValidateString(std_str, str);
 
   // Clear
@@ -81,7 +81,7 @@ TEST(StringBufferTest, AppendBoundary) {
   std_str.resize(chunk_size, 'a');
   int64_t data_size = 0;
   while (data_size + chunk_size <= max_data_size) {
-    str.Append(std_str.c_str(), chunk_size);
+    ASSERT_OK(str.Append(std_str.c_str(), chunk_size));
     data_size += chunk_size;
   }
   EXPECT_EQ(str.buffer_size(), data_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 5682bc7..2188e4d 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -48,7 +48,7 @@ class StringBuffer {
 
   /// Append 'str' to the current string, allocating a new buffer as necessary.
   /// Return error status if memory limit is exceeded.
-  Status Append(const char* str, int64_t str_len) {
+  Status Append(const char* str, int64_t str_len) WARN_UNUSED_RESULT {
     int64_t new_len = len_ + str_len;
     if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len));
     memcpy(buffer_ + len_, str, str_len);
@@ -57,7 +57,7 @@ class StringBuffer {
   }
 
   /// Wrapper around append() for input type 'uint8_t'.
-  Status Append(const uint8_t* str, int64_t str_len) {
+  Status Append(const uint8_t* str, int64_t str_len) WARN_UNUSED_RESULT {
     return Append(reinterpret_cast<const char*>(str), str_len);
   }
 
@@ -78,7 +78,7 @@ class StringBuffer {
   /// Grows the buffer to be at least 'new_size', copying over the previous data
   /// into the new buffer. The old buffer is not freed. Return an error status if
   /// growing the buffer will exceed memory limit.
-  Status GrowBuffer(int64_t new_size) {
+  Status GrowBuffer(int64_t new_size) WARN_UNUSED_RESULT {
     if (LIKELY(new_size > buffer_size_)) {
       int64_t old_size = buffer_size_;
       buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);