You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/09 09:10:59 UTC

[GitHub] [incubator-doris] pengxiangyu commented on a diff in pull request #9471: [fix]binlog load fails because txn exceeds the default value

pengxiangyu commented on code in PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#discussion_r867797240


##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db ) {

Review Comment:
   An unnecessary space after max_running_txn_num_per_db



##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db ) {
+                TransactionEntry txnEntry = txnExecutor.getTxnEntry ();
+                TTxnParams txnConf = txnEntry.getTxnConf ();
+                TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+                TStreamLoadPutRequest request = null;
+                try {
+                    long txnId = globalTransactionMgr.beginTransaction (db.getId (), Lists.newArrayList (tbl.getId ()), label,
+                        new TransactionState.TxnCoordinator (TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress ()), sourceType, timeoutSecond);
+                    String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr ().getTransactionState (
+                        db.getId (), txnId).getAuthCode ();
+                    request = new TStreamLoadPutRequest ()
+                        .setTxnId (txnId).setDb (txnConf.getDb ()).setTbl (txnConf.getTbl ())
+                        .setFileType (TFileType.FILE_STREAM).setFormatType (TFileFormatType.FORMAT_CSV_PLAIN)
+                        .setThriftRpcTimeoutMs (5000).setLoadId (txnExecutor.getLoadId ())
+                        .setMergeType (TMergeType.MERGE).setDeleteCondition (DELETE_CONDITION)
+                        .setColumns (targetColumn);
+                    txnConf.setTxnId (txnId).setAuthCodeUuid (authCodeUuid);
+                    txnEntry.setLabel (label);
+                    txnExecutor.setTxnId (txnId);
+                } catch ( DuplicatedRequestException e ) {
+                    LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+                        id, e.getDuplicatedRequestId (), e.getTxnId (), targetTable);
+                    txnExecutor.setTxnId (e.getTxnId ());
+                } catch ( LabelAlreadyUsedException e ) {
+                    // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
+                    LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
+                    return;
+                } catch ( AnalysisException | BeginTransactionException e ) {
+                    LOG.warn ("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
+                    throw e;
+                } catch ( UserException e ) {
+                    LOG.warn ("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
+                    throw e;
                 }
-            } catch (TException e) {
-                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
-                        id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
+                try {
+                    // async exec begin transaction
+                    long txnId = txnExecutor.getTxnId ();
+                    if ( txnId != - 1L ) {
+                        this.txnExecutor.beginTransaction (request);
+                        LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId ());
+                    }
+                } catch ( TException e ) {
+                    LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+                    throw e;
+                } catch ( TimeoutException | InterruptedException | ExecutionException e ) {
+                    LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+                    throw e;
+                }
+            } else {
+                String failMsg = "current running txns on db " + db.getId () + " is "
+                    + databaseTransactionMgr.getRunningTxnNums () + ", larger than limit " + Config.max_running_txn_num_per_db;

Review Comment:
   Add a LOG.warn() here is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org