You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/08/18 01:06:10 UTC
[incubator-doris] branch master updated: Fixed the problem that
there may be redundant retries when the query result export fails (#6436)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5c3f7 Fixed the problem that there may be redundant retries when the query result export fails (#6436)
0c5c3f7 is described below
commit 0c5c3f7d87b9028b65fb3c79355e41226644e19c
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Aug 18 09:06:02 2021 +0800
Fixed the problem that there may be redundant retries when the query result export fails (#6436)
---
be/src/common/status.h | 6 +++---
be/src/runtime/file_result_writer.cpp | 35 +++++++++--------------------------
2 files changed, 12 insertions(+), 29 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 50cffd1..89bd6fe 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -286,7 +286,7 @@ private:
if (UNLIKELY(!_s.ok())) { \
LOG(WARNING) << (warning_prefix) << ": " << _s.to_string(); \
} \
- } while (0);
+ } while (false);
#define RETURN_WITH_WARN_IF_ERROR(stmt, ret_code, warning_prefix) \
do { \
@@ -295,7 +295,7 @@ private:
LOG(WARNING) << (warning_prefix) << ", error: " << _s.to_string(); \
return ret_code; \
} \
- } while (0);
+ } while (false);
#define RETURN_NOT_OK_STATUS_WITH_WARN(stmt, warning_prefix) \
do { \
@@ -304,7 +304,7 @@ private:
LOG(WARNING) << (warning_prefix) << ", error: " << _s.to_string(); \
return _s; \
} \
- } while (0);
+ } while (false);
} // namespace doris
#define WARN_UNUSED_RESULT __attribute__((warn_unused_result))
diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp
index 9377075..80bbc81 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -54,9 +54,7 @@ FileResultWriter::~FileResultWriter() {
Status FileResultWriter::init(RuntimeState* state) {
_state = state;
_init_profile();
-
- RETURN_IF_ERROR(_create_next_file_writer());
- return Status::OK();
+ return _create_next_file_writer();
}
void FileResultWriter::_init_profile() {
@@ -73,8 +71,7 @@ Status FileResultWriter::_create_success_file() {
std::string file_name;
RETURN_IF_ERROR(_get_success_file_name(&file_name));
RETURN_IF_ERROR(_create_file_writer(file_name));
- RETURN_IF_ERROR(_close_file_writer(true, true));
- return Status::OK();
+ return _close_file_writer(true, true);
}
Status FileResultWriter::_get_success_file_name(std::string* file_name) {
@@ -179,8 +176,7 @@ Status FileResultWriter::append_row_batch(const RowBatch* batch) {
Status FileResultWriter::_write_parquet_file(const RowBatch& batch) {
RETURN_IF_ERROR(_parquet_writer->write(batch));
// split file if exceed limit
- RETURN_IF_ERROR(_create_new_file_if_exceed_size());
- return Status::OK();
+ return _create_new_file_if_exceed_size();
}
Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
@@ -189,8 +185,7 @@ Status FileResultWriter::_write_csv_file(const RowBatch& batch) {
TupleRow* row = batch.get_row(i);
RETURN_IF_ERROR(_write_one_row_as_csv(row));
}
- _flush_plain_text_outstream(true);
- return Status::OK();
+ return _flush_plain_text_outstream(true);
}
// actually, this logic is same as `ExportSink::gen_row_buffer`
@@ -319,9 +314,7 @@ Status FileResultWriter::_flush_plain_text_outstream(bool eos) {
_plain_text_outstream.clear();
// split file if exceed limit
- RETURN_IF_ERROR(_create_new_file_if_exceed_size());
-
- return Status::OK();
+ return _create_new_file_if_exceed_size();
}
Status FileResultWriter::_create_new_file_if_exceed_size() {
@@ -388,20 +381,11 @@ Status FileResultWriter::_send_result() {
std::string localhost = BackendOptions::get_localhost();
row_buffer.push_string(localhost.c_str(), localhost.length()); // url
- TFetchDataResult* result = new (std::nothrow) TFetchDataResult();
+ std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
-
- Status st = _sinker->add_batch(result);
- if (st.ok()) {
- result = nullptr;
- } else {
- LOG(WARNING) << "failed to send outfile result: " << st.get_error_msg();
- }
-
- delete result;
- result = nullptr;
- return st;
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result.get()), "failed to send outfile result");
+ return Status::OK();
}
Status FileResultWriter::close() {
@@ -412,8 +396,7 @@ Status FileResultWriter::close() {
// so does the profile in RuntimeState.
COUNTER_SET(_written_rows_counter, _written_rows);
SCOPED_TIMER(_writer_close_timer);
- RETURN_IF_ERROR(_close_file_writer(true));
- return Status::OK();
+ return _close_file_writer(true);
}
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org