You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/09 09:03:04 UTC

[GitHub] [incubator-doris] hf200012 opened a new pull request, #9471: Binlog load fix

hf200012 opened a new pull request, #9471:
URL: https://github.com/apache/incubator-doris/pull/9471

   # Proposed changes
   
   Issue Number: close #9468
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] pengxiangyu commented on a diff in pull request #9471: [fix]binlog load fails because txn exceeds the default value

Posted by GitBox <gi...@apache.org>.
pengxiangyu commented on code in PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#discussion_r867797240


##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db ) {

Review Comment:
   An unnecessary space after max_running_txn_num_per_db



##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
-                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db ) {
+                TransactionEntry txnEntry = txnExecutor.getTxnEntry ();
+                TTxnParams txnConf = txnEntry.getTxnConf ();
+                TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+                TStreamLoadPutRequest request = null;
+                try {
+                    long txnId = globalTransactionMgr.beginTransaction (db.getId (), Lists.newArrayList (tbl.getId ()), label,
+                        new TransactionState.TxnCoordinator (TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress ()), sourceType, timeoutSecond);
+                    String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr ().getTransactionState (
+                        db.getId (), txnId).getAuthCode ();
+                    request = new TStreamLoadPutRequest ()
+                        .setTxnId (txnId).setDb (txnConf.getDb ()).setTbl (txnConf.getTbl ())
+                        .setFileType (TFileType.FILE_STREAM).setFormatType (TFileFormatType.FORMAT_CSV_PLAIN)
+                        .setThriftRpcTimeoutMs (5000).setLoadId (txnExecutor.getLoadId ())
+                        .setMergeType (TMergeType.MERGE).setDeleteCondition (DELETE_CONDITION)
+                        .setColumns (targetColumn);
+                    txnConf.setTxnId (txnId).setAuthCodeUuid (authCodeUuid);
+                    txnEntry.setLabel (label);
+                    txnExecutor.setTxnId (txnId);
+                } catch ( DuplicatedRequestException e ) {
+                    LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+                        id, e.getDuplicatedRequestId (), e.getTxnId (), targetTable);
+                    txnExecutor.setTxnId (e.getTxnId ());
+                } catch ( LabelAlreadyUsedException e ) {
+                    // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
+                    LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
+                    return;
+                } catch ( AnalysisException | BeginTransactionException e ) {
+                    LOG.warn ("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
+                    throw e;
+                } catch ( UserException e ) {
+                    LOG.warn ("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
+                    throw e;
                 }
-            } catch (TException e) {
-                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
-            } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
-                        id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
+                try {
+                    // async exec begin transaction
+                    long txnId = txnExecutor.getTxnId ();
+                    if ( txnId != - 1L ) {
+                        this.txnExecutor.beginTransaction (request);
+                        LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId ());
+                    }
+                } catch ( TException e ) {
+                    LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+                    throw e;
+                } catch ( TimeoutException | InterruptedException | ExecutionException e ) {
+                    LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+                    throw e;
+                }
+            } else {
+                String failMsg = "current running txns on db " + db.getId () + " is "
+                    + databaseTransactionMgr.getRunningTxnNums () + ", larger than limit " + Config.max_running_txn_num_per_db;

Review Comment:
   Add a LOG.warn() here is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9471: [fix]binlog load fails because txn exceeds the default value

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#issuecomment-1120851705

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #9471: [fix]binlog load fails because txn exceeds the default value

Posted by GitBox <gi...@apache.org>.
morningman merged PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9471: [fix]binlog load fails because txn exceeds the default value

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#issuecomment-1121385377

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9471: [fix]binlog load fails because txn exceeds the default value

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#issuecomment-1123202697

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org