You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/22 04:33:20 UTC

[incubator-doris] branch branch-0.15 updated: [Bug] Fix some memory bugs (#6699)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/branch-0.15 by this push:
     new 9c0c129  [Bug] Fix some memory bugs (#6699)
9c0c129 is described below

commit 9c0c129eed4f607d684f5e66651f3e045bbfaa41
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Sep 22 12:30:14 2021 +0800

    [Bug] Fix some memory bugs (#6699)
    
    1. Fix a memory leak in `collect_iterator.cpp` (Fix #6700)
    2. Add a new BE config `max_segment_num_per_rowset` to limit the num of segment in new rowset.(Fix #6701)
    3. Make the error msg of stream load more friendly.
---
 be/src/common/config.h                                       |  7 +++++++
 be/src/exec/tablet_sink.cpp                                  |  7 ++++---
 be/src/http/action/mini_load.cpp                             |  4 ++--
 be/src/http/action/stream_load.cpp                           | 10 +++++-----
 be/src/olap/collect_iterator.cpp                             | 11 +++++++++++
 be/src/olap/collect_iterator.h                               |  2 ++
 be/src/olap/delta_writer.cpp                                 |  3 +++
 be/src/olap/delta_writer.h                                   |  3 +++
 be/src/olap/olap_define.h                                    |  1 +
 be/src/runtime/fragment_mgr.cpp                              |  4 ++--
 be/src/runtime/message_body_sink.cpp                         |  2 +-
 be/src/runtime/message_body_sink.h                           |  4 ++--
 be/src/runtime/routine_load/data_consumer_group.cpp          |  2 +-
 be/src/runtime/routine_load/routine_load_task_executor.cpp   |  2 +-
 be/src/runtime/stream_load/stream_load_context.h             |  1 +
 be/src/runtime/stream_load/stream_load_executor.cpp          |  2 +-
 be/src/runtime/stream_load/stream_load_pipe.h                | 12 +++++++-----
 be/src/service/internal_service.cpp                          |  2 +-
 be/test/runtime/stream_load_pipe_test.cpp                    |  2 +-
 docs/en/administrator-guide/config/be_config.md              |  8 +++++++-
 docs/zh-CN/administrator-guide/config/be_config.md           |  6 ++++++
 .../org/apache/doris/load/update/UpdateStmtExecutor.java     |  4 ++--
 22 files changed, 71 insertions(+), 28 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index a7d1847..ef53f70 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -633,6 +633,13 @@ CONF_Int32(send_batch_thread_pool_thread_num, "64");
 // number of send batch thread pool queue size
 CONF_Int32(send_batch_thread_pool_queue_size, "102400");
 
+// Limit the number of segment of a newly created rowset.
+// The newly created rowset may to be compacted after loading,
+// so if there are too many segment in a rowset, the compaction process
+// will run out of memory.
+// When doing compaction, each segment may take at least 1MB buffer.
+CONF_mInt32(max_segment_num_per_rowset, "100");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 9c1e31c..d282058 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -481,9 +481,10 @@ Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
     }
 
     if (has_intolerable_failure()) {
-        std::stringstream ss;
-        ss << "index channel has intolerable failure. " << BackendOptions::get_localhost();
-        return Status::InternalError(ss.str());
+        std::stringstream ss2;
+        ss2 << "index channel has intolerable failure. " << BackendOptions::get_localhost()
+            << ", err: " << ss.str();
+        return Status::InternalError(ss2.str());
     }
 
     return Status::OK();
diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp
index b692e84..f7ef79a 100644
--- a/be/src/http/action/mini_load.cpp
+++ b/be/src/http/action/mini_load.cpp
@@ -626,7 +626,7 @@ void MiniLoadAction::free_handler_ctx(void* param) {
             if (streaming_ctx->body_sink != nullptr) {
                 LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string()
                              << " because sender failed";
-                streaming_ctx->body_sink->cancel();
+                streaming_ctx->body_sink->cancel("sender failed");
             }
             if (streaming_ctx->unref()) {
                 delete streaming_ctx;
@@ -935,7 +935,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
             ctx->need_rollback = false;
         }
         if (ctx->body_sink.get() != nullptr) {
-            ctx->body_sink->cancel();
+            ctx->body_sink->cancel(ctx->status.get_error_msg());
         }
     }
 
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 2177d12..369ce58 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -153,7 +153,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
             ctx->need_rollback = false;
         }
         if (ctx->body_sink.get() != nullptr) {
-            ctx->body_sink->cancel();
+            ctx->body_sink->cancel(ctx->status.get_error_msg());
         }
     }
 
@@ -229,7 +229,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
             ctx->need_rollback = false;
         }
         if (ctx->body_sink.get() != nullptr) {
-            ctx->body_sink->cancel();
+            ctx->body_sink->cancel(st.get_error_msg());
         }
         auto str = ctx->to_json();
         // add new line at end
@@ -336,7 +336,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
         auto st = ctx->body_sink->append(bb);
         if (!st.ok()) {
             LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg()
-                         << ctx->brief();
+                         << ", " << ctx->brief();
             ctx->status = st;
             return;
         }
@@ -350,9 +350,9 @@ void StreamLoadAction::free_handler_ctx(void* param) {
     if (ctx == nullptr) {
         return;
     }
-    // sender is going, make receiver know it
+    // sender is gone, make receiver know it
     if (ctx->body_sink != nullptr) {
-        ctx->body_sink->cancel();
+        ctx->body_sink->cancel("sender is gone");
     }
     if (ctx->unref()) {
         delete ctx;
diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index 5fce312..9e4b799 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -239,6 +239,17 @@ CollectIterator::Level1Iterator::~Level1Iterator() {
             child = nullptr;
         }
     }
+
+    if (_heap) {
+        while (!_heap->empty()) {
+            LevelIterator* it = _heap->top();
+            if (it != nullptr) {
+                delete it;
+                it = nullptr;
+            }
+            _heap->pop();
+        }
+    }
 }
 
 // Read next row into *row.
diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h
index 92f3748..269f754 100644
--- a/be/src/olap/collect_iterator.h
+++ b/be/src/olap/collect_iterator.h
@@ -149,6 +149,8 @@ private:
         bool _merge = true;
         bool _reverse = false;
         // used when `_merge == true`
+        // need to be cleared when deconstructing this Level1Iterator
+        // The child LevelIterator should be either in _heap or in _children
         std::unique_ptr<MergeHeap> _heap;
         // used when `_merge == false`
         int _child_idx = 0;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index defd345..a5cb6de 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -169,6 +169,9 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
     // if memtable is full, push it to the flush executor,
     // and create a new memtable for incoming data
     if (_mem_table->memory_usage() >= config::write_buffer_size) {
+        if (++_segment_counter > config::max_segment_num_per_rowset) {
+            return OLAP_ERR_TOO_MANY_SEGMENTS;
+        }
         RETURN_NOT_OK(_flush_memtable_async());
         // create a new memtable for new incoming data
         _reset_mem_table();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 01c1b0d..728e0b2 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -115,6 +115,9 @@ private:
     std::shared_ptr<MemTracker> _parent_mem_tracker;
     std::shared_ptr<MemTracker> _mem_tracker;
 
+    // The counter of number of segment flushed already.
+    int64_t _segment_counter = 0;
+
     SpinLock _lock;
 };
 
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index f140f6b..fd3a794 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -179,6 +179,7 @@ enum OLAPStatus {
     OLAP_ERR_TOO_MANY_VERSION = -235,
     OLAP_ERR_NOT_INITIALIZED = -236,
     OLAP_ERR_ALREADY_CANCELLED = -237,
+    OLAP_ERR_TOO_MANY_SEGMENTS = -238,
 
     // CommandExecutor
     // [-300, -400)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e607266..f980e7b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -248,7 +248,7 @@ Status FragmentExecState::cancel_before_execute() {
     _executor.set_abort();
     _executor.cancel();
     if (_pipe != nullptr) {
-        _pipe->cancel();
+        _pipe->cancel("Execution aborted before start");
     }
     return Status::OK();
 }
@@ -261,7 +261,7 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
     }
     _executor.cancel();
     if (_pipe != nullptr) {
-        _pipe->cancel();
+        _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/message_body_sink.cpp b/be/src/runtime/message_body_sink.cpp
index 0bdf3df..6538ece 100644
--- a/be/src/runtime/message_body_sink.cpp
+++ b/be/src/runtime/message_body_sink.cpp
@@ -67,7 +67,7 @@ Status MessageBodyFileSink::finish() {
     return Status::OK();
 }
 
-void MessageBodyFileSink::cancel() {
+void MessageBodyFileSink::cancel(const std::string& reason) {
     unlink(_path.data());
 }
 
diff --git a/be/src/runtime/message_body_sink.h b/be/src/runtime/message_body_sink.h
index 8d39f01..d5460a6 100644
--- a/be/src/runtime/message_body_sink.h
+++ b/be/src/runtime/message_body_sink.h
@@ -30,7 +30,7 @@ public:
     // called when all data has been append
     virtual Status finish() { return Status::OK(); }
     // called when read HTTP failed
-    virtual void cancel() {}
+    virtual void cancel(const std::string& reason) {}
 };
 
 // write message to a local file
@@ -43,7 +43,7 @@ public:
 
     Status append(const char* data, size_t size) override;
     Status finish() override;
-    void cancel() override;
+    void cancel(const std::string& reason) override;
 
 private:
     std::string _path;
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp
index 24880d2..9096e15 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -149,7 +149,7 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
             if (left_bytes == ctx->max_batch_size) {
                 // nothing to be consumed, we have to cancel it, because
                 // we do not allow finishing stream load pipe without data
-                kafka_pipe->cancel();
+                kafka_pipe->cancel("no data");
                 return Status::Cancelled("Cancelled");
             } else {
                 DCHECK(left_bytes < ctx->max_batch_size);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index a574309..17d13a4 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -342,7 +342,7 @@ void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
         ctx->need_rollback = false;
     }
     if (ctx->body_sink.get() != nullptr) {
-        ctx->body_sink->cancel();
+        ctx->body_sink->cancel(err_msg);
     }
 
     return;
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index f6e7b4a..cd3b458 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -194,6 +194,7 @@ public:
     // to identified a specified data consumer.
     int64_t consumer_id;
 
+    // If this is an tranactional insert operation, this will be true
     bool need_commit_self = false;
 public:
     ExecEnv* exec_env() { return _exec_env; }
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 781792b..87d8de4 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -94,7 +94,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, std::sh
                                  << ", err_msg=" << status.get_error_msg() << ", " << ctx->brief();
                     // cancel body_sink, make sender known it
                     if (ctx->body_sink != nullptr) {
-                        ctx->body_sink->cancel();
+                        ctx->body_sink->cancel(status.get_error_msg());
                     }
 
                     switch (ctx->load_src_type) {
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index 3912a0f..cab7908 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -123,7 +123,7 @@ public:
             }
             // cancelled
             if (_cancelled) {
-                return Status::InternalError("cancelled");
+                return Status::InternalError("cancelled: " + _cancelled_reason);
             }
             // finished
             if (_buf_queue.empty()) {
@@ -159,7 +159,7 @@ public:
     Status tell(int64_t* position) override { return Status::InternalError("Not implemented"); }
 
     // called when consumer finished
-    void close() override { cancel(); }
+    void close() override { cancel("closed"); }
 
     bool closed() override { return _cancelled; }
 
@@ -179,10 +179,11 @@ public:
     }
 
     // called when producer/consumer failed
-    void cancel() override {
+    void cancel(const std::string& reason) override {
         {
             std::lock_guard<std::mutex> l(_lock);
             _cancelled = true;
+            _cancelled_reason = reason;
         }
         _get_cond.notify_all();
         _put_cond.notify_all();
@@ -197,7 +198,7 @@ private:
         }
         // cancelled
         if (_cancelled) {
-            return Status::InternalError("cancelled");
+            return Status::InternalError("cancelled: " + _cancelled_reason);
         }
         // finished
         if (_buf_queue.empty()) {
@@ -237,7 +238,7 @@ private:
                 }
             }
             if (_cancelled) {
-                return Status::InternalError("cancelled");
+                return Status::InternalError("cancelled: " + _cancelled_reason);
             }
             _buf_queue.push_back(buf);
             if (_use_proto) {
@@ -271,6 +272,7 @@ private:
 
     bool _finished;
     bool _cancelled;
+    std::string _cancelled_reason = "";
 
     ByteBufferPtr _write_buf;
 };
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 021324f..58d743a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -353,7 +353,7 @@ void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controll
         response->mutable_status()->set_status_code(1);
         response->mutable_status()->add_error_msgs("pipe is null");
     } else {
-        pipe->cancel();
+        pipe->cancel("rollback");
         response->mutable_status()->set_status_code(0);
     }
 }
diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp
index cd7c678..48fc580 100644
--- a/be/test/runtime/stream_load_pipe_test.cpp
+++ b/be/test/runtime/stream_load_pipe_test.cpp
@@ -211,7 +211,7 @@ TEST_F(StreamLoadPipeTest, cancel) {
             pipe.append(&buf, 1);
         }
         SleepFor(MonoDelta::FromMilliseconds(100));
-        pipe.cancel();
+        pipe.cancel("test");
     };
     std::thread t1(appender);
 
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index a888263..101a235 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -1418,4 +1418,10 @@ The size of the buffer before flashing
     RELEASE = 0
     DEBUG = 1
   ```
-* Default: 0
\ No newline at end of file
+* Default: 0
+
+### `max_segment_num_per_rowset`
+
+* Type: int32
+* Description: Used to limit the number of segments in the newly generated rowset when importing. If the threshold is exceeded, the import will fail with error -238. Too many segments will cause compaction to take up a lot of memory and cause OOM errors.
+* Default value: 100
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index 7fb7728..e191671 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -1441,3 +1441,9 @@ webserver默认工作线程数
     DEBUG = 1
   ```
 * 默认值: 0
+
+### `max_segment_num_per_rowset`
+
+* 类型: int32
+* 描述: 用于限制导入时,新产生的rowset中的segment数量。如果超过阈值,导入会失败并报错 -238。过多的 segment 会导致compaction占用大量内存引发 OOM 错误。
+* 默认值: 100
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
index b31f544..1c0a01c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
@@ -100,7 +100,7 @@ public class UpdateStmtExecutor {
             LOG.warn("failed to plan update stmt, query id:{}", DebugUtil.printId(queryId), e);
             Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
             QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
-            throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e);
+            throw new DdlException("failed to plan update stmt, query id: " + DebugUtil.printId(queryId) + ", err: " + e.getMessage());
         } finally {
             targetTable.readUnlock();
         }
@@ -115,7 +115,7 @@ public class UpdateStmtExecutor {
         } catch (Throwable e) {
             LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e);
             Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
-            throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e);
+            throw new DdlException("failed to execute update stmt, query id: " + DebugUtil.printId(queryId) + ", err: " + e.getMessage());
         } finally {
             QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org