You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/22 04:33:20 UTC
[incubator-doris] branch branch-0.15 updated: [Bug] Fix some memory
bugs (#6699)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/branch-0.15 by this push:
new 9c0c129 [Bug] Fix some memory bugs (#6699)
9c0c129 is described below
commit 9c0c129eed4f607d684f5e66651f3e045bbfaa41
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Sep 22 12:30:14 2021 +0800
[Bug] Fix some memory bugs (#6699)
1. Fix a memory leak in `collect_iterator.cpp` (Fix #6700)
2. Add a new BE config `max_segment_num_per_rowset` to limit the num of segment in new rowset.(Fix #6701)
3. Make the error msg of stream load more friendly.
---
be/src/common/config.h | 7 +++++++
be/src/exec/tablet_sink.cpp | 7 ++++---
be/src/http/action/mini_load.cpp | 4 ++--
be/src/http/action/stream_load.cpp | 10 +++++-----
be/src/olap/collect_iterator.cpp | 11 +++++++++++
be/src/olap/collect_iterator.h | 2 ++
be/src/olap/delta_writer.cpp | 3 +++
be/src/olap/delta_writer.h | 3 +++
be/src/olap/olap_define.h | 1 +
be/src/runtime/fragment_mgr.cpp | 4 ++--
be/src/runtime/message_body_sink.cpp | 2 +-
be/src/runtime/message_body_sink.h | 4 ++--
be/src/runtime/routine_load/data_consumer_group.cpp | 2 +-
be/src/runtime/routine_load/routine_load_task_executor.cpp | 2 +-
be/src/runtime/stream_load/stream_load_context.h | 1 +
be/src/runtime/stream_load/stream_load_executor.cpp | 2 +-
be/src/runtime/stream_load/stream_load_pipe.h | 12 +++++++-----
be/src/service/internal_service.cpp | 2 +-
be/test/runtime/stream_load_pipe_test.cpp | 2 +-
docs/en/administrator-guide/config/be_config.md | 8 +++++++-
docs/zh-CN/administrator-guide/config/be_config.md | 6 ++++++
.../org/apache/doris/load/update/UpdateStmtExecutor.java | 4 ++--
22 files changed, 71 insertions(+), 28 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a7d1847..ef53f70 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -633,6 +633,13 @@ CONF_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
CONF_Int32(send_batch_thread_pool_queue_size, "102400");
+// Limit the number of segment of a newly created rowset.
+// The newly created rowset may to be compacted after loading,
+// so if there are too many segment in a rowset, the compaction process
+// will run out of memory.
+// When doing compaction, each segment may take at least 1MB buffer.
+CONF_mInt32(max_segment_num_per_rowset, "100");
+
} // namespace config
} // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 9c1e31c..d282058 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -481,9 +481,10 @@ Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
}
if (has_intolerable_failure()) {
- std::stringstream ss;
- ss << "index channel has intolerable failure. " << BackendOptions::get_localhost();
- return Status::InternalError(ss.str());
+ std::stringstream ss2;
+ ss2 << "index channel has intolerable failure. " << BackendOptions::get_localhost()
+ << ", err: " << ss.str();
+ return Status::InternalError(ss2.str());
}
return Status::OK();
diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp
index b692e84..f7ef79a 100644
--- a/be/src/http/action/mini_load.cpp
+++ b/be/src/http/action/mini_load.cpp
@@ -626,7 +626,7 @@ void MiniLoadAction::free_handler_ctx(void* param) {
if (streaming_ctx->body_sink != nullptr) {
LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string()
<< " because sender failed";
- streaming_ctx->body_sink->cancel();
+ streaming_ctx->body_sink->cancel("sender failed");
}
if (streaming_ctx->unref()) {
delete streaming_ctx;
@@ -935,7 +935,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel(ctx->status.get_error_msg());
}
}
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 2177d12..369ce58 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -153,7 +153,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel(ctx->status.get_error_msg());
}
}
@@ -229,7 +229,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel(st.get_error_msg());
}
auto str = ctx->to_json();
// add new line at end
@@ -336,7 +336,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
auto st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg()
- << ctx->brief();
+ << ", " << ctx->brief();
ctx->status = st;
return;
}
@@ -350,9 +350,9 @@ void StreamLoadAction::free_handler_ctx(void* param) {
if (ctx == nullptr) {
return;
}
- // sender is going, make receiver know it
+ // sender is gone, make receiver know it
if (ctx->body_sink != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel("sender is gone");
}
if (ctx->unref()) {
delete ctx;
diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index 5fce312..9e4b799 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -239,6 +239,17 @@ CollectIterator::Level1Iterator::~Level1Iterator() {
child = nullptr;
}
}
+
+ if (_heap) {
+ while (!_heap->empty()) {
+ LevelIterator* it = _heap->top();
+ if (it != nullptr) {
+ delete it;
+ it = nullptr;
+ }
+ _heap->pop();
+ }
+ }
}
// Read next row into *row.
diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h
index 92f3748..269f754 100644
--- a/be/src/olap/collect_iterator.h
+++ b/be/src/olap/collect_iterator.h
@@ -149,6 +149,8 @@ private:
bool _merge = true;
bool _reverse = false;
// used when `_merge == true`
+ // need to be cleared when deconstructing this Level1Iterator
+ // The child LevelIterator should be either in _heap or in _children
std::unique_ptr<MergeHeap> _heap;
// used when `_merge == false`
int _child_idx = 0;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index defd345..a5cb6de 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -169,6 +169,9 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
// if memtable is full, push it to the flush executor,
// and create a new memtable for incoming data
if (_mem_table->memory_usage() >= config::write_buffer_size) {
+ if (++_segment_counter > config::max_segment_num_per_rowset) {
+ return OLAP_ERR_TOO_MANY_SEGMENTS;
+ }
RETURN_NOT_OK(_flush_memtable_async());
// create a new memtable for new incoming data
_reset_mem_table();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 01c1b0d..728e0b2 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -115,6 +115,9 @@ private:
std::shared_ptr<MemTracker> _parent_mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;
+ // The counter of number of segment flushed already.
+ int64_t _segment_counter = 0;
+
SpinLock _lock;
};
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index f140f6b..fd3a794 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -179,6 +179,7 @@ enum OLAPStatus {
OLAP_ERR_TOO_MANY_VERSION = -235,
OLAP_ERR_NOT_INITIALIZED = -236,
OLAP_ERR_ALREADY_CANCELLED = -237,
+ OLAP_ERR_TOO_MANY_SEGMENTS = -238,
// CommandExecutor
// [-300, -400)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e607266..f980e7b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -248,7 +248,7 @@ Status FragmentExecState::cancel_before_execute() {
_executor.set_abort();
_executor.cancel();
if (_pipe != nullptr) {
- _pipe->cancel();
+ _pipe->cancel("Execution aborted before start");
}
return Status::OK();
}
@@ -261,7 +261,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
}
_executor.cancel();
if (_pipe != nullptr) {
- _pipe->cancel();
+ _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
}
return Status::OK();
}
diff --git a/be/src/runtime/message_body_sink.cpp b/be/src/runtime/message_body_sink.cpp
index 0bdf3df..6538ece 100644
--- a/be/src/runtime/message_body_sink.cpp
+++ b/be/src/runtime/message_body_sink.cpp
@@ -67,7 +67,7 @@ Status MessageBodyFileSink::finish() {
return Status::OK();
}
-void MessageBodyFileSink::cancel() {
+void MessageBodyFileSink::cancel(const std::string& reason) {
unlink(_path.data());
}
diff --git a/be/src/runtime/message_body_sink.h b/be/src/runtime/message_body_sink.h
index 8d39f01..d5460a6 100644
--- a/be/src/runtime/message_body_sink.h
+++ b/be/src/runtime/message_body_sink.h
@@ -30,7 +30,7 @@ public:
// called when all data has been append
virtual Status finish() { return Status::OK(); }
// called when read HTTP failed
- virtual void cancel() {}
+ virtual void cancel(const std::string& reason) {}
};
// write message to a local file
@@ -43,7 +43,7 @@ public:
Status append(const char* data, size_t size) override;
Status finish() override;
- void cancel() override;
+ void cancel(const std::string& reason) override;
private:
std::string _path;
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp
index 24880d2..9096e15 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -149,7 +149,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, we have to cancel it, because
// we do not allow finishing stream load pipe without data
- kafka_pipe->cancel();
+ kafka_pipe->cancel("no data");
return Status::Cancelled("Cancelled");
} else {
DCHECK(left_bytes < ctx->max_batch_size);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index a574309..17d13a4 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -342,7 +342,7 @@ void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel(err_msg);
}
return;
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index f6e7b4a..cd3b458 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -194,6 +194,7 @@ public:
// to identified a specified data consumer.
int64_t consumer_id;
+ // If this is an tranactional insert operation, this will be true
bool need_commit_self = false;
public:
ExecEnv* exec_env() { return _exec_env; }
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 781792b..87d8de4 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -94,7 +94,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, std::sh
<< ", err_msg=" << status.get_error_msg() << ", " << ctx->brief();
// cancel body_sink, make sender known it
if (ctx->body_sink != nullptr) {
- ctx->body_sink->cancel();
+ ctx->body_sink->cancel(status.get_error_msg());
}
switch (ctx->load_src_type) {
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index 3912a0f..cab7908 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -123,7 +123,7 @@ public:
}
// cancelled
if (_cancelled) {
- return Status::InternalError("cancelled");
+ return Status::InternalError("cancelled: " + _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
@@ -159,7 +159,7 @@ public:
Status tell(int64_t* position) override { return Status::InternalError("Not implemented"); }
// called when consumer finished
- void close() override { cancel(); }
+ void close() override { cancel("closed"); }
bool closed() override { return _cancelled; }
@@ -179,10 +179,11 @@ public:
}
// called when producer/consumer failed
- void cancel() override {
+ void cancel(const std::string& reason) override {
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
+ _cancelled_reason = reason;
}
_get_cond.notify_all();
_put_cond.notify_all();
@@ -197,7 +198,7 @@ private:
}
// cancelled
if (_cancelled) {
- return Status::InternalError("cancelled");
+ return Status::InternalError("cancelled: " + _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
@@ -237,7 +238,7 @@ private:
}
}
if (_cancelled) {
- return Status::InternalError("cancelled");
+ return Status::InternalError("cancelled: " + _cancelled_reason);
}
_buf_queue.push_back(buf);
if (_use_proto) {
@@ -271,6 +272,7 @@ private:
bool _finished;
bool _cancelled;
+ std::string _cancelled_reason = "";
ByteBufferPtr _write_buf;
};
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 021324f..58d743a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -353,7 +353,7 @@ void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controll
response->mutable_status()->set_status_code(1);
response->mutable_status()->add_error_msgs("pipe is null");
} else {
- pipe->cancel();
+ pipe->cancel("rollback");
response->mutable_status()->set_status_code(0);
}
}
diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp
index cd7c678..48fc580 100644
--- a/be/test/runtime/stream_load_pipe_test.cpp
+++ b/be/test/runtime/stream_load_pipe_test.cpp
@@ -211,7 +211,7 @@ TEST_F(StreamLoadPipeTest, cancel) {
pipe.append(&buf, 1);
}
SleepFor(MonoDelta::FromMilliseconds(100));
- pipe.cancel();
+ pipe.cancel("test");
};
std::thread t1(appender);
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index a888263..101a235 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1418,4 +1418,10 @@ The size of the buffer before flashing
RELEASE = 0
DEBUG = 1
```
-* Default: 0
\ No newline at end of file
+* Default: 0
+
+### `max_segment_num_per_rowset`
+
+* Type: int32
+* Description: Used to limit the number of segments in the newly generated rowset when importing. If the threshold is exceeded, the import will fail with error -238. Too many segments will cause compaction to take up a lot of memory and cause OOM errors.
+* Default value: 100
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index 7fb7728..e191671 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1441,3 +1441,9 @@ webserver默认工作线程数
DEBUG = 1
```
* 默认值: 0
+
+### `max_segment_num_per_rowset`
+
+* 类型: int32
+* 描述: 用于限制导入时,新产生的rowset中的segment数量。如果超过阈值,导入会失败并报错 -238。过多的 segment 会导致compaction占用大量内存引发 OOM 错误。
+* 默认值: 100
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
index b31f544..1c0a01c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
@@ -100,7 +100,7 @@ public class UpdateStmtExecutor {
LOG.warn("failed to plan update stmt, query id:{}", DebugUtil.printId(queryId), e);
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
- throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e);
+ throw new DdlException("failed to plan update stmt, query id: " + DebugUtil.printId(queryId) + ", err: " + e.getMessage());
} finally {
targetTable.readUnlock();
}
@@ -115,7 +115,7 @@ public class UpdateStmtExecutor {
} catch (Throwable e) {
LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e);
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
- throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e);
+ throw new DdlException("failed to execute update stmt, query id: " + DebugUtil.printId(queryId) + ", err: " + e.getMessage());
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org