You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/15 01:26:30 UTC

[incubator-doris] branch master updated: Limit the memory usage of Loading process (#1954)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 62acf5d  Limit the memory usage of Loading process (#1954)
62acf5d is described below

commit 62acf5d098813dc5239c565baca04ecd8143a54e
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Tue Oct 15 09:26:20 2019 +0800

    Limit the memory usage of Loading process (#1954)
---
 be/src/common/config.h                             |  12 +-
 be/src/common/logconfig.cpp                        |   5 +-
 be/src/common/logging.h                            |   4 +-
 be/src/exec/tablet_sink.cpp                        |   3 +
 be/src/exec/tablet_sink.h                          |   3 +
 be/src/http/action/stream_load.cpp                 |   7 +
 be/src/http/http_common.h                          |   1 +
 be/src/olap/delta_writer.cpp                       |  49 ++++-
 be/src/olap/delta_writer.h                         |  15 +-
 be/src/olap/file_stream.cpp                        |   4 +-
 be/src/olap/memtable.cpp                           |   7 +-
 be/src/olap/memtable.h                             |   4 +-
 be/src/olap/memtable_flush_executor.cpp            |   5 +-
 be/src/olap/memtable_flush_executor.h              |  18 +-
 be/src/olap/out_stream.cpp                         |   6 +-
 be/src/olap/rowset/column_reader.cpp               |   2 +-
 be/src/olap/rowset/segment_reader.cpp              |  20 +-
 be/src/olap/rowset/segment_writer.cpp              |   4 +-
 be/src/olap/rowset_graph.cpp                       |   2 +-
 be/src/olap/stream_index_writer.cpp                |   2 +-
 be/src/olap/utils.cpp                              |   2 +-
 be/src/runtime/CMakeLists.txt                      |   3 +-
 be/src/runtime/exec_env.h                          |   6 +-
 be/src/runtime/exec_env_init.cpp                   |   9 +-
 be/src/runtime/load_channel.cpp                    | 157 ++++++++++++++
 be/src/runtime/load_channel.h                      |  90 ++++++++
 be/src/runtime/load_channel_mgr.cpp                | 229 +++++++++++++++++++++
 .../{tablet_writer_mgr.h => load_channel_mgr.h}    |  55 ++---
 be/src/runtime/runtime_state.cpp                   |   2 +-
 be/src/runtime/tablet_writer_mgr.cpp               | 169 ---------------
 be/src/runtime/tablets_channel.cpp                 |  52 ++++-
 be/src/runtime/tablets_channel.h                   |  22 +-
 be/src/service/internal_service.cpp                |  10 +-
 be/test/exec/tablet_sink_test.cpp                  |  16 +-
 be/test/olap/delta_writer_test.cpp                 |   8 +-
 be/test/runtime/CMakeLists.txt                     |   2 +-
 ...iter_mgr_test.cpp => load_channel_mgr_test.cpp} |  58 ++++--
 .../alter-table/alter-table-schema-change.md       |   2 +-
 .../load-data/broker-load-manual.md                |   4 +-
 .../administrator-guide/load-data/load-manual.md   |  19 ++
 .../load-data/stream-load-manual.md                |   4 +
 .../Data Manipulation/BROKER LOAD.md               |   4 +-
 .../Data Manipulation/STREAM LOAD.md               |   2 +
 .../load-data/broker-load-manual_EN.md             |   4 +-
 .../load-data/load-manual_EN.md                    |  17 ++
 .../load-data/stream-load-manual_EN.md             |   4 +
 .../Data Manipulation/BROKER LOAD_EN.md            |   6 +-
 .../Data Manipulation/STREAM LOAD_EN.md            |   6 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   1 +
 .../java/org/apache/doris/task/StreamLoadTask.java |   8 +
 gensrc/proto/internal_service.proto                |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 run-ut.sh                                          |   2 +-
 53 files changed, 828 insertions(+), 320 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 13792fd..cb9e7ae 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -124,6 +124,8 @@ namespace config {
     CONF_Int32(sys_log_roll_num, "10");
     // verbose log
     CONF_Strings(sys_log_verbose_modules, "");
+    // verbose log level
+    CONF_Int32(sys_log_verbose_level, "10");
     // log buffer level
     CONF_String(log_buffer_level, "");
 
@@ -397,7 +399,15 @@ namespace config {
     CONF_Int32(memory_max_alignment, "16");
 
     // write buffer size before flush
-    CONF_Int32(write_buffer_size, "104857600");
+    CONF_Int64(write_buffer_size, "104857600");
+
+    // following 2 configs limit the memory consumption of load process on a Backend.
+    // eg: memory limit to 80% of mem limit config but up to 100GB(default)
+    // NOTICE(cmy): set these default values very large because we don't want to
+    // impact the load performace when user upgrading Doris.
+    // user should set these configs properly if necessary.
+    CONF_Int64(load_process_max_memory_limit_bytes, "107374182400"); // 100GB
+    CONF_Int32(load_process_max_memory_limit_percent, "80");    // 80%
 
     // update interval of tablet stat cache
     CONF_Int32(tablet_stat_cache_update_interval_second, "300");
diff --git a/be/src/common/logconfig.cpp b/be/src/common/logconfig.cpp
index c2c6e0b..bb5a183 100644
--- a/be/src/common/logconfig.cpp
+++ b/be/src/common/logconfig.cpp
@@ -125,12 +125,13 @@ bool init_glog(const char* basename, bool install_signal_handler) {
         return false;
     }
 
-    // set verbose modules. only use vlog(0)
+    // set verbose modules.
     FLAGS_v = -1;
     std::vector<std::string>& verbose_modules = config::sys_log_verbose_modules;
+    int32_t vlog_level = config::sys_log_verbose_level;
     for (size_t i = 0; i < verbose_modules.size(); i++) {
         if (verbose_modules[i].size() != 0) {
-            google::SetVLOGLevel(verbose_modules[i].c_str(), 10);
+            google::SetVLOGLevel(verbose_modules[i].c_str(), vlog_level);
         }
     }
 
diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 66f5458..b2021e6 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -49,10 +49,10 @@
 // Define VLOG levels.  We want display per-row info less than per-file which
 // is less than per-query.  For now per-connection is the same as per-query.
 #define VLOG_CONNECTION VLOG(1)
-#define VLOG_RPC        VLOG(2)
+#define VLOG_RPC        VLOG(8)
 #define VLOG_QUERY      VLOG(1)
 #define VLOG_FILE       VLOG(2)
-#define VLOG_ROW        VLOG(3)
+#define VLOG_ROW        VLOG(10)
 #define VLOG_PROGRESS   VLOG(2)
 
 #define VLOG_CONNECTION_IS_ON VLOG_IS_ON(1)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 90402aa..24b4b5e 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -96,6 +96,7 @@ void NodeChannel::open() {
     }
     request.set_num_senders(_parent->_num_senders);
     request.set_need_gen_rollup(_parent->_need_gen_rollup);
+    request.set_load_mem_limit(_parent->_load_mem_limit);
 
     _open_closure = new RefCountClosure<PTabletWriterOpenResult>();
     _open_closure->ref();
@@ -512,6 +513,8 @@ Status OlapTableSink::prepare(RuntimeState* state) {
     _close_timer = ADD_TIMER(_profile, "CloseTime");
     _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime");
     _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
+    // use query mem limit as load mem limit for remote load channels
+    _load_mem_limit = state->query_mem_tracker()->limit();
 
     // open all channels
     auto& partitions = _partition->get_partitions();
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 0077604..624caad 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -283,6 +283,9 @@ private:
 
     // BE id -> add_batch method counter
     std::unordered_map<int64_t, AddBatchCounter> _node_add_batch_counter_map;
+
+    // load mem limit is for remote load channel
+    int64_t _load_mem_limit = -1;
 };
 
 }
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 97661aa..c7b49c5 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -347,6 +347,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
     if (!http_req->header(HTTP_TIMEZONE).empty()) {
         request.__set_timezone(http_req->header(HTTP_TIMEZONE));
     }
+    if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
+        try {
+            request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT))); 
+        } catch (const std::invalid_argument& e) {
+            return Status::InvalidArgument("Invalid mem limit format");
+        }
+    }
 
     // plan this load
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 1875e1c..676d6da 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -34,6 +34,7 @@ static const std::string HTTP_PARTITIONS = "partitions";
 static const std::string HTTP_NEGATIVE = "negative";
 static const std::string HTTP_STRICT_MODE = "strict_mode";
 static const std::string HTTP_TIMEZONE = "timezone";
+static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
 
 static const std::string HTTP_100_CONTINUE = "100-continue";
 
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index a927045..70dd583 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -28,19 +28,22 @@
 
 namespace doris {
 
-OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) {
-    *writer = new DeltaWriter(req, StorageEngine::instance());
+OLAPStatus DeltaWriter::open(WriteRequest* req, MemTracker* mem_tracker, DeltaWriter** writer) {
+    *writer = new DeltaWriter(req, mem_tracker, StorageEngine::instance());
     return (*writer)->init();
 }
 
 DeltaWriter::DeltaWriter(
         WriteRequest* req,
+        MemTracker* mem_tracker,
         StorageEngine* storage_engine)
     : _req(*req), _tablet(nullptr),
       _cur_rowset(nullptr), _new_rowset(nullptr), _new_tablet(nullptr),
       _rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr),
       _delta_written_success(false),
       _storage_engine(storage_engine) {
+
+    _mem_tracker.reset(new MemTracker(-1, "delta writer", mem_tracker));
 }
 
 DeltaWriter::~DeltaWriter() {
@@ -50,6 +53,13 @@ DeltaWriter::~DeltaWriter() {
 
     _mem_table.reset();
     SAFE_DELETE(_schema);
+
+    if (_flush_handler != nullptr) {
+        // cancel and wait all memtables in flush queue to be finished
+        _flush_handler->cancel();
+        _flush_handler->wait();
+    }
+
     if (_rowset_writer != nullptr) {
         _rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string());
     }
@@ -132,7 +142,7 @@ OLAPStatus DeltaWriter::init() {
     _tablet_schema = &(_tablet->tablet_schema());
     _schema = new Schema(*_tablet_schema);
     _mem_table = std::make_shared<MemTable>(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
-            _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get());
+            _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get());
 
     // create flush handler
     RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(_tablet->data_dir()->path_hash(), &_flush_handler));
@@ -154,7 +164,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
         RETURN_NOT_OK(_flush_memtable_async());
         // create a new memtable for new incoming data
         _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
-                _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()));
+                    _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get()));
     }
     return OLAP_SUCCESS;
 }
@@ -163,6 +173,23 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
     return _flush_handler->submit(_mem_table);
 }
 
+OLAPStatus DeltaWriter::flush_memtable_and_wait() {
+    if (mem_consumption() == _mem_table->memory_usage()) {
+        // equal means there is no memtable in flush queue, just flush this memtable
+        VLOG(3) << "flush memtable to reduce mem consumption. memtable size: " << _mem_table->memory_usage()
+                << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
+        _flush_memtable_async();
+        _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
+                    _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get())); 
+    } else {
+        DCHECK(mem_consumption() > _mem_table->memory_usage());
+        // this means there should be at least one memtable in flush queue.
+    }
+    // wait all memtables in flush queue to be flushed.
+    RETURN_NOT_OK(_flush_handler->wait());
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus DeltaWriter::close() {
     if (!_is_init) {
         RETURN_NOT_OK(init());
@@ -232,8 +259,20 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
 }
 
 OLAPStatus DeltaWriter::cancel() {
-    DCHECK(!_is_init);
+    if (_flush_handler != nullptr) {
+        // cancel and wait all memtables in flush queue to be finished
+        _flush_handler->cancel();
+        _flush_handler->wait();
+    }
     return OLAP_SUCCESS;
 }
 
+int64_t DeltaWriter::mem_consumption() const {
+    return _mem_tracker->consumption();
+}
+
+int64_t DeltaWriter::partition_id() const {
+    return _req.partition_id;
+}
+
 } // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 073f5e5..f9349a2 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -30,6 +30,7 @@ namespace doris {
 
 class FlushHandler;
 class MemTable;
+class MemTracker;
 class Schema;
 class SegmentGroup;
 class StorageEngine;
@@ -55,9 +56,9 @@ struct WriteRequest {
 
 class DeltaWriter {
 public:
-    static OLAPStatus open(WriteRequest* req, DeltaWriter** writer);
+    static OLAPStatus open(WriteRequest* req, MemTracker* mem_tracker, DeltaWriter** writer);
 
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine);
+    DeltaWriter(WriteRequest* req, MemTracker* mem_tracker, StorageEngine* storage_engine);
 
     OLAPStatus init();
 
@@ -71,7 +72,14 @@ public:
 
     OLAPStatus cancel();
 
-    int64_t partition_id() const { return _req.partition_id; }
+    // submit current memtable to flush queue, and wait all memtables in flush queue
+    // to be flushed.
+    // This is currently for reducing mem consumption of this delta writer.
+    OLAPStatus flush_memtable_and_wait();
+
+    int64_t partition_id() const;
+
+    int64_t mem_consumption() const;
 
 private:
     // push a full memtable to flush executor
@@ -94,6 +102,7 @@ private:
 
     StorageEngine* _storage_engine;
     std::shared_ptr<FlushHandler> _flush_handler;
+    std::unique_ptr<MemTracker> _mem_tracker;
 };
 
 }  // namespace doris
diff --git a/be/src/olap/file_stream.cpp b/be/src/olap/file_stream.cpp
index 4be996b..08ebf2a 100755
--- a/be/src/olap/file_stream.cpp
+++ b/be/src/olap/file_stream.cpp
@@ -61,7 +61,7 @@ OLAPStatus ReadOnlyFileStream::_assure_data() {
     if (OLAP_LIKELY(_uncompressed != NULL && _uncompressed->remaining() > 0)) {
         return OLAP_SUCCESS;
     } else if (_file_cursor.eof()) {
-        VLOG(3) << "STREAM EOF. length=" << _file_cursor.length()
+        VLOG(10) << "STREAM EOF. length=" << _file_cursor.length()
                 << ", used=" << _file_cursor.position();
         return OLAP_ERR_COLUMN_STREAM_EOF;
     }
@@ -130,7 +130,7 @@ OLAPStatus ReadOnlyFileStream::seek(PositionProvider* position) {
         if (OLAP_LIKELY(OLAP_SUCCESS == res)) {
             // assure data will be successful in most case
         } else if (res == OLAP_ERR_COLUMN_STREAM_EOF) {
-            VLOG(3) << "file stream eof.";
+            VLOG(10) << "file stream eof.";
             return res;
         } else {
             OLAP_LOG_WARNING("fail to assure data after seek");
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 9465070..7edb6f2 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -27,7 +27,7 @@ namespace doris {
 
 MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-                   KeysType keys_type, RowsetWriter* rowset_writer)
+                   KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker)
     : _tablet_id(tablet_id),
       _schema(schema),
       _tablet_schema(tablet_schema),
@@ -36,9 +36,10 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
       _keys_type(keys_type),
       _row_comparator(_schema),
       _rowset_writer(rowset_writer) {
+
     _schema_size = _schema->schema_size();
-    _tracker.reset(new MemTracker(config::write_buffer_size * 2));
-    _mem_pool.reset(new MemPool(_tracker.get()));
+    _mem_tracker.reset(new MemTracker(-1, "memtable", mem_tracker));
+    _mem_pool.reset(new MemPool(_mem_tracker.get()));
     _tuple_buf = _mem_pool->allocate(_schema_size);
     _skip_list = new Table(_row_comparator, _mem_pool.get());
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 7ce2374..1688dbd 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -33,7 +33,7 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-             KeysType keys_type, RowsetWriter* rowset_writer);
+             KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker);
     ~MemTable();
     int64_t tablet_id() { return _tablet_id; }
     size_t memory_usage();
@@ -57,7 +57,7 @@ private:
     };
 
     RowCursorComparator _row_comparator;
-    std::unique_ptr<MemTracker> _tracker;
+    std::unique_ptr<MemTracker> _mem_tracker;
     std::unique_ptr<MemPool> _mem_pool;
     ObjectPool _agg_object_pool;
 
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index a2a2448..1aa25e9 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -21,6 +21,7 @@
 #include "olap/delta_writer.h"
 #include "olap/memtable.h"
 #include "runtime/exec_env.h"
+#include "runtime/mem_tracker.h"
 
 namespace doris {
 
@@ -53,7 +54,8 @@ void FlushHandler::on_flush_finished(const FlushResult& res) {
     }
 }
 
-OLAPStatus MemTableFlushExecutor::create_flush_handler(int64_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) {
+OLAPStatus MemTableFlushExecutor::create_flush_handler(
+        int64_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) {
     int32_t flush_queue_idx = _get_queue_idx(path_hash); 
     flush_handler->reset(new FlushHandler(flush_queue_idx, this));
     return OLAP_SUCCESS;
@@ -140,6 +142,7 @@ void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) {
         timer.start();
         res.flush_status = ctx.memtable->flush();
         res.flush_time_ns = timer.elapsed_time();
+        res.flush_size_bytes = ctx.memtable->memory_usage();
         // callback
         ctx.flush_handler->on_flush_finished(res);
     }
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index a38b4ee..421040f 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -26,6 +26,7 @@
 #include <utility>
 
 #include "olap/olap_define.h"
+#include "runtime/mem_tracker.h"
 #include "util/blocking_queue.hpp"
 #include "util/counter_cond_variable.hpp"
 #include "util/spinlock.h"
@@ -53,7 +54,8 @@ struct MemTableFlushContext {
 // the flush result of a single memtable flush
 struct FlushResult {
     OLAPStatus flush_status;
-    int64_t flush_time_ns;
+    int64_t flush_time_ns = 0;
+    int64_t flush_size_bytes = 0;
 };
 
 // the statistic of a certain flush handler.
@@ -68,13 +70,15 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
 // This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception
 // when calling submit();
 class MemTableFlushExecutor;
+class MemTracker;
 class FlushHandler : public std::enable_shared_from_this<FlushHandler> {
 public:
     FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor):
         _flush_queue_idx(flush_queue_idx),
         _last_flush_status(OLAP_SUCCESS),
         _counter_cond(0),
-        _flush_executor(flush_executor) {
+        _flush_executor(flush_executor),
+        _is_cancelled(false) {
     }
 
     // submit a memtable to flush. return error if some previous submitted MemTable has failed  
@@ -91,7 +95,9 @@ public:
         _counter_cond.dec_to_zero();
     }
 
-    bool is_cancelled() { return _last_flush_status.load() != OLAP_SUCCESS; }
+    bool is_cancelled() { return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load(); }
+
+    void cancel() { _is_cancelled.store(true); }
 private:
     // flush queue idx in memtable flush executor
     int32_t _flush_queue_idx;
@@ -102,6 +108,10 @@ private:
 
     FlushStatistic _stats;
     MemTableFlushExecutor* _flush_executor;
+
+    // the caller of the flush handler can set this variable to notify that the
+    // uppper application is already cancelled.
+    std::atomic<bool> _is_cancelled;
 };
 
 // MemTableFlushExecutor is for flushing memtables to disk.
@@ -111,7 +121,7 @@ private:
 //
 //      ...
 //      std::shared_ptr<FlushHandler> flush_handler;
-//      memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler);
+//      memTableFlushExecutor.create_flush_handler(path_hash, mem_tracker, &flush_handler);
 //      ...      
 //      flush_handler->submit(memtable)
 //      ...
diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp
index 3b90029..16ab017 100644
--- a/be/src/olap/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -351,7 +351,7 @@ OLAPStatus OutStream::write_to_file(FileHandler* file_handle,
 
     for (std::vector<StorageByteBuffer*>::const_iterator it = _output_buffers.begin();
             it != _output_buffers.end(); ++it) {
-        VLOG(3) << "write stream begin:" << file_handle->tell();
+        VLOG(10) << "write stream begin:" << file_handle->tell();
 
         res = file_handle->write((*it)->array(), (*it)->limit());
         if (OLAP_SUCCESS != res) {
@@ -359,7 +359,7 @@ OLAPStatus OutStream::write_to_file(FileHandler* file_handle,
             return res;
         }
 
-        VLOG(3) << "write stream end:" << file_handle->tell();
+        VLOG(10) << "write stream end:" << file_handle->tell();
 
         total_stream_len += (*it)->limit();
         if (write_mbytes_per_sec > 0) {
@@ -367,7 +367,7 @@ OLAPStatus OutStream::write_to_file(FileHandler* file_handle,
             int64_t sleep_time =
                     total_stream_len / write_mbytes_per_sec - delta_time_us;
             if (sleep_time > 0) {
-                VLOG(3) << "sleep to limit merge speed. time=" << sleep_time
+                VLOG(10) << "sleep to limit merge speed. time=" << sleep_time
                         << ", bytes=" << total_stream_len;
                 usleep(sleep_time);
             }
diff --git a/be/src/olap/rowset/column_reader.cpp b/be/src/olap/rowset/column_reader.cpp
index 6a262a0..0898510 100644
--- a/be/src/olap/rowset/column_reader.cpp
+++ b/be/src/olap/rowset/column_reader.cpp
@@ -700,7 +700,7 @@ OLAPStatus ColumnReader::init(
         _present_reader = NULL;
         _value_present = false;
     } else {
-        VLOG(3) << "create null present_stream for column_id:" << _column_unique_id;
+        VLOG(10) << "create null present_stream for column_id:" << _column_unique_id;
         _present_reader = new(std::nothrow) BitFieldReader(present_stream);
 
         if (NULL == _present_reader) {
diff --git a/be/src/olap/rowset/segment_reader.cpp b/be/src/olap/rowset/segment_reader.cpp
index 650544d..2ccc4a2 100644
--- a/be/src/olap/rowset/segment_reader.cpp
+++ b/be/src/olap/rowset/segment_reader.cpp
@@ -378,14 +378,14 @@ OLAPStatus SegmentReader::_pick_columns() {
 }
 
 OLAPStatus SegmentReader::_pick_delete_row_groups(uint32_t first_block, uint32_t last_block) {
-    VLOG(3) << "pick for " << first_block << " to " << last_block << " for delete_condition";
+    VLOG(10) << "pick for " << first_block << " to " << last_block << " for delete_condition";
 
     if (_delete_handler->empty()) {
         return OLAP_SUCCESS;
     }
 
     if (DEL_NOT_SATISFIED == _delete_status) {
-        VLOG(3) << "the segment not satisfy the delete_conditions";
+        VLOG(10) << "the segment not satisfy the delete_conditions";
         return OLAP_SUCCESS;
     }
 
@@ -431,11 +431,11 @@ OLAPStatus SegmentReader::_pick_delete_row_groups(uint32_t first_block, uint32_t
                 }
             } else if (true == del_partial_satisfied) {
                 _include_blocks[j] = DEL_PARTIAL_SATISFIED;
-                VLOG(3) << "filter block partially: " << j;
+                VLOG(10) << "filter block partially: " << j;
             } else {
                 _include_blocks[j] = DEL_SATISFIED;
                 --_remain_block;
-                VLOG(3) << "filter block: " << j;
+                VLOG(10) << "filter block: " << j;
                 if (j < _block_count - 1) {
                     _stats->rows_del_filtered += _num_rows_in_block;
                 } else {
@@ -467,7 +467,7 @@ OLAPStatus SegmentReader::_init_include_blocks(uint32_t first_block, uint32_t la
 }
 
 OLAPStatus SegmentReader::_pick_row_groups(uint32_t first_block, uint32_t last_block) {
-    VLOG(3) << "pick from " << first_block << " to " << last_block;
+    VLOG(10) << "pick from " << first_block << " to " << last_block;
 
     if (first_block > last_block) {
         OLAP_LOG_WARNING("invalid block offset. [first_block=%u last_block=%u]",
@@ -524,7 +524,7 @@ OLAPStatus SegmentReader::_pick_row_groups(uint32_t first_block, uint32_t last_b
     }
 
     if (_remain_block < MIN_FILTER_BLOCK_NUM) {
-        VLOG(3) << "bloom filter is ignored for too few block remained. "
+        VLOG(10) << "bloom filter is ignored for too few block remained. "
                 << "remain_block=" << _remain_block
                 << ", const_time=" << timer.get_elapse_time_us();
         return OLAP_SUCCESS;
@@ -563,7 +563,7 @@ OLAPStatus SegmentReader::_pick_row_groups(uint32_t first_block, uint32_t last_b
         }
     }
 
-    VLOG(3) << "pick row groups finished. remain_block=" << _remain_block
+    VLOG(10) << "pick row groups finished. remain_block=" << _remain_block
             << ", const_time=" << timer.get_elapse_time_us();
     return OLAP_SUCCESS;
 }
@@ -727,7 +727,7 @@ OLAPStatus SegmentReader::_load_index(bool is_using_cache) {
         }
     }
 
-    VLOG(3) << "found index entry count: " << _block_count;
+    VLOG(10) << "found index entry count: " << _block_count;
     return OLAP_SUCCESS;
 }
 
@@ -832,7 +832,7 @@ OLAPStatus SegmentReader::_seek_to_block_directly(
         PositionProvider position(&_column_indices[cid]->entry(block_id));
         if (OLAP_SUCCESS != (res = _column_readers[cid]->seek(&position))) {
             if (OLAP_ERR_COLUMN_STREAM_EOF == res) {
-                VLOG(3) << "Stream EOF. tablet_id=" << _segment_group->get_tablet_id()
+                VLOG(10) << "Stream EOF. tablet_id=" << _segment_group->get_tablet_id()
                         << ", column_id=" << _column_readers[cid]->column_unique_id()
                         << ", block_id=" << block_id;
                 return OLAP_ERR_DATA_EOF;
@@ -850,7 +850,7 @@ OLAPStatus SegmentReader::_seek_to_block_directly(
 }
 
 OLAPStatus SegmentReader::_reset_readers() {
-    VLOG(3) << _streams.size() << " stream in total.";
+    VLOG(10) << _streams.size() << " stream in total.";
 
     for (std::map<StreamName, ReadOnlyFileStream*>::iterator it = _streams.begin();
             it != _streams.end(); ++it) {
diff --git a/be/src/olap/rowset/segment_writer.cpp b/be/src/olap/rowset/segment_writer.cpp
index c7ddef4..66d3d6a 100644
--- a/be/src/olap/rowset/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_writer.cpp
@@ -194,7 +194,7 @@ OLAPStatus SegmentWriter::_make_file_header(ColumnDataHeaderMessage* file_header
             data_length += stream->get_stream_length();
         }
 
-        VLOG(3) << "stream id=" << it->first.unique_column_id()
+        VLOG(10) << "stream id=" << it->first.unique_column_id()
                 << ", type=" << it->first.kind()
                 << ", length=" << stream->get_stream_length();
     }
@@ -260,7 +260,7 @@ OLAPStatus SegmentWriter::finalize(uint32_t* segment_file_size) {
         // 输出没有被掐掉的流
         if (!stream->is_suppressed()) {
             checksum = stream->crc32(checksum);
-            VLOG(3) << "stream id=" << it->first.unique_column_id()
+            VLOG(10) << "stream id=" << it->first.unique_column_id()
                     << ", type=" << it->first.kind();
             res = stream->write_to_file(
                     &file_handle, _write_mbytes_per_sec);
diff --git a/be/src/olap/rowset_graph.cpp b/be/src/olap/rowset_graph.cpp
index d9f0285..7914469 100644
--- a/be/src/olap/rowset_graph.cpp
+++ b/be/src/olap/rowset_graph.cpp
@@ -257,7 +257,7 @@ OLAPStatus RowsetGraph::capture_consistent_versions(
             << (*version_path)[version_path->size() - 1].second << ' ';
     }
 
-    VLOG(3) << "success to find path for spec_version. "
+    VLOG(10) << "success to find path for spec_version. "
             << "spec_version=" << spec_version.first << "-" << spec_version.second
             << ", path=" << shortest_path_for_debug.str();
 
diff --git a/be/src/olap/stream_index_writer.cpp b/be/src/olap/stream_index_writer.cpp
index 6912dee..6b6d7bb 100755
--- a/be/src/olap/stream_index_writer.cpp
+++ b/be/src/olap/stream_index_writer.cpp
@@ -164,7 +164,7 @@ OLAPStatus StreamIndexWriter::write_to_buffer(char* buffer, size_t buffer_size)
     }
 
     _header.block_count = _index_to_write.size();
-    VLOG(3) << "header info. pos: " << _header.position_format
+    VLOG(10) << "header info. pos: " << _header.position_format
             << ", stat:" << _header.statistic_format
             << ", entry_size:" << entry_size;
     memcpy(buffer, reinterpret_cast<char*>(&_header), sizeof(_header));
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index b6e3432..1d2ad5e 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -254,7 +254,7 @@ OLAPStatus olap_compress(const char* src_buf,
         int lz4_res = LZ4_compress_default(src_buf, dest_buf, src_len, dest_len);
         *written_len = lz4_res;
         if (0 == lz4_res) {
-            VLOG(3) << "compress failed. src_len=" << src_len
+            VLOG(10) << "compress failed. src_len=" << src_len
                     << ", dest_len=" << dest_len
                     << ", written_len=" << *written_len
                     << ", lz4_res=" << lz4_res;
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index ae89fca..1f0bcc6 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -82,7 +82,8 @@ set(RUNTIME_FILES
     buffered_tuple_stream3.cc
     #  export_task_mgr.cpp
     export_sink.cpp
-    tablet_writer_mgr.cpp
+    load_channel_mgr.cpp
+    load_channel.cpp
     tablets_channel.cpp
     bufferpool/buffer_allocator.cc
     bufferpool/buffer_pool.cc
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index dc5557d..2659931 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -46,7 +46,7 @@ class ReservationTracker;
 class ResultBufferMgr;
 class ResultQueueMgr;
 class TMasterInfo;
-class TabletWriterMgr;
+class LoadChannelMgr;
 class TestExecEnv;
 class ThreadPool;
 class ThreadResourceMgr;
@@ -120,7 +120,7 @@ public:
     BrpcStubCache* brpc_stub_cache() const { return _brpc_stub_cache; }
     ReservationTracker* buffer_reservation() { return _buffer_reservation; }
     BufferPool* buffer_pool() { return _buffer_pool; }
-    TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; }
+    LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
     LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
     SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
 
@@ -169,7 +169,7 @@ private:
     BfdParser* _bfd_parser = nullptr;
     PullLoadTaskMgr* _pull_load_task_mgr = nullptr;
     BrokerMgr* _broker_mgr = nullptr;
-    TabletWriterMgr* _tablet_writer_mgr = nullptr;
+    LoadChannelMgr* _load_channel_mgr = nullptr;
     LoadStreamMgr* _load_stream_mgr = nullptr;
     BrpcStubCache* _brpc_stub_cache = nullptr;
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 87184a0..081cf96 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -32,7 +32,7 @@
 #include "runtime/mem_tracker.h"
 #include "runtime/thread_resource_mgr.h"
 #include "runtime/fragment_mgr.h"
-#include "runtime/tablet_writer_mgr.h"
+#include "runtime/load_channel_mgr.h"
 #include "runtime/tmp_file_mgr.h"
 #include "runtime/bufferpool/reservation_tracker.h"
 #include "util/metrics.h"
@@ -99,7 +99,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _bfd_parser = BfdParser::create();
     _pull_load_task_mgr = new PullLoadTaskMgr(config::pull_load_task_dir);
     _broker_mgr = new BrokerMgr(this);
-    _tablet_writer_mgr = new TabletWriterMgr(this);
+    _load_channel_mgr = new LoadChannelMgr();
     _load_stream_mgr = new LoadStreamMgr();
     _brpc_stub_cache = new BrpcStubCache();
     _stream_load_executor = new StreamLoadExecutor(this);
@@ -126,7 +126,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _broker_mgr->init();
     _small_file_mgr->init();
     _init_mem_tracker();
-    RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker());
+
+    RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit()));
 
     return Status::OK();
 }
@@ -210,7 +211,7 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size,
 void ExecEnv::_destory() {
     delete _brpc_stub_cache;
     delete _load_stream_mgr;
-    delete _tablet_writer_mgr;
+    delete _load_channel_mgr;
     delete _broker_mgr;
     delete _pull_load_task_mgr;
     delete _bfd_parser;
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
new file mode 100644
index 0000000..44877bb
--- /dev/null
+++ b/be/src/runtime/load_channel.cpp
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/load_channel.h"
+
+#include "runtime/mem_tracker.h"
+#include "runtime/tablets_channel.h"
+#include "olap/lru_cache.h"
+
+namespace doris {
+
+LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker)
+        : _load_id(load_id) {
+    _mem_tracker.reset(new MemTracker(mem_limit, _load_id.to_string(), mem_tracker));
+    // _last_updated_time should be set before being inserted to
+    // _load_channels in load_channel_mgr, or it may be erased
+    // immediately by gc thread.
+    _last_updated_time = time(nullptr);
+}
+
+LoadChannel::~LoadChannel() {
+    LOG(INFO) << "load channel mem peak usage: " << _mem_tracker->peak_consumption()
+        << ", info: " << _mem_tracker->debug_string()
+        << ", load id: " << _load_id;
+}
+
+Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
+    int64_t index_id = params.index_id();
+    std::shared_ptr<TabletsChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it != _tablets_channels.end()) {
+            channel = it->second;
+        } else {
+            // create a new tablets channel
+            TabletsChannelKey key(params.id(), index_id);
+            channel.reset(new TabletsChannel(key, _mem_tracker.get()));
+            _tablets_channels.insert({index_id, channel});
+        }
+    }
+
+    RETURN_IF_ERROR(channel->open(params));
+
+    _opened = true;
+    _last_updated_time = time(nullptr);
+    return Status::OK();
+}
+
+Status LoadChannel::add_batch(
+        const PTabletWriterAddBatchRequest& request,
+        google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
+
+    int64_t index_id = request.index_id();
+    // 1. get tablets channel
+    std::shared_ptr<TabletsChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _tablets_channels.find(index_id);
+        if (it == _tablets_channels.end()) {
+            if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) {
+                // this channel is already finished, just return OK
+                return Status::OK();
+            }
+            std::stringstream ss;
+            ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id;
+            return Status::InternalError(ss.str());
+        }
+        channel = it->second;
+    }
+
+    // 2. check if mem consumption exceed limit
+    handle_mem_exceed_limit(false);
+
+    // 3. add batch to tablets channel
+    if (request.has_row_batch()) {
+        RETURN_IF_ERROR(channel->add_batch(request));
+    }
+
+    // 4. handle eos
+    Status st;
+    if (request.has_eos() && request.eos()) {
+        bool finished = false;
+        RETURN_IF_ERROR(channel->close(request.sender_id(), &finished, request.partition_ids(), tablet_vec));
+        if (finished) {
+            std::lock_guard<std::mutex> l(_lock);
+            _tablets_channels.erase(index_id);
+            _finished_channel_ids.emplace(index_id);
+        }
+    }
+    _last_updated_time = time(nullptr);
+    return st;
+}
+
+void LoadChannel::handle_mem_exceed_limit(bool force) {
+    // lock so that only one thread can check mem limit
+    std::lock_guard<std::mutex> l(_lock);
+    if (!force && !_mem_tracker->limit_exceeded()) {
+        return;
+    }
+
+    VLOG(1) << "mem consumption: " << _mem_tracker->consumption()
+        << " may exceed limit. force: " << force << ", load id: " << _load_id;
+    std::shared_ptr<TabletsChannel> channel;
+    if (_find_largest_max_consumption_tablets_channel(&channel)) {
+        channel->reduce_mem_usage();
+    } else {
+        // should not happen, add log to observe
+        LOG(WARNING) << "failed to find suitable tablets channel when mem limit execeed: " << _load_id;
+    }
+}
+
+// lock should be held when calling this method
+bool LoadChannel::_find_largest_max_consumption_tablets_channel(std::shared_ptr<TabletsChannel>* channel) {
+    bool find = false;;
+    int64_t max_consume = 0;
+    for (auto& it : _tablets_channels) { 
+        if (it.second->mem_consumption() > max_consume) {
+            max_consume = it.second->mem_consumption();
+            *channel = it.second;
+            find = true;
+        }
+    }
+    return find;
+}
+
+bool LoadChannel::is_finished() {
+    if (!_opened) {
+        return false;
+    }
+    std::lock_guard<std::mutex> l(_lock);
+    return _tablets_channels.empty();
+}
+
+Status LoadChannel::cancel() {
+    std::lock_guard<std::mutex> l(_lock);
+    for (auto& it : _tablets_channels) { 
+        it.second->cancel();
+    }
+    return Status::OK();
+}
+
+}
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
new file mode 100644
index 0000000..9e317ab
--- /dev/null
+++ b/be/src/runtime/load_channel.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <unordered_map>
+#include <mutex>
+
+#include "common/status.h"
+#include "gen_cpp/Types_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/mem_tracker.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class Cache;
+class TabletsChannel;
+
+// A LoadChannel manages tablets channels for all indexes
+// corresponding to a certain load job
+class LoadChannel {
+public:
+    LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* mem_tracker);
+    ~LoadChannel();
+
+    // open a new load channel if not exist
+    Status open(const PTabletWriterOpenRequest& request);
+
+    // this batch must belong to a index in one transaction
+    Status add_batch(
+            const PTabletWriterAddBatchRequest& request,
+            google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
+
+    // return true if this load channel has been opened and all tablets channels are closed then.
+    bool is_finished();
+
+    // cancel this channel
+    Status cancel();
+
+    time_t last_updated_time() const { return _last_updated_time; }
+
+    const UniqueId& load_id() const { return _load_id; }
+
+    // check if this load channel mem consumption exceeds limit.
+    // If yes, it will pick a tablets channel to try to reduce memory consumption.
+    // If force is true, even if this load channel does not exceeds limit, it will still
+    // try to reduce memory.
+    void handle_mem_exceed_limit(bool force);
+
+    int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+
+private:
+    // when mem consumption exceeds limit, should call this to find the max mem consumption channel
+    // and try to reduce its mem usage.
+    bool _find_largest_max_consumption_tablets_channel(std::shared_ptr<TabletsChannel>* channel);
+
+private:
+    UniqueId _load_id;
+    // this mem tracker tracks the total mem comsuption of this load task
+    std::unique_ptr<MemTracker> _mem_tracker; 
+
+    // lock protect the tablets channel map
+    std::mutex _lock;
+    // index id -> tablets channel
+    std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels;
+    // This is to save finished channels id, to handle the retry request.
+    std::unordered_set<int64_t> _finished_channel_ids;
+    // set to true if at least one tablets channel has been opened
+    bool _opened = false;
+
+    time_t _last_updated_time;
+};
+
+}
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
new file mode 100644
index 0000000..181499a
--- /dev/null
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/load_channel_mgr.h"
+
+#include "runtime/load_channel.h"
+#include "runtime/mem_tracker.h"
+#include "service/backend_options.h"
+#include "util/stopwatch.hpp"
+#include "olap/lru_cache.h"
+
+namespace doris {
+
+LoadChannelMgr::LoadChannelMgr():_is_stopped(false) {
+    _lastest_success_channel = new_lru_cache(1024);
+}
+
+Status LoadChannelMgr::init(int64_t process_mem_limit) {
+    int64_t load_mem_limit = _calc_total_mem_limit(process_mem_limit);
+    _mem_tracker.reset(new MemTracker(load_mem_limit, "load channel mgr"));
+    RETURN_IF_ERROR(_start_bg_worker());
+    return Status::OK();
+}
+
+int64_t LoadChannelMgr::_calc_total_mem_limit(int64_t process_mem_limit) {
+    if (process_mem_limit == -1) {
+        // no limit
+        return -1;
+    }
+    int64_t load_mem_limit = process_mem_limit * (config::load_process_max_memory_limit_percent / 100.0);
+    return std::min<int64_t>(load_mem_limit, config::load_process_max_memory_limit_bytes);
+}
+
+LoadChannelMgr::~LoadChannelMgr() {
+    _is_stopped.store(true);
+    if (_load_channels_clean_thread.joinable()) {
+        _load_channels_clean_thread.join();
+    }
+    delete _lastest_success_channel;
+}
+
+Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
+    UniqueId load_id(params.id());
+    std::shared_ptr<LoadChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _load_channels.find(load_id);
+        if (it != _load_channels.end()) {
+            channel = it->second;
+        } else {
+            // create a new load channel
+            int64_t load_mem_limit = _calc_load_mem_limit(params.has_load_mem_limit() ? params.load_mem_limit() : -1);
+            channel.reset(new LoadChannel(load_id, load_mem_limit, _mem_tracker.get()));
+            _load_channels.insert({load_id, channel});
+        }
+    }
+
+    RETURN_IF_ERROR(channel->open(params));
+    return Status::OK();
+}
+
+int64_t LoadChannelMgr::_calc_load_mem_limit(int64_t mem_limit) {
+    // default mem limit is used to be compatible with old request.
+    // new request should be set load_mem_limit.
+    const int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB
+    int64_t load_mem_limit = default_load_mem_limit;
+    if (mem_limit != -1) {
+        // mem limit of a certain load should between config::write_buffer_size and config::load_process_memory_limit_bytes
+        load_mem_limit = std::max<int64_t>(mem_limit, config::write_buffer_size);
+        load_mem_limit = std::min<int64_t>(_mem_tracker->limit(), load_mem_limit);
+    }
+    return load_mem_limit;
+}
+
+static void dummy_deleter(const CacheKey& key, void* value) {
+}
+
+Status LoadChannelMgr::add_batch(
+        const PTabletWriterAddBatchRequest& request,
+        google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+        int64_t* wait_lock_time_ns) {
+
+    UniqueId load_id(request.id());
+    // 1. get load channel
+    std::shared_ptr<LoadChannel> channel;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        auto it = _load_channels.find(load_id);
+        if (it == _load_channels.end()) {
+            auto handle = _lastest_success_channel->lookup(load_id.to_string());
+            // success only when eos be true
+            if (handle != nullptr) {
+                _lastest_success_channel->release(handle);
+                if (request.has_eos() && request.eos()) {
+                    return Status::OK();
+                }
+            }
+            std::stringstream ss;
+            ss << "load channel manager add batch with unknown load id: " << load_id;
+            return Status::InternalError(ss.str());
+        }
+        channel = it->second;
+    }
+
+    // 2. check if mem consumption exceed limit
+    _handle_mem_exceed_limit(); 
+
+    // 3. add batch to load channel
+    if (request.has_row_batch()) {
+        RETURN_IF_ERROR(channel->add_batch(request, tablet_vec));
+    }
+
+    // 4. handle finish
+    if (channel->is_finished()) {
+        std::lock_guard<std::mutex> l(_lock);
+        _load_channels.erase(load_id);
+        auto handle = _lastest_success_channel->insert(
+                load_id.to_string(), nullptr, 1, dummy_deleter);
+        _lastest_success_channel->release(handle);
+    }
+    return Status::OK();
+}
+
+void LoadChannelMgr::_handle_mem_exceed_limit() {
+    // lock so that only one thread can check mem limit
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_mem_tracker->limit_exceeded()) {
+        return;
+    }
+    
+    VLOG(1) << "total load mem consumption: " << _mem_tracker->consumption()
+        << " exceed limit: " << _mem_tracker->limit(); 
+    int64_t max_consume = 0;
+    std::shared_ptr<LoadChannel> channel;
+    for (auto& kv : _load_channels) {
+        if (kv.second->mem_consumption() > max_consume) {
+            max_consume = kv.second->mem_consumption();
+            channel = kv.second;
+        }
+    }
+    if (max_consume == 0) {
+        // should not happen, add log to observe
+        LOG(WARNING) << "failed to find suitable load channel when total load mem limit execeed";
+        return;
+    }
+    DCHECK(channel.get() != nullptr);
+
+    // force reduce mem limit of the selected channel
+    channel->handle_mem_exceed_limit(true);
+}
+
+Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
+    UniqueId load_id(params.id());
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        _load_channels.erase(load_id);
+    }
+    return Status::OK();
+}
+
+Status LoadChannelMgr::_start_bg_worker() {
+    _load_channels_clean_thread = std::thread(
+        [this] {
+#ifdef GOOGLE_PROFILER
+            ProfilerRegisterThread();
+#endif
+
+            uint32_t interval = 60;
+            while (!_is_stopped.load()) {
+                _start_load_channels_clean();
+                sleep(interval);
+            }
+        });
+    return Status::OK();
+}
+
+Status LoadChannelMgr::_start_load_channels_clean() {
+    std::vector<std::shared_ptr<LoadChannel>> need_delete_channels;
+    const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec;
+    time_t now = time(nullptr);
+    {
+        std::vector<UniqueId> need_delete_channel_ids;
+        std::lock_guard<std::mutex> l(_lock);
+        for (auto& kv : _load_channels) {
+            time_t last_updated_time = kv.second->last_updated_time();
+            if (difftime(now, last_updated_time) >= max_alive_time) {
+                need_delete_channel_ids.emplace_back(kv.first);
+                need_delete_channels.emplace_back(kv.second);
+            }
+        }
+
+        for(auto& key: need_delete_channel_ids) {
+            _load_channels.erase(key);
+            LOG(INFO) << "erase timeout load channel: " << key;
+        }
+    }
+
+    // we must canel these load channels before destroying them.
+    // or some object may be invalid before trying to visit it.
+    // eg: MemTracker in load channel
+    for (auto& channel : need_delete_channels) {
+        channel->cancel();
+        LOG(INFO) << "load channel has been safely deleted: " << channel->load_id();
+    }
+
+    // this log print every 1 min, so that we could observe the mem consumption of load process
+    // on this Backend
+    LOG(INFO) << "load mem consumption(bytes). limit: " << _mem_tracker->limit()
+            << ", current: " << _mem_tracker->consumption()
+            << ", peak: " << _mem_tracker->peak_consumption();
+
+    return Status::OK();
+}
+
+}
diff --git a/be/src/runtime/tablet_writer_mgr.h b/be/src/runtime/load_channel_mgr.h
similarity index 59%
rename from be/src/runtime/tablet_writer_mgr.h
rename to be/src/runtime/load_channel_mgr.h
index 8a7f1e1..64e7a37 100644
--- a/be/src/runtime/tablet_writer_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -20,8 +20,6 @@
 #include <unordered_map>
 #include <memory>
 #include <mutex>
-#include <ostream>
-#include <sstream>
 #include <thread>
 #include <ctime>
 
@@ -30,56 +28,59 @@
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/tablets_channel.h"
-#include "util/hash_util.hpp"
 #include "util/uid_util.h"
 
-#include "service/brpc.h"
-
 namespace doris {
 
-class ExecEnv;
-
 class Cache;
+class LoadChannel;
 
-//  Mgr -> load -> tablet
+// LoadChannelMgr -> LoadChannel -> TabletsChannel -> DeltaWrtier
 // All dispached load data for this backend is routed from this class
-class TabletWriterMgr {
+class LoadChannelMgr {
 public:
-    TabletWriterMgr(ExecEnv* exec_env);
-    ~TabletWriterMgr();
+    LoadChannelMgr();
+    ~LoadChannelMgr();
 
-    // open a new backend
+    Status init(int64_t process_mem_limit);
+
+    // open a new load channel if not exist
     Status open(const PTabletWriterOpenRequest& request);
 
-    // this batch must belong to a index in one transaction
-    // when batch.
     Status add_batch(const PTabletWriterAddBatchRequest& request,
                      google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
                      int64_t* wait_lock_time_ns);
 
     // cancel all tablet stream for 'load_id' load
-    // id: stream load's id
     Status cancel(const PTabletWriterCancelRequest& request);
 
-    Status start_bg_worker();
 
 private:
-    ExecEnv* _exec_env;
-    // lock protect the channel map
-    std::mutex _lock;
+    // calculate the totol memory limit of all load processes on this Backend
+    int64_t _calc_total_mem_limit(int64_t process_mem_limit);
+    // calculate the memory limit for a single load process.
+    int64_t _calc_load_mem_limit(int64_t mem_limit);
 
-    // A map from load_id|index_id to load channel
-    butil::FlatMap<
-        TabletsChannelKey,
-        std::shared_ptr<TabletsChannel>,
-        TabletsChannelKeyHasher> _tablets_channels;
+    // check if the total load mem consumption exceeds limit.
+    // If yes, it will pick a load channel to try to reduce memory consumption.
+    void _handle_mem_exceed_limit();
 
+    Status _start_bg_worker();
+
+private:
+    // lock protect the load channel map
+    std::mutex _lock;
+    // load id -> load channel
+    std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
     Cache* _lastest_success_channel = nullptr;
 
-    // thread to clean timeout tablets_channel
-    std::thread _tablets_channel_clean_thread;
+    // check the total load mem consumption of this Backend
+    std::unique_ptr<MemTracker> _mem_tracker;
 
-    Status _start_tablets_channel_clean();
+    // thread to clean timeout load channels
+    std::thread _load_channels_clean_thread;
+    Status _start_load_channels_clean();
+    std::atomic<bool> _is_stopped;
 };
 
 }
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 970aec8..0717b84 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -164,7 +164,6 @@ RuntimeState::~RuntimeState() {
         _instance_mem_tracker->close();
     }
 
-#endif
     _instance_mem_tracker.reset();
    
     if (_query_mem_tracker.get() != NULL) {
@@ -172,6 +171,7 @@ RuntimeState::~RuntimeState() {
         _query_mem_tracker->close();
     }
     _query_mem_tracker.reset();
+#endif
 }
 
 Status RuntimeState::init(
diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp
deleted file mode 100644
index 20593ce..0000000
--- a/be/src/runtime/tablet_writer_mgr.cpp
+++ /dev/null
@@ -1,169 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/tablet_writer_mgr.h"
-
-#include <cstdint>
-#include <unordered_map>
-#include <utility>
-
-#include "common/object_pool.h"
-#include "exec/tablet_info.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_tracker.h"
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-#include "service/backend_options.h"
-#include "util/stopwatch.hpp"
-#include "olap/delta_writer.h"
-#include "olap/lru_cache.h"
-
-namespace doris {
-
-TabletWriterMgr::TabletWriterMgr(ExecEnv* exec_env) :_exec_env(exec_env) {
-    _tablets_channels.init(2011);
-    _lastest_success_channel = new_lru_cache(1024);
-}
-
-TabletWriterMgr::~TabletWriterMgr() {
-    delete _lastest_success_channel;
-}
-
-Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) {
-    TabletsChannelKey key(params.id(), params.index_id());
-    std::shared_ptr<TabletsChannel> channel;
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        auto val = _tablets_channels.seek(key);
-        if (val != nullptr) {
-            channel = *val;
-        } else {
-            // create a new 
-            channel.reset(new TabletsChannel(key));
-            _tablets_channels.insert(key, channel);
-        }
-    }
-    RETURN_IF_ERROR(channel->open(params));
-    return Status::OK();
-}
-
-static void dummy_deleter(const CacheKey& key, void* value) {
-}
-
-Status TabletWriterMgr::add_batch(
-        const PTabletWriterAddBatchRequest& request,
-        google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-        int64_t* wait_lock_time_ns) {
-    TabletsChannelKey key(request.id(), request.index_id());
-    std::shared_ptr<TabletsChannel> channel;
-    {
-        MonotonicStopWatch timer;
-        timer.start();
-        std::lock_guard<std::mutex> l(_lock);
-        *wait_lock_time_ns += timer.elapsed_time();
-        auto value = _tablets_channels.seek(key);
-        if (value == nullptr) {
-            auto handle = _lastest_success_channel->lookup(key.to_string());
-            // success only when eos be true
-            if (handle != nullptr && request.has_eos() && request.eos()) {
-                _lastest_success_channel->release(handle);
-                return Status::OK();
-            }
-            std::stringstream ss;
-            ss << "TabletWriter add batch with unknown id, key=" << key;
-            return Status::InternalError(ss.str());
-        }
-        channel = *value;
-    }
-    if (request.has_row_batch()) {
-        RETURN_IF_ERROR(channel->add_batch(request));
-    }
-    Status st;
-    if (request.has_eos() && request.eos()) {
-        bool finished = false;
-        st = channel->close(request.sender_id(), &finished, request.partition_ids(), tablet_vec);
-        if (!st.ok()) {
-            LOG(WARNING) << "channle close failed, key=" << key
-                << ", sender_id=" << request.sender_id()
-                << ", err_msg=" << st.get_error_msg();
-        }
-        if (finished) {
-            MonotonicStopWatch timer;
-            timer.start();
-            std::lock_guard<std::mutex> l(_lock);
-            *wait_lock_time_ns += timer.elapsed_time();
-            _tablets_channels.erase(key);
-            if (st.ok()) {
-                auto handle = _lastest_success_channel->insert(
-                    key.to_string(), nullptr, 1, dummy_deleter);
-                _lastest_success_channel->release(handle);
-            }
-        }
-    }
-    return st;
-}
-
-Status TabletWriterMgr::cancel(const PTabletWriterCancelRequest& params) {
-    TabletsChannelKey key(params.id(), params.index_id());
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _tablets_channels.erase(key);
-    }
-    return Status::OK();
-}
-
-Status TabletWriterMgr::start_bg_worker() {
-    _tablets_channel_clean_thread = std::thread(
-        [this] {
-            #ifdef GOOGLE_PROFILER
-                ProfilerRegisterThread();
-            #endif
-
-            uint32_t interval = 60;
-            while (true) {
-                _start_tablets_channel_clean();
-                sleep(interval);
-            }
-        });
-    _tablets_channel_clean_thread.detach();
-    return Status::OK();
-}
-
-Status TabletWriterMgr::_start_tablets_channel_clean() {
-    const int32_t max_alive_time = config::streaming_load_rpc_max_alive_time_sec;
-    time_t now = time(nullptr);
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        std::vector<TabletsChannelKey> need_delete_keys;
-
-        for (auto& kv : _tablets_channels) {
-            time_t last_updated_time = kv.second->last_updated_time();
-            if (difftime(now, last_updated_time) >= max_alive_time) {
-                need_delete_keys.emplace_back(kv.first);
-            }
-        }
-
-        for(auto& key: need_delete_keys) {
-            _tablets_channels.erase(key);
-            LOG(INFO) << "erase timeout tablets channel: " << key;
-        }
-    }
-    return Status::OK();
-}
-
-}
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index b52f1aa..731e861 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -25,12 +25,11 @@
 
 namespace doris {
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key): 
+TabletsChannel::TabletsChannel(
+        const TabletsChannelKey& key,
+        MemTracker* mem_tracker): 
     _key(key), _closed_senders(64) {
-    // _last_updated_time should be set before being inserted to
-    // _tablet_channels in tablet_channel_mgr, or it may be erased
-    // immediately by gc thread.
-    _last_updated_time = time(nullptr);
+    _mem_tracker.reset(new MemTracker(-1, "tablets channel", mem_tracker));
 }
 
 TabletsChannel::~TabletsChannel() {
@@ -62,7 +61,6 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
     RETURN_IF_ERROR(_open_all_writers(params));
 
     _opened = true;
-    _last_updated_time = time(nullptr);
     return Status::OK();
 }
 
@@ -82,7 +80,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
         return Status::InternalError("lost data packet");
     }
 
-    RowBatch row_batch(*_row_desc, params.row_batch(), &_mem_tracker);
+    RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
 
     // iterator all data
     for (int i = 0; i < params.tablet_ids_size(); ++i) {
@@ -103,7 +101,6 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
         }
     }
     _next_seqs[params.sender_id()]++;
-    _last_updated_time = time(nullptr);
     return Status::OK();
 }
 
@@ -158,6 +155,35 @@ Status TabletsChannel::close(int sender_id, bool* finished,
     return Status::OK();
 }
 
+Status TabletsChannel::reduce_mem_usage() {
+    std::lock_guard<std::mutex> l(_lock);
+    // find tablet writer with largest mem consumption
+    int64_t max_consume = 0L;
+    DeltaWriter* writer = nullptr;
+    for (auto& it : _tablet_writers) {
+        if (it.second->mem_consumption() > max_consume) {
+            max_consume = it.second->mem_consumption();
+            writer = it.second;
+        } 
+    }
+
+    if (writer == nullptr || max_consume == 0) {
+        // barely not happend, just return OK
+        return Status::OK();
+    }
+    
+    VLOG(3) << "pick the delte writer to flush, with mem consumption: " << max_consume
+            << ", channel key: " << _key;
+    OLAPStatus st = writer->flush_memtable_and_wait();
+    if (st != OLAP_SUCCESS) {
+        // flush failed, return error
+        std::stringstream ss;
+        ss << "failed to reduce mem consumption by flushing memtable. err: " << st;
+        return Status::InternalError(ss.str());
+    }
+    return Status::OK();
+}
+
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
@@ -186,7 +212,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params)
         request.slots = index_slots;
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&request, &writer);
+        auto st = DeltaWriter::open(&request, _mem_tracker.get(),  &writer);
         if (st != OLAP_SUCCESS) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
@@ -202,6 +228,14 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params)
     return Status::OK();
 }
 
+Status TabletsChannel::cancel() {
+    std::lock_guard<std::mutex> l(_lock);
+    for (auto& it : _tablet_writers) {
+        it.second->cancel();
+    }
+    return Status::OK();
+}
+
 std::string TabletsChannelKey::to_string() const {
     std::stringstream ss;
     ss << *this;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b893af8..cb63e36 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -55,13 +55,15 @@ struct TabletsChannelKeyHasher {
     }
 };
 
+std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
+
 class DeltaWriter;
 class OlapTableSchemaParam;
 
 // channel that process all data for this load
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key);
+    TabletsChannel(const TabletsChannelKey& key, MemTracker* mem_tracker);
 
     ~TabletsChannel();
 
@@ -73,10 +75,15 @@ public:
         const google::protobuf::RepeatedField<int64_t>& partition_ids,
         google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
 
-    time_t last_updated_time() {
-        return _last_updated_time;
-    }
+    Status cancel();
 
+    // upper application may call this to try to reduce the mem usage of this channel.
+    // eg. flush the largest memtable immediately.
+    // return Status::OK if mem is reduced.
+    Status reduce_mem_usage();
+
+    int64_t mem_consumption() const { return _mem_tracker->consumption(); }
+    
 private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& params);
@@ -108,13 +115,8 @@ private:
 
     std::unordered_set<int64_t> _partition_ids;
 
-    // TODO(zc): to add this tracker to somewhere
-    MemTracker _mem_tracker;
-
-    //use to erase timeout TabletsChannel in _tablets_channels
-    time_t _last_updated_time;
+    std::unique_ptr<MemTracker> _mem_tracker;
 };
 
-std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
 
 } // end namespace
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 13dfc97..1897691 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -18,11 +18,11 @@
 #include "service/internal_service.h"
 
 #include "common/config.h"
-#include "runtime/tablet_writer_mgr.h"
 #include "gen_cpp/BackendService.h"
 #include "runtime/exec_env.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/load_channel_mgr.h"
 #include "service/brpc.h"
 #include "util/uid_util.h"
 #include "util/thrift_util.h"
@@ -63,9 +63,9 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
     VLOG_RPC << "tablet writer open, id=" << request->id()
         << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id();
     brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->tablet_writer_mgr()->open(*request);
+    auto st = _exec_env->load_channel_mgr()->open(*request);
     if (!st.ok()) {
-        LOG(WARNING) << "tablet writer open failed, message=" << st.get_error_msg()
+        LOG(WARNING) << "load channel open failed, message=" << st.get_error_msg()
             << ", id=" << request->id()
             << ", index_id=" << request->index_id()
             << ", txn_id=" << request->txn_id();
@@ -106,7 +106,7 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
             int64_t wait_lock_time_ns = 0;
             { 
                 SCOPED_RAW_TIMER(&execution_time_ns);
-                auto st = _exec_env->tablet_writer_mgr()->add_batch(*request, response->mutable_tablet_vec(), &wait_lock_time_ns);
+                auto st = _exec_env->load_channel_mgr()->add_batch(*request, response->mutable_tablet_vec(), &wait_lock_time_ns);
                 if (!st.ok()) {
                     LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
                         << ", id=" << request->id()
@@ -129,7 +129,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
         << ", index_id=" << request->index_id()
         << ", sender_id=" << request->sender_id();
     brpc::ClosureGuard closure_guard(done);
-    auto st = _exec_env->tablet_writer_mgr()->cancel(*request);
+    auto st = _exec_env->load_channel_mgr()->cancel(*request);
     if (!st.ok()) {
         LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
         << ", index_id=" << request->index_id()
diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp
index 843ef5c..eed036b 100644
--- a/be/test/exec/tablet_sink_test.cpp
+++ b/be/test/exec/tablet_sink_test.cpp
@@ -336,7 +336,9 @@ TEST_F(OlapTableSinkTest, normal) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
+    // state._query_mem_tracker.reset(new MemTracker());
+    // state._instance_mem_tracker.reset(new MemTracker(-1, "test", state._query_mem_tracker.get()));
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -442,7 +444,7 @@ TEST_F(OlapTableSinkTest, convert) {
     TQueryOptions query_options;
     query_options.batch_size = 1024;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -569,7 +571,7 @@ TEST_F(OlapTableSinkTest, init_fail1) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -627,7 +629,7 @@ TEST_F(OlapTableSinkTest, init_fail3) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -686,7 +688,7 @@ TEST_F(OlapTableSinkTest, init_fail4) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -753,7 +755,7 @@ TEST_F(OlapTableSinkTest, add_batch_failed) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
@@ -848,7 +850,7 @@ TEST_F(OlapTableSinkTest, decimal) {
     TQueryOptions query_options;
     query_options.batch_size = 1;
     RuntimeState state(fragment_id, query_options, TQueryGlobals(), &_env);
-    state._instance_mem_tracker.reset(new MemTracker());
+    state.init_mem_trackers(TUniqueId());
 
     ObjectPool obj_pool;
     TDescriptorTable tdesc_tbl;
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 9685aeb..495d011 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -44,6 +44,7 @@ static const uint32_t MAX_RETRY_TIMES = 10;
 static const uint32_t MAX_PATH_LEN = 1024;
 
 StorageEngine* k_engine = nullptr;
+MemTracker* k_mem_tracker = nullptr;
 
 void set_up() {
     char buffer[MAX_PATH_LEN];
@@ -60,6 +61,8 @@ void set_up() {
 
     ExecEnv* exec_env = doris::ExecEnv::GetInstance();
     exec_env->set_storage_engine(k_engine);
+
+    k_mem_tracker = new MemTracker(-1, "delta writer test");
 }
 
 void tear_down() {
@@ -67,6 +70,7 @@ void tear_down() {
     k_engine = nullptr;
     system("rm -rf ./data_test");
     remove_all_dir(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX);
+    delete k_mem_tracker;
 }
 
 void create_tablet_request(int64_t tablet_id, int32_t schema_hash, TCreateTabletReq* request) {
@@ -303,7 +307,7 @@ TEST_F(TestDeltaWriter, open) {
     WriteRequest write_req = {10003, 270068375, WriteType::LOAD,
                               20001, 30001, load_id, false, tuple_desc};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer); 
+    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); 
     ASSERT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     ASSERT_EQ(OLAP_SUCCESS, res);
@@ -338,7 +342,7 @@ TEST_F(TestDeltaWriter, write) {
                               20002, 30002, load_id, false, tuple_desc,
                               &(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer); 
+    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); 
     ASSERT_NE(delta_writer, nullptr);
 
     Arena arena;
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index ccfe08b..9694ab0 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -52,7 +52,7 @@ ADD_BE_TEST(mem_limit_test)
 ADD_BE_TEST(buffered_block_mgr2_test)
 ADD_BE_TEST(buffered_tuple_stream2_test)
 ADD_BE_TEST(stream_load_pipe_test)
-ADD_BE_TEST(tablet_writer_mgr_test)
+ADD_BE_TEST(load_channel_mgr_test)
 #ADD_BE_TEST(export_task_mgr_test)
 ADD_BE_TEST(snapshot_loader_test)
 ADD_BE_TEST(user_function_cache_test)
diff --git a/be/test/runtime/tablet_writer_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp
similarity index 94%
rename from be/test/runtime/tablet_writer_mgr_test.cpp
rename to be/test/runtime/load_channel_mgr_test.cpp
index d42ad84..8997d77 100644
--- a/be/test/runtime/tablet_writer_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/tablet_writer_mgr.h"
+#include "runtime/load_channel_mgr.h"
 
 #include <gtest/gtest.h>
 
@@ -43,7 +43,7 @@ OLAPStatus close_status;
 int64_t wait_lock_time_ns;
 
 // mock
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine) : _req(*req) {
+DeltaWriter::DeltaWriter(WriteRequest* req, MemTracker* mem_tracker, StorageEngine* storage_engine) : _req(*req) {
 }
 
 DeltaWriter::~DeltaWriter() {
@@ -53,11 +53,11 @@ OLAPStatus DeltaWriter::init() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) {
+OLAPStatus DeltaWriter::open(WriteRequest* req, MemTracker* mem_tracker, DeltaWriter** writer) {
     if (open_status != OLAP_SUCCESS) {
         return open_status;
     }
-    *writer = new DeltaWriter(req, nullptr);
+    *writer = new DeltaWriter(req, mem_tracker, nullptr);
     return open_status;
 }
 
@@ -82,10 +82,17 @@ OLAPStatus DeltaWriter::cancel() {
     return OLAP_SUCCESS;
 }
 
-class TabletWriterMgrTest : public testing::Test {
+OLAPStatus DeltaWriter::flush_memtable_and_wait() {
+    return OLAP_SUCCESS;
+}
+
+int64_t DeltaWriter::partition_id() const { return 1L; }
+int64_t DeltaWriter::mem_consumption() const { return 1024L; }
+
+class LoadChannelMgrTest : public testing::Test {
 public:
-    TabletWriterMgrTest() { }
-    virtual ~TabletWriterMgrTest() { }
+    LoadChannelMgrTest() { }
+    virtual ~LoadChannelMgrTest() { }
     void SetUp() override {
         _k_tablet_recorder.clear();
         open_status = OLAP_SUCCESS;
@@ -95,7 +102,7 @@ public:
 private:
 };
 
-TEST_F(TabletWriterMgrTest, check_builder) {
+TEST_F(LoadChannelMgrTest, check_builder) {
     TDescriptorTableBuilder table_builder;
     {
         TTupleDescriptorBuilder tuple;
@@ -150,9 +157,10 @@ void create_schema(DescriptorTbl* desc_tbl, POlapTableSchemaParam* pschema) {
     indexes->set_schema_hash(123);
 }
 
-TEST_F(TabletWriterMgrTest, normal) {
+TEST_F(LoadChannelMgrTest, normal) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -238,9 +246,10 @@ TEST_F(TabletWriterMgrTest, normal) {
     ASSERT_EQ(_k_tablet_recorder[21], 1);
 }
 
-TEST_F(TabletWriterMgrTest, cancel) {
+TEST_F(LoadChannelMgrTest, cancel) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -280,9 +289,10 @@ TEST_F(TabletWriterMgrTest, cancel) {
     }
 }
 
-TEST_F(TabletWriterMgrTest, open_failed) {
+TEST_F(LoadChannelMgrTest, open_failed) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -313,9 +323,10 @@ TEST_F(TabletWriterMgrTest, open_failed) {
     }
 }
 
-TEST_F(TabletWriterMgrTest, add_failed) {
+TEST_F(LoadChannelMgrTest, add_failed) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -399,9 +410,10 @@ TEST_F(TabletWriterMgrTest, add_failed) {
     }
 }
 
-TEST_F(TabletWriterMgrTest, close_failed) {
+TEST_F(LoadChannelMgrTest, close_failed) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -490,9 +502,10 @@ TEST_F(TabletWriterMgrTest, close_failed) {
     }
 }
 
-TEST_F(TabletWriterMgrTest, unknown_tablet) {
+TEST_F(LoadChannelMgrTest, unknown_tablet) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
@@ -575,9 +588,10 @@ TEST_F(TabletWriterMgrTest, unknown_tablet) {
     }
 }
 
-TEST_F(TabletWriterMgrTest, duplicate_packet) {
+TEST_F(LoadChannelMgrTest, duplicate_packet) {
     ExecEnv env;
-    TabletWriterMgr mgr(&env);
+    LoadChannelMgr mgr;
+    mgr.init(-1);
 
     auto tdesc_tbl = create_descriptor_table();
     ObjectPool obj_pool;
diff --git a/docs/documentation/cn/administrator-guide/alter-table/alter-table-schema-change.md b/docs/documentation/cn/administrator-guide/alter-table/alter-table-schema-change.md
index 0ac957e..bffabfc 100644
--- a/docs/documentation/cn/administrator-guide/alter-table/alter-table-schema-change.md
+++ b/docs/documentation/cn/administrator-guide/alter-table/alter-table-schema-change.md
@@ -171,7 +171,7 @@ ADD COLUMN k5 INT default "1" to rollup2;
 
     如修改列 `k1 INT SUM NULL DEFAULT "1"` 类型为 BIGINT,则需执行命令如下:
     
-    ```ALTER TABLE tbl1 MODIFY COLUMN `k1 BIGINT SUM NULL DEFAULT "1";```
+    ```ALTER TABLE tbl1 MODIFY COLUMN `k1` BIGINT SUM NULL DEFAULT "1";```
     
     注意,除新的列类型外,如聚合方式,Nullable 属性,以及默认值都要按照原信息补全。
     
diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
index a50c8c6..fe12579 100644
--- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
@@ -192,9 +192,7 @@ Label 的另一个作用,是防止用户重复导入相同的数据。**强烈
     
 + exec\_mem\_limit
 
-    导入任务的内存使用上限。当导入任务使用的内存超过设定上限时,导入任务会被 CANCEL。默认为 2G,单位为字节。
-    
-    当导入出现 `Memory exceed limit` 错误时,可以适当调整这个参数,如调整为 4G、8G 等。
+    导入内存限制。默认是 2GB。单位为字节。
 
 + strict\_mode
 
diff --git a/docs/documentation/cn/administrator-guide/load-data/load-manual.md b/docs/documentation/cn/administrator-guide/load-data/load-manual.md
index 6d8bff4..0798c45 100644
--- a/docs/documentation/cn/administrator-guide/load-data/load-manual.md
+++ b/docs/documentation/cn/administrator-guide/load-data/load-manual.md
@@ -118,6 +118,17 @@ Doris 目前的导入方式分为两类,同步和异步。如果是外部程
 ### 注意事项
 无论是异步还是同步的导入类型,都不应该在 Doris 返回导入失败或导入创建失败后,无休止的重试。**外部系统在有限次数重试并失败后,保留失败信息,大部分多次重试均失败问题都是使用方法问题或数据本身问题。**
 
+## 内存限制
+
+用户可以通过设置参数来限制单个导入的内存使用,以防止导入占用过多的内存而导致系统OOM。
+不同导入方式限制内存的方式略有不同,可以参阅各自的导入手册查看。
+
+一个导入作业通常会分布在多个 Backend 上执行,导入内存限制的是一个导入作业,在单个 Backend 上的内存使用,而不是在整个集群的内存使用。
+
+同时,每个 Backend 会设置可用于导入的内存的总体上限。具体配置参阅下面的通用系统配置小节。这个配置限制了所有在该 Backend 上运行的导入任务的总体内存使用上限。
+
+较小的内存限制可能会影响导入效率,因为导入流程可能会因为内存达到上限而频繁的将内存中的数据写回磁盘。而过大的内存限制可能导致当导入并发较高时,系统OOM。所以,需要根据需求,合理的设置导入的内存限制。
+
 ## 最佳实践
 
 用户在接入 Doris 导入时,一般会采用程序接入的方式,这样可以保证数据被定期的导入到 Doris 中。下面主要说明了程序接入 Doris 的最佳实践。
@@ -127,6 +138,7 @@ Doris 目前的导入方式分为两类,同步和异步。如果是外部程
 3. 确定导入方式的类型:导入方式为同步或异步。比如 Broker load 为异步导入方式,则外部系统在提交创建导入后,必须调用查看导入命令,根据查看导入命令的结果来判断导入是否成功。
 4. 制定 Label 生成策略:Label 生成策略需满足,每一批次数据唯一且固定的原则。这样 Doris 就可以保证 At-Most-Once。
 5. 程序自身保证 At-Least-Once:外部系统需要保证自身的 At-Least-Once,这样就可以保证导入流程的 Exactly-Once。
+6. 
 
 ## 通用系统配置
 
@@ -169,3 +181,10 @@ Doris 目前的导入方式分为两类,同步和异步。如果是外部程
 + streaming\_load\_rpc\_max\_alive\_time\_sec
 
     在导入过程中,Doris 会为每一个 Tablet 开启一个 Writer,用于接收数据并写入。这个参数指定了 Writer 的等待超时时间。如果在这个时间内,Writer 没有收到任何数据,则 Writer 会被自动销毁。当系统处理速度较慢时,Writer 可能长时间接收不到下一批数据,导致导入报错:`TabletWriter add batch with unknown id`。此时可适当增大这个配置。默认为 600 秒。
+    
+* load\_process\_max\_memory\_limit\_bytes 和 load\_process\_max\_memory\_limit\_percent
+
+    这两个参数,限制了单个 Backend 上,可用于导入任务的内存上限。分别是最大内存和最大内存百分比。`load_process_max_memory_limit_percent` 默认为 80%,该值为 `mem_limit` 配置的 80%。即假设物理内存为 M,则默认导入内存限制为 M * 80% * 80%。
+
+    `load_process_max_memory_limit_bytes` 默认为 100GB。系统会在两个参数中取较小者,作为最终的 Backend 导入内存使用上限。
+
diff --git a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md
index 662558e..9dad1f4 100644
--- a/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md
+++ b/docs/documentation/cn/administrator-guide/load-data/stream-load-manual.md
@@ -114,6 +114,10 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
     ```
 
++ exec\_mem\_limit
+
+    导入内存限制。默认为 2GB,单位为字节。
+
 + strict\_mode
 
     Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为开启。
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index 5dc7ec0..99bc7ed 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
@@ -135,8 +135,7 @@
         可以指定如下参数:
         timeout:         指定导入操作的超时时间。默认超时为4小时。单位秒。
         max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。
-        exec_mem_limit:   设置导入使用的内存上限。默认为2G,单位字节。这里是指单个 BE 节点的内存上限。
-                          一个导入可能分布于多个BE。我们假设 1GB 数据在单个节点处理需要最大5GB内存。那么假设1GB文件分布在2个节点处理,那么理论上,每个节点需要内存为2.5GB。则该参数可以设置为 2684354560,即2.5GB
+        exec_mem_limit:  导入内存限制。默认为 2GB。单位为字节。
         strict mode:     是否对数据进行严格限制。默认为true。
         timezone:         指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 "Asia/Shanghai" 时区。
 
@@ -359,4 +358,5 @@
         )
      
 ## keyword
+
     BROKER,LOAD
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md
index 507373b..e871b46 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md	
@@ -45,6 +45,8 @@
         strict_mode: 用户指定此次导入是否开启严格模式,默认为开启。关闭方式为 -H "strict_mode: false"。
 
         timezone: 指定本次导入所使用的时区。默认为东八区。该参数会影响所有导入涉及的和时区有关的函数结果。
+        
+        exec_mem_limit: 导入内存限制。默认为 2GB。单位为字节。
 
     RETURN VALUES
         导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段
diff --git a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
index eb164df..87304d7 100644
--- a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
+++ b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
@@ -188,9 +188,7 @@ The following is a detailed explanation of some parameters of the import operati
 
 * exec\_mem\_limit
 
-	The upper limit of memory usage for import tasks. When the memory used by the import task exceeds the set upper limit, the import task will be CANCEL. The default is 2G in bytes.
-
-	When `Memory exceed limit` error occurs in the import, this parameter can be adjusted appropriately, such as 4G, 8G, etc.
+	Memory limit. Default is 2GB. Unit is Bytes.
 
 + strict\_mode
 
diff --git a/docs/documentation/en/administrator-guide/load-data/load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/load-manual_EN.md
index 4c7f43d..346e2ad 100644
--- a/docs/documentation/en/administrator-guide/load-data/load-manual_EN.md
+++ b/docs/documentation/en/administrator-guide/load-data/load-manual_EN.md
@@ -119,6 +119,17 @@ Operation steps:
 ### Notes
 Neither asynchronous nor synchronous import types should be retried endlessly after Doris returns an import failure or an import creation failure. **After a limited number of retries and failures, the external system retains the failure information. Most of the retries fail because of the problem of using method or data itself.**
 
+## Memory Limit
+
+Users can limit the memory usage of a single load by setting parameters to prevent the system from taking up too much memory and causing the system OOM.
+Different load methods restrict memory in a slightly different way. You can refer to the respective load manuals for viewing.
+
+An load job is usually distributed across multiple Backends. The load memory limit is the memory usage of load job on a single Backend, not memory usage across the cluster.
+
+At the same time, each Backend sets the overall upper limit of the memory available for load. See the General System Configuration section below for specific configuration. This configuration limits the overall memory usage limit for all load tasks running on this Backend.
+
+Smaller memory limits can affect load efficiency because the load process can frequently write in-memory data back to disk because memory reaches the upper limit. Excessive memory limits can cause system OOM when load concurrency is high. Therefore, you need to properly set the load memory limit according to your needs.
+
 ## Best Practices
 
 When users access Doris import, they usually use program access mode to ensure that data is imported into Doris regularly. Below is a brief description of the best practices for program access to Doris.
@@ -170,3 +181,9 @@ The following configuration belongs to the BE system configuration, which can be
 + streaming\_load\_rpc\_max\_alive\_time\_sec
 
 	During the import process, Doris opens a Writer for each Tablet to receive and write data. This parameter specifies Writer's waiting timeout time. If Writer does not receive any data at this time, Writer will be destroyed automatically. When the system processing speed is slow, Writer may not receive the next batch of data for a long time, resulting in import error: `Tablet Writer add batch with unknown id`. This configuration can be increased appropriately at this time. The default is  [...]
+
+* load\_process\_max\_memory\_limit\_bytes and load\_process\_max\_memory\_limit\_percent
+
+    These two parameters limit the upper memory limit that can be used to load tasks on a single Backend. The maximum memory and maximum memory percentage are respectively. `load_process_max_memory_limit_percent` defaults to 80%, which is 80% of the `mem_limit` configuration. That is, if the physical memory is M, the default load memory limit is M * 80% * 80%.
+
+     `load_process_max_memory_limit_bytes` defaults to 100GB. The system takes the smaller of the two parameters as the final Backend load memory usage limit.
\ No newline at end of file
diff --git a/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md
index 687a473..a397997 100644
--- a/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md
+++ b/docs/documentation/en/administrator-guide/load-data/stream-load-manual_EN.md
@@ -114,6 +114,10 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL`
 	Tmp_* is a placeholder, representing two original columns in the original file.
 	```
 
++ exec\_mem\_limit
+
+    Memory limit. Default is 2GB. Unit is Bytes
+
 ### Return results
 
 Since Stream load is a synchronous import method, the result of the import is directly returned to the user by creating the return value of the import.
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md
index 2b82084..ffec852 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md	
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD_EN.md	
@@ -146,10 +146,8 @@
 
         max_filter_ratio: Data ratio of maximum tolerance filterable (data irregularity, etc.). Default zero tolerance.
 
-        exc_mem_limit: Sets the upper memory limit for import. The default is 2G, per byte. This is the upper memory limit for a single BE node.
-
-        A load may be distributed over multiple BEs. Let's assume that 1GB data needs up to 5GB of memory for processing at a single node. Suppose 1GB files are distributed over two nodes, then theoretically, each node needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
-
+        exc_mem_limit: Memory limit. Default is 2GB. Unit is Bytes.
+        
         strict_mode: Whether the data is strictly restricted. The default is true.
 
         timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone.
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD_EN.md
index 531c3bd..8217471 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD_EN.md	
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD_EN.md	
@@ -77,10 +77,14 @@ Specifies the timeout for the load. Unit seconds. The default is 600 seconds. Th
 
 The user specifies whether strict load mode is enabled for this load. The default is enabled. The shutdown mode is `-H "strict_mode: false"`.
 
-`Timezone`
+`timezone`
 
 Specifies the time zone used for this load. The default is East Eight District. This parameter affects all function results related to the time zone involved in the load.
 
+`exec_mem_limit`
+
+Memory limit. Default is 2GB. Unit is Bytes.
+
 RETURN VALUES
 
 After the load is completed, the related content of this load will be returned in Json format. Current field included
diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index a2a0b7d..8d26857 100644
--- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -144,6 +144,7 @@ public class StreamLoadPlanner {
         TQueryOptions queryOptions = new TQueryOptions();
         queryOptions.setQuery_type(TQueryType.LOAD);
         queryOptions.setQuery_timeout(streamLoadTask.getTimeout());
+        queryOptions.setMem_limit(streamLoadTask.getMemLimit());
         params.setQuery_options(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
         queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java
index a42b387..faa4987 100644
--- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -62,6 +62,7 @@ public class StreamLoadTask {
     private boolean strictMode = true;
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
     private int timeout = Config.stream_load_default_timeout_second;
+    private long execMemLimit = 2 * 1024 * 1024 * 1024L; // default is 2GB
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
         this.id = id;
@@ -162,6 +163,9 @@ public class StreamLoadTask {
             timezone = request.getTimezone();
             TimeUtils.checkTimeZoneValid(timezone);
         }
+        if (request.isSetExecMemLimit()) {
+            execMemLimit = request.getExecMemLimit();
+        }
     }
 
     public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
@@ -243,4 +247,8 @@ public class StreamLoadTask {
         columnSeparator = new ColumnSeparator(oriSeparator);
         columnSeparator.analyze();
     }
+
+    public long getMemLimit() {
+        return execMemLimit;
+    }
 }
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 3a1ec7b..ae55901 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -68,6 +68,7 @@ message PTabletWriterOpenRequest {
     repeated PTabletWithPartition tablets = 5;
     required int32 num_senders = 6;
     required bool need_gen_rollup = 7;
+    optional int64 load_mem_limit = 8;
 };
 
 message PTabletWriterOpenResult {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 0aa4dbb..aa22875 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -533,6 +533,7 @@ struct TStreamLoadPutRequest {
     18: optional i32 timeout
     19: optional bool strictMode
     20: optional string timezone
+    21: optional i64 execMemLimit
 }
 
 struct TStreamLoadPutResult {
diff --git a/run-ut.sh b/run-ut.sh
index a915738..6edfa50 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -209,7 +209,7 @@ ${DORIS_TEST_BINARY_DIR}/runtime/string_value_test
 ${DORIS_TEST_BINARY_DIR}/runtime/free_list_test
 ${DORIS_TEST_BINARY_DIR}/runtime/string_buffer_test
 ${DORIS_TEST_BINARY_DIR}/runtime/stream_load_pipe_test
-${DORIS_TEST_BINARY_DIR}/runtime/tablet_writer_mgr_test
+${DORIS_TEST_BINARY_DIR}/runtime/load_channel_mgr_test
 ${DORIS_TEST_BINARY_DIR}/runtime/snapshot_loader_test
 ${DORIS_TEST_BINARY_DIR}/runtime/user_function_cache_test
 ${DORIS_TEST_BINARY_DIR}/runtime/small_file_mgr_test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org