You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/26 14:40:07 UTC

[incubator-doris] branch stream-load-vec updated: [Refactor] Refactor the code of vec stream load (#9157)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch stream-load-vec
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/stream-load-vec by this push:
     new c0c4ce8e18 [Refactor] Refactor the code of vec stream load (#9157)
c0c4ce8e18 is described below

commit c0c4ce8e18b5f45adcb05c6c71631a4cda513c7b
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue Apr 26 22:39:59 2022 +0800

    [Refactor] Refactor the code of vec stream load (#9157)
---
 be/src/common/config.h                             |   2 -
 be/src/exec/tablet_sink.cpp                        |  27 +-
 be/src/exec/tablet_sink.h                          |  46 +-
 be/src/olap/delta_writer.cpp                       |  45 +-
 be/src/olap/delta_writer.h                         |  20 +-
 be/src/olap/memtable.cpp                           | 183 ++---
 be/src/olap/memtable.h                             |  58 +-
 be/src/olap/row_cursor_cell.h                      |  64 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |  70 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  13 +-
 be/src/olap/rowset/segment_v2/segment_writer.h     |   8 +-
 be/src/runtime/exec_env_init.cpp                   |   7 +-
 be/src/runtime/load_channel.cpp                    |  39 +-
 be/src/runtime/load_channel.h                      |  60 +-
 be/src/runtime/load_channel_mgr.cpp                |  42 +-
 be/src/runtime/load_channel_mgr.h                  | 107 ++-
 be/src/runtime/tablets_channel.cpp                 |  70 +-
 be/src/runtime/tablets_channel.h                   | 130 +++-
 be/src/runtime/thread_mem_tracker_mgr.h            |   2 +-
 be/src/service/internal_service.cpp                |   2 +-
 be/src/vec/CMakeLists.txt                          |   6 +-
 be/src/vec/exec/vbroker_scan_node.cpp              |   4 +-
 be/src/vec/exec/vbroker_scan_node.h                |  11 +-
 be/src/vec/exec/vbroker_scanner.cpp                |   3 -
 be/src/vec/exec/vbroker_scanner.h                  |   4 +-
 be/src/vec/olap/olap_data_convertor.cpp            |  27 +-
 be/src/vec/olap/olap_data_convertor.h              |  20 +-
 be/src/vec/olap/vdelta_writer.cpp                  |  81 ---
 be/src/vec/olap/vdelta_writer.h                    |  43 --
 be/src/vec/runtime/vload_channel.cpp               |  87 ---
 be/src/vec/runtime/vload_channel.h                 |  42 --
 be/src/vec/runtime/vload_channel_mgr.cpp           |  68 --
 be/src/vec/runtime/vload_channel_mgr.h             |  43 --
 be/src/vec/runtime/vtablets_channel.cpp            | 142 ----
 be/src/vec/runtime/vtablets_channel.h              |  40 --
 be/src/vec/sink/vtablet_sink.cpp                   |  27 +-
 be/src/vec/sink/vtablet_sink.h                     |  13 +-
 be/test/CMakeLists.txt                             |   6 +-
 be/test/vec/exec/vbroker_scan_node_test.cpp        |   4 -
 be/test/vec/exec/vbroker_scanner_test.cpp          |   4 -
 be/test/vec/exec/vtablet_sink_test.cpp             |   5 -
 be/test/vec/runtime/vload_channel_mgr_test.cpp     | 757 ---------------------
 .../apache/doris/planner/StreamLoadPlanner.java    |   9 +-
 gensrc/proto/internal_service.proto                |   1 +
 44 files changed, 474 insertions(+), 1968 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5e209573bd..a93229cdf8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -731,8 +731,6 @@ CONF_mInt32(string_type_length_soft_limit_bytes, "1048576");
 CONF_Validator(string_type_length_soft_limit_bytes,
                [](const int config) -> bool { return config > 0 && config <= 2147483643; });
 
-CONF_mBool(enable_vectorized_load, "false");
-
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 4271313d9c..b04883511b 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -141,6 +141,7 @@ void NodeChannel::open() {
     request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
     request.set_is_high_priority(_parent->_is_high_priority);
     request.set_sender_ip(BackendOptions::get_localhost());
+    request.set_is_vectorized(_is_vectorized);
 
     _open_closure = new RefCountClosure<PTabletWriterOpenResult>();
     _open_closure->ref();
@@ -547,8 +548,6 @@ void NodeChannel::clear_all_batches() {
     _cur_batch.reset();
 }
 
-IndexChannel::~IndexChannel() {}
-
 Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) {
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     for (auto& tablet : tablets) {
@@ -586,20 +585,6 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
     return Status::OK();
 }
 
-void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
-    auto it = _channels_by_tablet.find(tablet_id);
-    DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
-    for (auto channel : it->second) {
-        // if this node channel is already failed, this add_row will be skipped
-        auto st = channel->add_row(tuple, tablet_id);
-        if (!st.ok()) {
-            mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id);
-            // continue add row to other node, the error will be checked for every batch outside
-        }
-    }
-}
-
 void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
                                   int64_t tablet_id) {
     const auto& it = _tablets_by_channel.find(node_id);
@@ -823,14 +808,8 @@ Status OlapTableSink::prepare(RuntimeState* state) {
                 tablets.emplace_back(std::move(tablet_with_partition));
             }
         }
-        IndexChannel *index_channel;
-        if (_is_vectorized) {
-            index_channel = new VIndexChannel(this, index->index_id);
-        } else {
-            index_channel = new IndexChannel(this, index->index_id);
-        }
-        RETURN_IF_ERROR(index_channel->init(state, tablets));
-        _channels.emplace_back(index_channel);
+        _channels.emplace_back(new IndexChannel(this, index->index_id, _is_vectorized));
+        RETURN_IF_ERROR(_channels.back()->init(state, tablets));
     }
 
     return Status::OK();
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index a2edb73b19..55e1209226 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -33,6 +33,7 @@
 #include "exec/tablet_info.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "runtime/thread_context.h"
 #include "util/bitmap.h"
 #include "util/countdown_latch.h"
 #include "util/ref_count_closure.h"
@@ -90,18 +91,18 @@ struct AddBatchCounter {
 // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
 // Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted.
 template <typename T>
-class ReusableClosure : public google::protobuf::Closure {
+class ReusableClosure final: public google::protobuf::Closure {
 public:
     ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
-    ~ReusableClosure() {
+    ~ReusableClosure() override {
         // shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
         join();
     }
 
     static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
 
-    void addFailedHandler(std::function<void(bool)> fn) { failed_handler = fn; }
-    void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; }
+    void addFailedHandler(const std::function<void(bool)>& fn) { failed_handler = fn; }
+    void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { success_handler = fn; }
 
     void join() {
         // We rely on in_flight to assure one rpc is running,
@@ -181,7 +182,7 @@ public:
     virtual Status open_wait();
 
     Status add_row(Tuple* tuple, int64_t tablet_id);
-    virtual Status add_row(BlockRow& block_row, int64_t tablet_id) {
+    virtual Status add_row(const BlockRow& block_row, int64_t tablet_id) {
         LOG(FATAL) << "add block row to NodeChannel not supported";
         return Status::OK();
     }
@@ -326,18 +327,16 @@ private:
 
 class IndexChannel {
 public:
-    IndexChannel(OlapTableSink* parent, int64_t index_id) : _parent(parent), _index_id(index_id) {
+    IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) :
+        _parent(parent), _index_id(index_id), _is_vectorized(is_vec) {
         _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel");
     }
-    virtual ~IndexChannel();
+    ~IndexChannel() = default;
 
     Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets);
 
-    void add_row(Tuple* tuple, int64_t tablet_id);
-
-    virtual void add_row(BlockRow& block_row, int64_t tablet_id) {
-        LOG(FATAL) << "add block row to IndexChannel not supported";
-    }
+    template <typename Row>
+    void add_row(const Row& tuple, int64_t tablet_id);
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) {
@@ -355,13 +354,13 @@ public:
 
     size_t num_node_channels() const { return _node_channels.size(); }
 
-protected:
+private:
     friend class NodeChannel;
     friend class VNodeChannel;
 
-    bool _is_vectorized = false;
     OlapTableSink* _parent;
     int64_t _index_id;
+    bool _is_vectorized = false;
 
     // from backend channel to tablet_id
     // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
@@ -386,6 +385,21 @@ protected:
     std::shared_ptr<MemTracker> _index_channel_tracker; // TODO(zxy) use after
 };
 
+template <typename Row>
+void IndexChannel::add_row(const Row& tuple, int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
+    auto it = _channels_by_tablet.find(tablet_id);
+    DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
+    for (const auto& channel : it->second) {
+        // if this node channel is already failed, this add_row will be skipped
+        auto st = channel->add_row(tuple, tablet_id);
+        if (!st.ok()) {
+            mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id);
+            // continue add row to other node, the error will be checked for every batch outside
+        }
+    }
+}
+
 // Write data to Olap Table.
 // When OlapTableSink::open() called, there will be a consumer thread running in the background.
 // When you call OlapTableSink::send(), you will be the producer who products pending batches.
@@ -430,10 +444,8 @@ private:
 
 protected:
     friend class NodeChannel;
-    friend class IndexChannel;
-
     friend class VNodeChannel;
-    friend class VIndexChannel;
+    friend class IndexChannel;
 
     bool _is_vectorized = false;
 
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 60ea864282..2a0535b4d5 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -29,19 +29,20 @@
 
 namespace doris {
 
-OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) {
-    *writer = new DeltaWriter(req, StorageEngine::instance());
+OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, bool is_vec) {
+    *writer = new DeltaWriter(req, StorageEngine::instance(), is_vec);
     return OLAP_SUCCESS;
 }
 
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine)
+DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec)
         : _req(*req),
           _tablet(nullptr),
           _cur_rowset(nullptr),
           _rowset_writer(nullptr),
           _tablet_schema(nullptr),
           _delta_written_success(false),
-          _storage_engine(storage_engine) {}
+          _storage_engine(storage_engine),
+          _is_vec(is_vec) {}
 
 DeltaWriter::~DeltaWriter() {
     if (_is_init && !_delta_written_success) {
@@ -195,6 +196,40 @@ OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>&
     return OLAP_SUCCESS;
 }
 
+OLAPStatus DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs) {
+    if (UNLIKELY(row_idxs.empty())) {
+        return OLAP_SUCCESS;
+    }
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_is_init && !_is_cancelled) {
+        RETURN_NOT_OK(init());
+    }
+
+    if (_is_cancelled) {
+        return OLAP_ERR_ALREADY_CANCELLED;
+    }
+
+    int start = 0, end = 0;
+    const size_t num_rows = row_idxs.size();
+    for (; start < num_rows;) {
+        auto count = end + 1 - start;
+        if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
+            _mem_table->insert(block, row_idxs[start], count);
+            start += count;
+            end = start;
+        } else {
+            end++;
+        }
+    }
+
+    if (_mem_table->memory_usage() >= config::write_buffer_size) {
+        RETURN_NOT_OK(_flush_memtable_async());
+        _reset_mem_table();
+    }
+
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus DeltaWriter::_flush_memtable_async() {
     if (++_segment_counter > config::max_segment_num_per_rowset) {
         return OLAP_ERR_TOO_MANY_SEGMENTS;
@@ -252,7 +287,7 @@ OLAPStatus DeltaWriter::wait_flush() {
 void DeltaWriter::_reset_mem_table() {
     _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
                                   _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
-                                  _mem_tracker));
+                                  _mem_tracker, _is_vec));
 }
 
 OLAPStatus DeltaWriter::close() {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 9f17beed93..09171c945e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -54,17 +54,15 @@ struct WriteRequest {
 // This class is NOT thread-safe, external synchronization is required.
 class DeltaWriter {
 public:
-    static OLAPStatus open(WriteRequest* req, DeltaWriter** writer);
+    static OLAPStatus open(WriteRequest* req, DeltaWriter** writer, bool is_vec = false);
 
-    virtual ~DeltaWriter();
+    ~DeltaWriter();
 
     OLAPStatus init();
 
     OLAPStatus write(Tuple* tuple);
     OLAPStatus write(const RowBatch* row_batch, const std::vector<int>& row_idxs);
-    virtual OLAPStatus write_block(const vectorized::Block* block, const std::vector<int>& row_idxs) {
-        return OLAP_ERR_READER_INITIALIZE_ERROR;
-    }
+    OLAPStatus write(const vectorized::Block* block, const std::vector<int>& row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
     OLAPStatus close();
@@ -92,24 +90,23 @@ public:
 
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
-protected:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine);
+private:
+    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec);
 
     // push a full memtable to flush executor
     OLAPStatus _flush_memtable_async();
 
-private:
     void _garbage_collection();
 
-    virtual void _reset_mem_table();
+    void _reset_mem_table();
 
-protected:
     bool _is_init = false;
     bool _is_cancelled = false;
     WriteRequest _req;
     TabletSharedPtr _tablet;
     RowsetSharedPtr _cur_rowset;
     std::unique_ptr<RowsetWriter> _rowset_writer;
+    // TODO: Recheck the lifttime of _mem_table, Look only should use unique_ptr
     std::shared_ptr<MemTable> _mem_table;
     std::unique_ptr<Schema> _schema;
     const TabletSchema* _tablet_schema;
@@ -123,6 +120,9 @@ protected:
     int64_t _segment_counter = 0;
 
     std::mutex _lock;
+
+    // use in vectorized load
+    bool _is_vec;
 };
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2628b34007..be7d2fac86 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -19,12 +19,10 @@
 
 #include "common/logging.h"
 #include "olap/row.h"
-#include "olap/row_cursor.h"
 #include "olap/rowset/column_data_writer.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/schema.h"
 #include "runtime/tuple.h"
-#include "util/debug_util.h"
 #include "util/doris_metrics.h"
 #include "vec/core/field.h"
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
@@ -50,13 +48,14 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
           _is_first_insertion(true), 
           _agg_functions(schema->num_columns()),
           _mem_usage(0){
-    if (support_vec){
+    if (support_vec) {
         _skip_list = nullptr;
         _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
+        // TODO: Support ZOrderComparator in the future
         _vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(),
                                 _keys_type == KeysType::DUP_KEYS);
     }else{
-        _vec_skip_list =nullptr;
+        _vec_skip_list = nullptr;
         if (tablet_schema->sort_type() == SortType::ZORDER) {
             _row_comparator =
                     std::make_shared<TupleRowZOrderComparator>(_schema, tablet_schema->sort_col_num());
@@ -68,45 +67,29 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
     }
 }
 
-void MemTable::_init_agg_functions(const vectorized::Block* block)
-{
-    
-    for (uint32_t cid = _schema->num_key_columns(); 
-                cid < _schema->num_columns();
-                ++cid) {
-        FieldAggregationMethod agg_method =
-                _tablet_schema
-                        ->column(cid)
-                        .aggregation();
+void MemTable::_init_agg_functions(const vectorized::Block* block) {
+    for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
+        FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation();
         std::string agg_name =
                 TabletColumn::get_string_by_aggregation_type(agg_method) + vectorized::AGG_LOAD_SUFFIX;
-        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
-                        [](unsigned char c) { return std::tolower(c); });
+        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), [](unsigned char c) { return std::tolower(c); });
 
         // create aggregate function
-        vectorized::DataTypes argument_types;
-        vectorized::DataTypePtr dtptr = block->get_data_type(cid);//Schema::get_data_type_ptr(_schema->column(cid)->type());
-        argument_types.push_back(dtptr);
-        vectorized::Array params;
+        vectorized::DataTypes argument_types{block->get_data_type(cid)};
         vectorized::AggregateFunctionPtr function = vectorized::AggregateFunctionSimpleFactory::instance().get(
-                agg_name, argument_types, params,
-                dtptr->is_nullable());
+                agg_name, argument_types, {}, argument_types.back()->is_nullable());
 
         DCHECK(function != nullptr);
         _agg_functions[cid] = function;
     }
 }
+
 MemTable::~MemTable() {
-    if (_skip_list)
-        delete _skip_list;
-    if (_vec_skip_list)
-        delete _vec_skip_list;
-    for(auto row: rowInBlocks)
-    {
-        if (row != nullptr){
-            delete row;
-        }
-    }
+    delete _skip_list;
+    delete _vec_skip_list;
+
+    std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(),
+            std::default_delete<RowInBlock>());
     _mem_tracker->release(_mem_usage);
 }
 
@@ -124,10 +107,8 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, const Row
                             *_pblock, -1); 
 }
 
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows)
-{
-    if (_is_first_insertion)
-    {
+void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+    if (_is_first_insertion) {
         _is_first_insertion = false;
         auto cloneBlock = block->clone_without_columns();
         _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
@@ -145,38 +126,34 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
     _mem_tracker->consume(newsize - oldsize);
 
     for(int i = 0; i < num_rows; i++){       
-        RowInBlock* row_in_block_ptr = new RowInBlock(cursor_in_mutableblock + i);
-        rowInBlocks.push_back(row_in_block_ptr);
-        insert_one_row_from_block(row_in_block_ptr);
+        _row_in_blocks.emplace_back(new RowInBlock{cursor_in_mutableblock + i});
+        _insert_one_row_from_block(_row_in_blocks.back());
     }   
 }
 
-void MemTable::insert_one_row_from_block(RowInBlock* row_in_block_ptr)
-{
+void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
     _rows++;
     bool overwritten = false;
-    if (_keys_type == KeysType::DUP_KEYS)
-    {
-        _vec_skip_list->Insert(row_in_block_ptr, &overwritten);
+    if (_keys_type == KeysType::DUP_KEYS) {
+        // TODO: dup keys only need sort opertaion. Rethink skiplist is the beat way to sort columns?
+        _vec_skip_list->Insert(row_in_block, &overwritten);
         DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
         return;
     }
-    bool is_exist = _vec_skip_list->Find(row_in_block_ptr, &_vec_hint);
+
+    bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint);
     if (is_exist){
-        _aggregate_two_rowInBlock(row_in_block_ptr, _vec_hint.curr->key);
-    }else{
-        row_in_block_ptr->init_agg_places(_agg_functions, _schema->num_key_columns());
-        for ( auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++){
+        _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key);
+    } else {
+        row_in_block->init_agg_places(_agg_functions, _schema->num_key_columns());
+        for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++){
             auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
-            auto place = row_in_block_ptr->_agg_places[cid];
-            _agg_functions[cid]->add(place, 
-                    const_cast<const doris::vectorized::IColumn**>( &col_ptr),
-                    row_in_block_ptr->_row_pos,
-                    nullptr
-                    );
+            auto place = row_in_block->_agg_places[cid];
+            _agg_functions[cid]->add(place, const_cast<const doris::vectorized::IColumn**>(&col_ptr),
+                                     row_in_block->_row_pos, nullptr);
         }
         
-        _vec_skip_list->InsertWithHint(row_in_block_ptr, is_exist, &_vec_hint);
+        _vec_skip_list->InsertWithHint(row_in_block, is_exist, &_vec_hint);
     }
 }
 
@@ -239,51 +216,40 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_
     }
 }
 
-void MemTable::_aggregate_two_rowInBlock(RowInBlock* new_row, RowInBlock* row_in_skiplist){
-    if (_tablet_schema->has_sequence_col())
-    {
+void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist){
+    if (_tablet_schema->has_sequence_col()) {
         auto sequence_idx = _tablet_schema->sequence_col_idx();
         auto res = _input_mutable_block.compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, sequence_idx, _input_mutable_block, -1);
         // dst sequence column larger than src, don't need to update
         if (res > 0){
-            return ;
+            return;
         }
     }
-    //dst is non-sequence row, or dst sequence is smaller
-    for (uint32_t cid = _schema->num_key_columns(); 
-                    cid < _schema->num_columns();
-                    ++cid) 
-    {
+    // dst is non-sequence row, or dst sequence is smaller
+    for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
         auto place = row_in_skiplist->_agg_places[cid];
-
         auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
-
-        _agg_functions[cid]->add(place, 
-                const_cast<const doris::vectorized::IColumn**>( &col_ptr),
-                new_row->_row_pos,
-                nullptr
-                );
+        _agg_functions[cid]->add(place, const_cast<const doris::vectorized::IColumn**>(&col_ptr),
+                new_row->_row_pos, nullptr);
     }   
     
 }
-vectorized::Block MemTable::collect_skiplist_results()
-{
+vectorized::Block MemTable::_collect_vskiplist_results() {
     VecTable::Iterator it(_vec_skip_list);
     vectorized::Block in_block = _input_mutable_block.to_block();
+    // TODO: should try to insert data by column, not by row. to opt the the code
     if (_keys_type == KeysType::DUP_KEYS){
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
             _output_mutable_block.add_row(&in_block, it.key()->_row_pos);
         }
-    }else{
+    } else {
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
-            
             auto& block_data = in_block.get_columns_with_type_and_name();
-            //move key columns
+            // move key columns
             for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
                 _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), it.key()->_row_pos);
             }
-            //get value columns from agg_places
-            
+            // get value columns from agg_places
             for (size_t i = _schema->num_key_columns(); i < _schema->num_columns(); ++i) {
                 auto function = _agg_functions[i];
                 function->insert_result_into(it.key()->_agg_places[i] , *(_output_mutable_block.get_column_by_position(i)));
@@ -294,43 +260,11 @@ vectorized::Block MemTable::collect_skiplist_results()
     return _output_mutable_block.to_block();
 }
 
-void dump(const vectorized::Block& block, int64_t tablet_id) {
-    std::ofstream out;
-    std::string file_name("/home/englefly/stream_load_test/dump.txt");
-    file_name += std::to_string(tablet_id);
-    out.open(file_name);
-    for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
-        for (size_t i = 0; i < block.columns(); ++i) {
-            if (block.get_by_position(i).column) {
-                out << block.get_by_position(i).to_string(row_num);
-            }
-            if (i != block.columns() - 1) {
-                out << ", ";
-            }
-        }
-        out << "\n";
-    }
-    out.close();
-}
-
-OLAPStatus MemTable::_vflush(){
-    //skip empty tablet
-    if (_rows == 0)
-    {
-        return OLAP_SUCCESS;
-    }
+OLAPStatus MemTable::flush() {
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
-    size_t _flush_size = 0;
     int64_t duration_ns = 0;
-    {
-        SCOPED_RAW_TIMER(&duration_ns);
-        vectorized::Block block = collect_skiplist_results();
-        OLAPStatus st = _rowset_writer->add_block(&block);
-        RETURN_NOT_OK(st);
-        _flush_size = block.allocated_bytes();
-        _rowset_writer->flush();
-    }
+    RETURN_NOT_OK(_do_flush(duration_ns));
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
     VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
@@ -338,16 +272,9 @@ OLAPStatus MemTable::_vflush(){
     return OLAP_SUCCESS;
 }
 
-OLAPStatus MemTable::flush() {
-    if (_vec_skip_list) {
-        return _vflush();
-    }
-
-    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
-                  << ", memsize: " << memory_usage() << ", rows: " << _rows;
-    int64_t duration_ns = 0;
-    {
-        SCOPED_RAW_TIMER(&duration_ns);
+OLAPStatus MemTable::_do_flush(int64_t& duration_ns) {
+    SCOPED_RAW_TIMER(&duration_ns);
+    if (_skip_list) {
         OLAPStatus st = _rowset_writer->flush_single_memtable(this, &_flush_size);
         if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) {
             // For alpha rowset, we do not implement "flush_single_memtable".
@@ -363,14 +290,16 @@ OLAPStatus MemTable::flush() {
         } else {
             RETURN_NOT_OK(st);
         }
+    } else {
+        vectorized::Block block = _collect_vskiplist_results();
+        RETURN_NOT_OK(_rowset_writer->add_block(&block));
+        _flush_size = block.allocated_bytes();
+        RETURN_NOT_OK(_rowset_writer->flush());
     }
-    DorisMetrics::instance()->memtable_flush_total->increment(1);
-    DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
-    VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
-                  << ", flushsize: " << _flush_size;
     return OLAP_SUCCESS;
 }
 
+
 OLAPStatus MemTable::close() {
     return flush();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c5f519eeeb..393b4876b4 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -38,22 +38,22 @@ class SlotDescriptor;
 class TabletSchema;
 class Tuple;
 class TupleDescriptor;
+
 class MemTable {
 public:
-   
     MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
              KeysType keys_type, RowsetWriter* rowset_writer,
              const std::shared_ptr<MemTracker>& parent_tracker,
-             bool support_vec=false);
+             bool support_vec = false);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
-    std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
+    std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
     
     void insert(const Tuple* tuple);
-    //insert tuple from (row_pos) to (row_pos+num_rows)
+    // insert tuple from (row_pos) to (row_pos+num_rows)
     void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
     
     /// Flush
@@ -63,31 +63,30 @@ public:
     int64_t flush_size() const { return _flush_size; }
 
 private:
-    //flush for vectorized
-    OLAPStatus _vflush();
+    OLAPStatus _do_flush(int64_t& duration_ns);
 
     class RowCursorComparator : public RowComparator {
     public:
         RowCursorComparator(const Schema* schema);
-        virtual int operator()(const char* left, const char* right) const;
+        int operator()(const char* left, const char* right) const;
 
     private:
         const Schema* _schema;
     };
 
-    //row pos in _input_mutable_block
-    struct RowInBlock{
+    // row pos in _input_mutable_block
+    struct RowInBlock {
         size_t _row_pos;
         std::vector<vectorized::AggregateDataPtr> _agg_places;
-        RowInBlock(size_t i):_row_pos(i) {}
+        explicit RowInBlock(size_t i) : _row_pos(i) {}
+
         void init_agg_places(std::vector<vectorized::AggregateFunctionPtr>& agg_functions,
-                            int key_column_count){
+                            int key_column_count) {
             _agg_places.resize(agg_functions.size());
-            for(int cid = 0; cid < agg_functions.size(); cid++)
-            {
+            for(int cid = 0; cid < agg_functions.size(); cid++) {
                 if (cid < key_column_count) {
                     _agg_places[cid] = nullptr;
-                }else{
+                } else {
                     auto function = agg_functions[cid];
                     size_t place_size = function->size_of_data();
                     _agg_places[cid] = new char[place_size];
@@ -96,25 +95,20 @@ private:
             }
         }
 
-        RowCursorCell cell(vectorized::MutableBlock* block, int cid){
-            StringRef ref = block->mutable_columns()[cid]->get_data_at(_row_pos);
-            bool is_null = block->mutable_columns()[cid]->is_null_at(_row_pos);
-            NullState null_state = is_null ? NullState::IS_NULL : NullState::NOT_NULL;
-            return RowCursorCell(ref.data, null_state);
-        }
-
         ~RowInBlock() {
             for (auto agg_place : _agg_places) {
                 delete [] agg_place;
             }
         }
     };
+
     class RowInBlockComparator {
     public:
-        RowInBlockComparator(const Schema* schema):_schema(schema){};
-        //call set_block before operator().
-        //在第一次insert block时创建的 _input_mutable_block, 所以无法在Comparator的构造函数中获得pblock
-        void set_block(vectorized::MutableBlock* pblock){_pblock = pblock;}
+        RowInBlockComparator(const Schema* schema) : _schema(schema) {};
+        // call set_block before operator().
+        // only first time insert block to create _input_mutable_block,
+        // so can not Comparator of construct to set pblock
+        void set_block(vectorized::MutableBlock* pblock) {_pblock = pblock;}
         int operator()(const RowInBlock* left, const RowInBlock* right) const;
     private:
         const Schema* _schema;
@@ -124,7 +118,6 @@ private:
 private:
     typedef SkipList<char*, RowComparator> Table;
     typedef Table::key_type TableKey;
-
     typedef SkipList<RowInBlock*, RowInBlockComparator> VecTable;
 
 public:
@@ -133,7 +126,7 @@ public:
     class Iterator {
     public:
         Iterator(MemTable* mem_table);
-        ~Iterator() {}
+        ~Iterator() = default;
 
         void seek_to_first();
         bool valid();
@@ -149,9 +142,9 @@ public:
 private:
     void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
     void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
-    //for vectorized
-    void insert_one_row_from_block(RowInBlock* row_in_block);
-    void _aggregate_two_rowInBlock(RowInBlock* new_row, RowInBlock* row_in_skiplist);
+    // for vectorized
+    void _insert_one_row_from_block(RowInBlock* row_in_block);
+    void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
 
     int64_t _tablet_id;
     Schema* _schema;
@@ -160,6 +153,7 @@ private:
     const std::vector<SlotDescriptor*>* _slot_descs;
     KeysType _keys_type;
 
+    // TODO: change to unique_ptr of comparator
     std::shared_ptr<RowComparator> _row_comparator;
     
     std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
@@ -197,12 +191,12 @@ private:
     //for vectorized 
     vectorized::MutableBlock _input_mutable_block;
     vectorized::MutableBlock _output_mutable_block;
-    vectorized::Block collect_skiplist_results();
+    vectorized::Block _collect_vskiplist_results();
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
-    std::vector<RowInBlock*> rowInBlocks;
+    std::vector<RowInBlock*> _row_in_blocks;
     size_t _mem_usage;
 }; // class MemTable
 
diff --git a/be/src/olap/row_cursor_cell.h b/be/src/olap/row_cursor_cell.h
index faaa65b33b..10ef938fce 100644
--- a/be/src/olap/row_cursor_cell.h
+++ b/be/src/olap/row_cursor_cell.h
@@ -19,62 +19,18 @@
 
 namespace doris {
 
-enum class NullState {
-    UNKNOWN = 0, 
-    IS_NULL = 1,
-    NOT_NULL = 2
-};
 struct RowCursorCell {
-    
-    RowCursorCell(void* ptr) : _ptr(ptr), _null_state(NullState::UNKNOWN) {}
-    RowCursorCell(const void* ptr) : _ptr((void*)ptr), _null_state(NullState::UNKNOWN) {}
-    RowCursorCell(void* ptr, NullState null_state) : _ptr((void*)ptr), _null_state(null_state) {}
-    RowCursorCell(const void* ptr, NullState null_state) : _ptr((void*)ptr), _null_state(null_state) {}
-    bool is_null() const { 
-        return _null_state == NullState::UNKNOWN ? *reinterpret_cast<bool*>(_ptr) : _null_state == NullState::IS_NULL; 
-    }
-    void set_is_null(bool is_null) { 
-        if (_null_state == NullState::UNKNOWN)
-            *reinterpret_cast<bool*>(_ptr) = is_null;
-        else{
-            _null_state = (is_null ? NullState::IS_NULL : NullState::NOT_NULL);
-        } 
-    }
-    void set_null(){ 
-        if (_null_state == NullState::UNKNOWN){
-            *reinterpret_cast<bool*>(_ptr) = true;
-        }else{ 
-            _null_state = NullState::IS_NULL; 
-        }
-    }
-    void set_not_null(){ 
-        if (_null_state == NullState::UNKNOWN){
-            *reinterpret_cast<bool*>(_ptr) = false;
-        }else{ 
-            _null_state = NullState::IS_NULL; 
-        }
-    }
-    const void* cell_ptr() const { 
-        if (_null_state == NullState::UNKNOWN){ 
-            return (char*)_ptr + 1; 
-        }else{ 
-            return (char*)_ptr; 
-        }
-    }
-    void* mutable_cell_ptr() const { 
-        if (_null_state == NullState::UNKNOWN){ 
-            return (char*)_ptr + 1; 
-        }else{ 
-            return (char*)_ptr; 
-        }
-    }
+    RowCursorCell(void* ptr) : _ptr(ptr) {}
+    RowCursorCell(const void* ptr) : _ptr((void*)ptr) {}
+    bool is_null() const { return *reinterpret_cast<bool*>(_ptr); }
+    void set_is_null(bool is_null) const { *reinterpret_cast<bool*>(_ptr) = is_null; }
+    void set_null() const { *reinterpret_cast<bool*>(_ptr) = true; }
+    void set_not_null() const { *reinterpret_cast<bool*>(_ptr) = false; }
+    const void* cell_ptr() const { return (char*)_ptr + 1; }
+    void* mutable_cell_ptr() const { return (char*)_ptr + 1; }
+
 private:
     void* _ptr;
-    /**
-     * @brief if _null_state is UNKNOWN, the null flag is the first char of ptr
-     * 
-     */
-    NullState _null_state; 
 };
 
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 260666552e..46e1e74288 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -38,10 +38,6 @@
 
 namespace doris {
 
-// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context
-const uint32_t MAX_SEGMENT_SIZE = static_cast<uint32_t>(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE *
-                                                        OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE);
-
 BetaRowsetWriter::BetaRowsetWriter()
         : _rowset_meta(nullptr),
           _num_segment(0),
@@ -110,61 +106,27 @@ OLAPStatus BetaRowsetWriter::add_block(const vectorized::Block* block) {
     size_t block_row_num = block->rows();
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
     size_t row_offset = 0;
-    int64_t segment_capacity_in_bytes = 0;
-    int64_t segment_capacity_in_rows = 0;
-    auto refresh_segment_capacity = [&]() {
-        segment_capacity_in_bytes =
-                (int64_t)MAX_SEGMENT_SIZE - (int64_t)_segment_writer->estimate_segment_size();
-        segment_capacity_in_rows = (int64_t)_context.max_rows_per_segment -
-                                   (int64_t)_segment_writer->num_rows_written();
-    };
-
-    refresh_segment_capacity();
-    if (UNLIKELY(segment_capacity_in_bytes < row_avg_size_in_bytes ||
-                      segment_capacity_in_rows <= 0)) {
-        // no space for another signle row, need flush now
-        RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
-        RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
-        refresh_segment_capacity();
-    }
 
-    assert(segment_capacity_in_bytes > row_avg_size_in_bytes && segment_capacity_in_rows > 0);
-    if (block_size_in_bytes > segment_capacity_in_bytes ||
-        block_row_num > segment_capacity_in_rows) {
-        size_t segment_max_row_num;
-        size_t input_row_num;
-        do {
-            assert(row_offset < block_row_num);
-            segment_max_row_num =
-                    std::min((size_t)segment_capacity_in_bytes / row_avg_size_in_bytes,
-                             (size_t)segment_capacity_in_rows);
-            input_row_num = std::min(segment_max_row_num, block_row_num - row_offset);
-            assert(input_row_num > 0);
-            auto s = _segment_writer->append_block(block, row_offset, input_row_num);
-            if (UNLIKELY(!s.ok())) {
-                LOG(WARNING) << "failed to append block: " << s.to_string();
-                return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
-            }
+    do {
+        auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
+        if (UNLIKELY(max_row_add < 1)) {
+            // no space for another signle row, need flush now
+            RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
+            RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
+            max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes);
+            DCHECK(max_row_add > 0);
+        }
 
-            refresh_segment_capacity();
-            if (LIKELY(segment_capacity_in_bytes < row_avg_size_in_bytes ||
-                segment_capacity_in_rows <= 0)) {
-                RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
-                RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
-                refresh_segment_capacity();
-            }
-            row_offset += input_row_num;
-            _num_rows_written += input_row_num;
-        } while (row_offset < block_row_num);
-    } else {
-        auto s = _segment_writer->append_block(block, 0, block_row_num);
+        size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
+        auto s = _segment_writer->append_block(block, row_offset, input_row_num);
         if (UNLIKELY(!s.ok())) {
             LOG(WARNING) << "failed to append block: " << s.to_string();
             return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
         }
-        refresh_segment_capacity();
-        _num_rows_written += block_row_num;
-    }
+        row_offset += input_row_num;
+    } while (row_offset < block_row_num);
+
+    _num_rows_written += block_row_num;
     return OLAP_SUCCESS;
 }
 
@@ -305,7 +267,7 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::
     DCHECK(wblock != nullptr);
     segment_v2::SegmentWriterOptions writer_options;
     writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment, _context.tablet_schema,
-                                                _context.data_dir, writer_options));
+                                                _context.data_dir, _context.max_rows_per_segment, writer_options));
     {
         std::lock_guard<SpinLock> l(_lock);
         _wblocks.push_back(std::move(wblock));
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 08d23865ea..5eebb30a88 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -38,11 +38,12 @@ const char* k_segment_magic = "D0R1";
 const uint32_t k_segment_magic_length = 4;
 
 SegmentWriter::SegmentWriter(fs::WritableBlock* wblock, uint32_t segment_id,
-                             const TabletSchema* tablet_schema,
-                             DataDir* data_dir, const SegmentWriterOptions& opts)
+                             const TabletSchema* tablet_schema, DataDir* data_dir,
+                             uint32_t max_row_per_segment, const SegmentWriterOptions& opts)
         : _segment_id(segment_id),
           _tablet_schema(tablet_schema),
           _data_dir(data_dir),
+          _max_row_per_segment(max_row_per_segment),
           _opts(opts),
           _wblock(wblock),
           _mem_tracker(
@@ -156,6 +157,14 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
     return Status::OK();
 }
 
+int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
+    int64_t size_rows = ((int64_t)MAX_SEGMENT_SIZE - (int64_t)estimate_segment_size()) / row_avg_size_in_bytes;
+    int64_t count_rows = (int64_t)_max_row_per_segment - _row_count;
+
+    return std::min(size_rows, count_rows);
+}
+
+
 std::string SegmentWriter::encode_short_keys(
         const std::vector<const void*> key_column_fields, bool null_first) {
     size_t num_key_columns = _tablet_schema->num_short_key_columns();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index 45f2850d62..cc047e19c9 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -30,6 +30,9 @@
 
 namespace doris {
 
+// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context
+const uint32_t MAX_SEGMENT_SIZE = static_cast<uint32_t>(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE *
+                                                        OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE);
 class DataDir;
 class MemTracker;
 class RowBlock;
@@ -58,7 +61,7 @@ class SegmentWriter {
 public:
     explicit SegmentWriter(fs::WritableBlock* block, uint32_t segment_id,
                            const TabletSchema* tablet_schema,
-                           DataDir* data_dir,
+                           DataDir* data_dir, uint32_t max_row_per_segment,
                            const SegmentWriterOptions& opts);
     ~SegmentWriter();
 
@@ -69,6 +72,8 @@ public:
 
     Status append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows);
 
+    int64_t max_row_to_add(size_t row_avg_size_in_bytes);
+
     uint64_t estimate_segment_size();
 
     uint32_t num_rows_written() { return _row_count; }
@@ -95,6 +100,7 @@ private:
     uint32_t _segment_id;
     const TabletSchema* _tablet_schema;
     DataDir* _data_dir;
+    uint32_t _max_row_per_segment;
     SegmentWriterOptions _opts;
 
     // Not owned. owned by RowsetWriter
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a0964afbcf..0d483eaa4c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -65,7 +65,6 @@
 #include "util/priority_thread_pool.hpp"
 #include "util/priority_work_stealing_thread_pool.hpp"
 #include "vec/runtime/vdata_stream_mgr.h"
-#include "vec/runtime/vload_channel_mgr.h"
 
 namespace doris {
 
@@ -143,11 +142,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _tmp_file_mgr = new TmpFileMgr(this);
     _bfd_parser = BfdParser::create();
     _broker_mgr = new BrokerMgr(this);
-    if (config::enable_vectorized_load) {
-        _load_channel_mgr = new vectorized::VLoadChannelMgr();
-    } else {
-        _load_channel_mgr = new LoadChannelMgr();
-    }
+    _load_channel_mgr = new LoadChannelMgr();
     _load_stream_mgr = new LoadStreamMgr();
     _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
     _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 59506ad879..eb8756c81a 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,9 +25,9 @@
 namespace doris {
 
 LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                         bool is_high_priority, const std::string& sender_ip)
+                         bool is_high_priority, const std::string& sender_ip, bool is_vec)
         : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority),
-          _sender_ip(sender_ip) {
+          _sender_ip(sender_ip), _is_vec(is_vec) {
     _mem_tracker = MemTracker::create_tracker(
             mem_limit, "LoadChannel:" + _load_id.to_string(), nullptr, MemTrackerLevel::TASK);
     // _last_updated_time should be set before being inserted to
@@ -39,7 +39,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim
 LoadChannel::~LoadChannel() {
     LOG(INFO) << "load channel removed. mem peak usage=" << _mem_tracker->peak_consumption()
               << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id
-              << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip;
+              << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip << ", is_vec=" << _is_vec;
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
@@ -54,7 +54,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _is_high_priority));
+            channel.reset(new TabletsChannel(key, _is_high_priority, _is_vec));
             _tablets_channels.insert({index_id, channel});
         }
     }
@@ -85,37 +85,6 @@ Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe
     return Status::OK();
 }
 
-Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
-                              PTabletWriterAddBatchResult* response) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
-    int64_t index_id = request.index_id();
-    // 1. get tablets channel
-    std::shared_ptr<TabletsChannel> channel;
-    bool is_finished;
-    Status st = _get_tablets_channel(channel, is_finished, index_id);
-    if (!st.ok() || is_finished) {
-        return st;
-    }
-
-    // 2. check if mem consumption exceed limit
-    handle_mem_exceed_limit(false);
-
-    // 3. add batch to tablets channel
-    if (request.has_row_batch()) {
-        RETURN_IF_ERROR(channel->add_batch(request, response));
-    }
-
-    // 4. handle eos
-    if (request.has_eos() && request.eos()) {
-        st = _handle_eos(channel, request, response);
-        if (!st.ok()) {
-            return st;
-        }
-    }
-    _last_updated_time.store(time(nullptr));
-    return st;
-}
-
 void LoadChannel::handle_mem_exceed_limit(bool force) {
     // lock so that only one thread can check mem limit
     std::lock_guard<std::mutex> l(_lock);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4b4708679d..37ee8453c9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -28,6 +28,7 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/tablets_channel.h"
+#include "runtime/thread_context.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -39,20 +40,16 @@ class Cache;
 class LoadChannel {
 public:
     LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                bool is_high_priority, const std::string& sender_ip);
-    virtual ~LoadChannel();
+                bool is_high_priority, const std::string& sender_ip, bool is_vec);
+    ~LoadChannel();
 
     // open a new load channel if not exist
-    virtual Status open(const PTabletWriterOpenRequest& request);
+    Status open(const PTabletWriterOpenRequest& request);
 
     // this batch must belong to a index in one transaction
-    Status add_batch(const PTabletWriterAddBatchRequest& request,
-                     PTabletWriterAddBatchResult* response);
-
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) {
-        return Status::NotSupported("Not Implemented add_block");
-    }
+    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+    Status add_batch(const TabletWriterAddRequest& request,
+                     TabletWriterAddResult* response);
 
     // return true if this load channel has been opened and all tablets channels are closed then.
     bool is_finished();
@@ -98,7 +95,7 @@ protected:
     }
 
 
-protected:
+private:
     // when mem consumption exceeds limit, should call this method to find the channel
     // that consumes the largest memory(, and then we can reduce its memory usage).
     bool _find_largest_consumption_channel(std::shared_ptr<TabletsChannel>* channel);
@@ -127,8 +124,49 @@ protected:
 
     // the ip where tablet sink locate
     std::string _sender_ip = "";
+
+    // true if this load is vectorized
+    bool _is_vec = false;
 };
 
+template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
+                              TabletWriterAddResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    int64_t index_id = request.index_id();
+    // 1. get tablets channel
+    std::shared_ptr<TabletsChannel> channel;
+    bool is_finished;
+    Status st = _get_tablets_channel(channel, is_finished, index_id);
+    if (!st.ok() || is_finished) {
+        return st;
+    }
+
+    // 2. check if mem consumption exceed limit
+    handle_mem_exceed_limit(false);
+
+    // 3. add batch to tablets channel
+    if constexpr (std::is_same_v<TabletWriterAddRequest, PTabletWriterAddBatchRequest>) {
+        if (request.has_row_batch()) {
+            RETURN_IF_ERROR(channel->add_batch(request, response));
+        }
+    } else {
+        if (request.has_block()) {
+            RETURN_IF_ERROR(channel->add_batch(request, response));
+        }
+    }
+
+    // 4. handle eos
+    if (request.has_eos() && request.eos()) {
+        st = _handle_eos(channel, request, response);
+        if (!st.ok()) {
+            return st;
+        }
+    }
+    _last_updated_time.store(time(nullptr));
+    return st;
+}
+
 inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) {
     os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption()
         << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time())
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index da7b950359..35f7c3b82f 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -94,10 +94,9 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
     return Status::OK();
 }
 
-LoadChannel* 
-LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                                     bool is_high_priority, const std::string& sender_ip) {
-    return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip);
+LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
+                                     bool is_high_priority, const std::string& sender_ip, bool is_vec) {
+    return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip, is_vec);
 }
 
 Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
@@ -121,7 +120,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
 
             bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority());
             channel.reset(_create_load_channel(load_id, job_max_memory, job_timeout_s, is_high_priority,
-                                          params.sender_ip()));
+                                          params.sender_ip(), params.is_vectorized()));
             _load_channels.insert({load_id, channel});
         }
     }
@@ -144,37 +143,6 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) {
     VLOG_CRITICAL << "removed load channel " << load_id;
 }
 
-Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
-                                 PTabletWriterAddBatchResult* response) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
-    UniqueId load_id(request.id());
-    // 1. get load channel
-    std::shared_ptr<LoadChannel> channel;
-    bool is_eof;
-    auto status = _get_load_channel(channel, is_eof, load_id, request);
-    if (!status.ok() || is_eof) {
-        return status;
-    }
-
-    if (!channel->is_high_priority()) {
-        // 2. check if mem consumption exceed limit
-        // If this is a high priority load task, do not handle this.
-        // because this may block for a while, which may lead to rpc timeout.
-        _handle_mem_exceed_limit();
-    }
-
-    // 3. add batch to load channel
-    // batch may not exist in request(eg: eos request without batch),
-    // this case will be handled in load channel's add batch method.
-    RETURN_IF_ERROR(channel->add_batch(request, response));
-
-    // 4. handle finish
-    if (channel->is_finished()) {
-        _finish_load_channel(load_id);
-    }
-    return Status::OK();
-}
-
 void LoadChannelMgr::_handle_mem_exceed_limit() {
     // lock so that only one thread can check mem limit
     std::lock_guard<std::mutex> l(_lock);
@@ -220,7 +188,7 @@ Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
         }
     }
 
-    if (cancelled_channel.get() != nullptr) {
+    if (cancelled_channel != nullptr) {
         cancelled_channel->cancel();
         LOG(INFO) << "load channel has been cancelled: " << load_id;
     }
diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h
index 2bb982cb08..292f3776e8 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -28,7 +28,9 @@
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/ref_counted.h"
+#include "runtime/load_channel.h"
 #include "runtime/tablets_channel.h"
+#include "runtime/thread_context.h"
 #include "util/countdown_latch.h"
 #include "util/thread.h"
 #include "util/uid_util.h"
@@ -37,60 +39,35 @@
 namespace doris {
 
 class Cache;
-class LoadChannel;
 
 // LoadChannelMgr -> LoadChannel -> TabletsChannel -> DeltaWriter
 // All dispatched load data for this backend is routed from this class
 class LoadChannelMgr {
 public:
     LoadChannelMgr();
-    virtual ~LoadChannelMgr();
+    ~LoadChannelMgr();
 
     Status init(int64_t process_mem_limit);
 
     // open a new load channel if not exist
     Status open(const PTabletWriterOpenRequest& request);
 
-    Status add_batch(const PTabletWriterAddBatchRequest& request,
-                     PTabletWriterAddBatchResult* response);
-    
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) {
-        return Status::NotSupported("Not Implemented add_block");
-    }
+    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+    Status add_batch(const TabletWriterAddRequest& request,
+                     TabletWriterAddResult* response);
 
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
 
-protected:
-    virtual LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                                              bool is_high_priority, const std::string& sender_ip);
+private:
+    static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
+                                              bool is_high_priority, const std::string& sender_ip, bool is_vec);
 
     template<typename Request>
-    Status _get_load_channel(std::shared_ptr<LoadChannel>& channel,
-                             bool& is_eof,
-                             const UniqueId load_id,
-                             const Request& request) {
-        is_eof = false;
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _load_channels.find(load_id);
-        if (it == _load_channels.end()) {
-            auto handle = _last_success_channel->lookup(load_id.to_string());
-            // success only when eos be true
-            if (handle != nullptr) {
-                _last_success_channel->release(handle);
-                if (request.has_eos() && request.eos()) {
-                    is_eof = true;
-                    return Status::OK();
-                }
-            }
-            return Status::InternalError(strings::Substitute(
-                    "fail to add batch in load channel. unknown load_id=$0", load_id.to_string()));
-        }
-        channel = it->second;
-        return Status::OK();
-    }
-    void _finish_load_channel(const UniqueId load_id);
+    Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
+                             const UniqueId& load_id, const Request& request);
+
+    void _finish_load_channel(UniqueId load_id);
     // check if the total load mem consumption exceeds limit.
     // If yes, it will pick a load channel to try to reduce memory consumption.
     void _handle_mem_exceed_limit();
@@ -113,4 +90,62 @@ protected:
     Status _start_load_channels_clean();
 };
 
+template<typename Request>
+Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
+                         bool& is_eof,
+                         const UniqueId& load_id,
+                         const Request& request) {
+    is_eof = false;
+    std::lock_guard<std::mutex> l(_lock);
+    auto it = _load_channels.find(load_id);
+    if (it == _load_channels.end()) {
+        auto handle = _last_success_channel->lookup(load_id.to_string());
+        // success only when eos be true
+        if (handle != nullptr) {
+            _last_success_channel->release(handle);
+            if (request.has_eos() && request.eos()) {
+                is_eof = true;
+                return Status::OK();
+            }
+        }
+        return Status::InternalError(strings::Substitute(
+                "fail to add batch in load channel. unknown load_id=$0", load_id.to_string()));
+    }
+    channel = it->second;
+    return Status::OK();
+}
+
+template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request,
+                                 TabletWriterAddResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    UniqueId load_id(request.id());
+    // 1. get load channel
+    std::shared_ptr<LoadChannel> channel;
+    bool is_eof;
+    auto status = _get_load_channel(channel, is_eof, load_id, request);
+    if (!status.ok() || is_eof) {
+        return status;
+    }
+
+    if (!channel->is_high_priority()) {
+        // 2. check if mem consumption exceed limit
+        // If this is a high priority load task, do not handle this.
+        // because this may block for a while, which may lead to rpc timeout.
+        _handle_mem_exceed_limit();
+    }
+
+    // 3. add batch to load channel
+    // batch may not exist in request(eg: eos request without batch),
+    // this case will be handled in load channel's add batch method.
+    RETURN_IF_ERROR(channel->add_batch(request, response));
+
+    // 4. handle finish
+    if (channel->is_finished()) {
+        _finish_load_channel(load_id);
+    }
+    return Status::OK();
+}
+
+
 } // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 3b22653e50..6f6db9d112 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -18,7 +18,6 @@
 #include "runtime/tablets_channel.h"
 
 #include "exec/tablet_info.h"
-#include "olap/delta_writer.h"
 #include "olap/memtable.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
@@ -31,8 +30,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
 
 std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority)
-        : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority) {
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec)
+        : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority), _is_vec(is_vec) {
     _mem_tracker = MemTracker::create_tracker(-1, "TabletsChannel:" + std::to_string(key.index_id));
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
@@ -75,69 +74,6 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
     return Status::OK();
 }
 
-Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
-        PTabletWriterAddBatchResult* response) {
-    DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
-    int64_t cur_seq = 0;
-    
-    auto status = _get_current_seq(cur_seq, request);
-    if (UNLIKELY(!status.ok())) {
-        return status;
-    }
-
-    if (request.packet_seq() < cur_seq) {
-        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
-            << ", recept_seq=" << request.packet_seq();
-        return Status::OK();
-    }
-
-    RowBatch row_batch(*_row_desc, request.row_batch());
-    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs;
-    for (int i = 0; i < request.tablet_ids_size(); ++i) {
-        int64_t tablet_id = request.tablet_ids(i);
-        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
-            // skip broken tablets
-            continue;
-        }
-        auto it = tablet_to_rowidxs.find(tablet_id);
-        if (it == tablet_to_rowidxs.end()) {
-            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i });
-        } else {
-            it->second.emplace_back(i);
-        }
-    }
-
-    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors(); 
-    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
-        if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError(
-                    strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first));
-        }
-
-        OLAPStatus st = tablet_writer_it->second->write(&row_batch, tablet_to_rowidxs_it.second);
-        if (st != OLAP_SUCCESS) {
-            auto err_msg = strings::Substitute(
-                    "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2",
-                    tablet_to_rowidxs_it.first, _txn_id, st);
-            LOG(WARNING) << err_msg;
-            PTabletError* error = tablet_errors->Add();
-            error->set_tablet_id(tablet_to_rowidxs_it.first);
-            error->set_msg(err_msg);
-            _broken_tablets.insert(tablet_to_rowidxs_it.first);
-            // continue write to other tablet.
-            // the error will return back to sender.
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _next_seqs[request.sender_id()] = cur_seq + 1;
-    }
-    return Status::OK();
-}
-
 Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
                              const google::protobuf::RepeatedField<int64_t>& partition_ids,
                              google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
@@ -278,7 +214,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
         wrequest.is_high_priority = _is_high_priority;
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer);
+        auto st = DeltaWriter::open(&wrequest, &writer, _is_vec);
         if (st != OLAP_SUCCESS) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 8c2c9c1939..efc021e79a 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -27,11 +27,15 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
 #include "util/bitmap.h"
 #include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
 #include "gutil/strings/substitute.h"
 
+#include "vec/core/block.h"
+#include "olap/delta_writer.h"
+
 namespace doris {
 
 struct TabletsChannelKey {
@@ -57,18 +61,15 @@ class OlapTableSchemaParam;
 // Write channel for a particular (load, index).
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key, bool is_high_priority);
+    TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec);
 
     ~TabletsChannel();
 
     Status open(const PTabletWriterOpenRequest& request);
 
     // no-op when this channel has been closed or cancelled
-    Status add_batch(const PTabletWriterAddBatchRequest& request, PTabletWriterAddBatchResult* response);
-
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request, PTabletWriterAddBlockResult* response) {
-        return Status::NotSupported("Not Implemented add_block");
-    }
+    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+    Status add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response);
 
     // Mark sender with 'sender_id' as closed.
     // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec'
@@ -89,31 +90,13 @@ public:
 
     int64_t mem_consumption() const { return _mem_tracker->consumption(); }
 
-protected:
+private:
     template<typename Request>
-    Status _get_current_seq(int64_t& cur_seq, const Request& request) {
-        std::lock_guard<std::mutex> l(_lock);
-        if (_state != kOpened) {
-            return _state == kFinished
-                ? _close_status
-                : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1",
-                            _key.to_string(), _state));
-        }
-        cur_seq = _next_seqs[request.sender_id()];
-        // check packet
-        if (request.packet_seq() > cur_seq) {
-            LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
-                << ", recept_seq=" << request.packet_seq();
-            return Status::InternalError("lost data packet");
-        }
-        return Status::OK();
-    }
+    Status _get_current_seq(int64_t& cur_seq, const Request& request);
 
-private:
     // open all writer
-    virtual Status _open_all_writers(const PTabletWriterOpenRequest& request);
+    Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-protected:
     // id of this load channel
     TabletsChannelKey _key;
 
@@ -132,12 +115,10 @@ protected:
     int64_t _index_id = -1;
     OlapTableSchemaParam* _schema = nullptr;
 
-private:
     TupleDescriptor* _tuple_desc = nullptr;
     // row_desc used to construct
     RowDescriptor* _row_desc = nullptr;
 
-protected:
     // next sequence we expect
     int _num_remaining_senders = 0;
     std::vector<int64_t> _next_seqs;
@@ -160,6 +141,97 @@ protected:
     static std::atomic<uint64_t> _s_tablet_writer_count;
 
     bool _is_high_priority = false;
+
+    bool _is_vec = false;
 };
 
+template<typename Request>
+Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (_state != kOpened) {
+        return _state == kFinished
+            ? _close_status
+            : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1",
+                        _key.to_string(), _state));
+    }
+    cur_seq = _next_seqs[request.sender_id()];
+    // check packet
+    if (request.packet_seq() > cur_seq) {
+        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
+            << ", recept_seq=" << request.packet_seq();
+        return Status::InternalError("lost data packet");
+    }
+    return Status::OK();
+}
+
+template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
+Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
+                                 TabletWriterAddResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    int64_t cur_seq = 0;
+
+    auto status = _get_current_seq(cur_seq, request);
+    if (UNLIKELY(!status.ok())) {
+        return status;
+    }
+
+    if (request.packet_seq() < cur_seq) {
+        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
+                  << ", recept_seq=" << request.packet_seq();
+        return Status::OK();
+    }
+
+    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs;
+    for (int i = 0; i < request.tablet_ids_size(); ++i) {
+        int64_t tablet_id = request.tablet_ids(i);
+        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
+            // skip broken tablets
+            continue;
+        }
+        auto it = tablet_to_rowidxs.find(tablet_id);
+        if (it == tablet_to_rowidxs.end()) {
+            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i });
+        } else {
+            it->second.emplace_back(i);
+        }
+    }
+
+    auto get_send_data = [&] () {
+        if constexpr (std::is_same_v<TabletWriterAddRequest, PTabletWriterAddBatchRequest>) {
+            return RowBatch(*_row_desc, request.row_batch());
+        } else {
+            return vectorized::Block(request.block());
+        }
+    };
+
+    auto send_data = get_send_data();
+    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors();
+    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
+        if (tablet_writer_it == _tablet_writers.end()) {
+            return Status::InternalError(
+                    strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first));
+        }
+
+        OLAPStatus st = tablet_writer_it->second->write(&send_data, tablet_to_rowidxs_it.second);
+        if (st != OLAP_SUCCESS) {
+            auto err_msg = strings::Substitute(
+                    "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2",
+                    tablet_to_rowidxs_it.first, _txn_id, st);
+            LOG(WARNING) << err_msg;
+            PTabletError* error = tablet_errors->Add();
+            error->set_tablet_id(tablet_to_rowidxs_it.first);
+            error->set_msg(err_msg);
+            _broken_tablets.insert(tablet_to_rowidxs_it.first);
+            // continue write to other tablet.
+            // the error will return back to sender.
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        _next_seqs[request.sender_id()] = cur_seq + 1;
+    }
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index 79c152c243..6cdf75f070 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -103,7 +103,7 @@ public:
     void update_tracker_id(int64_t tracker_id);
 
     void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
-        DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
+        // DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
         _mem_trackers[mem_tracker->id()] = mem_tracker;
         DCHECK(_mem_trackers[mem_tracker->id()]);
         _untracked_mems[mem_tracker->id()] = 0;
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index b7917ce97d..7c7bec0862 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -128,7 +128,7 @@ void PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcContr
             SCOPED_RAW_TIMER(&execution_time_ns);
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
-            auto st = _exec_env->load_channel_mgr()->add_block(*request, response);
+            auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << request->index_id()
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 5d1ca3035a..10c98fb923 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -178,7 +178,6 @@ set(VEC_FILES
   olap/vgeneric_iterators.cpp
   olap/vcollect_iterator.cpp
   olap/block_reader.cpp
-  olap/vdelta_writer.cpp
   olap/olap_data_convertor.cpp
   sink/mysql_result_writer.cpp
   sink/result_sink.cpp
@@ -190,10 +189,7 @@ set(VEC_FILES
   runtime/vdata_stream_recvr.cpp
   runtime/vdata_stream_mgr.cpp
   runtime/vpartition_info.cpp
-  runtime/vsorted_run_merger.cpp
-  runtime/vload_channel.cpp
-  runtime/vload_channel_mgr.cpp
-  runtime/vtablets_channel.cpp)
+  runtime/vsorted_run_merger.cpp)
 
 add_library(Vec STATIC
     ${VEC_FILES}
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index 9cb919228d..e183cdd06c 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -35,8 +35,6 @@ VBrokerScanNode::VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode,
     _vectorized = true;
 }
 
-VBrokerScanNode::~VBrokerScanNode() {}
-
 Status VBrokerScanNode::start_scanners() {
     {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -146,7 +144,7 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
             }
         }
 
-        if (columns[0]->size() > 0) {
+        if (!columns[0]->empty()) {
             auto n_columns = 0;
             for (const auto slot_desc : _tuple_desc->slots()) {
                 block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index 95aa58f822..1a1b8eb4e0 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -22,25 +22,24 @@
 #include "exec/broker_scan_node.h"
 #include "exec/scan_node.h"
 #include "runtime/descriptors.h"
-//#include "vec/exec/vbroker_scanner.h"
 namespace doris {
 
 class RuntimeState;
 class Status;
 
 namespace vectorized {
-class VBrokerScanNode : public BrokerScanNode {
+class VBrokerScanNode final : public BrokerScanNode {
 public:
     VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    virtual ~VBrokerScanNode();
+    ~VBrokerScanNode() override = default;
 
-    virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
+    Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
 
     // Close the scanner, and report errors.
-    virtual Status close(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
 
 private:
-    virtual Status start_scanners() override;
+    Status start_scanners() override;
 
     void scanner_worker(int start_idx, int length);
     // Scan one range
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index 5713a90a03..23ed20493a 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -36,9 +36,6 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
 
 }
 
-VBrokerScanner::~VBrokerScanner() {
-}
-
 Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
     SCOPED_TIMER(_read_timer);
 
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index b21a086f3e..89d077168f 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -21,13 +21,13 @@
 
 
 namespace doris::vectorized {
-class VBrokerScanner : public BrokerScanner {
+class VBrokerScanner final : public BrokerScanner {
 public:
     VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                   const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges,
                   const std::vector<TNetworkAddress>& broker_addresses,
                   const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-    virtual ~VBrokerScanner();
+    ~VBrokerScanner() override = default;
 
     Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) override;
 
diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp
index 7fc103cdbd..60505252c1 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -29,7 +29,7 @@ OlapBlockDataConvertor::OlapBlockDataConvertor(const TabletSchema* tablet_schema
     for (const auto& col : columns) {
         switch (col.type()) {
         case FieldType::OLAP_FIELD_TYPE_OBJECT: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorObject>());
+            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorBitMap>());
             break;
         }
         case FieldType::OLAP_FIELD_TYPE_HLL: {
@@ -179,7 +179,7 @@ const void* OlapBlockDataConvertor::OlapColumnDataConvertorObject::get_data_at(
     return null_flag ? nullptr : _slice.data() + offset;
 }
 
-Status OlapBlockDataConvertor::OlapColumnDataConvertorObject::convert_to_olap() {
+Status OlapBlockDataConvertor::OlapColumnDataConvertorBitMap::convert_to_olap() {
     assert(_typed_column.column);
     const vectorized::ColumnBitmap* column_bitmap = nullptr;
     if (_nullmap) {
@@ -242,29 +242,6 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorObject::convert_to_olap()
     return Status::OK();
 }
 
-// class OlapBlockDataConvertor::OlapColumnDataConvertorHLL
-void OlapBlockDataConvertor::OlapColumnDataConvertorHLL::set_source_column(
-        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
-    OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos,
-                                                                           num_rows);
-    _raw_data.clear();
-    _slice.resize(num_rows);
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorHLL::get_data() const {
-    return _slice.data();
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorHLL::get_data_at(size_t offset) const {
-    assert(offset < _num_rows && _num_rows == _slice.size());
-    UInt8 null_flag = 0;
-    auto null_map = get_nullmap();
-    if (null_map) {
-        null_flag = null_map[offset];
-    }
-    return null_flag ? nullptr : _slice.data() + offset;
-}
-
 Status OlapBlockDataConvertor::OlapColumnDataConvertorHLL::convert_to_olap() {
     assert(_typed_column.column);
     const vectorized::ColumnHLL* column_hll = nullptr;
diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h
index dc0ac23c00..5ade3dd0d1 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -71,27 +71,19 @@ private:
                                size_t num_rows) override;
         const void* get_data() const override;
         const void* get_data_at(size_t offset) const override;
-        Status convert_to_olap() override;
-
-    private:
+    protected:
         PaddedPODArray<Slice> _slice;
         PaddedPODArray<char> _raw_data;
     };
 
-    class OlapColumnDataConvertorHLL : public OlapColumnDataConvertorBase {
+    class OlapColumnDataConvertorHLL final : public OlapColumnDataConvertorObject{
     public:
-        OlapColumnDataConvertorHLL() = default;
-        ~OlapColumnDataConvertorHLL() override = default;
-
-        void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
-                               size_t num_rows) override;
-        const void* get_data() const override;
-        const void* get_data_at(size_t offset) const override;
         Status convert_to_olap() override;
+    };
 
-    private:
-        PaddedPODArray<Slice> _slice;
-        PaddedPODArray<char> _raw_data;
+    class OlapColumnDataConvertorBitMap final : public OlapColumnDataConvertorObject{
+    public:
+        Status convert_to_olap() override;
     };
 
     class OlapColumnDataConvertorChar : public OlapColumnDataConvertorBase {
diff --git a/be/src/vec/olap/vdelta_writer.cpp b/be/src/vec/olap/vdelta_writer.cpp
deleted file mode 100644
index af8c6613d5..0000000000
--- a/be/src/vec/olap/vdelta_writer.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vdelta_writer.h"
-#include "olap/storage_engine.h"
-#include "olap/memtable.h"
-
-namespace doris {
-
-namespace vectorized {
-
-VDeltaWriter::VDeltaWriter(WriteRequest* req, StorageEngine* storage_engine)
-        : DeltaWriter(req, storage_engine) {}
-
-VDeltaWriter::~VDeltaWriter() {
-
-}
-
-OLAPStatus VDeltaWriter::open(WriteRequest* req, VDeltaWriter** writer) {
-    *writer = new VDeltaWriter(req, StorageEngine::instance());
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus VDeltaWriter::write_block(const vectorized::Block* block, const std::vector<int>& row_idxs) {
-    if (UNLIKELY(row_idxs.empty())) {
-        return OLAP_SUCCESS;
-    }
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_is_init && !_is_cancelled) {
-        RETURN_NOT_OK(init());
-    }
-
-    if (_is_cancelled) {
-        return OLAP_ERR_ALREADY_CANCELLED;
-    }
-
-    int start = 0, end = 0;
-
-    const size_t num_rows = row_idxs.size();
-    for (; start < num_rows;) {
-        auto count = end + 1 - start;
-        if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
-            _mem_table->insert(block, row_idxs[start], count);
-            start += count;
-            end = start;
-        } else {
-            end++;
-        }
-    }
-
-    if (_mem_table->memory_usage() >= config::write_buffer_size) {
-        RETURN_NOT_OK(_flush_memtable_async());
-        _reset_mem_table();
-    }
-
-    return OLAP_SUCCESS;
-}
-
-void VDeltaWriter::_reset_mem_table() {
-    _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
-                                  _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
-                                  _mem_tracker, true));
-}
-
-} // namespace vectorized
-
-} // namespace doris
diff --git a/be/src/vec/olap/vdelta_writer.h b/be/src/vec/olap/vdelta_writer.h
deleted file mode 100644
index 6d73777751..0000000000
--- a/be/src/vec/olap/vdelta_writer.h
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "olap/delta_writer.h"
-
-namespace doris {
-
-namespace vectorized {
-
-class VDeltaWriter : public DeltaWriter {
-public:
-    virtual ~VDeltaWriter() override;
-
-    static OLAPStatus open(WriteRequest* req, VDeltaWriter** writer);
-
-    virtual OLAPStatus write_block(const vectorized::Block* block, const std::vector<int>& row_idxs) override;
-
-protected:
-    virtual void _reset_mem_table() override;
-
-private:
-    VDeltaWriter(WriteRequest* req, StorageEngine* storage_engine);
-};
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vload_channel.cpp b/be/src/vec/runtime/vload_channel.cpp
deleted file mode 100644
index 5ac1c7d8f2..0000000000
--- a/be/src/vec/runtime/vload_channel.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vload_channel.h"
-#include "vtablets_channel.h"
-
-namespace doris {
-
-namespace vectorized {
-
-VLoadChannel::VLoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                           bool is_high_priority, const std::string& sender_ip)
-        : LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip) {
-}
-
-Status VLoadChannel::open(const PTabletWriterOpenRequest& params) {
-    int64_t index_id = params.index_id();
-    std::shared_ptr<TabletsChannel> channel;
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _tablets_channels.find(index_id);
-        if (it != _tablets_channels.end()) {
-            channel = it->second;
-        } else {
-            // create a new tablets channel
-            TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new VTabletsChannel(key, _is_high_priority));
-            _tablets_channels.insert({index_id, channel});
-        }
-    }
-
-    RETURN_IF_ERROR(channel->open(params));
-
-    _opened = true;
-    _last_updated_time.store(time(nullptr));
-    return Status::OK();
-}
-
-Status VLoadChannel::add_block(const PTabletWriterAddBlockRequest& request,
-                               PTabletWriterAddBlockResult* response) {
-    int64_t index_id = request.index_id();
-    // 1. get tablets channel
-    std::shared_ptr<TabletsChannel> channel;
-    bool is_finished;
-    Status st = _get_tablets_channel(channel, is_finished, index_id);
-    if (!st.ok() || is_finished) {
-        return st;
-    }
-
-    // 2. check if mem consumption exceed limit
-    handle_mem_exceed_limit(false);
-
-    // 3. add batch to tablets channel
-    if (request.has_block()) {
-        RETURN_IF_ERROR(channel->add_block(request, response));
-    }
-
-    // 4. handle eos
-    if (request.has_eos() && request.eos()) {
-        if (request.has_eos() && request.eos()) {
-        st = _handle_eos(channel, request, response);
-        if (!st.ok()) {
-            return st;
-        }
-    }
-    }
-    _last_updated_time.store(time(nullptr));
-    return st;
-}
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vload_channel.h b/be/src/vec/runtime/vload_channel.h
deleted file mode 100644
index 411625e9fb..0000000000
--- a/be/src/vec/runtime/vload_channel.h
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "runtime/load_channel.h"
-
-namespace doris {
-
-namespace vectorized {
-
-class VLoadChannel : public LoadChannel {
-public:
-    VLoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                 bool is_high_priority, const std::string& sender_ip);
-
-    ~VLoadChannel() override {};
-
-    virtual Status open(const PTabletWriterOpenRequest& request) override;
-
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) override;
-
-};
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vload_channel_mgr.cpp b/be/src/vec/runtime/vload_channel_mgr.cpp
deleted file mode 100644
index aa353515f8..0000000000
--- a/be/src/vec/runtime/vload_channel_mgr.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/runtime/vload_channel_mgr.h"
-#include "vec/runtime/vload_channel.h"
-
-namespace doris {
-
-namespace vectorized {
-
-VLoadChannelMgr::VLoadChannelMgr() : LoadChannelMgr() {}
-
-VLoadChannelMgr::~VLoadChannelMgr() {}
-
-LoadChannel*
-VLoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                                      bool is_high_priority, const std::string& sender_ip) {
-    return new VLoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip);
-}
-
-Status VLoadChannelMgr::add_block(const PTabletWriterAddBlockRequest& request,
-                                  PTabletWriterAddBlockResult* response) {
-    UniqueId load_id(request.id());
-    // 1. get load channel
-    std::shared_ptr<LoadChannel> channel;
-    bool is_eof;
-    auto status = _get_load_channel(channel, is_eof, load_id, request);
-    if (!status.ok() || is_eof) {
-        return status;
-    }
-
-    if (!channel->is_high_priority()) {
-        // 2. check if mem consumption exceed limit
-        // If this is a high priority load task, do not handle this.
-        // because this may block for a while, which may lead to rpc timeout.
-        _handle_mem_exceed_limit();
-    }
-
-    // 3. add batch to load channel
-    // batch may not exist in request(eg: eos request without batch),
-    // this case will be handled in load channel's add batch method.
-    RETURN_IF_ERROR(channel->add_block(request, response));
-
-    // 4. handle finish
-    if (channel->is_finished()) {
-        _finish_load_channel(load_id);
-    }
-    return Status::OK();
-
-}
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vload_channel_mgr.h b/be/src/vec/runtime/vload_channel_mgr.h
deleted file mode 100644
index cbe53336d7..0000000000
--- a/be/src/vec/runtime/vload_channel_mgr.h
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "runtime/load_channel_mgr.h"
-
-namespace doris {
-
-class Cache;
-class LoadChannel;
-
-namespace vectorized {
-
-class VLoadChannelMgr : public LoadChannelMgr {
-public:
-    VLoadChannelMgr();
-    virtual ~VLoadChannelMgr() override;
-
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) override;
-protected:
-    LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                                      bool is_high_priority, const std::string& sender_ip) override;
-};
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vtablets_channel.cpp b/be/src/vec/runtime/vtablets_channel.cpp
deleted file mode 100644
index fb3244d821..0000000000
--- a/be/src/vec/runtime/vtablets_channel.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vtablets_channel.h"
-#include "exec/tablet_info.h"
-#include "gutil/strings/substitute.h"
-#include "vec/olap/vdelta_writer.h"
-#include "olap/memtable.h"
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-#include "util/doris_metrics.h"
-
-namespace doris {
-
-namespace vectorized {
-
-VTabletsChannel::VTabletsChannel(const TabletsChannelKey& key,
-                                 bool is_high_priority)
-        : TabletsChannel(key, is_high_priority) {}
-
-Status VTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
-    std::vector<SlotDescriptor*>* index_slots = nullptr;
-    int32_t schema_hash = 0;
-    for (auto& index : _schema->indexes()) {
-        if (index->index_id == _index_id) {
-            index_slots = &index->slots;
-            schema_hash = index->schema_hash;
-            break;
-        }
-    }
-    if (index_slots == nullptr) {
-        std::stringstream ss;
-        ss << "unknown index id, key=" << _key;
-        return Status::InternalError(ss.str());
-    }
-    for (auto& tablet : request.tablets()) {
-        WriteRequest wrequest;
-        wrequest.tablet_id = tablet.tablet_id();
-        wrequest.schema_hash = schema_hash;
-        wrequest.write_type = WriteType::LOAD;
-        wrequest.txn_id = _txn_id;
-        wrequest.partition_id = tablet.partition_id();
-        wrequest.load_id = request.id();
-        wrequest.slots = index_slots;
-        wrequest.is_high_priority = _is_high_priority;
-
-        VDeltaWriter* writer = nullptr;
-        auto st = VDeltaWriter::open(&wrequest, &writer);
-        if (st != OLAP_SUCCESS) {
-            std::stringstream ss;
-            ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
-               << ", txn_id=" << _txn_id << ", partition_id=" << tablet.partition_id()
-               << ", err=" << st;
-            LOG(WARNING) << ss.str();
-            return Status::InternalError(ss.str());
-        }
-        _tablet_writers.emplace(tablet.tablet_id(), writer);
-    }
-    _s_tablet_writer_count += _tablet_writers.size();
-    DCHECK_EQ(_tablet_writers.size(), request.tablets_size());
-    return Status::OK();
-}
-
-Status VTabletsChannel::add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) {
-    int64_t cur_seq = 0;
-
-    auto status = _get_current_seq(cur_seq, request);
-    if (UNLIKELY(!status.ok())) {
-        return status;
-    }
-
-    if (request.packet_seq() < cur_seq) {
-        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
-            << ", recept_seq=" << request.packet_seq();
-        return Status::OK();
-    }
-
-    Block block(request.block());
-
-    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs;
-    for (int i = 0; i < request.tablet_ids_size(); ++i) {
-        int64_t tablet_id = request.tablet_ids(i);
-        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
-            // skip broken tablets
-            continue;
-        }
-        auto it = tablet_to_rowidxs.find(tablet_id);
-        if (it == tablet_to_rowidxs.end()) {
-            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i });
-        } else {
-            it->second.emplace_back(i);
-        }
-    }
-
-    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors();
-    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
-        if (tablet_writer_it == _tablet_writers.end()) {
-            return Status::InternalError(
-                    strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first));
-        }
-
-        OLAPStatus st = tablet_writer_it->second->write_block(&block, tablet_to_rowidxs_it.second);
-        if (st != OLAP_SUCCESS) {
-            auto err_msg = strings::Substitute(
-                    "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2",
-                    tablet_to_rowidxs_it.first, _txn_id, st);
-            LOG(WARNING) << err_msg;
-            PTabletError* error = tablet_errors->Add();
-            error->set_tablet_id(tablet_to_rowidxs_it.first);
-            error->set_msg(err_msg);
-            _broken_tablets.insert(tablet_to_rowidxs_it.first);
-            // continue write to other tablet.
-            // the error will return back to sender.
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        _next_seqs[request.sender_id()] = cur_seq + 1;
-    }
-    return Status::OK();
-}
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/vtablets_channel.h b/be/src/vec/runtime/vtablets_channel.h
deleted file mode 100644
index 45bc652a5e..0000000000
--- a/be/src/vec/runtime/vtablets_channel.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "runtime/tablets_channel.h"
-
-namespace doris {
-
-namespace vectorized {
-
-class VTabletsChannel : public TabletsChannel {
-
-public:
-    VTabletsChannel(const TabletsChannelKey& key, bool is_high_priority);
-
-    virtual Status add_block(const PTabletWriterAddBlockRequest& request,
-                             PTabletWriterAddBlockResult* response) override;
-
-private:
-    virtual Status _open_all_writers(const PTabletWriterOpenRequest& request) override;
-};
-
-} // namespace vectorized
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 997b619bd5..f4d553f9e8 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -134,7 +134,7 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
-Status VNodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
     // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
@@ -178,7 +178,7 @@ Status VNodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
 
 int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
                                             std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
-        auto st = none_of({_cancelled, _send_finished});
+    auto st = none_of({_cancelled, _send_finished});
     if (!st.ok()) {
         return 0;
     }
@@ -307,31 +307,10 @@ void VNodeChannel::mark_close() {
     _eos_is_produced = true;
 }
 
-VIndexChannel::VIndexChannel(OlapTableSink* parent, int64_t index_id)
-    : IndexChannel(parent, index_id) {
-    _is_vectorized = true;
-}
-
-VIndexChannel::~VIndexChannel() {}
-
-void VIndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
-    auto it = _channels_by_tablet.find(tablet_id);
-    DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
-    for (auto channel : it->second) {
-        // if this node channel is already failed, this add_row will be skipped
-        auto st = channel->add_row(block_row, tablet_id);
-        if (!st.ok()) {
-            mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id);
-        }
-    }
-}
-
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& texprs, Status* status)
         : OlapTableSink(pool, row_desc, texprs, status) {
-
     _is_vectorized = true;
-
     // From the thrift expressions create the real exprs.
     vectorized::VExpr::create_expr_trees(pool, texprs, &_output_vexpr_ctxs);
     _name = "VOlapTableSink";
@@ -341,7 +320,7 @@ VOlapTableSink::~VOlapTableSink() {
     // We clear NodeChannels' batches here, cuz NodeChannels' batches destruction will use
     // OlapTableSink::_mem_tracker and its parents.
     // But their destructions are after OlapTableSink's.
-    for (auto index_channel : _channels) {
+    for (const auto& index_channel : _channels) {
         index_channel->for_each_node_channel([](const std::shared_ptr<NodeChannel>& ch) { ch->clear_all_blocks(); });
     }
 }
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 65a90ad505..844a7a7df2 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -28,7 +28,6 @@ class VExprContext;
 
 namespace stream_load {
 
-class VIndexChannel;
 class VNodeChannel : public NodeChannel {
 public:
     VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id);
@@ -39,7 +38,7 @@ public:
 
     Status open_wait() override;
 
-    Status add_row(BlockRow& block_row, int64_t tablet_id) override;
+    Status add_row(const BlockRow& block_row, int64_t tablet_id) override;
 
     int try_send_and_fetch_status(RuntimeState* state,
                                   std::unique_ptr<ThreadPoolToken>& thread_pool_token) override;
@@ -68,18 +67,8 @@ private:
     // The data in the buffer is copied to the attachment of the brpc when it is sent,
     // to avoid an extra pb serialization in the brpc.
     std::string _column_values_buffer;
-
 };
 
-class VIndexChannel : public IndexChannel {
-public:
-    VIndexChannel(OlapTableSink* parent, int64_t index_id);
-
-    ~VIndexChannel() override;
-
-    void add_row(BlockRow& block_row, int64_t tablet_id) override;
-
-};
 
 class OlapTableSink;
 
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 5e11a9b34f..5d3c350494 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -338,7 +338,7 @@ set(VEC_TEST_FILES
     vec/exec/vgeneric_iterators_test.cpp
     vec/exec/vbroker_scan_node_test.cpp
     vec/exec/vbroker_scanner_test.cpp
-    vec/exec/vtablet_sink_test.cpp
+#    vec/exec/vtablet_sink_test.cpp
     vec/exprs/vexpr_test.cpp
     vec/function/function_array_element_test.cpp
     vec/function/function_array_index_test.cpp
@@ -356,9 +356,9 @@ set(VEC_TEST_FILES
     vec/function/function_geo_test.cpp
     vec/function/function_test_util.cpp
     vec/function/table_function_test.cpp
-    vec/olap/vdelta_writer_test.cpp
+#    vec/olap/vdelta_writer_test.cpp
     vec/runtime/vdata_stream_test.cpp
-    vec/runtime/vload_channel_mgr_test.cpp
+#    vec/runtime/vload_channel_mgr_test.cpp
 )
 
 add_executable(doris_be_test
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp
index 0846a26f09..d9d8c3e4b9 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -642,7 +642,3 @@ TEST_F(VBrokerScanNodeTest, where_binary_pre) {
 
 } // namespace vectorized
 } // namespace doris
-int main(int argc, char** argv) {
-    ::testing::InitGoogleTest(&argc, argv);
-    return RUN_ALL_TESTS();
-}
\ No newline at end of file
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 6064abe655..b0d65c7e30 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -458,7 +458,3 @@ TEST_F(VBrokerScannerTest, normal5) {
 }
 } // namespace vectorized
 } // namespace doris
-int main(int argc, char** argv) {
-    ::testing::InitGoogleTest(&argc, argv);
-    return RUN_ALL_TESTS();
-}
\ No newline at end of file
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp
index 31d9ed84cb..5be72aad5e 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -828,8 +828,3 @@ TEST_F(VOlapTableSinkTest, decimal) {
 } // namespace stream_load
 } // namespace doris
 
-int main(int argc, char** argv) {
-    doris::CpuInfo::init();
-    ::testing::InitGoogleTest(&argc, argv);
-    return RUN_ALL_TESTS();
-}
\ No newline at end of file
diff --git a/be/test/vec/runtime/vload_channel_mgr_test.cpp b/be/test/vec/runtime/vload_channel_mgr_test.cpp
deleted file mode 100644
index 068188cf6a..0000000000
--- a/be/test/vec/runtime/vload_channel_mgr_test.cpp
+++ /dev/null
@@ -1,757 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/runtime/vload_channel_mgr.h"
-
-#include <gtest/gtest.h>
-
-#include "common/object_pool.h"
-#include "gen_cpp/Descriptors_types.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "vec/olap/vdelta_writer.h"
-#include "olap/memtable_flush_executor.h"
-#include "olap/schema.h"
-#include "olap/storage_engine.h"
-#include "runtime/descriptor_helper.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_tracker.h"
-#include "runtime/primitive_type.h"
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-#include "util/thrift_util.h"
-
-namespace doris {
-
-std::unordered_map<int64_t, int> _k_tablet_recorder;
-OLAPStatus open_status;
-OLAPStatus add_status;
-OLAPStatus close_status;
-int64_t wait_lock_time_ns;
-
-// mock
-DeltaWriter::DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& mem_tracker,
-                         StorageEngine* storage_engine)
-        : _req(*req) {}
-
-DeltaWriter::~DeltaWriter() {}
-
-OLAPStatus DeltaWriter::init() {
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus DeltaWriter::open(WriteRequest* req, const std::shared_ptr<MemTracker>& mem_tracker,
-                             DeltaWriter** writer) {
-    if (open_status != OLAP_SUCCESS) {
-        return open_status;
-    }
-    *writer = new DeltaWriter(req, mem_tracker, nullptr);
-    return open_status;
-}
-
-OLAPStatus DeltaWriter::write(Tuple* tuple) {
-    if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) {
-        _k_tablet_recorder[_req.tablet_id] = 1;
-    } else {
-        _k_tablet_recorder[_req.tablet_id]++;
-    }
-    return add_status;
-}
-
-OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row_idxs) {
-    if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) {
-        _k_tablet_recorder[_req.tablet_id] = 0;
-    }
-    _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
-    return add_status;
-}
-
-OLAPStatus DeltaWriter::close() {
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, bool is_broken) {
-    return close_status;
-}
-
-OLAPStatus DeltaWriter::cancel() {
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus DeltaWriter::wait_flush() {
-    return OLAP_SUCCESS;
-}
-
-int64_t DeltaWriter::partition_id() const {
-    return 1L;
-}
-int64_t DeltaWriter::mem_consumption() const {
-    return 1024L;
-}
-
-namespace vectorized {
-
-VDeltaWriter::VDeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
-                           StorageEngine* storage_engine)
-        : DeltaWriter(req, parent, storage_engine) {}
-
-VDeltaWriter::~VDeltaWriter() {}
-
-OLAPStatus VDeltaWriter::open(WriteRequest* req, const std::shared_ptr<MemTracker>& mem_tracker,
-                              VDeltaWriter** writer) {
-    if (open_status != OLAP_SUCCESS) {
-        return open_status;
-    }
-    *writer = new VDeltaWriter(req, mem_tracker, nullptr);
-    return open_status;
-}
-
-OLAPStatus VDeltaWriter::write(const Block* block, const std::vector<int>& row_idxs) {
-    if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) {
-        _k_tablet_recorder[_req.tablet_id] = 0;
-    }
-    _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
-    return add_status;
-}
-
-}
-
-class VLoadChannelMgrTest : public testing::Test {
-public:
-    VLoadChannelMgrTest() {}
-    virtual ~VLoadChannelMgrTest() {}
-    void SetUp() override {
-        _k_tablet_recorder.clear();
-        open_status = OLAP_SUCCESS;
-        add_status = OLAP_SUCCESS;
-        close_status = OLAP_SUCCESS;
-        config::streaming_load_rpc_max_alive_time_sec = 120;
-    }
-
-private:
-
-    size_t uncompressed_size = 0;
-    size_t compressed_size = 0;
-};
-
-TDescriptorTable create_descriptor_table() {
-    TDescriptorTableBuilder dtb;
-    TTupleDescriptorBuilder tuple_builder;
-
-    tuple_builder.add_slot(
-            TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(0).build());
-    tuple_builder.add_slot(
-            TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(1).build());
-    tuple_builder.build(&dtb);
-
-    return dtb.desc_tbl();
-}
-
-Schema create_schema() {
-    std::vector<TabletColumn> col_schemas;
-    //c1
-    TabletColumn c1(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true);
-    c1.set_name("c1");
-
-    col_schemas.emplace_back(std::move(c1));
-    // c2: int
-    TabletColumn c2(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_BIGINT, true);
-    c2.set_name("c2");
-    col_schemas.emplace_back(std::move(c2));
-
-    Schema schema(col_schemas, 2);
-    return schema;
-}
-
-void create_schema(DescriptorTbl* desc_tbl, POlapTableSchemaParam* pschema) {
-    pschema->set_db_id(1);
-    pschema->set_table_id(2);
-    pschema->set_version(0);
-
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    tuple_desc->to_protobuf(pschema->mutable_tuple_desc());
-    for (auto slot : tuple_desc->slots()) {
-        slot->to_protobuf(pschema->add_slot_descs());
-    }
-
-    // index schema
-    auto indexes = pschema->add_indexes();
-    indexes->set_id(4);
-    indexes->add_columns("c1");
-    indexes->add_columns("c2");
-    indexes->set_schema_hash(123);
-}
-
-static void create_block(Schema& schema, vectorized::Block& block)
-{
-    for (auto &column_desc : schema.columns()) {
-        ASSERT_TRUE(column_desc);
-        auto data_type = Schema::get_data_type_ptr(column_desc->type());
-        ASSERT_NE(data_type, nullptr);
-        if (column_desc->is_nullable()) {
-            data_type = std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
-        }
-        auto column = data_type->create_column();
-        vectorized::ColumnWithTypeAndName ctn(std::move(column), data_type, column_desc->name());
-        block.insert(ctn);
-    }
-}
-
-TEST_F(VLoadChannelMgrTest, normal) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto schema = create_schema();
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    auto tracker = std::make_shared<MemTracker>();
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        if (!st.ok()) {
-            LOG(INFO) << "here we go!!!!";
-            LOG(INFO) << st.to_string() << std::endl;
-        }
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a block
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        vectorized::Block block;
-        create_block(schema, block);
-
-        auto columns = block.mutate_columns();
-        auto& col1 = columns[0];
-        auto& col2 = columns[1];
-
-        // row1
-        {
-            int value = 987654;
-            int64_t big_value = 1234567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row2
-        {
-            int value = 12345678;
-            int64_t big_value = 9876567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row3
-        {
-            int value = 876545678;
-            int64_t big_value = 76543234567;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-
-        PTabletWriterAddBlockResult response;
-        std::string buffer;
-        block.serialize(request.mutable_block(),  &uncompressed_size, &compressed_size, &buffer);
-        auto st = mgr.add_block(request, &response);
-        if (!st.ok()) {
-            LOG(INFO) << "here we go!!!!";
-            LOG(INFO) << st.to_string() << std::endl;
-        }
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-    // check content
-    ASSERT_EQ(_k_tablet_recorder[20], 2);
-    ASSERT_EQ(_k_tablet_recorder[21], 1);
-}
-
-TEST_F(VLoadChannelMgrTest, cancel) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterCancelRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        auto st = mgr.cancel(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-}
-
-TEST_F(VLoadChannelMgrTest, open_failed) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        open_status = OLAP_ERR_TABLE_NOT_FOUND;
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_FALSE(st.ok());
-    }
-}
-
-TEST_F(VLoadChannelMgrTest, add_failed) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto schema = create_schema();
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    auto tracker = std::make_shared<MemTracker>();
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        vectorized::Block block;
-        create_block(schema, block);
-
-        auto columns = block.mutate_columns();
-        auto& col1 = columns[0];
-        auto& col2 = columns[1];
-
-        // row1
-        {
-            int value = 987654;
-            int64_t big_value = 1234567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row2
-        {
-            int value = 12345678;
-            int64_t big_value = 9876567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row3
-        {
-            int value = 876545678;
-            int64_t big_value = 76543234567;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-
-        std::string buffer;
-        block.serialize(request.mutable_block(),  &uncompressed_size, &compressed_size, &buffer);
-        // DeltaWriter's write will return -215
-        add_status = OLAP_ERR_TABLE_NOT_FOUND;
-        PTabletWriterAddBlockResult response;
-        auto st = mgr.add_block(request, &response);
-        request.release_id();
-        // st is still ok.
-        ASSERT_TRUE(st.ok());
-        ASSERT_EQ(2, response.tablet_errors().size());
-    }
-}
-
-
-TEST_F(VLoadChannelMgrTest, close_failed) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto schema = create_schema();
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    auto tracker = std::make_shared<MemTracker>();
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        request.add_partition_ids(10);
-        request.add_partition_ids(11);
-
-        vectorized::Block block;
-        create_block(schema, block);
-
-        auto columns = block.mutate_columns();
-        auto& col1 = columns[0];
-        auto& col2 = columns[1];
-
-        // row1
-        {
-            int value = 987654;
-            int64_t big_value = 1234567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row2
-        {
-            int value = 12345678;
-            int64_t big_value = 9876567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row3
-        {
-            int value = 876545678;
-            int64_t big_value = 76543234567;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-
-        std::string buffer;
-        block.serialize(request.mutable_block(),  &uncompressed_size, &compressed_size, &buffer);
-        close_status = OLAP_ERR_TABLE_NOT_FOUND;
-        PTabletWriterAddBlockResult response;
-        auto st = mgr.add_block(request, &response);
-        request.release_id();
-        // even if delta close failed, the return status is still ok, but tablet_vec is empty
-        ASSERT_TRUE(st.ok());
-        ASSERT_TRUE(response.tablet_vec().empty());
-    }
-}
-
-TEST_F(VLoadChannelMgrTest, unknown_tablet) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto schema = create_schema();
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    auto tracker = std::make_shared<MemTracker>();
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(22);
-        request.add_tablet_ids(20);
-
-        vectorized::Block block;
-        create_block(schema, block);
-
-        auto columns = block.mutate_columns();
-        auto& col1 = columns[0];
-        auto& col2 = columns[1];
-
-        // row1
-        {
-            int value = 987654;
-            int64_t big_value = 1234567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row2
-        {
-            int value = 12345678;
-            int64_t big_value = 9876567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row3
-        {
-            int value = 876545678;
-            int64_t big_value = 76543234567;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-
-        std::string buffer;
-        block.serialize(request.mutable_block(),  &uncompressed_size, &compressed_size, &buffer);
-        PTabletWriterAddBlockResult response;
-        auto st = mgr.add_block(request, &response);
-        request.release_id();
-        ASSERT_FALSE(st.ok());
-    }
-}
-
-TEST_F(VLoadChannelMgrTest, duplicate_packet) {
-    ExecEnv env;
-    vectorized::VLoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto schema = create_schema();
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    auto tracker = std::make_shared<MemTracker>();
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(false);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        vectorized::Block block;
-        create_block(schema, block);
-
-        auto columns = block.mutate_columns();
-        auto& col1 = columns[0];
-        auto& col2 = columns[1];
-
-        // row1
-        {
-            int value = 987654;
-            int64_t big_value = 1234567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row2
-        {
-            int value = 12345678;
-            int64_t big_value = 9876567899876;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-        // row3
-        {
-            int value = 876545678;
-            int64_t big_value = 76543234567;
-            col1->insert_data((const char*)&value, sizeof(value));
-            col2->insert_data((const char*)&big_value, sizeof(big_value));
-        }
-
-        std::string buffer;
-        block.serialize(request.mutable_block(),  &uncompressed_size, &compressed_size, &buffer);
-        PTabletWriterAddBlockResult response;
-        auto st = mgr.add_block(request, &response);
-        ASSERT_TRUE(st.ok());
-        PTabletWriterAddBlockResult response2;
-        st = mgr.add_block(request, &response2);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-    // close
-    {
-        PTabletWriterAddBlockRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-        PTabletWriterAddBlockResult response;
-        auto st = mgr.add_block(request, &response);
-        request.release_id();
-        ASSERT_TRUE(st.ok());
-    }
-    // check content
-    ASSERT_EQ(_k_tablet_recorder[20], 2);
-    ASSERT_EQ(_k_tablet_recorder[21], 1);
-}
-
-} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 4862de3d06..c50a0d9926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -141,7 +141,9 @@ public class StreamLoadPlanner {
         scanNode.init(analyzer);
         descTable.computeStatAndMemLayout();
         scanNode.finalize(analyzer);
-        scanNode.convertToVectoriezd();
+        if (Config.enable_vectorized_load) {
+            scanNode.convertToVectoriezd();
+        }
 
         int timeout = taskInfo.getTimeout();
         if (taskInfo instanceof RoutineLoadJob) {
@@ -193,9 +195,8 @@ public class StreamLoadPlanner {
         queryOptions.setMemLimit(taskInfo.getMemLimit());
         // for stream load, we use exec_mem_limit to limit the memory usage of load channel.
         queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
-        if (Config.enable_vectorized_load) {
-            queryOptions.setEnableVectorizedEngine(true);
-        }
+        queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
+
         params.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
         queryGlobals.setNowString(DATE_FORMAT.format(new Date()));
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 80c41b2261..d8cfcd4839 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -75,6 +75,7 @@ message PTabletWriterOpenRequest {
     optional int64 load_channel_timeout_s = 9;
     optional bool is_high_priority = 10 [default = false];
     optional string sender_ip = 11 [default = ""];
+    optional bool is_vectorized = 12 [default = false];
 };
 
 message PTabletWriterOpenResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org