You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/12 05:31:27 UTC
[incubator-doris] branch master updated: [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d7705ace65 [fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
d7705ace65 is described below
commit d7705ace659c297754a1949a15a8950ee208724a
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu May 12 13:31:22 2022 +0800
[fix](binlog-load) binlog load fails because txn exceeds the default value (#9471)
binlog load Because txn exceeds the default value, resume is a failure,
and a friendly prompt message is given to the user, instead of prompting success now,
it still fails after a while, and the user will feel inexplicable
Issue Number: close #9468
---
.../doris/load/sync/canal/CanalSyncChannel.java | 94 +++++++++++++---------
.../doris/transaction/DatabaseTransactionMgr.java | 2 +-
2 files changed, 57 insertions(+), 39 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 2b71619dcf..5d0774b54a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
@@ -121,53 +123,69 @@ public class CanalSyncChannel extends SyncChannel {
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
- TransactionEntry txnEntry = txnExecutor.getTxnEntry();
- TTxnParams txnConf = txnEntry.getTxnConf();
- TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
- TStreamLoadPutRequest request = null;
- try {
- long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
- new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
- String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+ DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+ if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
+ TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+ TStreamLoadPutRequest request = null;
+ try {
+ long txnId = globalTransactionMgr.beginTransaction(db.getId(),
+ Lists.newArrayList(tbl.getId()), label,
+ new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
+ FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
+ String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
db.getId(), txnId).getAuthCode();
- request = new TStreamLoadPutRequest()
+ request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
.setColumns(targetColumn);
- txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
- txnEntry.setLabel(label);
- txnExecutor.setTxnId(txnId);
- } catch (DuplicatedRequestException e) {
- LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+ txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
+ txnEntry.setLabel(label);
+ txnExecutor.setTxnId (txnId);
+ } catch (DuplicatedRequestException e) {
+ LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
- txnExecutor.setTxnId(e.getTxnId());
- } catch (LabelAlreadyUsedException e) {
- // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
- LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
- return;
- } catch (AnalysisException | BeginTransactionException e) {
- LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
- throw e;
- } catch (UserException e) {
- LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
- throw e;
- }
- try {
- // async exec begin transaction
- long txnId = txnExecutor.getTxnId();
- if (txnId != -1L) {
- this.txnExecutor.beginTransaction(request);
- LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+ txnExecutor.setTxnId(e.getTxnId());
+ } catch (LabelAlreadyUsedException e) {
+ // this happens when channel re-consume same batch,
+ // we should just pass through it without begin a new txn
+ LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}",
+ id, label, targetTable, batchId);
+ return;
+ } catch (AnalysisException | BeginTransactionException e) {
+ LOG.warn ("encounter an error when beginning txn in channel {}, table: {}",
+ id, targetTable);
+ throw e;
+ } catch (UserException e) {
+ LOG.warn ("encounter an error when creating plan in channel {}, table: {}",
+ id, targetTable);
+ throw e;
}
- } catch (TException e) {
- LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
- throw e;
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+ try {
+ // async exec begin transaction
+ long txnId = txnExecutor.getTxnId();
+ if ( txnId != - 1L ) {
+ this.txnExecutor.beginTransaction (request);
+ LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}",
+ id, targetTable, label, txnExecutor.getTxnId());
+ }
+ } catch ( TException e) {
+ LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
+ id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+ throw e;
+ } catch ( TimeoutException | InterruptedException | ExecutionException e) {
+ LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
- throw e;
+ throw e;
+ }
+ } else {
+ String failMsg = "current running txns on db " + db.getId() + " is "
+ + databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db;
+ LOG.warn(failMsg);
+ throw new BeginTransactionException(failMsg);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9ce906fb79..bf530c5ef4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -195,7 +195,7 @@ public class DatabaseTransactionMgr {
return labelToTxnIds.get(label);
}
- protected int getRunningTxnNums() {
+ public int getRunningTxnNums() {
return runningTxnNums;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org