You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/07/21 02:54:46 UTC
[1/3] incubator-impala git commit: IMPALA-4276: Profile displays
non-default query options set by planner
Repository: incubator-impala
Updated Branches:
refs/heads/master 9d1e4449c -> daff8eb0c
IMPALA-4276: Profile displays non-default query options set by planner
Fix to populate the non-default query options set by planner in the
runtime profile.
Added a corresponding test case.
Change-Id: I08e9dc2bebb83101976bbbd903ee48c5068dbaab
Reviewed-on: http://gerrit.cloudera.org:8080/7419
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/83bfc142
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/83bfc142
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/83bfc142
Branch: refs/heads/master
Commit: 83bfc142e45e6394b30f712f90d201ea57eecc02
Parents: 9d1e444
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Jul 13 15:21:37 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 01:14:07 2017 +0000
----------------------------------------------------------------------
be/src/service/client-request-state.cc | 2 +-
tests/custom_cluster/test_admission_controller.py | 7 ++++---
tests/query_test/test_observability.py | 17 ++++++++++++++++-
3 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c454ebe..6be04f6 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -148,7 +148,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
profile_.AddChild(&server_profile_);
summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
summary_profile_.AddInfoString("Query Options (non default)",
- DebugQueryOptions(query_ctx_.client_request.query_options));
+ DebugQueryOptions(exec_request_.query_options));
switch (exec_request->stmt_type) {
case TStmtType::QUERY:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index e84db52..b9ae427 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -144,15 +144,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
assert re.search(expected_error_re, str(e))
def __check_query_options(self, profile, expected_query_options):
- """Validate that the per-pool query options were set on the specified profile.
- expected_query_options is a list of "KEY=VALUE" strings, e.g. ["MEM_LIMIT=1", ...]"""
+ """Validate that the expected per-pool query options were set on the specified
+ profile. expected_query_options is a list of "KEY=VALUE" strings, e.g.
+ ["MEM_LIMIT=1", ...]"""
confs = []
for line in profile.split("\n"):
if PROFILE_QUERY_OPTIONS_KEY in line:
rhs = re.split(": ", line)[1]
confs = re.split(",", rhs)
break
- assert len(confs) == len(expected_query_options)
+ assert len(confs) >= len(expected_query_options)
confs = map(str.lower, confs)
for expected in expected_query_options:
assert expected.lower() in confs,\
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83bfc142/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index cdb322d..5a82a25 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -79,7 +79,7 @@ class TestObservability(ImpalaTestSuite):
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE'
assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny'
- def test_get_profile(self):
+ def test_query_states(self):
"""Tests that the query profile shows expected query states."""
query = "select count(*) from functional.alltypes"
handle = self.execute_query_async(query, dict())
@@ -93,3 +93,18 @@ class TestObservability(ImpalaTestSuite):
profile = self.client.get_runtime_profile(handle)
# After fetching the results, the query must be in state FINISHED.
assert "Query State: FINISHED" in profile, profile
+
+ def test_query_options(self):
+ """Test that the query profile shows expected non-default query options, both set
+ explicitly through client and those set by planner"""
+ # Set a query option explicitly through client
+ self.execute_query("set mem_limit = 8589934592")
+ # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1 and
+ # RUNTIME_FILTER_MODE=0
+ expected_string = "Query Options (non default): MEM_LIMIT=8589934592,NUM_NODES=1," \
+ "NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n"
+ assert expected_string in self.execute_query("select 1").runtime_profile
+
+ # Make sure explicitly set default values are not shown in the profile
+ self.execute_query("set MAX_IO_BUFFERS = 0")
+ assert expected_string in self.execute_query("select 1").runtime_profile
[2/3] incubator-impala git commit: IMPALA-5275: Avoid printing status
stack trace on hot paths
Posted by he...@apache.org.
IMPALA-5275: Avoid printing status stack trace on hot paths
Currently, creation of a Status object (non-OK and non-EXPECTED)
prints the stack trace to the log. Fetching the stack trace takes
a large chunk of CPU time close to 130ms and results in a significant
perf hit when encountered on hot paths.
Five such hot paths were identified and the following changes were
made to fix it:
1. In ImpalaServer::GetExecSummary(), create Status() without holding
the query_log_lock_.
2, 3 and 4. In impala::DeserializeThriftMsg<>(),
PartitionedAggregationNode::CodegenUpdateTuple() and
HdfsScanner::CodegenWriteCompleteTuple, use Status::Expected where
appropriate.
5. In Status::MemLimitExceeded(), create Status object without
printing stacktrace
Change-Id: Ief083f558fba587381aa7fe8f99da279da02f1f2
Reviewed-on: http://gerrit.cloudera.org:8080/7449
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54865c4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54865c4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54865c4e
Branch: refs/heads/master
Commit: 54865c4ef1a1b54bf3fcdea4de29f7aaeb9278e8
Parents: 83bfc14
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Jul 17 13:18:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 01:29:24 2017 +0000
----------------------------------------------------------------------
be/src/common/status.cc | 6 +++---
be/src/exec/hdfs-scanner.cc | 4 ++--
be/src/exec/partitioned-aggregation-node.cc | 8 ++++----
be/src/rpc/thrift-util.h | 2 +-
be/src/service/impala-server.cc | 24 ++++++++++++++++++------
5 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index 1adc654..3fc2236 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -37,12 +37,12 @@ const Status Status::DEPRECATED_RPC(ErrorMsg::Init(TErrorCode::NOT_IMPLEMENTED_E
"Deprecated RPC; please update your client"));
Status Status::MemLimitExceeded() {
- return Status(TErrorCode::MEM_LIMIT_EXCEEDED, "Memory limit exceeded");
+ return Status(ErrorMsg(TErrorCode::MEM_LIMIT_EXCEEDED, "Memory limit exceeded"), true);
}
Status Status::MemLimitExceeded(const std::string& details) {
- return Status(TErrorCode::MEM_LIMIT_EXCEEDED,
- Substitute("Memory limit exceeded: $0", details));
+ return Status(ErrorMsg(TErrorCode::MEM_LIMIT_EXCEEDED,
+ Substitute("Memory limit exceeded: $0", details)), true);
}
Status::Status(TErrorCode::type code)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0670f81..10e4edd 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -312,10 +312,10 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
for (int i = 0; i < node->materialized_slots().size(); ++i) {
SlotDescriptor* slot_desc = node->materialized_slots()[i];
if (slot_desc->type().type == TYPE_TIMESTAMP) {
- return Status("Timestamp not yet supported for codegen.");
+ return Status::Expected("Timestamp not yet supported for codegen.");
}
if (slot_desc->type().type == TYPE_DECIMAL) {
- return Status("Decimal not yet supported for codegen.");
+ return Status::Expected("Decimal not yet supported for codegen.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 7849394..7067961 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1743,14 +1743,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
if (slot_desc->type().type == TYPE_CHAR) {
- return Status("PartitionedAggregationNode::CodegenUpdateTuple(): cannot codegen"
- "CHAR in aggregations");
+ return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): cannot "
+ "codegen CHAR in aggregations");
}
}
if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == NULL) {
- return Status("PartitionedAggregationNode::CodegenUpdateTuple(): failed to generate "
- "intermediate tuple desc");
+ return Status::Expected("PartitionedAggregationNode::CodegenUpdateTuple(): failed to"
+ " generate intermediate tuple desc");
}
// Get the types to match the UpdateTuple signature
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 96a864c..1a66286 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -126,7 +126,7 @@ Status DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, bool compact,
} catch (std::exception& e) {
std::stringstream msg;
msg << "couldn't deserialize thrift msg:\n" << e.what();
- return Status(msg.str());
+ return Status::Expected(msg.str());
} catch (...) {
/// TODO: Find the right exception for 0 bytes
return Status("Unknown exception");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54865c4e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index cf91ba3..a26f0ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -658,17 +658,29 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
}
// Look for the query in completed query log.
+ // IMPALA-5275: Don't create Status while holding query_log_lock_
{
- lock_guard<mutex> l(query_log_lock_);
- QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
- if (query_record == query_log_index_.end()) {
+ string effective_user;
+ bool user_has_profile_access = false;
+ bool is_query_missing = false;
+ TExecSummary exec_summary;
+ {
+ lock_guard<mutex> l(query_log_lock_);
+ QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
+ is_query_missing = query_record == query_log_index_.end();
+ if (!is_query_missing) {
+ effective_user = query_record->second->effective_user;
+ user_has_profile_access = query_record->second->user_has_profile_access;
+ exec_summary = query_record->second->exec_summary;
+ }
+ }
+ if (is_query_missing) {
stringstream ss;
ss << "Query id " << PrintId(query_id) << " not found.";
return Status(ss.str());
}
- RETURN_IF_ERROR(CheckProfileAccess(user, query_record->second->effective_user,
- query_record->second->user_has_profile_access));
- *result = query_record->second->exec_summary;
+ RETURN_IF_ERROR(CheckProfileAccess(user, effective_user, user_has_profile_access));
+ *result = exec_summary;
}
return Status::OK();
}
[3/3] incubator-impala git commit: IMPALA-5627: fix dropped statuses
in HDFS writers
Posted by he...@apache.org.
IMPALA-5627: fix dropped statuses in HDFS writers
The change is mostly mechanical - added Status returns where
need.
In one place I restructured the the logic around
'current_encoding_' for Parquet to allow a cleaner solution
to the dropped status from FinalizeCurrentPage() call in
ProcessValue(): after the restructuring the call was no longer
needed. 'current_encoding_' was overloaded to represent both the
encoding of the current page and the preferred encoding
for subsequent pages.
Testing:
Ran exhaustive build.
Change-Id: I753d352c640faf5eaef650cd743e53de53761431
Reviewed-on: http://gerrit.cloudera.org:8080/7372
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/daff8eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/daff8eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/daff8eb0
Branch: refs/heads/master
Commit: daff8eb0ca19aa612c9fc7cc2ddd647735b31266
Parents: 54865c4
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 6 18:41:07 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 02:51:51 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-avro-table-writer.cc | 2 +-
be/src/exec/hdfs-avro-table-writer.h | 20 +++---
be/src/exec/hdfs-parquet-table-writer.cc | 88 ++++++++++++++++----------
be/src/exec/hdfs-parquet-table-writer.h | 16 ++---
be/src/exec/hdfs-sequence-table-writer.cc | 2 +-
be/src/exec/hdfs-table-writer.h | 11 ++--
be/src/exec/parquet-column-stats.cc | 7 +-
be/src/exec/parquet-column-stats.h | 10 ++-
be/src/exec/parquet-column-stats.inline.h | 7 +-
be/src/runtime/string-buffer-test.cc | 6 +-
be/src/runtime/string-buffer.h | 6 +-
11 files changed, 101 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index 46185e8..3ce296d 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -196,7 +196,7 @@ Status HdfsAvroTableWriter::AppendRows(
}
}
- if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) Flush();
+ if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
*new_file = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index f85659e..6966860 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -68,17 +68,17 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
virtual ~HdfsAvroTableWriter() { }
- virtual Status Init();
- virtual Status Finalize() { return Flush(); }
- virtual Status InitNewFile() { return WriteFileHeader(); }
- virtual void Close();
- virtual uint64_t default_block_size() const { return 0; }
- virtual std::string file_extension() const { return "avro"; }
+ virtual Status Init() override;
+ virtual Status Finalize() override { return Flush(); }
+ virtual Status InitNewFile() override { return WriteFileHeader(); }
+ virtual void Close() override;
+ virtual uint64_t default_block_size() const override { return 0; }
+ virtual std::string file_extension() const override { return "avro"; }
/// Outputs the given rows into an HDFS sequence file. The rows are buffered
/// to fill a sequence file block.
- virtual Status AppendRows(
- RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
+ virtual Status AppendRows(RowBatch* rows,
+ const std::vector<int32_t>& row_group_indices, bool* new_file) override;
private:
/// Processes a single row, appending to out_
@@ -88,11 +88,11 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
inline void AppendField(const ColumnType& type, const void* value);
/// Writes the Avro file header to HDFS
- Status WriteFileHeader();
+ Status WriteFileHeader() WARN_UNUSED_RESULT;
/// Writes the contents of out_ to HDFS as a single Avro file block.
/// Returns an error if write to HDFS fails.
- Status Flush();
+ Status Flush() WARN_UNUSED_RESULT;
/// Buffer which holds accumulated output
WriteStream out_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 5a2d810..04a81f1 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -103,8 +103,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
page_stats_base_(nullptr),
row_group_stats_base_(nullptr) {
- Codec::CreateCompressor(nullptr, false, codec, &compressor_);
-
def_levels_ = parent_->state_->obj_pool()->Add(
new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
DEFAULT_DATA_PAGE_SIZE, 1));
@@ -113,13 +111,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
virtual ~BaseColumnWriter() {}
+ // Called after the constructor to initialize the column writer.
+ Status Init() WARN_UNUSED_RESULT {
+ Reset();
+ RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
+ return Status::OK();
+ }
+
// Appends the row to this column. This buffers the value into a data page. Returns
// error if the space needed for the encoded value is larger than the data page size.
// TODO: this needs to be batch based, instead of row based for better performance.
// This is a bit trickier to handle the case where only a partial row batch can be
// output to the current file because it reaches the max file size. Enabling codegen
// would also solve this problem.
- Status AppendRow(TupleRow* row);
+ Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
// Flushes all buffered data pages to the file.
// *file_pos is an output parameter and will be incremented by
@@ -128,13 +133,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// will contain the byte offset for the data page and dictionary page. They
// will be set to -1 if the column does not contain that type of page.
Status Flush(int64_t* file_pos, int64_t* first_data_page,
- int64_t* first_dictionary_page);
+ int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
// Materializes the column statistics to the per-file MemPool so they are available
// after their row batch buffer has been freed.
- void MaterializeStatsValues() {
- row_group_stats_base_->MaterializeStringValuesToInternalBuffers();
- page_stats_base_->MaterializeStringValuesToInternalBuffers();
+ Status MaterializeStatsValues() WARN_UNUSED_RESULT {
+ RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
+ RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
+ return Status::OK();
}
// Encodes the row group statistics into a parquet::Statistics object and attaches it to
@@ -157,6 +163,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
num_values_ = 0;
total_compressed_byte_size_ = 0;
current_encoding_ = Encoding::PLAIN;
+ next_page_encoding_ = Encoding::PLAIN;
column_encodings_.clear();
dict_encoding_stats_.clear();
data_encoding_stats_.clear();
@@ -184,16 +191,18 @@ class HdfsParquetTableWriter::BaseColumnWriter {
friend class HdfsParquetTableWriter;
// Encodes value into the current page output buffer and updates the column statistics
- // aggregates. Returns true if the value fits on the current page. If this function
- // returned false, the caller should create a new page and try again with the same
- // value.
+ // aggregates. Returns true if the value was appended successfully to the current page.
+ // Returns false if the value was not appended to the current page and the caller can
+ // create a new page and try again with the same value. May change
+ // 'next_page_encoding_' if the encoding for the next page should be different - e.g.
+ // if a dictionary overflowed and dictionary encoding is no longer viable.
// *bytes_needed will contain the (estimated) number of bytes needed to successfully
// encode the value in the page.
// Implemented in the subclass.
- virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
+ virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
// Encodes out all data for the current page and updates the metadata.
- virtual void FinalizeCurrentPage();
+ virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
// Update current_page_ to a new page, reusing pages allocated if possible.
void NewPage();
@@ -246,10 +255,16 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// Pointer to the current page in 'pages_'. Not owned.
DataPage* current_page_;
- int64_t num_values_; // Total number of values across all pages, including nullptr.
+ // Total number of values across all pages, including NULL.
+ int64_t num_values_;
int64_t total_compressed_byte_size_;
int64_t total_uncompressed_byte_size_;
+ // Encoding of the current page.
Encoding::type current_encoding_;
+ // Encoding to use for the next page. By default, the same as 'current_encoding_'.
+ // Used by the column writer to switch encoding while writing a column, e.g. if the
+ // dictionary overflows.
+ Encoding::type next_page_encoding_;
// Set of all encodings used in the column chunk
unordered_set<Encoding::type> column_encodings_;
@@ -299,6 +314,7 @@ class HdfsParquetTableWriter::ColumnWriter :
// Default to dictionary encoding. If the cardinality ends up being too high,
// it will fall back to plain.
current_encoding_ = Encoding::PLAIN_DICTIONARY;
+ next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
dict_encoder_.reset(
new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
dict_encoder_base_ = dict_encoder_.get();
@@ -321,10 +337,9 @@ class HdfsParquetTableWriter::ColumnWriter :
++num_values_since_dict_size_check_;
*bytes_needed = dict_encoder_->Put(*CastValue(value));
// If the dictionary contains the maximum number of values, switch to plain
- // encoding. The current dictionary encoded page is written out.
+ // encoding for the next page. The current page is full and must be written out.
if (UNLIKELY(*bytes_needed < 0)) {
- FinalizeCurrentPage();
- current_encoding_ = Encoding::PLAIN;
+ next_page_encoding_ = Encoding::PLAIN;
return false;
}
parent_->file_size_estimate_ += *bytes_needed;
@@ -423,15 +438,16 @@ class HdfsParquetTableWriter::BoolColumnWriter :
return true;
}
- virtual void FinalizeCurrentPage() {
+ virtual Status FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
- if (current_page_->finalized) return;
+ if (current_page_->finalized) return Status::OK();
bool_values_->Flush();
int num_bytes = bool_values_->bytes_written();
current_page_->header.uncompressed_page_size += num_bytes;
// Call into superclass to handle the rest.
- BaseColumnWriter::FinalizeCurrentPage();
+ RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
bool_values_->Clear();
+ return Status::OK();
}
private:
@@ -455,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
// Ensure that we have enough space for the definition level, but don't write it yet in
// case we don't have enough space for the value.
if (def_levels_->buffer_full()) {
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
NewPage();
}
@@ -475,11 +491,11 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
int64_t bytes_needed = 0;
if (ProcessValue(value, &bytes_needed)) {
++current_page_->num_non_null;
- break;
+ break; // Succesfully appended, don't need to retry.
}
// Value didn't fit on page, try again on a new page.
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
// Check how much space is needed to write this value. If that is larger than the
// page size then increase page size and try again.
@@ -534,7 +550,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
return Status::OK();
}
- FinalizeCurrentPage();
+ RETURN_IF_ERROR(FinalizeCurrentPage());
*first_dictionary_page = -1;
// First write the dictionary page before any of the data pages.
@@ -563,8 +579,8 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
uint8_t* compressed_data =
parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
- compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
- &header.compressed_page_size, &compressed_data);
+ RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+ dict_buffer, &header.compressed_page_size, &compressed_data));
dict_buffer = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
// bytes back to the mem pool.
@@ -614,11 +630,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
return Status::OK();
}
-void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
+Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
DCHECK(current_page_ != nullptr);
- if (current_page_->finalized) return;
+ if (current_page_->finalized) return Status::OK();
- // If the entire page was nullptr, encode it as PLAIN since there is no
+ // If the entire page was NULL, encode it as PLAIN since there is no
// data anyway. We don't output a useless dictionary page and it works
// around a parquet MR bug (see IMPALA-759 for more details).
if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -670,8 +686,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
DCHECK_GT(max_compressed_size, 0);
uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
header.compressed_page_size = max_compressed_size;
- compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data,
- &header.compressed_page_size, &compressed_data);
+ RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+ uncompressed_data, &header.compressed_page_size, &compressed_data));
current_page_->data = compressed_data;
// We allocated the output based on the guessed size, return the extra allocated
@@ -694,14 +710,15 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
// Add the size of the data page header
uint8_t* header_buffer;
uint32_t header_len = 0;
- parent_->thrift_serializer_->Serialize(
- ¤t_page_->header, &header_len, &header_buffer);
+ RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+ ¤t_page_->header, &header_len, &header_buffer));
current_page_->finalized = true;
total_compressed_byte_size_ += header_len + header.compressed_page_size;
total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
parent_->file_size_estimate_ += header_len + header.compressed_page_size;
def_levels_->Clear();
+ return Status::OK();
}
void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
@@ -724,6 +741,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
header.repetition_level_encoding = Encoding::BIT_PACKED;
current_page_->header.__set_data_page_header(header);
}
+ current_encoding_ = next_page_encoding_;
current_page_->finalized = false;
current_page_->num_non_null = 0;
page_stats_base_->Reset();
@@ -828,7 +846,7 @@ Status HdfsParquetTableWriter::Init() {
DCHECK(false);
}
columns_[i].reset(writer);
- columns_[i]->Reset();
+ RETURN_IF_ERROR(columns_[i]->Init());
}
RETURN_IF_ERROR(CreateSchema());
return Status::OK();
@@ -989,7 +1007,9 @@ Status HdfsParquetTableWriter::AppendRows(
}
// We exhausted the batch, so we materialize the statistics before releasing the memory.
- for (unique_ptr<BaseColumnWriter>& column : columns_) column->MaterializeStatsValues();
+ for (unique_ptr<BaseColumnWriter>& column : columns_) {
+ RETURN_IF_ERROR(column->MaterializeStatsValues());
+ }
// Reset the row_idx_ when we exhaust the batch. We can exit before exhausting
// the batch if we run out of file space and will continue from the last index.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index b3d319e..1334b19 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -58,25 +58,25 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
~HdfsParquetTableWriter();
/// Initialize column information.
- virtual Status Init();
+ virtual Status Init() override;
/// Initializes a new file. This resets the file metadata object and writes
/// the file header to the output file.
- virtual Status InitNewFile();
+ virtual Status InitNewFile() override;
/// Appends parquet representation of rows in the batch to the current file.
- virtual Status AppendRows(
- RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file);
+ virtual Status AppendRows(RowBatch* batch,
+ const std::vector<int32_t>& row_group_indices, bool* new_file) override;
/// Write out all the data.
- virtual Status Finalize();
+ virtual Status Finalize() override;
- virtual void Close();
+ virtual void Close() override;
/// Returns the target HDFS block size to use.
- virtual uint64_t default_block_size() const;
+ virtual uint64_t default_block_size() const override;
- virtual std::string file_extension() const { return "parq"; }
+ virtual std::string file_extension() const override { return "parq"; }
private:
/// Default data page size. In bytes.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 4a66c5e..42a70f0 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -125,7 +125,7 @@ Status HdfsSequenceTableWriter::AppendRows(
out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
}
- if (out_.Size() >= approx_block_size_) Flush();
+ if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
*new_file = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index cc08b00..cc6c6cc 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -64,10 +64,10 @@ class HdfsTableWriter {
/// text), 1) is called once and 2-4) is called repeatedly for each file.
/// Do initialization of writer.
- virtual Status Init() = 0;
+ virtual Status Init() WARN_UNUSED_RESULT = 0;
/// Called when a new file is started.
- virtual Status InitNewFile() = 0;
+ virtual Status InitNewFile() WARN_UNUSED_RESULT = 0;
/// Appends rows of 'batch' to the partition that are selected via 'row_group_indices',
/// and if the latter is empty, appends every row.
@@ -75,13 +75,14 @@ class HdfsTableWriter {
/// *new_file == true. A new file will be opened and the same row batch will be passed
/// again. The writer must track how much of the batch it had already processed asking
/// for a new file. Otherwise the writer will return with *newfile == false.
- virtual Status AppendRows(
- RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) = 0;
+ virtual Status AppendRows(RowBatch* batch,
+ const std::vector<int32_t>& row_group_indices,
+ bool* new_file) WARN_UNUSED_RESULT = 0;
/// Finalize this partition. The writer needs to finish processing
/// all data have written out after the return from this call.
/// This is called once for each call to InitNewFile()
- virtual Status Finalize() = 0;
+ virtual Status Finalize() WARN_UNUSED_RESULT = 0;
/// Called once when this writer should cleanup any resources.
virtual void Close() = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index bcd6fa4..76b3365 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -117,11 +117,12 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
return false;
}
-void ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
- if (value->ptr == buffer->buffer()) return;
+Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
+ if (value->ptr == buffer->buffer()) return Status::OK();
buffer->Clear();
- buffer->Append(value->ptr, value->len);
+ RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
value->ptr = buffer->buffer();
+ return Status::OK();
}
bool ColumnStatsBase::CanUseStats(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 11a01f5..7278cdc 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -81,7 +81,9 @@ class ColumnStatsBase {
/// data types (e.g. StringValue) need to be copied at the end of processing a row
/// batch, since the batch memory will be released. Overwrite this method in derived
/// classes to provide the functionality.
- virtual void MaterializeStringValuesToInternalBuffers() {}
+ virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT {
+ return Status::OK();
+ }
/// Returns the number of bytes needed to encode the current statistics into a
/// parquet::Statistics object.
@@ -100,7 +102,7 @@ class ColumnStatsBase {
protected:
// Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
// 'buffer' is reset before making the copy.
- static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
+ static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT;
/// Stores whether the min and max values of the current object have been initialized.
bool has_min_max_values_;
@@ -163,7 +165,9 @@ class ColumnStats : public ColumnStatsBase {
void Update(const T& v) { Update(v, v); }
virtual void Merge(const ColumnStatsBase& other) override;
- virtual void MaterializeStringValuesToInternalBuffers() override {}
+ virtual Status MaterializeStringValuesToInternalBuffers() override {
+ return Status::OK();
+ }
virtual int64_t BytesNeeded() const override;
virtual void EncodeToThrift(parquet::Statistics* out) const override;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index b112db3..9b81ba8 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -170,9 +170,10 @@ inline void ColumnStats<StringValue>::Update(
// StringValues need to be copied at the end of processing a row batch, since the batch
// memory will be released.
template <>
-inline void ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
- if (min_buffer_.IsEmpty()) CopyToBuffer(&min_buffer_, &min_value_);
- if (max_buffer_.IsEmpty()) CopyToBuffer(&max_buffer_, &max_value_);
+inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
+ if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_));
+ if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_));
+ return Status::OK();
}
} // end ns impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc
index c370728..27d1021 100644
--- a/be/src/runtime/string-buffer-test.cc
+++ b/be/src/runtime/string-buffer-test.cc
@@ -50,12 +50,12 @@ TEST(StringBufferTest, Basic) {
// Append to empty
std_str.append("Hello");
- str.Append("Hello", strlen("Hello"));
+ ASSERT_OK(str.Append("Hello", strlen("Hello")));
ValidateString(std_str, str);
// Append some more
std_str.append("World");
- str.Append("World", strlen("World"));
+ ASSERT_OK(str.Append("World", strlen("World")));
ValidateString(std_str, str);
// Clear
@@ -81,7 +81,7 @@ TEST(StringBufferTest, AppendBoundary) {
std_str.resize(chunk_size, 'a');
int64_t data_size = 0;
while (data_size + chunk_size <= max_data_size) {
- str.Append(std_str.c_str(), chunk_size);
+ ASSERT_OK(str.Append(std_str.c_str(), chunk_size));
data_size += chunk_size;
}
EXPECT_EQ(str.buffer_size(), data_size);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index 5682bc7..2188e4d 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -48,7 +48,7 @@ class StringBuffer {
/// Append 'str' to the current string, allocating a new buffer as necessary.
/// Return error status if memory limit is exceeded.
- Status Append(const char* str, int64_t str_len) {
+ Status Append(const char* str, int64_t str_len) WARN_UNUSED_RESULT {
int64_t new_len = len_ + str_len;
if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len));
memcpy(buffer_ + len_, str, str_len);
@@ -57,7 +57,7 @@ class StringBuffer {
}
/// Wrapper around append() for input type 'uint8_t'.
- Status Append(const uint8_t* str, int64_t str_len) {
+ Status Append(const uint8_t* str, int64_t str_len) WARN_UNUSED_RESULT {
return Append(reinterpret_cast<const char*>(str), str_len);
}
@@ -78,7 +78,7 @@ class StringBuffer {
/// Grows the buffer to be at least 'new_size', copying over the previous data
/// into the new buffer. The old buffer is not freed. Return an error status if
/// growing the buffer will exceed memory limit.
- Status GrowBuffer(int64_t new_size) {
+ Status GrowBuffer(int64_t new_size) WARN_UNUSED_RESULT {
if (LIKELY(new_size > buffer_size_)) {
int64_t old_size = buffer_size_;
buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);