You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/12/23 12:42:40 UTC

[GitHub] [incubator-doris] weizuo93 opened a new pull request #7473: [Feature][Transaction] Support two phase batch commit for stream load

weizuo93 opened a new pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473


   ## Proposed changes
   
   The two phase batch commit means:
   During Stream load, after data is written, the message will be returned to the client, the data is invisible at this point and the transaction status is PRECOMMITTED. The data will be visible only after COMMIT is triggered by client.
       
   1. User can invoke the following interface to trigger commit operations for batch transactions:
   
   ```
   curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." http://fe_host:http_port/api/{db}/{table}/_stream_load_commit
   ```
   When all transactions can be committed successfully, the batch commit operation succeeds. If at least one transaction commit failed, the batch commit operation failed.
       
   2.User can invoke the following interface to trigger abort operations for batch transactions:
   
   ```
   curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." http://fe_host:http_port/api/{db}/{table}/_stream_load_abort
   ```.
   
   Describe the overview of changes, and introduce why we need it.
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [ ] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [ ] Documentation Update (if none of the other choices apply)
   - [ ] Code refactor (Modify the code structure, format the code, etc...)
   - [ ] Optimization. Including functional usability improvements and performance improvements.
   - [ ] Dependency. Such as changes related to third-party components.
   - [ ] Other.
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have created an issue on (Fix #7141 ) and described the bug/feature there in detail
   - [ ] Compiling and unit tests pass locally with my changes
   - [ ] I have added tests that prove my fix is effective or that my feature works
   - [ ] If these changes need document changes, I have updated the document
   - [ ] Any dependent changes have been merged
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r805164341



##########
File path: be/src/http/action/stream_load_2pc.cpp
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_2pc.h"
+
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "common/status.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "http/utils.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+
+void StreamLoad2PCAction::handle(HttpRequest* req) {
+    Status status = Status::OK();
+    std::string status_result;
+
+    if (config::disable_stream_load_2pc) {
+        status = Status::InternalError("Two phase commit (2PC) for stream load was disabled");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+
+    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    ctx->ref();
+    req->set_handler_ctx(ctx);
+    ctx->db = req->param(HTTP_DB_KEY);
+    std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
+    try {
+        ctx->txn_id = std::stoull(req_txn_id);
+    } catch (const std::exception& e) {
+        status = Status::InternalError("convert txn_id [" + req_txn_id + "] failed");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+    ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY);

Review comment:
       check invalid parameter?

##########
File path: gensrc/thrift/FrontendService.thrift
##########
@@ -656,6 +656,23 @@ struct TLoadTxnCommitResult {
     1: required Status.TStatus status
 }
 
+struct TLoadTxn2PCRequest {
+    1: optional string cluster
+    2: required string user

Review comment:
       use `optional` for all fields

##########
File path: be/src/common/config.h
##########
@@ -353,6 +353,7 @@ CONF_mInt32(stream_load_record_batch_size, "50");
 CONF_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+CONF_mBool(disable_stream_load_2pc, "false");

Review comment:
       I think we can set default value to true. And set it true after we complete the flink connector and everything is ok, 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#issuecomment-1040343251






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #7473: [Feature][Transaction] Support two phase batch commit for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#issuecomment-1000406768


   Thanks for your contribution. Please rebase the master to pass the test first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r805164341



##########
File path: be/src/http/action/stream_load_2pc.cpp
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_2pc.h"
+
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "common/status.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "http/utils.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+
+void StreamLoad2PCAction::handle(HttpRequest* req) {
+    Status status = Status::OK();
+    std::string status_result;
+
+    if (config::disable_stream_load_2pc) {
+        status = Status::InternalError("Two phase commit (2PC) for stream load was disabled");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+
+    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    ctx->ref();
+    req->set_handler_ctx(ctx);
+    ctx->db = req->param(HTTP_DB_KEY);
+    std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
+    try {
+        ctx->txn_id = std::stoull(req_txn_id);
+    } catch (const std::exception& e) {
+        status = Status::InternalError("convert txn_id [" + req_txn_id + "] failed");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+    ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY);

Review comment:
       check invalid parameter?

##########
File path: gensrc/thrift/FrontendService.thrift
##########
@@ -656,6 +656,23 @@ struct TLoadTxnCommitResult {
     1: required Status.TStatus status
 }
 
+struct TLoadTxn2PCRequest {
+    1: optional string cluster
+    2: required string user

Review comment:
       use `optional` for all fields

##########
File path: be/src/common/config.h
##########
@@ -353,6 +353,7 @@ CONF_mInt32(stream_load_record_batch_size, "50");
 CONF_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+CONF_mBool(disable_stream_load_2pc, "false");

Review comment:
       I think we can set default value to true. And set it true after we complete the flink connector and everything is ok, 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r805734082



##########
File path: be/src/common/config.h
##########
@@ -353,6 +353,7 @@ CONF_mInt32(stream_load_record_batch_size, "50");
 CONF_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+CONF_mBool(disable_stream_load_2pc, "false");

Review comment:
       > I think we can set default value to true. And set it true after we complete the flink connector and everything is ok,
   
   OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #7473: [Feature][Transaction] Support two phase batch commit for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r799920692



##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
 + merge\_type
     数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+    
++ two\_phase\_commit
+
+    Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明 ```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+    两阶段批量事务提交模式的意思是:Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+    
+    1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." http://fe_host:http_port/api/{db}/{table}/_stream_load_commit

Review comment:
       No need `{table}` here

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already visible");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if (!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all replicas as error replicas
+                            if (commitBackends != null && commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum,
+                                    Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws UserException {

Review comment:
       ```suggestion
       private TransactionState checkPreCommitStatus(long transactionId) throws UserException {
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already visible");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if (!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all replicas as error replicas
+                            if (commitBackends != null && commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum,
+                                    Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already aborted, not pre-committed" + transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <

Review comment:
       Does it mean that we need to wait at least 60sec to commit the precommitted the txn?
   That is strange to me.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest request,
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private Object executeStreamLoadCommit(HttpServletRequest request,
+                                          HttpServletResponse response, String db, String table) {
+        try {
+            String dbName = db;
+            String tableName = table;
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(tableName)) {
+                return new RestBaseResult("No table selected.");
+            }
+
+            String fullDbName = ClusterNamespace.getFullName(clusterName, dbName);
+
+            List<Long> transactionIds = Lists.newArrayList();
+            String txnIds = request.getHeader(TXN_KEY);
+            if (Strings.isNullOrEmpty(txnIds)) {
+                return new RestBaseResult("No transaction id selected.");
+            } else {
+                for (String txnId : txnIds.split(",")) {

Review comment:
       trim the space?

##########
File path: docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
##########
@@ -179,6 +179,22 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
 + merge\_type
     数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+    
++ two\_phase\_commit
+
+    Stream load 导入可以开启两阶段批量事务提交模式。开启方式为在 HEADER 中声明 ```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。
+    两阶段批量事务提交模式的意思是:Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+    
+    1. 用户可以调用如下接口对多个stream load事务批量触发commit操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." http://fe_host:http_port/api/{db}/{table}/_stream_load_commit
+    ```
+    当所有事务都能成功commit时,则本次批量触发commit的操作才成功,只要有一个事务commit失败,则本次批量触发commit的操作不会成功。
+    
+    2. 用户可以调用如下接口对多个stream load事务批量触发abort操作:
+    ```
+    curl --location-trusted -u user:passwd -H "txn:txnId1,txnId2,txnId3,..." http://fe_host:http_port/api/{db}/{table}/_stream_load_abort

Review comment:
       No need `{table}` here

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
         DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
     }
+
+    public void commitTransaction(long dbId, List<Long> transactionIds)

Review comment:
       ```suggestion
       private void commitTransaction(long dbId, List<Long> transactionIds)
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_commit", method = RequestMethod.GET)
+    public Object streamLoadCommit(HttpServletRequest request,
+                             HttpServletResponse response,
+                             @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadCommit(request, response, db, table);
+    }
+
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_abort", method = RequestMethod.GET)

Review comment:
       better to use `RequestMethod.PUT`?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -154,4 +178,125 @@ private Object executeWithoutPassword(HttpServletRequest request,
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private Object executeStreamLoadCommit(HttpServletRequest request,
+                                          HttpServletResponse response, String db, String table) {
+        try {
+            String dbName = db;
+            String tableName = table;
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(tableName)) {
+                return new RestBaseResult("No table selected.");
+            }
+
+            String fullDbName = ClusterNamespace.getFullName(clusterName, dbName);
+
+            List<Long> transactionIds = Lists.newArrayList();
+            String txnIds = request.getHeader(TXN_KEY);
+            if (Strings.isNullOrEmpty(txnIds)) {
+                return new RestBaseResult("No transaction id selected.");
+            } else {
+                for (String txnId : txnIds.split(",")) {
+                    transactionIds.add(Long.parseLong(txnId));
+                }
+            }
+
+            // check auth
+            checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD);
+            LOG.info("redirect stream load commit request to master FE, txns: {}", txnIds);
+
+            RedirectView redirectView = redirectToMaster(request, response);
+            if (redirectView != null) {
+                return redirectView;
+            }
+            {
+                LOG.info("Master FE received http request to commit txns: {}", txnIds);

Review comment:
       Use `debug` level, or there will be too many logs

##########
File path: gensrc/thrift/MasterService.thrift
##########
@@ -31,13 +31,14 @@ struct TTabletInfo {
     5: required Types.TCount row_count
     6: required Types.TSize data_size
     7: optional Types.TStorageMedium storage_medium
-    8: optional list<Types.TTransactionId> transaction_ids

Review comment:
       changing fields name or order may cause incompatibility.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,

Review comment:
       Most part of this method is same as `commitTransaction`. Can we merge these 2 methods?
   I think the only difference it to set the txn state to PRECOMMITTED or COMMITTED.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_commit", method = RequestMethod.GET)
+    public Object streamLoadCommit(HttpServletRequest request,
+                             HttpServletResponse response,
+                             @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadCommit(request, response, db, table);
+    }
+
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_abort", method = RequestMethod.GET)
+    public Object streamLoadAbort(HttpServletRequest request,
+                                   HttpServletResponse response,
+                                   @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoadAbort(request, response, db, table);
+    }
+
     // Same as Multi load, to be compatible with http v1's response body,
     // we return error by using RestBaseResult.
     private Object executeWithoutPassword(HttpServletRequest request,
                                           HttpServletResponse response, String db, String table) {
+        LOG.info("received stream load request (precommit  httpv2).");

Review comment:
       This log can be removed.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
##########
@@ -73,10 +78,29 @@ public Object streamLoad(HttpServletRequest request,
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_commit", method = RequestMethod.GET)

Review comment:
       better to use `RequestMethod.PUT`?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already visible");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if (!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all replicas as error replicas
+                            if (commitBackends != null && commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum,
+                                    Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already aborted, not pre-committed" + transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <
+                Config.stream_load_default_wait_report_second * 1000) {
+            throw new TransactionCommitFailedException("The interval is too short after txn [" + transactionId + "] " +
+                    "was pre-committed. Please try again later.");
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Iterator<TableCommitInfo> tableCommitInfoIterator = transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between pre-commit and commit, ignore this error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+            PartitionInfo partitionInfo = table.getPartitionInfo();
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between pre-commit and commit, ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check and remove it from transaction state {}",
+                            partitionId,
+                            transactionState);
+                    continue;
+                }
+
+                int quorumReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int healthReplicaNum = 0;
+                        for (Replica replica : tablet.getReplicas()) {
+                            List<Long> committedTxnIds = replica.getCommittedTxnIds();
+                            if (committedTxnIds.contains(transactionId)) {
+                                ++healthReplicaNum;
+                            } else {
+                                errorReplicaIds.add(replica.getId());
+                            }
+                        }
+
+                        if (healthReplicaNum < quorumReplicaNum) {
+                            LOG.warn("failed to commit transaction {} on tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+                                    transactionState, tablet, healthReplicaNum, quorumReplicaNum, transactionId);
+                            throw new TransactionCommitFailedException("tablet [" + tablet.getId() +"], with only ["
+                                    +healthReplicaNum+ "] replicas less than quorum [" +quorumReplicaNum+ "], txnId ["
+                                    + transactionId + "]");
+                        }
+                    }
+                }
+            }
+        }
+        transactionState.setErrorReplicas(errorReplicaIds);
+
+        return transactionState;
+    }
+
+    public void commitTransaction(List<Long> transactionIds) throws UserException {
+        try {
+            List<TransactionState> transactionStates = Lists.newArrayList();
+            for (long transactionId : transactionIds) {
+                transactionStates.add(checkPreCommitStatus(transactionId));
+            }
+
+            for (TransactionState transactionState : transactionStates) {
+                commitTransaction(transactionState);
+            }
+        } finally {
+            for (long transactionId : transactionIds) {
+                TransactionState transactionState;
+                readLock();
+                try {
+                    transactionState = unprotectedGetTransactionState(transactionId);
+                } finally {
+                    readUnlock();
+                }
+
+                if (transactionState != null) {
+                    transactionState.setCheckTimeout(true);
+                }
+            }
+        }
+    }
+
+    public void commitTransaction(TransactionState transactionState) throws UserException {

Review comment:
       ```suggestion
       private void commitTransaction(TransactionState transactionState) throws UserException {
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -198,6 +224,16 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
         DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
         dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
     }
+
+    public void commitTransaction(long dbId, List<Long> transactionIds)

Review comment:
       Better to change the method name, for example, adding a prefix of suffix `2PC` to make it different from the normal `commitTransaction`. Or it is confused for code reading.
   
   All other methods which are related to `2PC` in both GlobalTransactionMgr and DatabaseTransactionMgr need to be changed.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -342,6 +339,344 @@ public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
+    public void preCommitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        // 1. check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+        if (transactionState == null
+                || transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            throw new TransactionCommitFailedException(
+                    transactionState == null ? "transaction not found" : transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already visible");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", transactionId);
+            return;
+        }
+
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex();
+        Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        Map<Long, Table> idToTable = new HashMap<>();
+        for (int i = 0; i < tableList.size(); i++) {
+            idToTable.put(tableList.get(i).getId(), tableList.get(i));
+        }
+        // 2. validate potential exists problem: db->table->partition
+        // guarantee exist exception during a transaction
+        // if index is dropped, it does not matter.
+        // if table or partition is dropped during load, just ignore that tablet,
+        // because we should allow dropping rollup or partition during load
+        List<Long> tabletIds = tabletCommitInfos.stream().map(
+                tabletCommitInfo -> tabletCommitInfo.getTabletId()).collect(Collectors.toList());
+        List<TabletMeta> tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            if (tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+                continue;
+            }
+            long tabletId = tabletIds.get(i);
+            long tableId = tabletMeta.getTableId();
+            OlapTable tbl = (OlapTable) idToTable.get(tableId);
+            if (tbl == null) {
+                // this can happen when tableId == -1 (tablet being dropping)
+                // or table really not exist.
+                continue;
+            }
+
+            if (tbl.getState() == OlapTable.OlapTableState.RESTORE) {
+                throw new LoadException("Table " + tbl.getName() + " is in restore process. "
+                        + "Can not load into it");
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            if (tbl.getPartition(partitionId) == null) {
+                // this can happen when partitionId == -1 (tablet being dropping)
+                // or partition really not exist.
+                continue;
+            }
+
+            if (!tableToPartition.containsKey(tableId)) {
+                tableToPartition.put(tableId, new HashSet<>());
+            }
+            tableToPartition.get(tableId).add(partitionId);
+            if (!tabletToBackends.containsKey(tabletId)) {
+                tabletToBackends.put(tabletId, new HashSet<>());
+            }
+            tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
+        }
+
+        if (tableToPartition.isEmpty()) {
+            // table or all partitions are being dropped
+            throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        for (long tableId : tableToPartition.keySet()) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
+            for (Partition partition : table.getAllPartitions()) {
+                if (!tableToPartition.get(tableId).contains(partition.getId())) {
+                    continue;
+                }
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                int quorumReplicaNum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int successReplicaNum = 0;
+                        long tabletId = tablet.getId();
+                        Set<Long> tabletBackends = tablet.getBackendIds();
+                        totalInvolvedBackends.addAll(tabletBackends);
+                        Set<Long> commitBackends = tabletToBackends.get(tabletId);
+                        // save the error replica ids for current tablet
+                        // this param is used for log
+                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
+                        for (long tabletBackend : tabletBackends) {
+                            Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
+                            if (replica == null) {
+                                throw new TransactionCommitFailedException("could not find replica for tablet ["
+                                        + tabletId + "], backend [" + tabletBackend + "]");
+                            }
+                            // if the tablet have no replica's to commit or the tablet is a rolling up tablet, the commit backends maybe null
+                            // if the commit backends is null, set all replicas as error replicas
+                            if (commitBackends != null && commitBackends.contains(tabletBackend)) {
+                                // if the backend load success but the backend has some errors previously, then it is not a normal replica
+                                // ignore it but not log it
+                                // for example, a replica is in clone state
+                                if (replica.getLastFailedVersion() < 0) {
+                                    ++successReplicaNum;
+                                }
+                            } else {
+                                errorBackendIdsForTablet.add(tabletBackend);
+                                errorReplicaIds.add(replica.getId());
+                                // not remove rollup task here, because the commit maybe failed
+                                // remove rollup task when commit successfully
+                            }
+                        }
+
+                        if (successReplicaNum < quorumReplicaNum) {
+                            LOG.warn("Failed to pre-commit txn [{}]. "
+                                            + "Tablet [{}] success replica num is {} < quorum replica num {} "
+                                            + "while error backends {}",
+                                    transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum,
+                                    Joiner.on(",").join(errorBackendIdsForTablet));
+                            throw new TabletQuorumFailedException(transactionId, tablet.getId(),
+                                    successReplicaNum, quorumReplicaNum,
+                                    errorBackendIdsForTablet);
+                        }
+                    }
+                }
+            }
+        }
+
+        unprotectedPreCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", transactionState);
+    }
+
+    public TransactionState checkPreCommitStatus(long transactionId) throws UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + transactionId + "} not found.");
+        }
+
+        transactionState.setCheckTimeout(false);
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already aborted, not pre-committed" + transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already visible, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is already committed, not pre-committed");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + transactionId
+                    + "] is prepare, not pre-committed");
+        }
+
+        long currentTimeMillis = System.currentTimeMillis();
+        // Maybe new  invisible version has not been reported to master FE after the transaction is pre-committed
+        // or FE restart, we should wait a little while to commit transaction.
+        if (currentTimeMillis - transactionState.getPreCommitTime() <
+                Config.stream_load_default_wait_report_second * 1000) {
+            throw new TransactionCommitFailedException("The interval is too short after txn [" + transactionId + "] " +
+                    "was pre-committed. Please try again later.");
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Iterator<TableCommitInfo> tableCommitInfoIterator = transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between pre-commit and commit, ignore this error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+            PartitionInfo partitionInfo = table.getPartitionInfo();
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between pre-commit and commit, ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip version check and remove it from transaction state {}",
+                            partitionId,
+                            transactionState);
+                    continue;
+                }
+
+                int quorumReplicaNum = partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + 1;
+
+                List<MaterializedIndex> allIndices;
+                if (transactionState.getLoadedTblIndexes().isEmpty()) {
+                    allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                } else {
+                    allIndices = Lists.newArrayList();
+                    for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
+                        MaterializedIndex index = partition.getIndex(indexId);
+                        if (index != null) {
+                            allIndices.add(index);
+                        }
+                    }
+                }
+
+                for (MaterializedIndex index : allIndices) {
+                    for (Tablet tablet : index.getTablets()) {
+                        int healthReplicaNum = 0;
+                        for (Replica replica : tablet.getReplicas()) {
+                            List<Long> committedTxnIds = replica.getCommittedTxnIds();
+                            if (committedTxnIds.contains(transactionId)) {
+                                ++healthReplicaNum;
+                            } else {
+                                errorReplicaIds.add(replica.getId());
+                            }
+                        }
+
+                        if (healthReplicaNum < quorumReplicaNum) {
+                            LOG.warn("failed to commit transaction {} on tablet {}, with only {} replicas less than quorum {}, txnId: {}",
+                                    transactionState, tablet, healthReplicaNum, quorumReplicaNum, transactionId);
+                            throw new TransactionCommitFailedException("tablet [" + tablet.getId() +"], with only ["
+                                    +healthReplicaNum+ "] replicas less than quorum [" +quorumReplicaNum+ "], txnId ["
+                                    + transactionId + "]");
+                        }
+                    }
+                }
+            }
+        }
+        transactionState.setErrorReplicas(errorReplicaIds);
+
+        return transactionState;
+    }
+
+    public void commitTransaction(List<Long> transactionIds) throws UserException {
+        try {
+            List<TransactionState> transactionStates = Lists.newArrayList();
+            for (long transactionId : transactionIds) {
+                transactionStates.add(checkPreCommitStatus(transactionId));

Review comment:
       If the txn is in PRECOMMITTED, it should be able to be COMMITTED, why we still need to call `checkPreCommitStatus()` here?
   The only thing need to be done in `checkPreCommitStatus()` is to check whether the txn state is PRECOMMITTED, but I saw that there other checking list in `checkPreCommitStatus()`, why?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #7473: [Feature][Transaction] Support two phase commit for stream load

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r803255419



##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -511,16 +521,112 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
                             LOG.warn("Failed to commit txn [{}]. "
                                             + "Tablet [{}] success replica num is {} < quorum replica num {} "
                                             + "while error backends {}",
-                                    transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum,
+                                    transactionState.getTransactionId(), tablet.getId(), successReplicaNum, quorumReplicaNum,
                                     Joiner.on(",").join(errorBackendIdsForTablet));
-                            throw new TabletQuorumFailedException(transactionId, tablet.getId(),
+                            throw new TabletQuorumFailedException(transactionState.getTransactionId(), tablet.getId(),
                                     successReplicaNum, quorumReplicaNum,
                                     errorBackendIdsForTablet);
                         }
                     }
                 }
             }
         }
+    }
+
+    public void commitTransaction2PC(long transactionId) throws UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction {" + transactionId + "} not found.");
+        }
+
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {

Review comment:
       All the following `if` condition can be merged to one: `if (status != PRECOMMITTED)`

##########
File path: fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -837,6 +956,30 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
         LOG.info("finish transaction {} successfully", transactionState);
     }
 
+    protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set<Long> errorReplicaIds,
+                                                Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                                Database db) {
+        // transaction state is modified during check if the transaction could committed
+        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setPreCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.PRECOMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                OlapTable table = (OlapTable) db.getTableNullable(tableId);

Review comment:
       table not used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] weizuo93 commented on a change in pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
weizuo93 commented on a change in pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#discussion_r805733884



##########
File path: be/src/http/action/stream_load_2pc.cpp
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_2pc.h"
+
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "common/status.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "http/utils.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+
+void StreamLoad2PCAction::handle(HttpRequest* req) {
+    Status status = Status::OK();
+    std::string status_result;
+
+    if (config::disable_stream_load_2pc) {
+        status = Status::InternalError("Two phase commit (2PC) for stream load was disabled");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+
+    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    ctx->ref();
+    req->set_handler_ctx(ctx);
+    ctx->db = req->param(HTTP_DB_KEY);
+    std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
+    try {
+        ctx->txn_id = std::stoull(req_txn_id);
+    } catch (const std::exception& e) {
+        status = Status::InternalError("convert txn_id [" + req_txn_id + "] failed");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+    ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY);

Review comment:
       > check invalid parameter?
   
   OK, thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7473:
URL: https://github.com/apache/incubator-doris/pull/7473#issuecomment-1040343251






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org