You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/21 02:54:48 UTC
[3/3] incubator-impala git commit: IMPALA-5627: fix dropped statuses
in HDFS writers
IMPALA-5627: fix dropped statuses in HDFS writers
The change is mostly mechanical - added Status returns where
need.
In one place I restructured the the logic around
'current_encoding_' for Parquet to allow a cleaner solution
to the dropped status from FinalizeCurrentPage() call in
ProcessValue(): after the restructuring the call was no longer
needed. 'current_encoding_' was overloaded to represent both the
encoding of the current page and the preferred encoding
for subsequent pages.
Testing:
Ran exhaustive build.
Change-Id: I753d352c640faf5eaef650cd743e53de53761431
Reviewed-on: http://gerrit.cloudera.org:8080/7372
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/daff8eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/daff8eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/daff8eb0
Branch: refs/heads/master
Commit: daff8eb0ca19aa612c9fc7cc2ddd647735b31266
Parents: 54865c4
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 6 18:41:07 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 02:51:51 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-avro-table-writer.cc | 2 +-
be/src/exec/hdfs-avro-table-writer.h | 20 +++---
be/src/exec/hdfs-parquet-table-writer.cc | 88 ++++++++++++++++----------
be/src/exec/hdfs-parquet-table-writer.h | 16 ++---
be/src/exec/hdfs-sequence-table-writer.cc | 2 +-
be/src/exec/hdfs-table-writer.h | 11 ++--
be/src/exec/parquet-column-stats.cc | 7 +-
be/src/exec/parquet-column-stats.h | 10 ++-
be/src/exec/parquet-column-stats.inline.h | 7 +-
be/src/runtime/string-buffer-test.cc | 6 +-
be/src/runtime/string-buffer.h | 6 +-
11 files changed, 101 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index 46185e8..3ce296d 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -196,7 +196,7 @@ Status HdfsAvroTableWriter::AppendRows(
}
}
- if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) Flush();
+ if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
*new_file = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index f85659e..6966860 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -68,17 +68,17 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
virtual ~HdfsAvroTableWriter() { }
- virtual Status Init();
- virtual Status Finalize() { return Flush(); }
- virtual Status InitNewFile() { return WriteFileHeader(); }
- virtual void Close();
- virtual uint64_t default_block_size() const { return 0; }
- virtual std::string file_extension() const { return "avro"; }
+ virtual Status Init() override;
+ virtual Status Finalize() override { return Flush(); }
+ virtual Status InitNewFile() override { return WriteFileHeader(); }
+ virtual void Close() override;
+ virtual uint64_t default_block_size() const override { return 0; }
+ virtual std::string file_extension() const override { return "avro"; }
/// Outputs the given rows into an HDFS sequence file. The rows are buffered
/// to fill a sequence file block.
- virtual Status AppendRows(
- RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
+ virtual Status AppendRows(RowBatch* rows,
+ const std::vector<int32_t>& row_group_indices, bool* new_file) override;
private:
/// Processes a single row, appending to out_
@@ -88,11 +88,11 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
inline void AppendField(const ColumnType& type, const void* value);
/// Writes the Avro file header to HDFS
- Status WriteFileHeader();
+ Status WriteFileHeader() WARN_UNUSED_RESULT;
/// Writes the contents of out_ to HDFS as a single Avro file block.
/// Returns an error if write to HDFS fails.
- Status Flush();
+ Status Flush() WARN_UNUSED_RESULT;
/// Buffer which holds accumulated output
WriteStream out_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 5a2d810..04a81f1 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -103,8 +103,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
page_stats_base_(nullptr),
row_group_stats_base_(nullptr) {
- Codec::CreateCompressor(nullptr, false, codec, &compressor_);
-
def_levels_ = parent_->state_->obj_pool()->Add(
new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
DEFAULT_DATA_PAGE_SIZE, 1));
@@ -113,13 +111,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
virtual ~BaseColumnWriter() {}
+ // Called after the constructor to initialize the column writer.
+ Status Init() WARN_UNUSED_RESULT {
+ Reset();
+ RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
+ return Status::OK();
+ }
+
// Appends the row to this column. This buffers the value into a data page. Returns
// error if the space needed for the encoded value is larger than the data page size.
// TODO: this needs to be batch based, instead of row based for better performance.
// This is a bit trickier to handle the case where only a partial row batch can be
// output to the current file because it reaches the max file size. Enabling codegen
// would also solve this problem.
- Status AppendRow(TupleRow* row);
+ Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
// Flushes all buffered data pages to the file.
// *file_pos is an output parameter and will be incremented by
@@ -128,13 +133,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// will contain the byte offset for the data page and dictionary page. They
// will be set to -1 if the column does not contain that type of page.
Status Flush(int64_t* file_pos, int64_t* first_data_page,
- int64_t* first_dictionary_page);
+ int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
// Materializes the column statistics to the per-file MemPool so they are available
// after their row batch buffer has been freed.
- void MaterializeStatsValues() {
- row_group_stats_base_->MaterializeStringValuesToInternalBuffers();
- page_stats_base_->MaterializeStringValuesToInternalBuffers();
+ Status MaterializeStatsValues() WARN_UNUSED_RESULT {
+ RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
+ RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
+ return Status::OK();
}
// Encodes the row group statistics into a parquet::Statistics object and attaches it to
@@ -157,6 +163,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
num_values_ = 0;
total_compressed_byte_size_ = 0;
current_encoding_ = Encoding::PLAIN;
+ next_page_encoding_ = Encoding::PLAIN;
column_encodings_.clear();
dict_encoding_stats_.clear();
data_encoding_stats_.clear();
@@ -184,16 +191,18 @@ class HdfsParquetTableWriter::BaseColumnWriter {
friend class HdfsParquetTableWriter;
// Encodes value into the current page output buffer and updates the column statistics
- // aggregates. Returns true if the value fits on the current page. If this function
- // returned false, the caller should create a new page and try again with the same
- // value.
+ // aggregates. Returns true if the value was appended successfully to the current page.
+ // Returns false if the value was not appended to the current page and the caller can
+ // create a new page and try again with the same value. May change
+ // 'next_page_encoding_' if the encoding for the next page should be different - e.g.
+ // if a dictionary overflowed and dictionary encoding is no longer viable.
// *bytes_needed will contain the (estimated) number of bytes needed to successfully
// encode the value in the page.
// Implemented in the subclass.
- virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
+ virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
// Encodes out all data for the current page and updates the metadata.
- virtual void FinalizeCurrentPage();
+ virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
// Update current_page_ to a new page, reusing pages allocated if possible.
void NewPage();
@@ -246,10 +255,16 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// Pointer to the current page in 'pages_'. Not owned.
DataPage* current_page_;
- int64_t num_values_; // Total number of values across all pages, including nullptr.
+ // Total number of values across all pages, including NULL.
+ int64_t num_values_;
int64_t total_compressed_byte_size_;
int64_t total_uncompressed_byte_size_;
+ // Encoding of the current page.
Encoding::type current_encoding_;
+ // Encoding to use for the next page. By default, the same as 'current_encoding_'.
+ // Used by the column writer to switch encoding while writing a column, e.g. if the
+ // dictionary overflows.
+ Encoding::type next_page_encoding_;
// Set of all encodings used in the column chunk
unordered_set<Encoding::type> column_encodings_;
@@ -299,6 +314,7 @@ class HdfsParquetTableWriter::ColumnWriter :
// Default to dictionary encoding. If the cardinality ends up being too high,
// it will fall back to plain.
current_encoding_ = Encoding::PLAIN_DICTIONARY;
+ next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
dict_encoder_.reset(
new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
dict_encoder_base_ = dict_encoder_.get();
@@ -321,10 +337,9 @@ class HdfsParquetTableWriter::ColumnWriter :
++num_values_since_dict_size_check_;
*bytes_needed = dict_encoder_->Put(*CastValue(value));
// If the dictionary contains the maximum number of values, switch to plain
- // encoding. The current dictionary encoded page is written out.
+ // encoding for the next page. The current page is full and must be written out.
if (UNLIKELY(*bytes_needed < 0)) {
- FinalizeCurrentPage();
- current_encoding_ = Encoding::PLAIN;
+ next_page_encoding_ = Encoding::PLAIN;
return false;
}
parent_->file_size_estimate_ += *bytes_needed;
@@ -423,15 +438,16 @@ class HdfsParquetTableWriter::BoolColumnWriter :
return true;
}
- virtual void FinalizeCurrentPage() {
+ virtual Status FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
- if (current_page_->finalized) return;
+ if (current_page_->finalized) return Status::OK();
bool_values_->Flush();
int num_bytes = bool_values_->bytes_written();
current_page_->header.uncompressed_page_size += num_bytes;
// Call into superclass to handle the rest.
- BaseColumnWriter::FinalizeCurrentPage();
+ RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
bool_values_->Clear();
+ return Status::OK();
}
private:
@@ -455,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
// Ensure that we have enough space for the definition level, but don't write it yet in
// case we don't have enough space for the value.
if (def_levels_->buffer_full()) {
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
NewPage();
}
@@ -475,11 +491,11 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
int64_t bytes_needed = 0;
if (ProcessValue(value, &bytes_needed)) {
++current_page_->num_non_null;
- break;
+ break; // Succesfully appended, don't need to retry.
}
// Value didn't fit on page, try again on a new page.
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
// Check how much space is needed to write this value. If that is larger than the
// page size then increase page size and try again.
@@ -534,7 +550,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
return Status::OK();
}
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
*first_dictionary_page = -1;
// First write the dictionary page before any of the data pages.
@@ -563,8 +579,8 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
uint8_t* compressed_data =
parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
- compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
- &header.compressed_page_size, &compressed_data);
+ RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+ dict_buffer, &header.compressed_page_size, &compressed_data));
dict_buffer = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
// bytes back to the mem pool.
@@ -614,11 +630,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
return Status::OK();
}
-void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
+Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
- if (current_page_->finalized) return;
+ if (current_page_->finalized) return Status::OK();
- // If the entire page was nullptr, encode it as PLAIN since there is no
+ // If the entire page was NULL, encode it as PLAIN since there is no
// data anyway. We don't output a useless dictionary page and it works
// around a parquet MR bug (see IMPALA-759 for more details).
if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -670,8 +686,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
DCHECK_GT(max_compressed_size, 0);
uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
- compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data,
- &header.compressed_page_size, &compressed_data);
+ RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+ uncompressed_data, &header.compressed_page_size, &compressed_data));
current_page_->data = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
@@ -694,14 +710,15 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
// Add the size of the data page header
uint8_t* header_buffer;
uint32_t header_len = 0;
- parent_->thrift_serializer_->Serialize(
- ¤t_page_->header, &header_len, &header_buffer);
+ RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+ ¤t_page_->header, &header_len, &header_buffer));
current_page_->finalized = true;
total_compressed_byte_size_ += header_len + header.compressed_page_size;
total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
parent_->file_size_estimate_ += header_len + header.compressed_page_size;
def_levels_->Clear();
+ return Status::OK();
}
void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
@@ -724,6 +741,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
header.repetition_level_encoding = Encoding::BIT_PACKED;
current_page_->header.__set_data_page_header(header);
}
+ current_encoding_ = next_page_encoding_;
current_page_->finalized = false;
current_page_->num_non_null = 0;
page_stats_base_->Reset();
@@ -828,7 +846,7 @@ Status HdfsParquetTableWriter::Init() {
DCHECK(false);
}
columns_[i].reset(writer);
- columns_[i]->Reset();
+ RETURN_IF_ERROR(columns_[i]->Init());
}
RETURN_IF_ERROR(CreateSchema());
return Status::OK();
@@ -989,7 +1007,9 @@ Status HdfsParquetTableWriter::AppendRows(
}
// We exhausted the batch, so we materialize the statistics before releasing the memory.
- for (unique_ptr<BaseColumnWriter>& column : columns_) column->MaterializeStatsValues();
+ for (unique_ptr<BaseColumnWriter>& column : columns_) {
+ RETURN_IF_ERROR(column->MaterializeStatsValues());
+ }
// Reset the row_idx_ when we exhaust the batch. We can exit before exhausting
// the batch if we run out of file space and will continue from the last index.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index b3d319e..1334b19 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -58,25 +58,25 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
~HdfsParquetTableWriter();
/// Initialize column information.
- virtual Status Init();
+ virtual Status Init() override;
/// Initializes a new file. This resets the file metadata object and writes
/// the file header to the output file.
- virtual Status InitNewFile();
+ virtual Status InitNewFile() override;
/// Appends parquet representation of rows in the batch to the current file.
- virtual Status AppendRows(
- RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file);
+ virtual Status AppendRows(RowBatch* batch,
+ const std::vector<int32_t>& row_group_indices, bool* new_file) override;
/// Write out all the data.
- virtual Status Finalize();
+ virtual Status Finalize() override;
- virtual void Close();
+ virtual void Close() override;
/// Returns the target HDFS block size to use.
- virtual uint64_t default_block_size() const;
+ virtual uint64_t default_block_size() const override;
- virtual std::string file_extension() const { return "parq"; }
+ virtual std::string file_extension() const override { return "parq"; }
private:
/// Default data page size. In bytes.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 4a66c5e..42a70f0 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -125,7 +125,7 @@ Status HdfsSequenceTableWriter::AppendRows(
out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
}
- if (out_.Size() >= approx_block_size_) Flush();
+ if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
*new_file = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index cc08b00..cc6c6cc 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -64,10 +64,10 @@ class HdfsTableWriter {
/// text), 1) is called once and 2-4) is called repeatedly for each file.
/// Do initialization of writer.
- virtual Status Init() = 0;
+ virtual Status Init() WARN_UNUSED_RESULT = 0;
/// Called when a new file is started.
- virtual Status InitNewFile() = 0;
+ virtual Status InitNewFile() WARN_UNUSED_RESULT = 0;
/// Appends rows of 'batch' to the partition that are selected via 'row_group_indices',
/// and if the latter is empty, appends every row.
@@ -75,13 +75,14 @@ class HdfsTableWriter {
/// *new_file == true. A new file will be opened and the same row batch will be passed
/// again. The writer must track how much of the batch it had already processed asking
/// for a new file. Otherwise the writer will return with *newfile == false.
- virtual Status AppendRows(
- RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) = 0;
+ virtual Status AppendRows(RowBatch* batch,
+ const std::vector<int32_t>& row_group_indices,
+ bool* new_file) WARN_UNUSED_RESULT = 0;
/// Finalize this partition. The writer needs to finish processing
/// all data have written out after the return from this call.
/// This is called once for each call to InitNewFile()
- virtual Status Finalize() = 0;
+ virtual Status Finalize() WARN_UNUSED_RESULT = 0;
/// Called once when this writer should cleanup any resources.
virtual void Close() = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index bcd6fa4..76b3365 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -117,11 +117,12 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
return false;
}
-void ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
- if (value->ptr == buffer->buffer()) return;
+Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
+ if (value->ptr == buffer->buffer()) return Status::OK();
buffer->Clear();
- buffer->Append(value->ptr, value->len);
+ RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
value->ptr = buffer->buffer();
+ return Status::OK();
}
bool ColumnStatsBase::CanUseStats(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 11a01f5..7278cdc 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -81,7 +81,9 @@ class ColumnStatsBase {
/// data types (e.g. StringValue) need to be copied at the end of processing a row
/// batch, since the batch memory will be released. Overwrite this method in derived
/// classes to provide the functionality.
- virtual void MaterializeStringValuesToInternalBuffers() {}
+ virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT {
+ return Status::OK();
+ }
/// Returns the number of bytes needed to encode the current statistics into a
/// parquet::Statistics object.
@@ -100,7 +102,7 @@ class ColumnStatsBase {
protected:
// Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
// 'buffer' is reset before making the copy.
- static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
+ static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT;
/// Stores whether the min and max values of the current object have been initialized.
bool has_min_max_values_;
@@ -163,7 +165,9 @@ class ColumnStats : public ColumnStatsBase {
void Update(const T& v) { Update(v, v); }
virtual void Merge(const ColumnStatsBase& other) override;
- virtual void MaterializeStringValuesToInternalBuffers() override {}
+ virtual Status MaterializeStringValuesToInternalBuffers() override {
+ return Status::OK();
+ }
virtual int64_t BytesNeeded() const override;
virtual void EncodeToThrift(parquet::Statistics* out) const override;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index b112db3..9b81ba8 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -170,9 +170,10 @@ inline void ColumnStats<StringValue>::Update(
// StringValues need to be copied at the end of processing a row batch, since the batch
// memory will be released.
template <>
-inline void ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
- if (min_buffer_.IsEmpty()) CopyToBuffer(&min_buffer_, &min_value_);
- if (max_buffer_.IsEmpty()) CopyToBuffer(&max_buffer_, &max_value_);
+inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
+ if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
+ if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
+ return Status::OK();
}
} // end ns impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index c370728..27d1021 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -50,12 +50,12 @@ TEST(StringBufferTest, Basic) {
// Append to empty
std_str.append("Hello");
- str.Append("Hello", strlen("Hello"));
+ ASSERT_OK(str.Append("Hello", strlen("Hello")));
ValidateString(std_str, str);
// Append some more
std_str.append("World");
- str.Append("World", strlen("World"));
+ ASSERT_OK(str.Append("World", strlen("World")));
ValidateString(std_str, str);
// Clear
@@ -81,7 +81,7 @@ TEST(StringBufferTest, AppendBoundary) {
std_str.resize(chunk_size, 'a');
int64_t data_size = 0;
while (data_size + chunk_size <= max_data_size) {
- str.Append(std_str.c_str(), chunk_size);
+ ASSERT_OK(str.Append(std_str.c_str(), chunk_size));
data_size += chunk_size;
}
EXPECT_EQ(str.buffer_size(), data_size);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 5682bc7..2188e4d 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -48,7 +48,7 @@ class StringBuffer {
/// Append 'str' to the current string, allocating a new buffer as necessary.
/// Return error status if memory limit is exceeded.
- Status Append(const char* str, int64_t str_len) {
+ Status Append(const char* str, int64_t str_len) WARN_UNUSED_RESULT {
int64_t new_len = len_ + str_len;
if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len));
memcpy(buffer_ + len_, str, str_len);
@@ -57,7 +57,7 @@ class StringBuffer {
}
/// Wrapper around append() for input type 'uint8_t'.
- Status Append(const uint8_t* str, int64_t str_len) {
+ Status Append(const uint8_t* str, int64_t str_len) WARN_UNUSED_RESULT {
return Append(reinterpret_cast<const char*>(str), str_len);
}
@@ -78,7 +78,7 @@ class StringBuffer {
/// Grows the buffer to be at least 'new_size', copying over the previous data
/// into the new buffer. The old buffer is not freed. Return an error status if
/// growing the buffer will exceed memory limit.
- Status GrowBuffer(int64_t new_size) {
+ Status GrowBuffer(int64_t new_size) WARN_UNUSED_RESULT {
if (LIKELY(new_size > buffer_size_)) {
int64_t old_size = buffer_size_;
buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);