You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/12 05:31:27 UTC

[incubator-doris] branch master updated: [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d7705ace65 [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
d7705ace65 is described below

commit d7705ace659c297754a1949a15a8950ee208724a
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu May 12 13:31:22 2022 +0800

    [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
    
    binlog load Because txn exceeds the default value, resume is a failure,
    and a friendly prompt message is given to the user, instead of prompting success now,
    it still fails after a while, and the user will feel inexplicable
    Issue Number: close #9468
---
 .../doris/load/sync/canal/CanalSyncChannel.java    | 94 +++++++++++++---------
 .../doris/transaction/DatabaseTransactionMgr.java  |  2 +-
 2 files changed, 57 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 2b71619dcf..5d0774b54a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.UserException;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionState;
@@ -121,53 +123,69 @@ public class CanalSyncChannel extends SyncChannel {
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
+                TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+                TTxnParams txnConf = txnEntry.getTxnConf();
+                TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+                TStreamLoadPutRequest request = null;
+                try {
+                    long txnId = globalTransactionMgr.beginTransaction(db.getId(),
+                        Lists.newArrayList(tbl.getId()), label,
+                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
+                            FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
+                    String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
                         db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
+                    request = new TStreamLoadPutRequest()
                         .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
                         .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
                         .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
                         .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
                         .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+                    txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
+                    txnEntry.setLabel(label);
+                    txnExecutor.setTxnId (txnId);
+                } catch (DuplicatedRequestException e) {
+                    LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
                         id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+                    txnExecutor.setTxnId(e.getTxnId());
+                } catch (LabelAlreadyUsedException e) {
+                    // this happens when channel re-consume same batch,
+                    // we should just pass through it without begin a new txn
+                    LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}",
+                        id, label, targetTable, batchId);
+                    return;
+                } catch (AnalysisException | BeginTransactionException e) {
+                    LOG.warn ("encounter an error when beginning txn in channel {}, table: {}",
+                        id, targetTable);
+                    throw e;
+                } catch (UserException e) {
+                    LOG.warn ("encounter an error when creating plan in channel {}, table: {}",
+                        id, targetTable);
+                    throw e;
                 }
-            } catch (TException e) {
-                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+                try {
+                    // async exec begin transaction
+                    long txnId = txnExecutor.getTxnId();
+                    if ( txnId != - 1L ) {
+                        this.txnExecutor.beginTransaction (request);
+                        LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}",
+                            id, targetTable, label, txnExecutor.getTxnId());
+                    }
+                } catch ( TException e) {
+                    LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+                    throw e;
+                } catch ( TimeoutException | InterruptedException | ExecutionException e) {
+                    LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
                         id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
+                    throw e;
+                }
+            } else {
+                String failMsg = "current running txns on db " + db.getId() + " is "
+                    + databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db;
+                LOG.warn(failMsg);
+                throw new BeginTransactionException(failMsg);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9ce906fb79..bf530c5ef4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -195,7 +195,7 @@ public class DatabaseTransactionMgr {
         return labelToTxnIds.get(label);
     }
 
-    protected int getRunningTxnNums() {
+    public int getRunningTxnNums() {
         return runningTxnNums;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org