You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/21 02:54:23 UTC

[incubator-doris] branch master updated: [Feature][Insert] Add transaction for the operation of insert #6244 (#6245)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7592f52  [Feature][Insert] Add transaction for the operation of insert #6244 (#6245)
7592f52 is described below

commit 7592f52d2e20266e2755cee74242e90226355f57
Author: pengxiangyu <di...@163.com>
AuthorDate: Wed Jul 21 10:54:11 2021 +0800

    [Feature][Insert] Add transaction for the operation of insert #6244 (#6245)
    
    ## Proposed changes
    Add transaction for the operation of insert. It will cost less time than non-transaction(it will cost 1/1000 time) when you want to insert a amount of rows.
    ### Syntax
    
    ```
    BEGIN [ WITH LABEL label];
    INSERT INTO table_name ...
    [COMMIT | ROLLBACK];
    ```
    
    ### Example
    commit a transaction:
    ```
    begin;
    insert into Tbl values(11, 22, 33);
    commit;
    ```
    rollback a transaction:
    ```
    begin;
    insert into Tbl values(11, 22, 33);
    rollback;
    ```
    commit a transaction with label:
    ```
    begin with label test_label;
    insert into Tbl values(11, 22, 33);
    commit;
    ```
    
    ### Description
    ```
    begin:  begin a transaction, the next insert will execute in the transaction until commit/rollback;
    commit:  commit the transaction, the data in the transaction will be inserted into the table;
    rollback:  abort the transaction, nothing will be inserted into the table;
    ```
    ### The main realization principle:
    ```
    1. begin a transaction in the session. next sql is executed in the transaction;
    2. insert sql will be parser and get the database name and table name, they will be used to select a be and create a pipe to accept data;
    3. all inserted values will be sent to the be and write into the pipe;
    4. a thread will get the data from the pipe, then write them to disk;
    5. commit will complete this transaction and make these data visible;
    6. rollback will abort this transaction
    ```
    
    ### Some restrictions on the use of update syntax.
    1. Only ```insert``` can be called in a transaction.
    2. If something error happened, ```commit``` will not succeed, it will ```rollback``` directly;
    3. By default, if part of insert in the transaction is invalid, ```commit``` will only insert the other correct data into the table.
    4. If you need ```commit``` return failed when any insert in the transaction is invalid, you need execute ```set enable_insert_strict = true``` before ```begin```.
---
 be/src/common/utils.h                              |   4 +
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/broker_scanner.cpp                     |  93 ++--
 be/src/exec/broker_scanner.h                       |   2 +
 .../plain_binary_line_reader.cpp}                  |  52 +--
 .../plain_binary_line_reader.h}                    |  26 +-
 be/src/runtime/fragment_mgr.cpp                    |  72 ++-
 be/src/runtime/fragment_mgr.h                      |   5 +
 be/src/runtime/stream_load/stream_load_context.h   |   2 +
 .../runtime/stream_load/stream_load_executor.cpp   |  27 +-
 be/src/runtime/stream_load/stream_load_executor.h  |   4 +
 be/src/runtime/stream_load/stream_load_pipe.h      |  36 +-
 be/src/service/internal_service.cpp                |  62 +++
 be/src/service/internal_service.h                  |  12 +
 .../sql-statements/Data Manipulation/BEGIN.md      |  92 ++++
 .../Data Manipulation/SHOW TRANSACTION.md          |  10 +-
 .../sql-statements/Data Manipulation/BEGIN.md      |  93 ++++
 .../Data Manipulation/SHOW TRANSACTION.md          |   9 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |  35 +-
 .../main/java/org/apache/doris/analysis/Expr.java  |   3 +
 .../java/org/apache/doris/analysis/InsertStmt.java |  18 +-
 .../java/org/apache/doris/analysis/IntLiteral.java |  62 +--
 .../apache/doris/analysis/ShowTransactionStmt.java |  24 +-
 .../doris/analysis/TransactionBeginStmt.java       |  45 ++
 .../doris/analysis/TransactionCommitStmt.java      |  32 +-
 .../doris/analysis/TransactionRollbackStmt.java    |  32 +-
 .../org/apache/doris/analysis/TransactionStmt.java |  40 +-
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../java/org/apache/doris/planner/Planner.java     |   2 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  48 ++
 .../java/org/apache/doris/qe/ConnectScheduler.java |   1 +
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 188 ++++++++
 .../org/apache/doris/qe/MasterTxnExecutor.java     | 141 ++++++
 .../java/org/apache/doris/qe/ShowExecutor.java     |   9 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 498 ++++++++++++++++-----
 .../org/apache/doris/rpc/BackendServiceClient.java |  12 +
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  43 ++
 .../apache/doris/service/FrontendServiceImpl.java  |  76 +++-
 .../doris/transaction/DatabaseTransactionMgr.java  |  14 +
 .../doris/transaction/GlobalTransactionMgr.java    |  53 +++
 .../apache/doris/transaction/TransactionEntry.java | 119 +++++
 .../apache/doris/transaction/TransactionState.java |  13 +-
 .../java/org/apache/doris/qe/StmtExecutorTest.java |   3 +-
 gensrc/proto/internal_service.proto                |  36 ++
 gensrc/thrift/FrontendService.thrift               |  21 +
 gensrc/thrift/PaloInternalService.thrift           |  13 +
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 47 files changed, 1826 insertions(+), 365 deletions(-)

diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index 4798483..50c6344 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -28,6 +28,7 @@ struct AuthInfo {
     std::string user_ip;
     // -1 as unset
     int64_t auth_code = -1;
+    std::string auth_code_uuid = "";
 };
 
 template <class T>
@@ -39,6 +40,9 @@ void set_request_auth(T* req, const AuthInfo& auth) {
         // so they have to be set.
         req->user = "";
         req->passwd = "";
+    } else if (auth.auth_code_uuid != "") {
+        req->__isset.auth_code_uuid = true;
+        req->auth_code_uuid = auth.auth_code_uuid;
     } else {
         req->user = auth.user;
         req->passwd = auth.passwd;
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 27ef5ed..9e826cc 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -54,6 +54,7 @@ set(EXEC_FILES
     olap_common.cpp
     tablet_info.cpp
     tablet_sink.cpp
+    plain_binary_line_reader.cpp
     plain_text_line_reader.cpp
     csv_scan_node.cpp
     csv_scanner.cpp
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index b275647..058b3e8 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -25,6 +25,7 @@
 #include "exec/decompressor.h"
 #include "exec/exec_node.h"
 #include "exec/local_file_reader.h"
+#include "exec/plain_binary_line_reader.h"
 #include "exec/plain_text_line_reader.h"
 #include "exec/s3_reader.h"
 #include "exec/text_converter.h"
@@ -222,6 +223,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
     switch (type) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
     case TFileFormatType::FORMAT_JSON:
+    case TFileFormatType::FORMAT_PROTO:
         compress_type = CompressType::UNCOMPRESSED;
         break;
     case TFileFormatType::FORMAT_CSV_GZ:
@@ -279,6 +281,7 @@ Status BrokerScanner::open_line_reader() {
     // _decompressor may be NULL if this is not a compressed file
     RETURN_IF_ERROR(create_decompressor(range.format_type));
 
+    _file_format_type = range.format_type;
     // open line reader
     switch (range.format_type) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
@@ -290,6 +293,9 @@ Status BrokerScanner::open_line_reader() {
         _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
                                                    size, _line_delimiter, _line_delimiter_length);
         break;
+    case TFileFormatType::FORMAT_PROTO:
+        _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader);
+        break;
     default: {
         std::stringstream ss;
         ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
@@ -326,40 +332,53 @@ void BrokerScanner::close() {
 
 void BrokerScanner::split_line(const Slice& line) {
     _split_values.clear();
-    const char* value = line.data;
-    size_t start = 0;  // point to the start pos of next col value.
-    size_t curpos = 0; // point to the start pos of separator matching sequence.
-    size_t p1 = 0;     // point to the current pos of separator matching sequence.
-
-    // Separator: AAAA
-    //
-    //   curpos
-    //     ▼
-    //     AAAA
-    //   1000AAAA2000AAAA
-    //   ▲   ▲
-    // Start │
-    //       p1
-
-    while (curpos < line.size) {
-        if (*(value + curpos + p1) != _value_separator[p1]) {
-            // Not match, move forward:
-            curpos += (p1 == 0 ? 1 : p1);
-            p1 = 0;
-        } else {
-            p1++;
-            if (p1 == _value_separator_length) {
-                // Match a separator
-                _split_values.emplace_back(value + start, curpos - start);
-                start = curpos + _value_separator_length;
-                curpos = start;
+    if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+        PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
+        PDataRow* row = *ptr;
+        for (const PDataColumn& col : (row)->col()) {
+            int len = col.value().size();
+            uint8_t* buf = new uint8_t[len];
+            memcpy(buf, col.value().c_str(), len);
+            _split_values.emplace_back(buf, len);
+        }
+        delete row;
+        delete ptr;
+    } else {
+        const char *value = line.data;
+        size_t start = 0;  // point to the start pos of next col value.
+        size_t curpos = 0; // point to the start pos of separator matching sequence.
+        size_t p1 = 0;     // point to the current pos of separator matching sequence.
+
+        // Separator: AAAA
+        //
+        //   curpos
+        //     ▼
+        //     AAAA
+        //   1000AAAA2000AAAA
+        //   ▲   ▲
+        // Start │
+        //       p1
+
+        while (curpos < line.size) {
+            if (*(value + curpos + p1) != _value_separator[p1]) {
+                // Not match, move forward:
+                curpos += (p1 == 0 ? 1 : p1);
                 p1 = 0;
+            } else {
+                p1++;
+                if (p1 == _value_separator_length) {
+                    // Match a separator
+                    _split_values.emplace_back(value + start, curpos - start);
+                    start = curpos + _value_separator_length;
+                    curpos = start;
+                    p1 = 0;
+                }
             }
         }
-    }
 
-    CHECK(curpos == line.size) << curpos << " vs " << line.size;
-    _split_values.emplace_back(value + start, curpos - start);
+        CHECK(curpos == line.size) << curpos << " vs " << line.size;
+        _split_values.emplace_back(value + start, curpos - start);
+    }
 }
 
 void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
@@ -454,7 +473,7 @@ bool BrokerScanner::convert_one_row(const Slice& line, Tuple* tuple, MemPool* tu
 
 // Convert one row to this tuple
 bool BrokerScanner::line_to_src_tuple(const Slice& line) {
-    if (!validate_utf8(line.data, line.size)) {
+    if (_file_format_type != TFileFormatType::FORMAT_PROTO && !validate_utf8(line.data, line.size)) {
         std::stringstream error_msg;
         error_msg << "data is not encoded by UTF-8";
         _state->append_error_msg_to_file("Unable to display", error_msg.str());
@@ -474,7 +493,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
                   << _value_separator << "], "
                   << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
-        _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+        if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+            _state->append_error_msg_to_file("", error_msg.str());
+        } else {
+            _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+        }
         _counter->num_rows_filtered++;
         return false;
     } else if (_split_values.size() + columns_from_path.size() > _src_slot_descs.size()) {
@@ -484,7 +507,11 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
                   << _value_separator << "], "
                   << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
-        _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+        if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
+            _state->append_error_msg_to_file("", error_msg.str());
+        } else {
+            _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
+        }
         _counter->num_rows_filtered++;
         return false;
     }
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 4578a77..1da3e11 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -27,6 +27,7 @@
 #include "exec/base_scanner.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "runtime/mem_pool.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
@@ -99,6 +100,7 @@ private:
 
     std::string _value_separator;
     std::string _line_delimiter;
+    TFileFormatType::type _file_format_type;
     int _value_separator_length;
     int _line_delimiter_length;
 
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/exec/plain_binary_line_reader.cpp
similarity index 55%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to be/src/exec/plain_binary_line_reader.cpp
index 127e710..e680e15 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -15,34 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "exec/plain_binary_line_reader.h"
 
-namespace doris {
-
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-    Status begin_txn(StreamLoadContext* ctx);
+#include "common/status.h"
+#include "exec/file_reader.h"
 
-    Status commit_txn(StreamLoadContext* ctx);
-
-    void rollback_txn(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
+namespace doris {
 
-private:
-    ExecEnv* _exec_env;
-};
+PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader)
+        : _file_reader(file_reader) {
+}
+
+PlainBinaryLineReader::~PlainBinaryLineReader() {
+    close();
+}
+
+void PlainBinaryLineReader::close() {
+}
+
+Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
+    std::unique_ptr<uint8_t[]> file_buf;
+    int64_t read_size = 0;
+    RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
+    *ptr = file_buf.release();
+    *size = read_size;
+    if (read_size == 0) {
+        *eof = true;
+    }
+    return Status::OK();
+}
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/exec/plain_binary_line_reader.h
similarity index 58%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to be/src/exec/plain_binary_line_reader.h
index 127e710..9e1143b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/exec/plain_binary_line_reader.h
@@ -17,32 +17,24 @@
 
 #pragma once
 
+#include "exec/line_reader.h"
+
 namespace doris {
 
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
+class FileReader;
 
-class StreamLoadExecutor {
+class PlainBinaryLineReader : public LineReader {
 public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-    Status begin_txn(StreamLoadContext* ctx);
+    PlainBinaryLineReader(FileReader* file_reader);
 
-    Status commit_txn(StreamLoadContext* ctx);
+    virtual ~PlainBinaryLineReader();
 
-    void rollback_txn(StreamLoadContext* ctx);
+    virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
 
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
+    virtual void close() override;
 
 private:
-    ExecEnv* _exec_env;
+    FileReader* _file_reader;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a7832f8..415e188 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -40,6 +40,9 @@
 #include "runtime/exec_env.h"
 #include "runtime/plan_fragment_executor.h"
 #include "runtime/runtime_filter_mgr.h"
+#include "runtime/stream_load/stream_load_pipe.h"
+#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
 #include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
@@ -136,6 +139,9 @@ public:
 
     std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; }
 
+    void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
+    std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
+
 private:
     void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);
 
@@ -166,6 +172,8 @@ private:
     std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
+    // The pipe for data transfering, such as insert.
+    std::shared_ptr<StreamLoadPipe> _pipe;
 };
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -239,6 +247,9 @@ Status FragmentExecState::cancel_before_execute() {
     // set status as 'abort', cuz cancel() won't effect the status arg of DataSink::close().
     _executor.set_abort();
     _executor.cancel();
+    if (_pipe != nullptr) {
+        _pipe->cancel();
+    }
     return Status::OK();
 }
 
@@ -249,6 +260,9 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
         _executor.set_is_report_on_cancel(false);
     }
     _executor.cancel();
+    if (_pipe != nullptr) {
+        _pipe->cancel();
+    }
     return Status::OK();
 }
 
@@ -457,7 +471,63 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
 }
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
-    return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
+    if (params.txn_conf.need_txn) {
+        StreamLoadContext* stream_load_cxt = new StreamLoadContext(_exec_env);
+        stream_load_cxt->db = params.txn_conf.db;
+        stream_load_cxt->db_id = params.txn_conf.db_id;
+        stream_load_cxt->table = params.txn_conf.tbl;
+        stream_load_cxt->txn_id = params.txn_conf.txn_id;
+        stream_load_cxt->id = UniqueId(params.params.query_id);
+        stream_load_cxt->put_result.params = params;
+        stream_load_cxt->use_streaming = true;
+        stream_load_cxt->load_type = TLoadType::MANUL_LOAD;
+        stream_load_cxt->load_src_type = TLoadSourceType::RAW;
+        stream_load_cxt->label = params.import_label;
+        stream_load_cxt->format = TFileFormatType::FORMAT_CSV_PLAIN;
+        stream_load_cxt->timeout_second = 3600;
+        stream_load_cxt->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
+        stream_load_cxt->need_commit_self = true;
+        stream_load_cxt->need_rollback = true;
+        // total_length == -1 means read one message from pipe in once time, don't care the length.
+        auto pipe = std::make_shared<StreamLoadPipe>(
+                1024 * 1024 /* max_buffered_bytes */,
+                64 * 1024 /* min_chunk_size */,
+                -1 /* total_length */,
+                true /* use_proto */);
+        stream_load_cxt->body_sink = pipe;
+        stream_load_cxt->max_filter_ratio = params.txn_conf.max_filter_ratio;
+
+        RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_cxt->id, pipe));
+
+        RETURN_IF_ERROR(
+                _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_cxt, pipe));
+        set_pipe(params.params.fragment_instance_id, pipe);
+        return Status::OK();
+    } else {
+        return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
+    }
+}
+
+void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe) {
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _fragment_map.find(fragment_instance_id);
+        if (iter != _fragment_map.end()) {
+            _fragment_map[fragment_instance_id]->set_pipe(pipe);
+        }
+    }
+}
+
+std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) {
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _fragment_map.find(fragment_instance_id);
+        if (iter != _fragment_map.end()) {
+            return _fragment_map[fragment_instance_id]->get_pipe();
+        } else {
+            return nullptr;
+        }
+    }
 }
 
 Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index a6329cb..2ea0cba 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -48,6 +48,7 @@ class TExecPlanFragmentParams;
 class TExecPlanFragmentParamsList;
 class TUniqueId;
 class RuntimeFilterMergeController;
+class StreamLoadPipe;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
@@ -88,6 +89,10 @@ public:
 
     Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
 
+    void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe);
+
+    std::shared_ptr<StreamLoadPipe> get_pipe(const TUniqueId& fragment_instance_id);
+
 private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb);
 
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index aeca593..f6e7b4a 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -126,6 +126,7 @@ public:
     UniqueId id;
 
     std::string db;
+    int64_t db_id = -1;
     std::string table;
     std::string label;
     // optional
@@ -193,6 +194,7 @@ public:
     // to identified a specified data consumer.
     int64_t consumer_id;
 
+    bool need_commit_self = false;
 public:
     ExecEnv* exec_env() { return _exec_env; }
 
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 3c9a237..781792b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -42,6 +42,10 @@ Status k_stream_load_plan_status;
 #endif
 
 Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
+    return execute_plan_fragment(ctx, nullptr);
+}
+
+Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe) {
     DorisMetrics::instance()->txn_exec_plan_total->increment(1);
 // submit this params
 #ifndef BE_TEST
@@ -50,7 +54,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
     LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
               << ", query_id=" << print_id(ctx->put_result.params.params.query_id);
     auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
-            ctx->put_result.params, [ctx](PlanFragmentExecutor* executor) {
+            ctx->put_result.params, [ctx, pipe, this](PlanFragmentExecutor* executor) {
                 ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
                 Status status = executor->status();
                 if (status.ok()) {
@@ -105,6 +109,15 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
                 ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
                 ctx->promise.set_value(status);
 
+                if (ctx->need_commit_self && pipe != nullptr) {
+                    if (pipe->closed() || !status.ok()) {
+                        ctx->status = status;
+                        this->rollback_txn(ctx);
+                    } else {
+                        this->commit_txn(ctx);
+                    }
+                }
+
                 if (ctx->unref()) {
                     delete ctx;
                 }
@@ -119,7 +132,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
 #endif
     return Status::OK();
 }
-
 Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
     DorisMetrics::instance()->txn_begin_request_total->increment(1);
 
@@ -156,6 +168,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
         return status;
     }
     ctx->txn_id = result.txnId;
+    if (result.__isset.db_id) {
+        ctx->db_id = result.db_id;
+    }
     ctx->need_rollback = true;
 
     return Status::OK();
@@ -167,6 +182,10 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
     TLoadTxnCommitRequest request;
     set_request_auth(&request, ctx->auth);
     request.db = ctx->db;
+    if (ctx->db_id > 0) {
+        request.db_id = ctx->db_id;
+        request.__isset.db_id = true;
+    }
     request.tbl = ctx->table;
     request.txnId = ctx->txn_id;
     request.sync = true;
@@ -217,6 +236,10 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
     TLoadTxnRollbackRequest request;
     set_request_auth(&request, ctx->auth);
     request.db = ctx->db;
+    if (ctx->db_id > 0) {
+        request.db_id = ctx->db_id;
+        request.__isset.db_id = true;
+    }
     request.tbl = ctx->table;
     request.txnId = ctx->txn_id;
     request.__set_reason(ctx->status.get_error_msg());
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h
index 127e710..4ba791c 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,12 +17,15 @@
 
 #pragma once
 
+#include <memory>
+
 namespace doris {
 
 class ExecEnv;
 class StreamLoadContext;
 class Status;
 class TTxnCommitAttachment;
+class StreamLoadPipe;
 
 class StreamLoadExecutor {
 public:
@@ -36,6 +39,7 @@ public:
 
     Status execute_plan_fragment(StreamLoadContext* ctx);
 
+    Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe);
 private:
     // collect the load statistics from context and set them to stat
     // return true if stat is set, otherwise, return false
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index 95b0adf..3912a0f 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -25,6 +25,7 @@
 #include "runtime/message_body_sink.h"
 #include "util/bit_util.h"
 #include "util/byte_buffer.h"
+#include "gen_cpp/internal_service.pb.h"
 
 namespace doris {
 
@@ -33,22 +34,24 @@ namespace doris {
 class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
     StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
-                   int64_t total_length = -1)
+                   int64_t total_length = -1, bool use_proto = false)
             : _buffered_bytes(0),
+              _proto_buffered_bytes(0),
               _max_buffered_bytes(max_buffered_bytes),
               _min_chunk_size(min_chunk_size),
               _total_length(total_length),
+              _use_proto(use_proto),
               _finished(false),
               _cancelled(false) {}
     virtual ~StreamLoadPipe() {}
 
     Status open() override { return Status::OK(); }
 
-    Status append_and_flush(const char* data, size_t size) {
+    Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0) {
         ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
         buf->put_bytes(data, size);
         buf->flip();
-        return _append(buf);
+        return _append(buf, proto_byte_size);
     }
 
     Status append(const char* data, size_t size) override {
@@ -210,23 +213,38 @@ private:
 
         _buf_queue.pop_front();
         _buffered_bytes -= buf->limit;
+        if (_use_proto) {
+            PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
+            _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize());
+        }
         _put_cond.notify_one();
         return Status::OK();
     }
 
-    Status _append(const ByteBufferPtr& buf) {
+    Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) {
         {
             std::unique_lock<std::mutex> l(_lock);
             // if _buf_queue is empty, we append this buf without size check
-            while (!_cancelled && !_buf_queue.empty() &&
-                   _buffered_bytes + buf->remaining() > _max_buffered_bytes) {
-                _put_cond.wait(l);
+            if (_use_proto) {
+                while (!_cancelled && !_buf_queue.empty() &&
+                       (_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) {
+                    _put_cond.wait(l);
+                }
+            } else {
+                while (!_cancelled && !_buf_queue.empty() &&
+                       _buffered_bytes + buf->remaining() > _max_buffered_bytes) {
+                    _put_cond.wait(l);
+                }
             }
             if (_cancelled) {
                 return Status::InternalError("cancelled");
             }
             _buf_queue.push_back(buf);
-            _buffered_bytes += buf->remaining();
+            if (_use_proto) {
+                _proto_buffered_bytes += proto_byte_size;
+            } else {
+                _buffered_bytes += buf->remaining();
+            }
         }
         _get_cond.notify_one();
         return Status::OK();
@@ -235,6 +253,7 @@ private:
     // Blocking queue
     std::mutex _lock;
     size_t _buffered_bytes;
+    size_t _proto_buffered_bytes;
     size_t _max_buffered_bytes;
     size_t _min_chunk_size;
     // The total amount of data expected to be read.
@@ -245,6 +264,7 @@ private:
     // and the length is unknown.
     // size_t is unsigned, so use int64_t
     int64_t _total_length = -1;
+    bool _use_proto = false;
     std::deque<ByteBufferPtr> _buf_queue;
     std::condition_variable _put_cond;
     std::condition_variable _get_cond;
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index a24f356..fbe7cbb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -289,6 +289,67 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
 }
 
 template<typename T>
+void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller,
+                                        const PSendDataRequest* request,
+                                        PSendDataResult* response,
+                                        google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        for (int i = 0; i < request->data_size(); ++i) {
+            PDataRow* row = new PDataRow();
+            row->CopyFrom(request->data(i));
+            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row), sizeof(row) + row->ByteSize());
+        }
+        response->mutable_status()->set_status_code(0);
+    }
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller,
+                                     const PCommitRequest* request,
+                                     PCommitResult* response,
+                                     google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        pipe->finish();
+        response->mutable_status()->set_status_code(0);
+    }
+}
+
+template<typename T>
+void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller,
+                                       const PRollbackRequest* request,
+                                       PRollbackResult* response,
+                                       google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        pipe->cancel();
+        response->mutable_status()->set_status_code(0);
+    }
+}
+
+template<typename T>
 void PInternalServiceImpl<T>::fold_constant_expr(
     google::protobuf::RpcController* cntl_base,
     const PConstantExprRequest* request,
@@ -324,6 +385,7 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_reque
     FoldConstantMgr mgr(_exec_env);
     return mgr.fold_constant_expr(t_request, response);
 }
+
 template class PInternalServiceImpl<PBackendService>;
 template class PInternalServiceImpl<palo::PInternalService>;
 
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index b51680d..9a4f240 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -93,6 +93,18 @@ public:
                       ::doris::PPublishFilterResponse* response,
                       ::google::protobuf::Closure* done) override;
 
+    void send_data(google::protobuf::RpcController* controller,
+                   const PSendDataRequest* request,
+                   PSendDataResult* response,
+                   google::protobuf::Closure* done);
+    void commit(google::protobuf::RpcController* controller,
+                const PCommitRequest* request,
+                PCommitResult* response,
+                google::protobuf::Closure* done);
+    void rollback(google::protobuf::RpcController* controller,
+                  const PRollbackRequest* request,
+                  PRollbackResult* response,
+                  google::protobuf::Closure* done);
     void fold_constant_expr(google::protobuf::RpcController* controller,
                             const PConstantExprRequest* request,
                             PConstantExprResult* response,
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md
new file mode 100644
index 0000000..069eeeb
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BEGIN.md	
@@ -0,0 +1,92 @@
+---
+{
+    "title": "BEGIN",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# BEGIN, COMMIT, ROLLBACK
+## Description
+### Syntax
+
+```
+BEGIN;
+INSERT INTO table_name ...
+COMMIT;
+```
+```
+BEGIN [ WITH LABEL label];
+INSERT INTO table_name ...
+ROLLBACK;
+```
+### Parameters
+
+> label: the label for this transaction, if you need to set it to a string.
+
+### Note
+
+A transaction can only be used on insert, nor update or delete. You can check the state of this transaction by `SHOW TRANSACTION WHERE LABEL = 'label'`
+
+## example
+
+1. Begin a transaction without a label, then commit it
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+All the data in the sql between `begin` and `commit` will be inserted into the table. 
+
+2. Begin a transaction without a label, then abort it
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+ROLLBACK:
+```
+
+All the data in the sql between `begin` and `rollback` will be aborted, nothing will be inserted into the table. 
+
+3. Begin a transaction with a label, then commit it
+
+```
+BEGIN WITH LABEL test_label1
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+All the data in the sql between `begin` and `commit` will be inserted into the table.
+The label of `test_label1` will be set to mark this transaction. You can check this transaction by `SHOW TRANSACTION WHERE LABEL = 'test_label1'`.
+
+## keyword
+BEGIN, COMMIT, ROLLBACK
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
index 0639f15..310636d 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md	
@@ -27,14 +27,16 @@ under the License.
 # SHOW TRANSACTION
 ## description
 
-This syntax is used to view transaction details for the specified transaction id.
+This syntax is used to view transaction details for the specified transaction id or label name.
 
 grammar:
 
 ```
 SHOW TRANSACTION
 [FROM db_name]
-WHERE id = transaction_id;
+WHERE
+[id = transaction_id]
+[label = label_name];
 ```
 
 Example return result:
@@ -81,6 +83,10 @@ ErrorReplicasCount: 0
 
     SHOW TRANSACTION FROM db WHERE ID = 4005;
 
+3. View the transaction with label `label_name`:
+
+    SHOW TRANSACTION WHERE LABEL = 'label_name';
+
 ## keyword
 
     SHOW, TRANSACTION
\ No newline at end of file
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md
new file mode 100644
index 0000000..a286a3f
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BEGIN.md	
@@ -0,0 +1,93 @@
+---
+{
+    "title": "BEGIN",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# BEGIN, COMMIT, ROLLBACK
+## Description
+### Syntax
+
+```
+BEGIN;
+INSERT INTO table_name ...
+COMMIT;
+```
+
+```
+BEGIN [ WITH LABEL label];
+INSERT INTO table_name ...
+ROLLBACK;
+```
+### Parameters
+
+> label: 用于指定当前事务的标签名。
+
+### Note
+
+事务只能对insert使用,而不能对update和delete使用,当指定标签时,可通过以下命令检查事务的运行状态: `SHOW TRANSACTION WHERE LABEL = 'label'`
+
+## example
+
+1. 开启一个事务,不指定标签,执行insert后提交。
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+所有在`begin`和`commit`之间的数据会被插入到test表中。 
+
+2. 开启一个事务,不指定标签,执行insert后,回滚。
+
+```
+BEGIN
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+ROLLBACK:
+```
+
+所有在`begin`和`commit`之间的数据会取消,没有任何数据插入到test表中。 
+
+3. 开启一个事务,指定标签为test_label1,执行insert后提交。
+
+```
+BEGIN WITH LABEL test_label1
+INSERT INTO test VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, 2);
+INSERT INTO test (c1, c2) VALUES (1, DEFAULT);
+INSERT INTO test (c1) VALUES (1);
+COMMIT:
+```
+
+所有在`begin`和`commit`之间的数据会被插入到test表中。 
+标签`test_label1`用于标记该事务,可以通过以下命令来检查事务的状态:`SHOW TRANSACTION WHERE LABEL = 'test_label1'`。
+
+## keyword
+BEGIN, COMMIT, ROLLBACK
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md
index ad9dabf..f3e83b5 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW TRANSACTION.md	
@@ -27,14 +27,16 @@ under the License.
 # SHOW TRANSACTION
 ## description
 
-该语法用于查看指定 transaction id 的事务详情。
+该语法用于查看指定 transaction id 或 label 的事务详情。
 
 语法:
     
 ```
 SHOW TRANSACTION
 [FROM db_name]
-WHERE id = transaction_id;
+WHERE
+[id = transaction_id]
+[label = label_name];
 ```
         
 返回结果示例:
@@ -81,6 +83,9 @@ ErrorReplicasCount: 0
 
     SHOW TRANSACTION FROM db WHERE ID=4005;
 
+3. 查看 label 为 label_name的事务: 
+    SHOW TRANSACTION WHERE LABEL = 'label_name';
+
 ## keyword
 
     SHOW, TRANSACTION
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index caa9ade..c3c3ed1 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -292,10 +292,11 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
     show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt,
     describe_stmt, alter_stmt,
     use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
-    link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
+    link_stmt, migrate_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
     import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
     import_preceding_filter_stmt;
 
+nonterminal String transaction_label;
 nonterminal ImportColumnDesc import_column_desc;
 nonterminal List<ImportColumnDesc> import_column_descs;
 
@@ -689,6 +690,8 @@ stmt ::=
     {: RESULT = stmt; :}
     | restore_stmt : stmt
     {: RESULT = stmt; :}
+    | transaction_stmt : stmt
+    {: RESULT = stmt; :}
     | unsupported_stmt : stmt
     {: RESULT = stmt; :}
     | export_stmt : stmt
@@ -4810,21 +4813,39 @@ truncate_stmt ::=
     :}
     ;
 
-unsupported_stmt ::=
-    KW_START KW_TRANSACTION opt_with_consistent_snapshot:v
+transaction_stmt ::=
+    KW_BEGIN
     {:
-        RESULT = new UnsupportedStmt();
+        RESULT = new TransactionBeginStmt();
     :}
-    | KW_BEGIN opt_work:work
+    | KW_BEGIN KW_WITH KW_LABEL transaction_label:label
     {:
-        RESULT = new UnsupportedStmt();
+        RESULT = new TransactionBeginStmt(label);
     :}
     | KW_COMMIT opt_work opt_chain opt_release
     {:
-        RESULT = new UnsupportedStmt();
+        RESULT = new TransactionCommitStmt();
     :}
     | KW_ROLLBACK opt_work opt_chain opt_release
     {:
+        RESULT = new TransactionRollbackStmt();
+    :}
+    ;
+
+transaction_label ::=
+    /* empty */
+    {:
+        RESULT = null;
+    :}
+    | ident:label
+    {:
+        RESULT = label;
+    :}
+    ;
+
+unsupported_stmt ::=
+    KW_START KW_TRANSACTION opt_with_consistent_snapshot:v
+    {:
         RESULT = new UnsupportedStmt();
     :}
     ;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 2823d98..b6b503f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -279,6 +279,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         return isAnalyzed;
     }
 
+    public void checkValueValid() throws AnalysisException {}
+
     public ExprId getId() {
         return id;
     }
@@ -1779,4 +1781,5 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         }
         return false;
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 8c6e0d6..3b961bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -187,6 +187,10 @@ public class InsertStmt extends DdlStmt {
         return tblName.getDb();
     }
 
+    public String getTbl() {
+        return tblName.getTbl();
+    }
+
     public void getTables(Analyzer analyzer, Map<Long, Table> tableMap, Set<String> parentViewNameSet) throws AnalysisException {
         // get dbs of statement
         queryStmt.getTables(analyzer, tableMap, parentViewNameSet);
@@ -291,6 +295,10 @@ public class InsertStmt extends DdlStmt {
 
         analyzePlanHints(analyzer);
 
+        if (analyzer.getContext().isTxnModel()) {
+            return;
+        }
+
         // create data sink
         createDataSink();
 
@@ -298,10 +306,10 @@ public class InsertStmt extends DdlStmt {
 
         // create label and begin transaction
         long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+        if (Strings.isNullOrEmpty(label)) {
+            label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId());
+        }
         if (!isExplain() && !isTransactionBegin) {
-            if (Strings.isNullOrEmpty(label)) {
-                label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId());
-            }
 
             if (targetTable instanceof OlapTable) {
                 LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
@@ -720,7 +728,9 @@ public class InsertStmt extends DdlStmt {
         if (col.getDataType().equals(expr.getType().getPrimitiveType())) {
             return expr;
         }
-        return expr.castTo(col.getType());
+        Expr newExpr = expr.castTo(col.getType());
+        newExpr.checkValueValid();
+        return newExpr;
     }
 
     public void prepareExpressions() throws UserException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
index 64b3973..65aee31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
@@ -66,37 +66,7 @@ public class IntLiteral extends LiteralExpr {
 
     public IntLiteral(long longValue, Type type) throws AnalysisException {
         super();
-        boolean valid = true;
-        switch (type.getPrimitiveType()) {
-            case TINYINT:
-                if (longValue < TINY_INT_MIN || longValue > TINY_INT_MAX) {
-                    valid = false;
-                }
-                break;
-            case SMALLINT:
-                if (longValue < SMALL_INT_MIN || longValue > SMALL_INT_MAX) {
-                    valid = false;
-                }
-                break;
-            case INT:
-                if (longValue < INT_MIN || longValue > INT_MAX) {
-                    valid = false;
-                }
-                break;
-            case BIGINT:
-                if (longValue < BIG_INT_MIN) {
-                    valid = false;
-                }
-                // no need to check upper bound
-                break;
-            default:
-                valid = false;
-                break;
-        }
-
-        if (!valid) {
-            throw new AnalysisException("Number out of range[" + value + "]. type: " + type);
-        }
+        checkValueValid(longValue, type);
 
         this.value = longValue;
         this.type = type;
@@ -107,11 +77,28 @@ public class IntLiteral extends LiteralExpr {
         super();
         long longValue = -1L;
         try {
-            longValue = Long.valueOf(value);
+            longValue = Long.parseLong(value);
         } catch (NumberFormatException e) {
             throw new AnalysisException("Invalid number format: " + value);
         }
+        checkValueValid(longValue, type);
+
+        this.value = longValue;
+        this.type = type;
+        analysisDone();
+    }
+
+    protected IntLiteral(IntLiteral other) {
+        super(other);
+        value = other.value;
+    }
+
+    @Override
+    public void checkValueValid() throws AnalysisException {
+        checkValueValid(value, type);
+    }
 
+    private void checkValueValid(long longValue, Type type) throws AnalysisException {
         boolean valid = true;
         switch (type.getPrimitiveType()) {
             case TINYINT:
@@ -141,17 +128,8 @@ public class IntLiteral extends LiteralExpr {
         }
 
         if (!valid) {
-            throw new AnalysisException("Number out of range[" + value + "]. type: " + type);
+            throw new AnalysisException("Number out of range[" + longValue + "]. type: " + type);
         }
-
-        this.value = longValue;
-        this.type = type;
-        analysisDone();
-    }
-
-    protected IntLiteral(IntLiteral other) {
-        super(other);
-        value = other.value;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
index 0115247..2c42cf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTransactionStmt.java
@@ -40,7 +40,8 @@ public class ShowTransactionStmt extends ShowStmt {
 
     private String dbName;
     private Expr whereClause;
-    private long txnId;
+    private long txnId = -1;
+    private String label = "";
 
     public ShowTransactionStmt(String dbName, Expr whereClause) {
         this.dbName = dbName;
@@ -55,6 +56,10 @@ public class ShowTransactionStmt extends ShowStmt {
         return txnId;
     }
 
+    public String getLabel() {
+        return label;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
         super.analyze(analyzer);
@@ -99,22 +104,17 @@ public class ShowTransactionStmt extends ShowStmt {
                 break CHECK;
             }
             String leftKey = ((SlotRef) whereClause.getChild(0)).getColumnName();
-            if (!leftKey.equalsIgnoreCase("id")) {
-                valid = false;
-                break CHECK;
-            }
-
-            // right child
-            if (!(whereClause.getChild(1) instanceof IntLiteral)) {
+            if (leftKey.equalsIgnoreCase("id") && (whereClause.getChild(1) instanceof IntLiteral)) {
+                txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue();
+            } else if (leftKey.equalsIgnoreCase("label") && (whereClause.getChild(1) instanceof StringLiteral)) {
+                label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
+            } else {
                 valid = false;
-                break CHECK;
             }
-
-            txnId = ((IntLiteral) whereClause.getChild(1)).getLongValue();
         }
 
         if (!valid) {
-            throw new AnalysisException("Where clause should looks like: id = 123");
+            throw new AnalysisException("Where clause should looks like one of them: id = 123 or label = 'label'");
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java
new file mode 100644
index 0000000..7e631b5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionBeginStmt.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.transaction.TransactionEntry;
+
+public class TransactionBeginStmt extends TransactionStmt {
+    private String label = null;
+    public TransactionBeginStmt() {
+        this.label = "";
+    }
+    public TransactionBeginStmt(final String label) {
+        this.label = label;
+    }
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+        if (label == null || label.isEmpty()) {
+            label = "txn_insert_" + DebugUtil.printId(analyzer.getContext().queryId());
+        }
+        if (analyzer.getContext().getTxnEntry() == null) {
+            analyzer.getContext().setTxnEntry(new TransactionEntry());
+        }
+        analyzer.getContext().getTxnEntry().setLabel(label);
+        super.analyze(analyzer);
+    }
+
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
index 127e710..08bca18 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionCommitStmt.java
@@ -15,34 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+package org.apache.doris.analysis;
 
-namespace doris {
+public class TransactionCommitStmt extends TransactionStmt {
 
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-    Status begin_txn(StreamLoadContext* ctx);
-
-    Status commit_txn(StreamLoadContext* ctx);
-
-    void rollback_txn(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
-    ExecEnv* _exec_env;
-};
-
-} // namespace doris
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
index 127e710..d62028e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionRollbackStmt.java
@@ -15,34 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+package org.apache.doris.analysis;
 
-namespace doris {
+public class TransactionRollbackStmt extends TransactionStmt {
 
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-
-class StreamLoadExecutor {
-public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-    Status begin_txn(StreamLoadContext* ctx);
-
-    Status commit_txn(StreamLoadContext* ctx);
-
-    void rollback_txn(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
-    ExecEnv* _exec_env;
-};
-
-} // namespace doris
+}
\ No newline at end of file
diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
similarity index 53%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
index 127e710..1aeb635 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TransactionStmt.java
@@ -15,34 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+package org.apache.doris.analysis;
 
-namespace doris {
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
 
-class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
+public class TransactionStmt extends StatementBase {
 
-class StreamLoadExecutor {
-public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.NO_FORWARD;
+    }
 
-    Status begin_txn(StreamLoadContext* ctx);
-
-    Status commit_txn(StreamLoadContext* ctx);
-
-    void rollback_txn(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
-
-private:
-    ExecEnv* _exec_env;
-};
-
-} // namespace doris
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+        super.analyze(analyzer);
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index b77a8ce..3f0155e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -433,6 +433,13 @@ public class Config extends ConfigBase {
     public static int publish_version_timeout_second = 30; // 30 seconds
 
     /**
+     * Maximal waiting time for all data inserted before one transaction to be committed
+     * This is the timeout second for the command "commit"
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int commit_timeout_second = 30; // 30 seconds
+
+    /**
      * minimal intervals between two publish version action
      */
     @ConfField public static int publish_version_interval_ms = 10;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 6fd8d6e..86b2fd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -225,7 +225,7 @@ public class Planner {
             RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
         }
 
-	if (statement instanceof InsertStmt) {
+	    if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) {
             InsertStmt insertStmt = (InsertStmt) statement;
             rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
             rootFragment.setSink(insertStmt.getDataSink());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index d762526..8f1dad8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -19,7 +19,9 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.MysqlCapability;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
@@ -27,6 +29,7 @@ import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.TransactionEntry;
 
 import com.google.common.collect.Lists;
 
@@ -64,6 +67,9 @@ public class ConnectContext {
     protected volatile boolean isKilled;
     // Db
     protected volatile String currentDb = "";
+    protected volatile long currentDbId = -1;
+    // Transaction
+    protected volatile TransactionEntry txnEntry = null;
     // cluster name
     protected volatile String clusterName = "";
     // username@host of current login user
@@ -154,6 +160,30 @@ public class ConnectContext {
         queryDetail = null;
     }
 
+    public boolean isTxnModel() {
+        return txnEntry != null && txnEntry.isTxnModel();
+    }
+    public boolean isTxnIniting() {
+        return txnEntry != null && txnEntry.isTxnIniting();
+    }
+    public boolean isTxnBegin() {
+        return txnEntry != null && txnEntry.isTxnBegin();
+    }
+    public void closeTxn() {
+        if (isTxnModel()) {
+            if (isTxnBegin()) {
+                try {
+                    Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
+                            currentDbId, txnEntry.getTxnConf().getTxnId(), "timeout");
+                } catch (UserException e) {
+                    LOG.error("db: {}, txnId: {}, rollback error.", currentDb,
+                            txnEntry.getTxnConf().getTxnId(), e);
+                }
+            }
+            txnEntry = null;
+        }
+    }
+
     // Just for unit test
     public void resetSessionVariables() {
         sessionVariable = VariableMgr.newSessionVariable();
@@ -199,6 +229,18 @@ public class ConnectContext {
         threadLocalInfo.set(this);
     }
 
+    public long getCurrentDbId() {
+        return currentDbId;
+    }
+
+    public TransactionEntry getTxnEntry() {
+        return txnEntry;
+    }
+
+    public void setTxnEntry(TransactionEntry txnEntry) {
+        this.txnEntry = txnEntry;
+    }
+
     public TResourceInfo toResourceCtx() {
         return new TResourceInfo(qualifiedUser, sessionVariable.getResourceGroup());
     }
@@ -315,6 +357,12 @@ public class ConnectContext {
 
     public void setDatabase(String db) {
         currentDb = db;
+        Database database = Catalog.getCurrentCatalog().getDb(db);
+        if (database == null) {
+            currentDbId = -1;
+        } else {
+            currentDbId = database.getId();
+        }
     }
 
     public void setExecutor(StmtExecutor executor) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index ae838f2..80f7a92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -113,6 +113,7 @@ public class ConnectScheduler {
     }
 
     public synchronized void unregisterConnection(ConnectContext ctx) {
+        ctx.closeTxn();
         if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
             numberConnection--;
             AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
new file mode 100644
index 0000000..9106006
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.StreamLoadPlanner;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.TransactionEntry;
+
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class InsertStreamTxnExecutor {
+    private long txnId;
+    private TUniqueId loadId;
+    private TransactionEntry txnEntry;
+
+    public InsertStreamTxnExecutor(TransactionEntry txnEntry) {
+        this.txnEntry = txnEntry;
+    }
+
+    public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        TTxnParams txnConf = txnEntry.getTxnConf();
+        StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
+        StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
+        TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
+        List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(
+                1, true, true, txnEntry.getDb().getClusterName());
+        if (beIds == null || beIds.isEmpty()) {
+            throw new UserException("there is no scanNode Backend.");
+        }
+
+        tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+        for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) {
+            for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                for (TBrokerRangeDesc desc : scanRangeParams.scan_range.broker_scan_range.ranges) {
+                    desc.setFormatType(TFileFormatType.FORMAT_PROTO);
+                }
+            }
+        }
+        txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+
+        Backend backend = Catalog.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+        txnConf.setUserIp(backend.getHost());
+        txnEntry.setBackend(backend);
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            Future<InternalService.PExecPlanFragmentResult> future = BackendServiceProxy.getInstance().execPlanFragmentAsync(
+                    address, tRequest);
+            InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
+            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
+            }
+        } catch (RpcException e) {
+            throw new TException(e);
+        }
+    }
+
+    public void commitTransaction() throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        TTxnParams txnConf = txnEntry.getTxnConf();
+        Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+                .setHi(txnConf.getFragmentInstanceId().getHi())
+                .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+        Backend backend = txnEntry.getBackend();
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            Future<InternalService.PCommitResult> future = BackendServiceProxy.getInstance().commit(address, fragmentInstanceId);
+            InternalService.PCommitResult result = future.get(5, TimeUnit.SECONDS);
+            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new TException("failed to commit txn: " + result.getStatus().getErrorMsgsList());
+            }
+        } catch (RpcException e) {
+            throw new TException(e);
+        }
+    }
+
+    public void abortTransaction() throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        TTxnParams txnConf = txnEntry.getTxnConf();
+        Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+                .setHi(txnConf.getFragmentInstanceId().getHi())
+                .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+        Backend be = txnEntry.getBackend();
+        TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+        try {
+            Future<InternalService.PRollbackResult> future = BackendServiceProxy.getInstance().rollback(address,
+                    fragmentInstanceId);
+            InternalService.PRollbackResult result = future.get(5, TimeUnit.SECONDS);
+            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new TException("failed to rollback txn: " + result.getStatus().getErrorMsgsList());
+            }
+        } catch (RpcException e) {
+            throw new TException(e);
+        }
+    }
+
+    public void sendData() throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        if (txnEntry.getDataToSend() == null || txnEntry.getDataToSend().isEmpty()) {
+            return;
+        }
+
+        TTxnParams txnConf = txnEntry.getTxnConf();
+        Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
+                .setHi(txnConf.getFragmentInstanceId().getHi())
+                .setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
+        Backend backend = txnEntry.getBackend();
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+        try {
+            Future<InternalService.PSendDataResult> future = BackendServiceProxy.getInstance().sendData(
+                    address, fragmentInstanceId, txnEntry.getDataToSend());
+            InternalService.PSendDataResult result = future.get(5, TimeUnit.SECONDS);
+            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList());
+            }
+            txnEntry.clearDataToSend();
+        } catch (RpcException e) {
+            throw new TException(e);
+        }
+    }
+
+    public TUniqueId getLoadId() {
+        return loadId;
+    }
+
+    public void setLoadId(TUniqueId loadId) {
+        this.loadId = loadId;
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
+
+    public void setTxnId(long txnId) {
+        this.txnId = txnId;
+    }
+
+    public TransactionEntry getTxnEntry() {
+        return txnEntry;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java
new file mode 100644
index 0000000..917c4cc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterTxnExecutor.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TLoadTxnBeginRequest;
+import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public class MasterTxnExecutor {
+    private static final Logger LOG = LogManager.getLogger(MasterTxnExecutor.class);
+
+    private int waitTimeoutMs;
+    // the total time of thrift connectTime add readTime and writeTime
+    private int thriftTimeoutMs;
+    private ConnectContext ctx;
+
+    public MasterTxnExecutor(ConnectContext ctx) {
+        this.ctx = ctx;
+        this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+        this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
+    }
+
+    private TNetworkAddress getMasterAddress() throws TException {
+        if (!ctx.getCatalog().isReady()) {
+            throw new TException("Node catalog is not ready, please wait for a while.");
+        }
+        String masterHost = ctx.getCatalog().getMasterIp();
+        int masterRpcPort = ctx.getCatalog().getMasterRpcPort();
+        return new TNetworkAddress(masterHost, masterRpcPort);
+    }
+
+    private FrontendService.Client getClient(TNetworkAddress thriftAddress) throws TException {
+        try {
+            return ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
+        } catch (Exception e) {
+            // may throw NullPointerException. add err msg
+            throw new TException("Failed to get master client.", e);
+        }
+    }
+    // Send request to Master
+    public TLoadTxnBeginResult beginTxn(TLoadTxnBeginRequest request) throws TException {
+        TNetworkAddress thriftAddress = getMasterAddress();
+
+        FrontendService.Client client = getClient(thriftAddress);
+
+        LOG.info("Send begin transaction {} to Master {}", ctx.getStmtId(), thriftAddress);
+
+        boolean isReturnToPool = false;
+        try {
+            TLoadTxnBeginResult result = client.loadTxnBegin(request);
+            isReturnToPool = true;
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                throw new TException("begin txn failed.");
+            }
+            return result;
+        } catch (TTransportException e) {
+            boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
+            if (!ok) {
+                throw e;
+            }
+            if (e.getType() == TTransportException.TIMED_OUT) {
+                throw e;
+            } else {
+                TLoadTxnBeginResult result = client.loadTxnBegin(request);
+                isReturnToPool = true;
+                return result;
+            }
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+            }
+        }
+    }
+
+    public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws TException {
+        TNetworkAddress thriftAddress = getMasterAddress();
+
+        FrontendService.Client client = getClient(thriftAddress);
+
+        LOG.info("Send waiting transaction status {} to Master {}", ctx.getStmtId(), thriftAddress);
+
+        boolean isReturnToPool = false;
+        try {
+            TWaitingTxnStatusResult result = client.waitingTxnStatus(request);
+            isReturnToPool = true;
+            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                throw new TException("get txn status failed.");
+            }
+            return result;
+        } catch (TTransportException e) {
+            boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
+            if (!ok) {
+                throw e;
+            }
+            if (e.getType() == TTransportException.TIMED_OUT) {
+                throw e;
+            } else {
+                TWaitingTxnStatusResult result = client.waitingTxnStatus(request);
+                if (result.getStatus().getStatusCode() != TStatusCode.OK) {
+                    throw new TException("commit failed.");
+                }
+                isReturnToPool = true;
+                return result;
+            }
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+            }
+        }
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index fecc3e1..8c54369 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1853,8 +1853,15 @@ public class ShowExecutor {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
         }
 
-        long txnId = showStmt.getTxnId();
+        Long txnId = showStmt.getTxnId();
+        String label = showStmt.getLabel();
         GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr();
+        if (!label.isEmpty()) {
+            txnId = transactionMgr.getTransactionId(db.getId(), label);
+            if (txnId == null) {
+                throw new AnalysisException("transaction with label " + label + " does not exist");
+            }
+        }
         resultSet = new ShowResultSet(showStmt.getMetaData(), transactionMgr.getSingleTranInfo(db.getId(), txnId));
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f9b0ca9..5938712 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.ExportStmt;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.KillStmt;
+import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.OutFileClause;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.RedirectStatus;
@@ -38,10 +39,15 @@ import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.StmtRewriter;
 import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TransactionBeginStmt;
+import org.apache.doris.analysis.TransactionCommitStmt;
+import org.apache.doris.analysis.TransactionRollbackStmt;
+import org.apache.doris.analysis.TransactionStmt;
 import org.apache.doris.analysis.UnsupportedStmt;
 import org.apache.doris.analysis.UseStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Table;
@@ -80,13 +86,25 @@ import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TLoadTxnBeginRequest;
+import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TMergeType;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionCommitFailedException;
+import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
 import com.google.common.base.Strings;
@@ -96,6 +114,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 import org.glassfish.jersey.internal.guava.Sets;
 
 import java.io.IOException;
@@ -105,6 +124,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -115,6 +136,7 @@ public class StmtExecutor implements ProfileWriter {
     private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
 
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
+    private static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
 
     private ConnectContext context;
     private MysqlSerializer serializer;
@@ -264,22 +286,28 @@ public class StmtExecutor implements ProfileWriter {
         context.setQueryId(queryId);
 
         try {
+            if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt)
+                    && !(parsedStmt instanceof TransactionStmt)) {
+                throw new TException("This is in a transaction, only insert, commit, rollback is acceptable.");
+            }
             // support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3);
             analyzeVariablesInStmt();
-            // analyze this query
-            SessionVariable sessionVariable = context.getSessionVariable();
-            analyze(sessionVariable.toThrift());
-
-            if (isForwardToMaster()) {
-                forwardToMaster();
-                if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
-                    // If the query id changed in master, we set it in context.
-                    // WARN: when query timeout, this code may not be reach.
-                    context.setQueryId(masterOpExecutor.getQueryId());
+
+            if (!context.isTxnModel()) {
+                // analyze this query
+                analyze(context.getSessionVariable().toThrift());
+                if (isForwardToMaster()) {
+                    forwardToMaster();
+                    if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
+                        context.setQueryId(masterOpExecutor.getQueryId());
+                    }
+                    return;
+                } else {
+                    LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
                 }
-                return;
             } else {
-                LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
+                analyzer = new Analyzer(context.getCatalog(), context);
+                parsedStmt.analyze(analyzer);
             }
 
             if (parsedStmt instanceof QueryStmt) {
@@ -320,6 +348,8 @@ public class StmtExecutor implements ProfileWriter {
                 handleEnterStmt();
             } else if (parsedStmt instanceof UseStmt) {
                 handleUseStmt();
+            } else if (parsedStmt instanceof TransactionStmt) {
+                handleTransactionStmt();
             } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
                 handleInsertStmt();
             } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
@@ -378,7 +408,7 @@ public class StmtExecutor implements ProfileWriter {
                 LOG.warn("failed to revert Session value.", e);
                 context.getState().setError(e.getMessage());
             }
-            if (parsedStmt instanceof InsertStmt) {
+            if (!context.isTxnModel() && parsedStmt instanceof InsertStmt) {
                 InsertStmt insertStmt = (InsertStmt) parsedStmt;
                 // The transaction of a insert operation begin at analyze phase.
                 // So we should abort the transaction at this finally block if it encounter exception.
@@ -852,6 +882,231 @@ public class StmtExecutor implements ProfileWriter {
         plannerProfile.setQueryFetchResultFinishTime();
     }
 
+
+    private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
+        TWaitingTxnStatusResult statusResult = null;
+        if (Catalog.getCurrentCatalog().isMaster()) {
+            statusResult = Catalog.getCurrentGlobalTransactionMgr()
+                    .getWaitingTxnStatus(request);
+        } else {
+            MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context);
+            statusResult = masterTxnExecutor.getWaitingTxnStatus(request);
+        }
+        return statusResult;
+    }
+
+    private void handleTransactionStmt() throws Exception {
+        // Every time set no send flag and clean all data in buffer
+        context.getMysqlChannel().reset();
+        context.getState().setOk(0, 0, "");
+        // create plan
+        if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0
+                && (parsedStmt instanceof TransactionCommitStmt || parsedStmt instanceof TransactionRollbackStmt)) {
+            context.setTxnEntry(null);
+        } else if (parsedStmt instanceof TransactionBeginStmt) {
+            if (context.isTxnModel()) {
+                LOG.info("A transaction has already begin");
+                return;
+            }
+            TTxnParams txnParams = new TTxnParams();
+            txnParams.setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("");
+            if (context.getSessionVariable().getEnableInsertStrict()) {
+                txnParams.setMaxFilterRatio(0);
+            } else {
+                txnParams.setMaxFilterRatio(1.0);
+            }
+            if (context.getTxnEntry() == null) {
+                context.setTxnEntry(new TransactionEntry());
+            }
+            TransactionEntry txnEntry = context.getTxnEntry();
+            txnEntry.setTxnConf(txnParams);
+            StringBuilder sb = new StringBuilder();
+            sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+                    .append(TransactionStatus.PREPARE.name());
+            sb.append("', 'txnId':'").append("'").append("}");
+            context.getState().setOk(0, 0, sb.toString());
+        } else if (parsedStmt instanceof TransactionCommitStmt) {
+            if (!context.isTxnModel()) {
+                LOG.info("No transaction to commit");
+                return;
+            }
+
+            TTxnParams txnConf = context.getTxnEntry().getTxnConf();
+            try {
+                InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry());
+                if (context.getTxnEntry().getDataToSend().size() > 0) {
+                    // send rest data
+                    executor.sendData();
+                }
+                // commit txn
+                executor.commitTransaction();
+
+                // wait txn visible
+                TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest();
+                request.setDbId(txnConf.getDbId()).setTxnId(txnConf.getTxnId());
+                request.setLabelIsSet(false);
+                request.setTxnIdIsSet(true);
+
+                TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request);
+                TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId());
+                if (txnStatus == TransactionStatus.COMMITTED) {
+                    throw new AnalysisException("transaction commit successfully, BUT data will be visible later.");
+                } else if (txnStatus != TransactionStatus.VISIBLE) {
+                    String errMsg = "commit failed, rollback.";
+                    if (statusResult.getStatus().isSetErrorMsgs()
+                            && statusResult.getStatus().getErrorMsgs().size() > 0) {
+                        errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs());
+                    }
+                    throw new AnalysisException(errMsg);
+                }
+                StringBuilder sb = new StringBuilder();
+                sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+                        .append(txnStatus.name()).append("', 'txnId':'")
+                        .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+                context.getState().setOk(0, 0, sb.toString());
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage());
+            } finally {
+                context.setTxnEntry(null);
+            }
+        } else if (parsedStmt instanceof TransactionRollbackStmt) {
+            if (!context.isTxnModel()) {
+                LOG.info("No transaction to rollback");
+                return;
+            }
+            try {
+                // abort txn
+                InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(context.getTxnEntry());
+                executor.abortTransaction();
+
+                StringBuilder sb = new StringBuilder();
+                sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
+                        .append(TransactionStatus.ABORTED.name()).append("', 'txnId':'")
+                        .append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
+                context.getState().setOk(0, 0, sb.toString());
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage());
+            } finally {
+                context.setTxnEntry(null);
+            }
+        } else {
+            throw new TException("parsedStmt type is not TransactionStmt");
+        }
+    }
+
+    public int executeForTxn(InsertStmt insertStmt)
+            throws UserException, TException, InterruptedException, ExecutionException, TimeoutException {
+        if (context.isTxnIniting()) { // first time, begin txn
+            beginTxn(insertStmt.getDb(), insertStmt.getTbl());
+        }
+        if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDb()) ||
+                !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) {
+            throw new TException("Only one table can be inserted in one transaction.");
+        }
+
+        QueryStmt queryStmt = insertStmt.getQueryStmt();
+        if (!(queryStmt instanceof SelectStmt)) {
+            throw new TException("queryStmt is not SelectStmt, insert command error");
+        }
+        TransactionEntry txnEntry = context.getTxnEntry();
+        SelectStmt selectStmt = (SelectStmt) queryStmt;
+        int effectRows = 0;
+        if (selectStmt.getValueList() != null) {
+            Table tbl = txnEntry.getTable();
+            int schemaSize = tbl.getBaseSchema(false).size();
+            for (List<Expr> row : selectStmt.getValueList().getRows()) {
+                // the value columns are columns which are visible to user, so here we use
+                // getBaseSchema(), not getFullSchema()
+                if (schemaSize != row.size()) {
+                    throw new TException("Column count doesn't match value count");
+                }
+            }
+            for (List<Expr> row : selectStmt.getValueList().getRows()) {
+                ++effectRows;
+                InternalService.PDataRow data = getRowStringValue(row);
+                if (data == null) {
+                    continue;
+                }
+                List<InternalService.PDataRow> dataToSend = txnEntry.getDataToSend();
+                dataToSend.add(data);
+                if (dataToSend.size() >= MAX_DATA_TO_SEND_FOR_TXN) {
+                    // send data
+                    InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
+                    executor.sendData();
+                }
+            }
+        }
+        txnEntry.setRowsInTransaction(txnEntry.getRowsInTransaction() + effectRows);
+        return effectRows;
+    }
+
+    private void beginTxn(String dbName, String tblName) throws UserException, TException,
+            InterruptedException, ExecutionException, TimeoutException {
+        TransactionEntry txnEntry = context.getTxnEntry();
+        TTxnParams txnConf = txnEntry.getTxnConf();
+        long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+        TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+        Database dbObj = Catalog.getCurrentCatalog().getDb(dbName);
+        if (dbObj == null) {
+            throw new TException("database is invalid for dbName: " + dbName);
+        }
+        Table tblObj = dbObj.getTable(tblName);
+        if (tblObj == null) {
+            throw new TException("table is invalid: " + tblName);
+        }
+        txnConf.setDbId(dbObj.getId()).setTbl(tblName).setDb(dbName);
+        txnEntry.setTable(tblObj);
+        txnEntry.setDb(dbObj);
+        String label = txnEntry.getLabel();
+        if (Catalog.getCurrentCatalog().isMaster()) {
+            long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
+                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
+                    label, new TransactionState.TxnCoordinator(
+                            TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
+                    sourceType, timeoutSecond);
+            txnConf.setTxnId(txnId);
+            String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+                    txnConf.getDbId(), txnConf.getTxnId()).getAuthCode();
+            txnConf.setAuthCodeUuid(authCodeUuid);
+        } else {
+            String authCodeUuid = UUID.randomUUID().toString();
+            MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context);
+            TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
+            request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setAuthCodeUuid(authCodeUuid)
+                    .setCluster(dbObj.getClusterName()).setLabel(label).setUser("").setUserIp("").setPasswd("");
+            TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
+            txnConf.setTxnId(result.getTxnId());
+            txnConf.setAuthCodeUuid(authCodeUuid);
+        }
+
+        TStreamLoadPutRequest request = new TStreamLoadPutRequest();
+        request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb())
+                .setTbl(txnConf.getTbl())
+                .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
+                .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId());
+
+        // execute begin txn
+        InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
+        executor.beginTransaction(request);
+    }
+
+    private static final String NULL_VALUE_FOR_LOAD = "\\N";
+
+    public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
+        if (cols.size() == 0) {
+            return null;
+        }
+        InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
+        for (Expr expr : cols) {
+            if (expr instanceof NullLiteral) {
+                row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+            } else {
+                row.addColBuilder().setValue(expr.getStringValue());
+            }
+        }
+        return row.build();
+    }
+
     // Process a select statement.
     private void handleInsertStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
@@ -878,131 +1133,145 @@ public class StmtExecutor implements ProfileWriter {
 
         long createTime = System.currentTimeMillis();
         Throwable throwable = null;
-
-        String label = insertStmt.getLabel();
-        LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId()));
-
+        long txnId = -1;
+        String label = "";
         long loadedRows = 0;
         int filteredRows = 0;
         TransactionStatus txnStatus = TransactionStatus.ABORTED;
-        try {
-            coord = new Coordinator(context, analyzer, planner);
-            coord.setQueryType(TQueryType.LOAD);
+        String errMsg = "";
+        if (context.isTxnModel()) {
+            if (insertStmt.getQueryStmt() instanceof SelectStmt) {
+                if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
+                    throw new TException("Insert into ** select is not supported in a transaction");
+                }
+            }
+            txnStatus = TransactionStatus.PREPARE;
+            loadedRows = executeForTxn(insertStmt);
+            label = context.getTxnEntry().getLabel();
+            txnId = context.getTxnEntry().getTxnConf().getTxnId();
+        } else {
+            label = insertStmt.getLabel();
+            LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId()));
 
-            QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
+            try {
+                coord = new Coordinator(context, analyzer, planner);
+                coord.setQueryType(TQueryType.LOAD);
 
-            coord.exec();
+                QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
 
-            coord.join(context.getSessionVariable().getQueryTimeoutS());
-            if (!coord.isDone()) {
-                coord.cancel();
-                ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
-            }
+                coord.exec();
 
-            if (!coord.getExecStatus().ok()) {
-                String errMsg = coord.getExecStatus().getErrorMsg();
-                LOG.warn("insert failed: {}", errMsg);
-                ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
-            }
+                coord.join(context.getSessionVariable().getQueryTimeoutS());
+                if (!coord.isDone()) {
+                    coord.cancel();
+                    ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
+                }
 
-            LOG.debug("delta files is {}", coord.getDeltaUrls());
+                if (!coord.getExecStatus().ok()) {
+                    errMsg = coord.getExecStatus().getErrorMsg();
+                    LOG.warn("insert failed: {}", errMsg);
+                    ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
+                }
 
-            if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
-                loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
-            }
-            if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
-                filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
-            }
+                LOG.debug("delta files is {}", coord.getDeltaUrls());
 
-            // if in strict mode, insert will fail if there are filtered rows
-            if (context.getSessionVariable().getEnableInsertStrict()) {
-                if (filteredRows > 0) {
-                    context.getState().setError("Insert has filtered data in strict mode, tracking_url="
-                            + coord.getTrackingUrl());
-                    return;
+                if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
+                    loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
+                }
+                if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
+                    filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
                 }
-            }
 
-            if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
-                // no need to add load job.
-                // MySQL table is already being inserted.
-                context.getState().setOk(loadedRows, filteredRows, null);
-                return;
-            }
+                // if in strict mode, insert will fail if there are filtered rows
+                if (context.getSessionVariable().getEnableInsertStrict()) {
+                    if (filteredRows > 0) {
+                        context.getState().setError("Insert has filtered data in strict mode, tracking_url="
+                                + coord.getTrackingUrl());
+                        return;
+                    }
+                }
 
-            if (loadedRows == 0 && filteredRows == 0) {
-                // if no data, just abort txn and return ok
-                Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(),
-                        insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
-                context.getState().setOk();
-                return;
-            }
-            if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
-                    insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(),
-                    TabletCommitInfo.fromThrift(coord.getCommitInfos()),
-                    context.getSessionVariable().getInsertVisibleTimeoutMs())) {
-                txnStatus = TransactionStatus.VISIBLE;
-                MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
-            } else {
-                txnStatus = TransactionStatus.COMMITTED;
-            }
+                if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
+                    // no need to add load job.
+                    // MySQL table is already being inserted.
+                    context.getState().setOk(loadedRows, filteredRows, null);
+                    return;
+                }
 
-        } catch (Throwable t) {
-            // if any throwable being thrown during insert operation, first we should abort this txn
-            LOG.warn("handle insert stmt fail: {}", label, t);
-            try {
-                Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
-                        insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
-                        t.getMessage() == null ? "unknown reason" : t.getMessage());
-            } catch (Exception abortTxnException) {
-                // just print a log if abort txn failed. This failure do not need to pass to user.
-                // user only concern abort how txn failed.
-                LOG.warn("errors when abort txn", abortTxnException);
-            }
+                if (loadedRows == 0 && filteredRows == 0) {
+                    // if no data, just abort txn and return ok
+                    Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getDbObj().getId(),
+                            insertStmt.getTransactionId(), TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+                    context.getState().setOk();
+                    return;
+                }
+                if (Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
+                        insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()), insertStmt.getTransactionId(),
+                        TabletCommitInfo.fromThrift(coord.getCommitInfos()),
+                        context.getSessionVariable().getInsertVisibleTimeoutMs())) {
+                    txnStatus = TransactionStatus.VISIBLE;
+                    MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+                } else {
+                    txnStatus = TransactionStatus.COMMITTED;
+                }
 
-            if (!Config.using_old_load_usage_pattern) {
-                // if not using old load usage pattern, error will be returned directly to user
-                StringBuilder sb = new StringBuilder(t.getMessage());
-                if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
-                    sb.append(". url: " + coord.getTrackingUrl());
+            } catch (Throwable t) {
+                // if any throwable being thrown during insert operation, first we should abort this txn
+                LOG.warn("handle insert stmt fail: {}", label, t);
+                try {
+                    Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
+                            insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
+                            t.getMessage() == null ? "unknown reason" : t.getMessage());
+                } catch (Exception abortTxnException) {
+                    // just print a log if abort txn failed. This failure do not need to pass to user.
+                    // user only concern abort how txn failed.
+                    LOG.warn("errors when abort txn", abortTxnException);
                 }
-                context.getState().setError(sb.toString());
-                return;
-            }
 
-            /*
-             * If config 'using_old_load_usage_pattern' is true.
-             * Doris will return a label to user, and user can use this label to check load job's status,
-             * which exactly like the old insert stmt usage pattern.
-             */
-            throwable = t;
-        }
+                if (!Config.using_old_load_usage_pattern) {
+                    // if not using old load usage pattern, error will be returned directly to user
+                    StringBuilder sb = new StringBuilder(t.getMessage());
+                    if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
+                        sb.append(". url: " + coord.getTrackingUrl());
+                    }
+                    context.getState().setError(sb.toString());
+                    return;
+                }
 
-        // Go here, which means:
-        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
-        // 2. transaction failed but Config.using_old_load_usage_pattern is true.
-        // we will record the load job info for these 2 cases
+                /*
+                 * If config 'using_old_load_usage_pattern' is true.
+                 * Doris will return a label to user, and user can use this label to check load job's status,
+                 * which exactly like the old insert stmt usage pattern.
+                 */
+                throwable = t;
+            }
 
-        String errMsg = "";
-        try {
-            context.getCatalog().getLoadManager().recordFinishedLoadJob(
-                    label,
-                    insertStmt.getDb(),
-                    insertStmt.getTargetTable().getId(),
-                    EtlJobType.INSERT,
-                    createTime,
-                    throwable == null ? "" : throwable.getMessage(),
-                    coord.getTrackingUrl());
-        } catch (MetaNotFoundException e) {
-            LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
-            errMsg = "Record info of insert load with error " + e.getMessage();
+            // Go here, which means:
+            // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
+            // 2. transaction failed but Config.using_old_load_usage_pattern is true.
+            // we will record the load job info for these 2 cases
+
+            try {
+                context.getCatalog().getLoadManager().recordFinishedLoadJob(
+                        label,
+                        insertStmt.getDb(),
+                        insertStmt.getTargetTable().getId(),
+                        EtlJobType.INSERT,
+                        createTime,
+                        throwable == null ? "" : throwable.getMessage(),
+                        coord.getTrackingUrl());
+            } catch (MetaNotFoundException e) {
+                LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
+                errMsg = "Record info of insert load with error " + e.getMessage();
+            }
+            txnId = insertStmt.getTransactionId();
         }
 
         // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
         // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
         StringBuilder sb = new StringBuilder();
         sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name());
-        sb.append("', 'txnId':'").append(insertStmt.getTransactionId()).append("'");
+        sb.append("', 'txnId':'").append(txnId).append("'");
         if (!Strings.isNullOrEmpty(errMsg)) {
             sb.append(", 'err':'").append(errMsg).append("'");
         }
@@ -1165,6 +1434,9 @@ public class StmtExecutor implements ProfileWriter {
         if (!statisticsForAuditLog.hasScanRows()) {
             statisticsForAuditLog.setScanRows(0L);
         }
+        if (statisticsForAuditLog.hasReturnedRows()) {
+            statisticsForAuditLog.setReturnedRows(0L);
+        }
         if (!statisticsForAuditLog.hasCpuMs()) {
             statisticsForAuditLog.setCpuMs(0L);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index ffd7e25..143b066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -77,6 +77,18 @@ public class BackendServiceClient {
         return stub.getInfo(request);
     }
 
+    public Future<InternalService.PSendDataResult> sendData(InternalService.PSendDataRequest request) {
+        return stub.sendData(request);
+    }
+
+    public Future<InternalService.PRollbackResult> rollback(InternalService.PRollbackRequest request) {
+        return stub.rollback(request);
+    }
+
+    public Future<InternalService.PCommitResult> commit(InternalService.PCommitRequest request) {
+        return stub.commit(request);
+    }
+
     public Future<InternalService.PConstantExprResult> foldConstantExpr(InternalService.PConstantExprRequest request) {
         return stub.foldConstantExpr(request);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 40f2897..2210645 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -32,6 +32,7 @@ import org.apache.thrift.TSerializer;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -169,6 +170,48 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PSendDataResult> sendData(
+            TNetworkAddress address, Types.PUniqueId fragmentInstanceId, List<InternalService.PDataRow> data)
+            throws RpcException {
+
+        final InternalService.PSendDataRequest.Builder pRequest = InternalService.PSendDataRequest.newBuilder();
+        pRequest.setFragmentInstanceId(fragmentInstanceId);
+        pRequest.addAllData(data);
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.sendData(pRequest.build());
+        } catch (Throwable e) {
+            LOG.warn("failed to send data, address={}:{}", address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
+    public Future<InternalService.PRollbackResult> rollback(TNetworkAddress address, Types.PUniqueId fragmentInstanceId)
+            throws RpcException {
+        final InternalService.PRollbackRequest pRequest = InternalService.PRollbackRequest.newBuilder()
+                .setFragmentInstanceId(fragmentInstanceId).build();
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.rollback(pRequest);
+        } catch (Throwable e) {
+            LOG.warn("failed to rollback, address={}:{}", address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
+    public Future<InternalService.PCommitResult> commit(TNetworkAddress address, Types.PUniqueId fragmentInstanceId)
+            throws RpcException {
+        final InternalService.PCommitRequest pRequest = InternalService.PCommitRequest.newBuilder()
+                .setFragmentInstanceId(fragmentInstanceId).build();
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.commit(pRequest);
+        } catch (Throwable e) {
+            LOG.warn("failed to commit, address={}:{}", address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public Future<InternalService.PConstantExprResult> foldConstantExpr(
             TNetworkAddress address, TFoldConstantParams tParams) throws RpcException, TException {
         final InternalService.PConstantExprRequest pRequest = InternalService.PConstantExprRequest.newBuilder()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1bc16e7..a25f29f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -105,6 +105,8 @@ import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
 import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -126,6 +128,7 @@ import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.doris.thrift.TStatusCode.NOT_IMPLEMENTED_ERROR;
 
@@ -663,6 +666,23 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return result;
     }
 
+
+    private void checkAuthCodeUuid(String dbName, long txnId, String authCodeUuid) throws AuthenticationException {
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new AuthenticationException("invalid db name: " + dbName);
+        }
+        TransactionState transactionState = Catalog.getCurrentGlobalTransactionMgr().
+                getTransactionState(db.getId(), txnId);
+        if (transactionState == null) {
+            throw new AuthenticationException("invalid transactionState: " + txnId);
+        }
+        if (!authCodeUuid.equals(transactionState.getAuthCode())) {
+            throw new AuthenticationException(
+                    "Access denied; you need (at least one of) the LOAD privilege(s) for this operation");
+        }
+    }
+
     private void checkPasswordAndPrivs(String cluster, String user, String passwd, String db, String tbl,
                                        String clientIp, PrivPredicate predicate) throws AuthenticationException {
 
@@ -718,7 +738,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
         try {
-            result.setTxnId(loadTxnBeginImpl(request, clientAddr));
+            TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr);
+            result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
         } catch (DuplicatedRequestException e) {
             // this is a duplicate request, just return previous txn id
             LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
@@ -740,14 +761,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return result;
     }
 
-    private long loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException {
+    private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException {
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
             cluster = SystemInfoService.DEFAULT_CLUSTER;
         }
 
-        checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
-                              request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+        if (Strings.isNullOrEmpty(request.getAuthCodeUuid())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+        }
 
         // check label
         if (Strings.isNullOrEmpty(request.getLabel())) {
@@ -769,10 +792,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         // begin
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
         MetricRepo.COUNTER_LOAD_ADD.increase(1L);
-        return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
+        long txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
                 db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
                 new TxnCoordinator(TxnSourceType.BE, clientIp),
                 TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
+        if (!Strings.isNullOrEmpty(request.getAuthCodeUuid())) {
+            Catalog.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId)
+                    .setAuthCode(request.getAuthCodeUuid());
+        }
+        TLoadTxnBeginResult result = new TLoadTxnBeginResult();
+        result.setTxnId(txnId).setDbId(db.getId());
+        return result;
     }
 
     @Override
@@ -811,6 +841,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
         if (request.isSetAuthCode()) {
             // TODO(cmy): find a way to check
+        } else if (request.isSetAuthCodeUuid()) {
+            checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid());
         } else {
             checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
                     request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
@@ -819,7 +851,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         // get database
         Catalog catalog = Catalog.getCurrentCatalog();
         String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
-        Database db = catalog.getDb(fullDbName);
+        Database db = null;
+        if (request.isSetDbId() && request.getDbId() > 0) {
+            db = catalog.getDb(request.getDbId());
+        } else {
+            db = catalog.getDb(fullDbName);
+        }
         if (db == null) {
             String dbName = fullDbName;
             if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -872,12 +909,19 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
         if (request.isSetAuthCode()) {
             // TODO(cmy): find a way to check
+        } else if (request.isSetAuthCodeUuid()) {
+            checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid());
         } else {
             checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
                     request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
         }
         String dbName = ClusterNamespace.getFullName(cluster, request.getDb());
-        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        Database db = null;
+        if (request.isSetDbId() && request.getDbId() > 0) {
+            db = Catalog.getCurrentCatalog().getDb(request.getDbId());
+        } else {
+            db = Catalog.getCurrentCatalog().getDb(dbName);
+        }
         if (db == null) {
             throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
         }
@@ -1003,6 +1047,24 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         TNetworkAddress addr = getClientAddr();
         return addr == null ? "unknown" : addr.hostname;
     }
+
+    @Override
+    public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException {
+        TWaitingTxnStatusResult result = new TWaitingTxnStatusResult();
+        result.setStatus(new TStatus());
+        try {
+            result = Catalog.getCurrentGlobalTransactionMgr().getWaitingTxnStatus(request);
+            result.status.setStatusCode(TStatusCode.OK);
+        } catch (TimeoutException e) {
+            result.status.setStatusCode(TStatusCode.INCOMPLETE);
+            result.status.addToErrorMsgs(e.getMessage());
+        } catch (AnalysisException e) {
+            result.status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            result.status.addToErrorMsgs(e.getMessage());
+        }
+        return result;
+    }
+
 }
 
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 121ff60..23b42ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -654,6 +654,20 @@ public class DatabaseTransactionMgr {
         }
     }
 
+    public Long getTransactionId(String label) {
+        readLock();
+        try {
+            Set<Long> existingTxnIds = unprotectedGetTxnIdsByLabel(label);
+            if (existingTxnIds == null || existingTxnIds.isEmpty()) {
+                return null;
+            }
+            // find the latest txn (which id is largest)
+            return existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
+        } finally {
+            readUnlock();
+        }
+    }
+
     public List<TransactionState> getCommittedTxnList() {
         readLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 6444b6e..83ebf07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -32,7 +32,10 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TWaitingTxnStatusRequest;
+import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 
@@ -52,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Transaction Manager
@@ -159,6 +163,16 @@ public class GlobalTransactionMgr implements Writable {
 
     }
 
+    public Long getTransactionId(long dbId, String label) {
+        try {
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
+            return dbTransactionMgr.getTransactionId(label);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction id by label " + label + " failed", e);
+            return null;
+        }
+    }
+
     public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
         commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
@@ -495,4 +509,43 @@ public class GlobalTransactionMgr implements Writable {
         DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
     }
+
+    public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws AnalysisException, TimeoutException {
+        long dbId = request.getDbId();
+        int commitTimeoutSec = Config.commit_timeout_second;
+        for (int i = 0; i < commitTimeoutSec; ++i) {
+            Database db = Catalog.getCurrentCatalog().getDb(dbId);
+            if (db == null) {
+                throw new AnalysisException("invalid db id: " + dbId);
+            }
+            TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
+            statusResult.status = new TStatus();
+            TransactionStatus txnStatus = null;
+            if (request.isSetTxnId()) {
+                long txnId = request.getTxnId();
+                TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().
+                        getTransactionState(dbId, txnId);
+                if (txnState == null) {
+                    throw new AnalysisException("txn does not exist: " + txnId);
+                }
+                txnStatus = txnState.getTransactionStatus();
+                if (!txnState.getReason().trim().isEmpty()) {
+                    statusResult.status.setErrorMsgsIsSet(true);
+                    statusResult.status.addToErrorMsgs(txnState.getReason());
+                }
+            } else {
+                txnStatus = getLabelState(dbId, request.getLabel());
+            }
+            if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) {
+                statusResult.setTxnStatusId(txnStatus.value());
+                return statusResult;
+            }
+            try {
+                Thread.sleep(1000L);
+            } catch (InterruptedException e) {
+                LOG.info("commit sleep exception.", e);
+            }
+        }
+        throw new TimeoutException("Operation is timeout");
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
new file mode 100644
index 0000000..4db596d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTxnParams;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TransactionEntry {
+
+    private String label = "";
+    private Database db;
+    private Table table;
+    private Backend backend;
+    private TTxnParams txnConf;
+    private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
+    private long rowsInTransaction = 0;
+
+    public TransactionEntry() {
+    }
+
+    public TransactionEntry(TTxnParams txnConf, Database db, Table table) {
+        this.txnConf = txnConf;
+        this.db = db;
+        this.table = table;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void setLabel(String label) {
+        this.label = label;
+    }
+
+    public Database getDb() {
+        return db;
+    }
+
+    public void setDb(Database db) {
+        this.db = db;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    public void setTable(Table table) {
+        this.table = table;
+    }
+
+    public Backend getBackend() {
+        return backend;
+    }
+
+    public void setBackend(Backend backend) {
+        this.backend = backend;
+    }
+
+    public TTxnParams getTxnConf() {
+        return txnConf;
+    }
+
+    public void setTxnConf(TTxnParams txnConf) {
+        this.txnConf = txnConf;
+    }
+
+    public boolean isTxnModel() {
+        return txnConf != null && txnConf.isNeedTxn();
+    }
+
+    public boolean isTxnIniting() {
+        return isTxnModel() && txnConf.getTxnId() == -1;
+    }
+
+    public boolean isTxnBegin() {
+        return isTxnModel() && txnConf.getTxnId() != -1;
+    }
+
+    public List<InternalService.PDataRow> getDataToSend() {
+        return dataToSend;
+    }
+
+    public void setDataToSend(List<InternalService.PDataRow> dataToSend) {
+        this.dataToSend = dataToSend;
+    }
+
+    public void clearDataToSend() {
+        dataToSend.clear();
+    }
+
+    public long getRowsInTransaction() {
+        return rowsInTransaction;
+    }
+
+    public void setRowsInTransaction(long rowsInTransaction) {
+        this.rowsInTransaction = rowsInTransaction;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 23c8053..28a77e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -45,6 +45,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -205,6 +206,7 @@ public class TransactionState implements Writable {
     // 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released.
     private TxnStateChangeCallback callback = null;
     private long timeoutMs = Config.stream_load_default_timeout_second;
+    private String authCode = "";
 
     // is set to true, we will double the publish timeout
     private boolean prolongPublishTimeout = false;
@@ -241,6 +243,7 @@ public class TransactionState implements Writable {
         this.publishVersionTasks = Maps.newHashMap();
         this.hasSendTask = false;
         this.latch = new CountDownLatch(1);
+        this.authCode = UUID.randomUUID().toString();
     }
     
     public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requestId,
@@ -264,8 +267,16 @@ public class TransactionState implements Writable {
         this.latch = new CountDownLatch(1);
         this.callbackId = callbackId;
         this.timeoutMs = timeoutMs;
+        this.authCode = UUID.randomUUID().toString();
     }
-    
+
+    public void setAuthCode(String authCode) {
+        this.authCode = authCode;
+    }
+    public String getAuthCode() {
+        return authCode;
+    }
+
     public void setErrorReplicas(Set<Long> newErrorReplicas) {
         this.errorReplicas = newErrorReplicas;
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 5d7ddb5..506b08c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -67,6 +67,7 @@ public class StmtExecutorTest {
     private ConnectContext ctx;
     private QueryState state;
     private ConnectScheduler scheduler;
+    private MysqlChannel channel = null;
 
     @Mocked
     SocketChannel socketChannel;
@@ -92,7 +93,7 @@ public class StmtExecutorTest {
         MysqlSerializer serializer = MysqlSerializer.newInstance();
         Catalog catalog = AccessTestUtil.fetchAdminCatalog();
 
-        MysqlChannel channel = new MysqlChannel(socketChannel);
+        channel = new MysqlChannel(socketChannel);
         new Expectations(channel) {
             {
                 channel.sendOnePacket((ByteBuffer) any);
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index d900073..4ebb671 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -256,6 +256,39 @@ message PProxyResult {
 	optional PKafkaPartitionOffsets partition_offsets = 3;
 };
 
+message PDataColumn {
+    optional string value = 1;
+}
+
+message PDataRow {
+    repeated PDataColumn col = 1;
+}
+
+message PSendDataRequest {
+    required PUniqueId fragment_instance_id = 1;
+    repeated PDataRow data = 2;
+}
+
+message PSendDataResult {
+    required PStatus status = 1;
+}
+
+message PCommitRequest {
+    required PUniqueId fragment_instance_id = 1;
+}
+
+message PCommitResult {
+    required PStatus status = 1;
+}
+
+message PRollbackRequest {
+    required PUniqueId fragment_instance_id = 1;
+}
+
+message PRollbackResult {
+    required PStatus status = 1;
+}
+
 message PBloomFilter {
      required bool always_true = 2;
      required int32 filter_length = 1;
@@ -358,6 +391,9 @@ service PBackendService {
     rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
     rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
     rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
+    rpc send_data(PSendDataRequest) returns (PSendDataResult);
+    rpc commit(PCommitRequest) returns (PCommitResult);
+    rpc rollback(PRollbackRequest) returns (PRollbackResult);
     rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 0f8b420..78e4ea3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -503,6 +503,7 @@ struct TMiniLoadBeginRequest {
     11: optional i64 auth_code
     12: optional i64 create_timestamp
     13: optional Types.TUniqueId request_id
+    14: optional string auth_code_uuid
 }
 
 struct TIsMethodSupportedRequest {
@@ -533,12 +534,14 @@ struct TLoadTxnBeginRequest {
     // The real value of timeout should be i32. i64 ensures the compatibility of interface.
     10: optional i64 timeout
     11: optional Types.TUniqueId request_id
+    12: optional string auth_code_uuid
 }
 
 struct TLoadTxnBeginResult {
     1: required Status.TStatus status
     2: optional i64 txnId
     3: optional string job_status // if label already used, set status of existing job
+    4: optional i64 db_id
 }
 
 // StreamLoad request, used to load a streaming to engine
@@ -587,6 +590,7 @@ struct TStreamLoadPutRequest {
     31: optional bool fuzzy_parse
     32: optional string line_delimiter
     33: optional bool read_json_by_line
+    34: optional string auth_code_uuid
 }
 
 struct TStreamLoadPutResult {
@@ -638,6 +642,8 @@ struct TLoadTxnCommitRequest {
     10: optional i64 auth_code
     11: optional TTxnCommitAttachment txnCommitAttachment
     12: optional i64 thrift_rpc_timeout_ms
+    13: optional string auth_code_uuid
+    14: optional i64 db_id
 }
 
 struct TLoadTxnCommitResult {
@@ -655,6 +661,8 @@ struct TLoadTxnRollbackRequest {
     8: optional string reason
     9: optional i64 auth_code
     10: optional TTxnCommitAttachment txnCommitAttachment
+    11: optional string auth_code_uuid
+    12: optional i64 db_id
 }
 
 struct TLoadTxnRollbackResult {
@@ -695,6 +703,17 @@ struct TPropertyVal {
     4: optional bool boolVal
 }
 
+struct TWaitingTxnStatusRequest {
+    1: optional i64 db_id
+    2: optional i64 txn_id
+    3: optional string label
+}
+
+struct TWaitingTxnStatusResult {
+    1: optional Status.TStatus status
+    2: optional i32 txn_status_id
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1:TGetDbsParams params)
     TGetTablesResult getTableNames(1:TGetTablesParams params)
@@ -727,6 +746,8 @@ service FrontendService {
     TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request)
     TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request)
 
+    TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request)
+
     TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
 
     Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index ae64ef5..434c661 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -244,6 +244,18 @@ enum PaloInternalServiceVersion {
   V1
 }
 
+struct TTxnParams {
+  1: optional bool need_txn
+  2: optional string auth_code_uuid
+  3: optional i64 thrift_rpc_timeout_ms
+  4: optional string db
+  5: optional string tbl
+  6: optional string user_ip
+  7: optional i64 txn_id
+  8: optional Types.TUniqueId fragment_instance_id
+  9: optional i64 db_id
+  10: optional double max_filter_ratio
+}
 
 // ExecPlanFragment
 
@@ -299,6 +311,7 @@ struct TExecPlanFragmentParams {
   // If true, all @Common components is unset and should be got from BE's cache
   // If this field is unset or it set to false, all @Common components is set.
   16: optional bool is_simplified_param
+  17: optional TTxnParams txn_conf
 }
 
 struct TExecPlanFragmentResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 8d968dc..029f5c4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -106,6 +106,7 @@ enum TFileFormatType {
     FORMAT_CSV_DEFLATE,
     FORMAT_ORC,
     FORMAT_JSON,
+    FORMAT_PROTO,
 }
 
 struct THdfsConf {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org