You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/04/21 14:20:13 UTC

[GitHub] [incubator-doris] caiconghui opened a new pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

caiconghui opened a new pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369


   This PR is the first step to make doris stream load more rubust with higher concurrent performance(#3368),the main work is to support txn management in db level isolation,and use ArrayDeque to stored final status txns


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r421537636



##########
File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2639,6 +2641,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
             fullNameToDb.remove(db.getFullName());
             final Cluster cluster = nameToCluster.get(db.getClusterName());
             cluster.removeDb(dbName, db.getId());
+            globalTransactionMgr.removeDatabaseTransactionMgr(db.getId());

Review comment:
       Actually, the operation `DropDb` is just put database to CatalogRecycleBin, not actually drop it. And the database can be recovered by `Recover` operation.
   So here you should not `removeDatabaseTransactionMgr()`, Instead, it should be called in `CatalogRecycleBin.eraseDatabase()`

##########
File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2686,6 +2689,7 @@ public void replayDropDb(String dbName) throws DdlException {
             idToDb.remove(db.getId());
             final Cluster cluster = nameToCluster.get(db.getClusterName());
             cluster.removeDb(dbName, db.getId());
+            globalTransactionMgr.removeDatabaseTransactionMgr(db.getId());

Review comment:
       Same to `dropDb`, this should be called in `CatalogRecycleBin.replayEraseDatabase()`

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
         return callbackFactory;
     }
 
+    public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws AnalysisException {

Review comment:
       ```suggestion
       public DatabaseTransactionMgr getDatabaseTransactionMgr(long dbId) throws AnalysisException {
   ```

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
         return callbackFactory;
     }
 
+    public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws AnalysisException {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        if (dbTransactionMgr == null) {
+            throw new AnalysisException("databaseTransactionMgr[" + dbId + "] does not exist");

Review comment:
       AnalysisException is not suitable here.
   But it can be modified next time

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -156,114 +119,20 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
                     + Config.min_load_timeout_second + " and " + Config.max_load_timeout_second
                     + " seconds");
         }
-        
-        writeLock();
-        try {
-            Preconditions.checkNotNull(coordinator);
-            Preconditions.checkNotNull(label);
-            FeNameFormat.checkLabel(label);
-
-            /*
-             * Check if label already used, by following steps
-             * 1. get all existing transactions
-             * 2. if there is a PREPARE transaction, check if this is a retry request. If yes, return the
-             *    existing txn id.
-             * 3. if there is a non-aborted transaction, throw label already used exception.
-             */
-            Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
-            if (existingTxnIds != null && !existingTxnIds.isEmpty()) {
-                List<TransactionState> notAbortedTxns = Lists.newArrayList();
-                for (long txnId : existingTxnIds) {
-                    TransactionState txn = idToTransactionState.get(txnId);
-                    Preconditions.checkNotNull(txn);
-                    if (txn.getTransactionStatus() != TransactionStatus.ABORTED) {
-                        notAbortedTxns.add(txn);
-                    }
-                }
-                // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE status
-                Preconditions.checkState(notAbortedTxns.size() <= 1, notAbortedTxns);
-                if (!notAbortedTxns.isEmpty()) {
-                    TransactionState notAbortedTxn = notAbortedTxns.get(0);
-                    if (requestId != null && notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
-                            && notAbortedTxn.getRequsetId() != null && notAbortedTxn.getRequsetId().equals(requestId)) {
-                        // this may be a retry request for same job, just return existing txn id.
-                        throw new DuplicatedRequestException(DebugUtil.printId(requestId),
-                                notAbortedTxn.getTransactionId(), "");
-                    }
-                    throw new LabelAlreadyUsedException(label, notAbortedTxn.getTransactionStatus());
-                }
-            }
 
-            checkRunningTxnExceedLimit(dbId, sourceType);
-          
-            long tid = idGenerator.getNextTransactionId();
-            LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
-            TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType,
-                    coordinator, listenerId, timeoutSecond * 1000);
-            transactionState.setPrepareTime(System.currentTimeMillis());
-            unprotectUpsertTransactionState(transactionState);
-
-            if (MetricRepo.isInit.get()) {
-                MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
-            }
-
-            return tid;
-        } catch (DuplicatedRequestException e) {
-            throw e;
-        } catch (Exception e) {
-            if (MetricRepo.isInit.get()) {
-                MetricRepo.COUNTER_TXN_REJECT.increase(1L);
-            }
-            throw e;
-        } finally {
-            writeUnlock();
-        }
-    }
-    
-    private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType sourceType) throws BeginTransactionException {
-        switch (sourceType) {
-            case ROUTINE_LOAD_TASK:
-                // no need to check limit for routine load task:
-                // 1. the number of running routine load tasks is limited by Config.max_routine_load_task_num_per_be
-                // 2. if we add routine load txn to runningTxnNums, runningTxnNums will always be occupied by routine load,
-                //    and other txn may not be able to submitted.
-                break;
-            default:
-                if (runningTxnNums.getOrDefault(dbId, 0) >= Config.max_running_txn_num_per_db) {
-                    throw new BeginTransactionException("current running txns on db " + dbId + " is "
-                            + runningTxnNums.get(dbId) + ", larger than limit " + Config.max_running_txn_num_per_db);
-                }
-                break;
-        }
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+        return dbTransactionMgr.beginTransaction(tableIdList, label, requestId, coordinator, sourceType, listenerId, timeoutSecond);
     }
 
     public TransactionStatus getLabelState(long dbId, String label) {
-        readLock();
         try {
-            Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
-            if (existingTxnIds == null || existingTxnIds.isEmpty()) {
-                return TransactionStatus.UNKNOWN;
-            }
-            // find the latest txn (which id is largest)
-            long maxTxnId = existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
-            return idToTransactionState.get(maxTxnId).getTransactionStatus();
-        } finally {
-            readUnlock();
-        }
-    }
-    
-    public void deleteTransaction(long transactionId) {
-        writeLock();
-        try {
-            TransactionState state = idToTransactionState.get(transactionId);
-            if (state == null) {
-                return;
-            }
-            replayDeleteTransactionState(state);
-            editLog.logDeleteTransactionState(state);
-        } finally {
-            writeUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getLabelState(label);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction status by label " + label + " failed", e);
+            return null;

Review comment:
       ```suggestion
               return TransactionStatus.UNKNOWN;
   ```

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {

Review comment:
       This logic(Remove the timeout txn) can also be put into `DatabaseTransactionMgr`.
   `dbTransactionMgr. removeExpiredAndTimeoutTxns();`
   

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {
+                try {
+                    dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null);
+                    LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
+                } catch (UserException e) {
+                    // abort may be failed. it is acceptable. just print a log
+                    LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
                 }
             }
-        } finally {
-            readUnlock();
-        }
 
-        // delete expired txns
-        for (Long txnId : expiredTxns) {
-            deleteTransaction(txnId);
-            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
-        }
-
-        // abort timeout txns
-        for (Long txnId : timeoutTxns) {
-            try {
-                abortTransaction(txnId, "timeout by txn manager");
-                LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
-            } catch (UserException e) {
-                // abort may be failed. it is acceptable. just print a log
-                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
-            }
         }
     }
 
-    public TransactionState getTransactionState(long transactionId) {
-        readLock();
-        try {
-            return idToTransactionState.get(transactionId);
-        } finally {
-            readUnlock();
-        }
+    public TransactionState getTransactionState(long dbId, long transactionId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        return dbTransactionMgr.getTransactionState(transactionId);
     }
     
     public void setEditLog(EditLog editLog) {
-        this.editLog = editLog;
         this.idGenerator.setEditLog(editLog);
     }
-    
-    private void readLock() {
-        this.transactionLock.readLock().lock();
-    }
-    
-    private void readUnlock() {
-        this.transactionLock.readLock().unlock();
-    }
-    
-    private void writeLock() {
-        this.transactionLock.writeLock().lock();
-    }
-    
-    private void writeUnlock() {
-        this.transactionLock.writeLock().unlock();
-    }
-    
-    // for add/update/delete TransactionState
-    private void unprotectUpsertTransactionState(TransactionState transactionState) {
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
-                || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) {
-            // if this is a prepare txn, and load source type is not FRONTEND
-            // no need to persist it. if prepare txn lost, the following commit will just be failed.
-            // user only need to retry this txn.
-            // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
-            editLog.logInsertTransactionState(transactionState);
-        }
-        idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-        updateTxnLabels(transactionState);
-        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
-    }
-
-    private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
-                                              Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
-                                              Database db) {
-        // transaction state is modified during check if the transaction could committed
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
-            return;
-        }
-        // update transaction state version
-        transactionState.setCommitTime(System.currentTimeMillis());
-        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
-        transactionState.setErrorReplicas(errorReplicaIds);
-        for (long tableId : tableToPartition.keySet()) {
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
-            for (long partitionId : tableToPartition.get(tableId)) {
-                OlapTable table = (OlapTable) db.getTable(tableId);
-                Partition partition = table.getPartition(partitionId);
-                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
-                                                                                  partition.getNextVersion(),
-                                                                                  partition.getNextVersionHash());
-                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
-            }
-            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
-        }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState);
-
-        // add publish version tasks. set task to null as a placeholder.
-        // tasks will be created when publishing version.
-        for (long backendId : totalInvolvedBackends) {
-            transactionState.addPublishVersionTask(backendId, null);
-        }
-    }
 
-    private boolean unprotectAbortTransaction(long transactionId, String reason)
-            throws UserException {
-        TransactionState transactionState = idToTransactionState.get(transactionId);
-        if (transactionState == null) {
-            throw new UserException("transaction not found");
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-            return false;
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
-                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already "
-                    + transactionState.getTransactionStatus() + ", could not abort");
-        }
-        transactionState.setFinishTime(System.currentTimeMillis());
-        transactionState.setReason(reason);
-        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-        unprotectUpsertTransactionState(transactionState);
-        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
-            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
-        }
-        return true;
-    }
-    
     // for replay idToTransactionState
     // check point also run transaction cleaner, the cleaner maybe concurrently modify id to 
     public void replayUpsertTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            // set transaction status will call txn state change listener
-            transactionState.replaySetTransactionStatus();
-            Database db = catalog.getDb(transactionState.getDbId());
-            if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
-                LOG.info("replay a committed transaction {}", transactionState);
-                updateCatalogAfterCommitted(transactionState, db);
-            } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                LOG.info("replay a visible transaction {}", transactionState);
-                updateCatalogAfterVisible(transactionState, db);
-            }
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
-        } finally {
-            writeUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.replayUpsertTransactionState(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay upsert transaction failed", e);

Review comment:
       add transaction's id in log, for easy debugging.

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -1346,81 +359,71 @@ public TransactionIdGenerator getTransactionIDGenerator() {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        int numTransactions = idToTransactionState.size();
+        int numTransactions = getTransactionNum();
         out.writeInt(numTransactions);
-        for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
-            entry.getValue().write(out);
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.unprotectWriteAllTransactionStates(out);
         }
         idGenerator.write(out);
     }
     
     public void readFields(DataInput in) throws IOException {
-        int numTransactions = in.readInt();
-        for (int i = 0; i < numTransactions; ++i) {
-            TransactionState transactionState = new TransactionState();
-            transactionState.readFields(in);
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
+        try {
+            int numTransactions = in.readInt();
+            for (int i = 0; i < numTransactions; ++i) {
+                TransactionState transactionState = new TransactionState();
+                transactionState.readFields(in);
+                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+                dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
+            }
+            idGenerator.readFields(in);
+        } catch (AnalysisException e) {
+            throw new IOException("Read transaction states failed", e);
         }
-        idGenerator.readFields(in);
+
     }
 
-    public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set<TransactionStatus> status) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackIdAndStatus(long dbId, long callbackId, Set<TransactionStatus> status) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId and status failed", e);
+            return null;
         }
-        return null;
     }
 
-    public TransactionState getTransactionStateByCallbackId(long callbackId) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackId(long dbId, long callbackId) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId failed", e);
+            return null;
         }
-        return null;
     }
 
-    public List<Long> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
-        ArrayList<Long> txnIds = new ArrayList<>();
-        readLock();
-        try {
-            idToTransactionState.values().stream()
-                    .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE
-                            && t.getCoordinator().ip.equals(coordinateHost)
-                            && (!t.getTransactionStatus().isFinalStatus())))
-                    .limit(limit)
-                    .forEach(t -> txnIds.add(t.getTransactionId()));
-        } finally {
-            readUnlock();
+    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
+        ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
+        for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit));
+            if (txnInfos.size() > limit) {
+                break;
+            }
         }
-        return txnIds;
+        return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos;

Review comment:
       ```suggestion
           return txnInfos.size() > limit ? txnInfos.subList(0, limit) : txnInfos;
   ```

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {
+                try {
+                    dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null);
+                    LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
+                } catch (UserException e) {
+                    // abort may be failed. it is acceptable. just print a log
+                    LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
                 }
             }
-        } finally {
-            readUnlock();
-        }
 
-        // delete expired txns
-        for (Long txnId : expiredTxns) {
-            deleteTransaction(txnId);
-            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
-        }
-
-        // abort timeout txns
-        for (Long txnId : timeoutTxns) {
-            try {
-                abortTransaction(txnId, "timeout by txn manager");
-                LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
-            } catch (UserException e) {
-                // abort may be failed. it is acceptable. just print a log
-                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
-            }
         }
     }
 
-    public TransactionState getTransactionState(long transactionId) {
-        readLock();
-        try {
-            return idToTransactionState.get(transactionId);
-        } finally {
-            readUnlock();
-        }
+    public TransactionState getTransactionState(long dbId, long transactionId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);

Review comment:
       No throw exception? `dbTransactionMgr` could be `null`

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {
+                try {
+                    dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null);
+                    LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
+                } catch (UserException e) {
+                    // abort may be failed. it is acceptable. just print a log
+                    LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
                 }
             }
-        } finally {
-            readUnlock();
-        }
 
-        // delete expired txns
-        for (Long txnId : expiredTxns) {
-            deleteTransaction(txnId);
-            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
-        }
-
-        // abort timeout txns
-        for (Long txnId : timeoutTxns) {
-            try {
-                abortTransaction(txnId, "timeout by txn manager");
-                LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
-            } catch (UserException e) {
-                // abort may be failed. it is acceptable. just print a log
-                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
-            }
         }
     }
 
-    public TransactionState getTransactionState(long transactionId) {
-        readLock();
-        try {
-            return idToTransactionState.get(transactionId);
-        } finally {
-            readUnlock();
-        }
+    public TransactionState getTransactionState(long dbId, long transactionId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        return dbTransactionMgr.getTransactionState(transactionId);
     }
     
     public void setEditLog(EditLog editLog) {
-        this.editLog = editLog;
         this.idGenerator.setEditLog(editLog);
     }
-    
-    private void readLock() {
-        this.transactionLock.readLock().lock();
-    }
-    
-    private void readUnlock() {
-        this.transactionLock.readLock().unlock();
-    }
-    
-    private void writeLock() {
-        this.transactionLock.writeLock().lock();
-    }
-    
-    private void writeUnlock() {
-        this.transactionLock.writeLock().unlock();
-    }
-    
-    // for add/update/delete TransactionState
-    private void unprotectUpsertTransactionState(TransactionState transactionState) {
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
-                || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) {
-            // if this is a prepare txn, and load source type is not FRONTEND
-            // no need to persist it. if prepare txn lost, the following commit will just be failed.
-            // user only need to retry this txn.
-            // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
-            editLog.logInsertTransactionState(transactionState);
-        }
-        idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-        updateTxnLabels(transactionState);
-        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
-    }
-
-    private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
-                                              Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
-                                              Database db) {
-        // transaction state is modified during check if the transaction could committed
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
-            return;
-        }
-        // update transaction state version
-        transactionState.setCommitTime(System.currentTimeMillis());
-        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
-        transactionState.setErrorReplicas(errorReplicaIds);
-        for (long tableId : tableToPartition.keySet()) {
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
-            for (long partitionId : tableToPartition.get(tableId)) {
-                OlapTable table = (OlapTable) db.getTable(tableId);
-                Partition partition = table.getPartition(partitionId);
-                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
-                                                                                  partition.getNextVersion(),
-                                                                                  partition.getNextVersionHash());
-                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
-            }
-            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
-        }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState);
-
-        // add publish version tasks. set task to null as a placeholder.
-        // tasks will be created when publishing version.
-        for (long backendId : totalInvolvedBackends) {
-            transactionState.addPublishVersionTask(backendId, null);
-        }
-    }
 
-    private boolean unprotectAbortTransaction(long transactionId, String reason)
-            throws UserException {
-        TransactionState transactionState = idToTransactionState.get(transactionId);
-        if (transactionState == null) {
-            throw new UserException("transaction not found");
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-            return false;
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
-                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already "
-                    + transactionState.getTransactionStatus() + ", could not abort");
-        }
-        transactionState.setFinishTime(System.currentTimeMillis());
-        transactionState.setReason(reason);
-        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-        unprotectUpsertTransactionState(transactionState);
-        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
-            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
-        }
-        return true;
-    }
-    
     // for replay idToTransactionState
     // check point also run transaction cleaner, the cleaner maybe concurrently modify id to 
     public void replayUpsertTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            // set transaction status will call txn state change listener
-            transactionState.replaySetTransactionStatus();
-            Database db = catalog.getDb(transactionState.getDbId());
-            if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
-                LOG.info("replay a committed transaction {}", transactionState);
-                updateCatalogAfterCommitted(transactionState, db);
-            } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                LOG.info("replay a visible transaction {}", transactionState);
-                updateCatalogAfterVisible(transactionState, db);
-            }
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
-        } finally {
-            writeUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.replayUpsertTransactionState(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay upsert transaction failed", e);
         }
+
     }
     
     public void replayDeleteTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            idToTransactionState.remove(transactionState.getTransactionId());
-            Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel());
-            txnIds.remove(transactionState.getTransactionId());
-            if (txnIds.isEmpty()) {
-                dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel());
-            }
-        } finally {
-            writeUnlock();
-        }
-    }
-    
-    private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) {
-        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-            long tableId = tableCommitInfo.getTableId();
-            OlapTable table = (OlapTable) db.getTable(tableId);
-            for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                long partitionId = partitionCommitInfo.getPartitionId();
-                Partition partition = table.getPartition(partitionId);
-                List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex index : allIndices) {
-                    List<Tablet> tablets = index.getTablets();
-                    for (Tablet tablet : tablets) {
-                        for (Replica replica : tablet.getReplicas()) {
-                            if (errorReplicaIds.contains(replica.getId())) {
-                                // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
-                                // should get from transaction state
-                                replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-                                                                partitionCommitInfo.getVersionHash());
-                            }
-                        }
-                    }
-                }
-                partition.setNextVersion(partition.getNextVersion() + 1);
-                // Although committed version(hash) is not visible to user,
-                // but they need to be synchronized among Frontends.
-                // because we use committed version(hash) to create clone task, if the first Master FE
-                // send clone task with committed version hash X, and than Master changed, the new Master FE
-                // received the clone task report with version hash X, which not equals to it own committed
-                // version hash, than the clone task is failed.
-                partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */,
-                                             partitionCommitInfo.getVersionHash() /* committed version hash*/);
-            }
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.deleteTransaction(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay delete transaction failed", e);
         }
     }
-    
-    private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) {
-        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-            long tableId = tableCommitInfo.getTableId();
-            OlapTable table = (OlapTable) db.getTable(tableId);
-            for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                long partitionId = partitionCommitInfo.getPartitionId();
-                long newCommitVersion = partitionCommitInfo.getVersion();
-                long newCommitVersionHash = partitionCommitInfo.getVersionHash();
-                Partition partition = table.getPartition(partitionId);
-                List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex index : allIndices) {
-                    for (Tablet tablet : index.getTablets()) {
-                        for (Replica replica : tablet.getReplicas()) {
-                            long lastFailedVersion = replica.getLastFailedVersion();
-                            long lastFailedVersionHash = replica.getLastFailedVersionHash();
-                            long newVersion = newCommitVersion;
-                            long newVersionHash = newCommitVersionHash;
-                            long lastSucessVersion = replica.getLastSuccessVersion();
-                            long lastSuccessVersionHash = replica.getLastSuccessVersionHash();
-                            if (!errorReplicaIds.contains(replica.getId())) {
-                                if (replica.getLastFailedVersion() > 0) {
-                                    // if the replica is a failed replica, then not changing version and version hash
-                                    newVersion = replica.getVersion();
-                                    newVersionHash = replica.getVersionHash();
-                                } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(),
-                                        partition.getVisibleVersionHash(), true)) {
-                                    // this means the replica has error in the past, but we did not observe it
-                                    // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica
-                                    // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback
-                                    // then we will detect this and set C's last failed version to 10 and last success version to 11
-                                    // this logic has to be replayed in checkpoint thread
-                                    lastFailedVersion = partition.getVisibleVersion();
-                                    lastFailedVersionHash = partition.getVisibleVersionHash();
-                                    newVersion = replica.getVersion();
-                                    newVersionHash = replica.getVersionHash();
-                                }
 
-                                // success version always move forward
-                                lastSucessVersion = newCommitVersion;
-                                lastSuccessVersionHash = newCommitVersionHash;
-                            } else {
-                                // for example, A,B,C 3 replicas, B,C failed during publish version, then B C will be set abnormal
-                                // all loading will failed, B,C will have to recovery by clone, it is very inefficient and maybe lost data
-                                // Using this method, B,C will publish failed, and fe will publish again, not update their last failed version
-                                // if B is publish successfully in next turn, then B is normal and C will be set abnormal so that quorum is maintained
-                                // and loading will go on.
-                                newVersion = replica.getVersion();
-                                newVersionHash = replica.getVersionHash();
-                                if (newCommitVersion > lastFailedVersion) {
-                                    lastFailedVersion = newCommitVersion;
-                                    lastFailedVersionHash = newCommitVersionHash;
-                                }
-                            }
-                            replica.updateVersionInfo(newVersion, newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion, lastSuccessVersionHash);
-                        }
-                    }
-                } // end for indices
-                long version = partitionCommitInfo.getVersion();
-                long versionHash = partitionCommitInfo.getVersionHash();
-                partition.updateVisibleVersionAndVersionHash(version, versionHash);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]",
-                              transactionState, partition.getId(), version, versionHash);
-                }
-            }
-        }
-        return true;
-    }
-    
-    private void updateTxnLabels(TransactionState transactionState) {
-        Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel());
-        if (txnIds == null) {
-            txnIds = Sets.newHashSet();
-            dbIdToTxnLabels.put(transactionState.getDbId(), transactionState.getLabel(), txnIds);
-        }
-        txnIds.add(transactionState.getTransactionId());
-    }
-    
-    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
-        Map<Long, Integer> txnNumMap = null;
-        if (curTxnState.getSourceType() == LoadJobSourceType.ROUTINE_LOAD_TASK) {
-            txnNumMap = runningRoutineLoadTxnNums;
-        } else {
-            txnNumMap = runningTxnNums;
-        }
-
-        int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0);
-        if (preStatus == null
-                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
-                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
-            ++txnNum;
-        } else if ((preStatus == TransactionStatus.PREPARE
-                || preStatus == TransactionStatus.COMMITTED)
-                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
-                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
-            --txnNum;
-        }
-
-        if (txnNum < 1) {
-            txnNumMap.remove(curTxnState.getDbId());
-        } else {
-            txnNumMap.put(curTxnState.getDbId(), txnNum);
-        }
-    }
-    
     public List<List<Comparable>> getDbInfo() {
         List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
-        readLock();
-        try {
-            Set<Long> dbIds = new HashSet<>();
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                dbIds.add(transactionState.getDbId());
-            }
-            for (long dbId : dbIds) {
-                List<Comparable> info = new ArrayList<Comparable>();
-                info.add(dbId);
-                Database db = Catalog.getInstance().getDb(dbId);
-                if (db == null) {
-                    continue;
-                }
-                info.add(db.getFullName());
-                infos.add(info);
+        List<Long> dbIds = Lists.newArrayList();

Review comment:
       ```suggestion
           List<Long> dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
   ```

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {
+                try {
+                    dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null);
+                    LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
+                } catch (UserException e) {
+                    // abort may be failed. it is acceptable. just print a log
+                    LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
                 }
             }
-        } finally {
-            readUnlock();
-        }
 
-        // delete expired txns
-        for (Long txnId : expiredTxns) {
-            deleteTransaction(txnId);
-            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
-        }
-
-        // abort timeout txns
-        for (Long txnId : timeoutTxns) {
-            try {
-                abortTransaction(txnId, "timeout by txn manager");
-                LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
-            } catch (UserException e) {
-                // abort may be failed. it is acceptable. just print a log
-                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
-            }
         }
     }
 
-    public TransactionState getTransactionState(long transactionId) {
-        readLock();
-        try {
-            return idToTransactionState.get(transactionId);
-        } finally {
-            readUnlock();
-        }
+    public TransactionState getTransactionState(long dbId, long transactionId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        return dbTransactionMgr.getTransactionState(transactionId);
     }
     
     public void setEditLog(EditLog editLog) {
-        this.editLog = editLog;
         this.idGenerator.setEditLog(editLog);
     }
-    
-    private void readLock() {
-        this.transactionLock.readLock().lock();
-    }
-    
-    private void readUnlock() {
-        this.transactionLock.readLock().unlock();
-    }
-    
-    private void writeLock() {
-        this.transactionLock.writeLock().lock();
-    }
-    
-    private void writeUnlock() {
-        this.transactionLock.writeLock().unlock();
-    }
-    
-    // for add/update/delete TransactionState
-    private void unprotectUpsertTransactionState(TransactionState transactionState) {
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
-                || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) {
-            // if this is a prepare txn, and load source type is not FRONTEND
-            // no need to persist it. if prepare txn lost, the following commit will just be failed.
-            // user only need to retry this txn.
-            // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
-            editLog.logInsertTransactionState(transactionState);
-        }
-        idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-        updateTxnLabels(transactionState);
-        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
-    }
-
-    private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
-                                              Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
-                                              Database db) {
-        // transaction state is modified during check if the transaction could committed
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
-            return;
-        }
-        // update transaction state version
-        transactionState.setCommitTime(System.currentTimeMillis());
-        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
-        transactionState.setErrorReplicas(errorReplicaIds);
-        for (long tableId : tableToPartition.keySet()) {
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
-            for (long partitionId : tableToPartition.get(tableId)) {
-                OlapTable table = (OlapTable) db.getTable(tableId);
-                Partition partition = table.getPartition(partitionId);
-                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
-                                                                                  partition.getNextVersion(),
-                                                                                  partition.getNextVersionHash());
-                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
-            }
-            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
-        }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState);
-
-        // add publish version tasks. set task to null as a placeholder.
-        // tasks will be created when publishing version.
-        for (long backendId : totalInvolvedBackends) {
-            transactionState.addPublishVersionTask(backendId, null);
-        }
-    }
 
-    private boolean unprotectAbortTransaction(long transactionId, String reason)
-            throws UserException {
-        TransactionState transactionState = idToTransactionState.get(transactionId);
-        if (transactionState == null) {
-            throw new UserException("transaction not found");
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-            return false;
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
-                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already "
-                    + transactionState.getTransactionStatus() + ", could not abort");
-        }
-        transactionState.setFinishTime(System.currentTimeMillis());
-        transactionState.setReason(reason);
-        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-        unprotectUpsertTransactionState(transactionState);
-        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
-            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
-        }
-        return true;
-    }
-    
     // for replay idToTransactionState
     // check point also run transaction cleaner, the cleaner maybe concurrently modify id to 
     public void replayUpsertTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            // set transaction status will call txn state change listener
-            transactionState.replaySetTransactionStatus();
-            Database db = catalog.getDb(transactionState.getDbId());
-            if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
-                LOG.info("replay a committed transaction {}", transactionState);
-                updateCatalogAfterCommitted(transactionState, db);
-            } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                LOG.info("replay a visible transaction {}", transactionState);
-                updateCatalogAfterVisible(transactionState, db);
-            }
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
-        } finally {
-            writeUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.replayUpsertTransactionState(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay upsert transaction failed", e);
         }
+
     }
     
     public void replayDeleteTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            idToTransactionState.remove(transactionState.getTransactionId());
-            Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel());
-            txnIds.remove(transactionState.getTransactionId());
-            if (txnIds.isEmpty()) {
-                dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel());
-            }
-        } finally {
-            writeUnlock();
-        }
-    }
-    
-    private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) {
-        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-            long tableId = tableCommitInfo.getTableId();
-            OlapTable table = (OlapTable) db.getTable(tableId);
-            for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                long partitionId = partitionCommitInfo.getPartitionId();
-                Partition partition = table.getPartition(partitionId);
-                List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex index : allIndices) {
-                    List<Tablet> tablets = index.getTablets();
-                    for (Tablet tablet : tablets) {
-                        for (Replica replica : tablet.getReplicas()) {
-                            if (errorReplicaIds.contains(replica.getId())) {
-                                // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
-                                // should get from transaction state
-                                replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-                                                                partitionCommitInfo.getVersionHash());
-                            }
-                        }
-                    }
-                }
-                partition.setNextVersion(partition.getNextVersion() + 1);
-                // Although committed version(hash) is not visible to user,
-                // but they need to be synchronized among Frontends.
-                // because we use committed version(hash) to create clone task, if the first Master FE
-                // send clone task with committed version hash X, and than Master changed, the new Master FE
-                // received the clone task report with version hash X, which not equals to it own committed
-                // version hash, than the clone task is failed.
-                partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */,
-                                             partitionCommitInfo.getVersionHash() /* committed version hash*/);
-            }
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.deleteTransaction(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay delete transaction failed", e);

Review comment:
       Add txn id in log.

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -17,69 +17,33 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.commons.lang3.tuple.Pair;

Review comment:
       You can use `org.apache.doris.common.Pair`

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> t
      */
     public void removeExpiredAndTimeoutTxns() {
         long currentMillis = System.currentTimeMillis();
-
-        List<Long> timeoutTxns = Lists.newArrayList();
-        List<Long> expiredTxns = Lists.newArrayList();
-        readLock();
-        try {
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                if (transactionState.isExpired(currentMillis)) {
-                    // remove the txn which labels are expired
-                    expiredTxns.add(transactionState.getTransactionId());
-                } else if (transactionState.isTimeout(currentMillis)) {
-                    // txn is running but timeout, abort it.
-                    timeoutTxns.add(transactionState.getTransactionId());
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.removeExpiredTxns();
+            List<Long> timeoutTxns = dbTransactionMgr.getTimeoutTxns(currentMillis);
+            // abort timeout txns
+            for (Long txnId : timeoutTxns) {
+                try {
+                    dbTransactionMgr.abortTransaction(txnId, "timeout by txn manager", null);
+                    LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
+                } catch (UserException e) {
+                    // abort may be failed. it is acceptable. just print a log
+                    LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
                 }
             }
-        } finally {
-            readUnlock();
-        }
 
-        // delete expired txns
-        for (Long txnId : expiredTxns) {
-            deleteTransaction(txnId);
-            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
-        }
-
-        // abort timeout txns
-        for (Long txnId : timeoutTxns) {
-            try {
-                abortTransaction(txnId, "timeout by txn manager");
-                LOG.info("transaction [" + txnId + "] is timeout, abort it by transaction manager");
-            } catch (UserException e) {
-                // abort may be failed. it is acceptable. just print a log
-                LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage());
-            }
         }
     }
 
-    public TransactionState getTransactionState(long transactionId) {
-        readLock();
-        try {
-            return idToTransactionState.get(transactionId);
-        } finally {
-            readUnlock();
-        }
+    public TransactionState getTransactionState(long dbId, long transactionId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        return dbTransactionMgr.getTransactionState(transactionId);
     }
     
     public void setEditLog(EditLog editLog) {
-        this.editLog = editLog;
         this.idGenerator.setEditLog(editLog);
     }
-    
-    private void readLock() {
-        this.transactionLock.readLock().lock();
-    }
-    
-    private void readUnlock() {
-        this.transactionLock.readLock().unlock();
-    }
-    
-    private void writeLock() {
-        this.transactionLock.writeLock().lock();
-    }
-    
-    private void writeUnlock() {
-        this.transactionLock.writeLock().unlock();
-    }
-    
-    // for add/update/delete TransactionState
-    private void unprotectUpsertTransactionState(TransactionState transactionState) {
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
-                || transactionState.getSourceType() == LoadJobSourceType.FRONTEND) {
-            // if this is a prepare txn, and load source type is not FRONTEND
-            // no need to persist it. if prepare txn lost, the following commit will just be failed.
-            // user only need to retry this txn.
-            // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
-            editLog.logInsertTransactionState(transactionState);
-        }
-        idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-        updateTxnLabels(transactionState);
-        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
-    }
-
-    private void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
-                                              Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
-                                              Database db) {
-        // transaction state is modified during check if the transaction could committed
-        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
-            return;
-        }
-        // update transaction state version
-        transactionState.setCommitTime(System.currentTimeMillis());
-        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
-        transactionState.setErrorReplicas(errorReplicaIds);
-        for (long tableId : tableToPartition.keySet()) {
-            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
-            for (long partitionId : tableToPartition.get(tableId)) {
-                OlapTable table = (OlapTable) db.getTable(tableId);
-                Partition partition = table.getPartition(partitionId);
-                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
-                                                                                  partition.getNextVersion(),
-                                                                                  partition.getNextVersionHash());
-                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
-            }
-            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
-        }
-        // persist transactionState
-        unprotectUpsertTransactionState(transactionState);
-
-        // add publish version tasks. set task to null as a placeholder.
-        // tasks will be created when publishing version.
-        for (long backendId : totalInvolvedBackends) {
-            transactionState.addPublishVersionTask(backendId, null);
-        }
-    }
 
-    private boolean unprotectAbortTransaction(long transactionId, String reason)
-            throws UserException {
-        TransactionState transactionState = idToTransactionState.get(transactionId);
-        if (transactionState == null) {
-            throw new UserException("transaction not found");
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-            return false;
-        }
-        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
-                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already "
-                    + transactionState.getTransactionStatus() + ", could not abort");
-        }
-        transactionState.setFinishTime(System.currentTimeMillis());
-        transactionState.setReason(reason);
-        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
-        unprotectUpsertTransactionState(transactionState);
-        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
-            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
-        }
-        return true;
-    }
-    
     // for replay idToTransactionState
     // check point also run transaction cleaner, the cleaner maybe concurrently modify id to 
     public void replayUpsertTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            // set transaction status will call txn state change listener
-            transactionState.replaySetTransactionStatus();
-            Database db = catalog.getDb(transactionState.getDbId());
-            if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
-                LOG.info("replay a committed transaction {}", transactionState);
-                updateCatalogAfterCommitted(transactionState, db);
-            } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                LOG.info("replay a visible transaction {}", transactionState);
-                updateCatalogAfterVisible(transactionState, db);
-            }
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
-        } finally {
-            writeUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.replayUpsertTransactionState(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay upsert transaction failed", e);
         }
+
     }
     
     public void replayDeleteTransactionState(TransactionState transactionState) {
-        writeLock();
         try {
-            idToTransactionState.remove(transactionState.getTransactionId());
-            Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel());
-            txnIds.remove(transactionState.getTransactionId());
-            if (txnIds.isEmpty()) {
-                dbIdToTxnLabels.remove(transactionState.getDbId(), transactionState.getLabel());
-            }
-        } finally {
-            writeUnlock();
-        }
-    }
-    
-    private void updateCatalogAfterCommitted(TransactionState transactionState, Database db) {
-        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-            long tableId = tableCommitInfo.getTableId();
-            OlapTable table = (OlapTable) db.getTable(tableId);
-            for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                long partitionId = partitionCommitInfo.getPartitionId();
-                Partition partition = table.getPartition(partitionId);
-                List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex index : allIndices) {
-                    List<Tablet> tablets = index.getTablets();
-                    for (Tablet tablet : tablets) {
-                        for (Replica replica : tablet.getReplicas()) {
-                            if (errorReplicaIds.contains(replica.getId())) {
-                                // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
-                                // should get from transaction state
-                                replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-                                                                partitionCommitInfo.getVersionHash());
-                            }
-                        }
-                    }
-                }
-                partition.setNextVersion(partition.getNextVersion() + 1);
-                // Although committed version(hash) is not visible to user,
-                // but they need to be synchronized among Frontends.
-                // because we use committed version(hash) to create clone task, if the first Master FE
-                // send clone task with committed version hash X, and than Master changed, the new Master FE
-                // received the clone task report with version hash X, which not equals to it own committed
-                // version hash, than the clone task is failed.
-                partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */,
-                                             partitionCommitInfo.getVersionHash() /* committed version hash*/);
-            }
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+            dbTransactionMgr.deleteTransaction(transactionState);
+        } catch (AnalysisException e) {
+            LOG.warn("replay delete transaction failed", e);
         }
     }
-    
-    private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) {
-        Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
-        for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
-            long tableId = tableCommitInfo.getTableId();
-            OlapTable table = (OlapTable) db.getTable(tableId);
-            for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
-                long partitionId = partitionCommitInfo.getPartitionId();
-                long newCommitVersion = partitionCommitInfo.getVersion();
-                long newCommitVersionHash = partitionCommitInfo.getVersionHash();
-                Partition partition = table.getPartition(partitionId);
-                List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex index : allIndices) {
-                    for (Tablet tablet : index.getTablets()) {
-                        for (Replica replica : tablet.getReplicas()) {
-                            long lastFailedVersion = replica.getLastFailedVersion();
-                            long lastFailedVersionHash = replica.getLastFailedVersionHash();
-                            long newVersion = newCommitVersion;
-                            long newVersionHash = newCommitVersionHash;
-                            long lastSucessVersion = replica.getLastSuccessVersion();
-                            long lastSuccessVersionHash = replica.getLastSuccessVersionHash();
-                            if (!errorReplicaIds.contains(replica.getId())) {
-                                if (replica.getLastFailedVersion() > 0) {
-                                    // if the replica is a failed replica, then not changing version and version hash
-                                    newVersion = replica.getVersion();
-                                    newVersionHash = replica.getVersionHash();
-                                } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(),
-                                        partition.getVisibleVersionHash(), true)) {
-                                    // this means the replica has error in the past, but we did not observe it
-                                    // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica
-                                    // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback
-                                    // then we will detect this and set C's last failed version to 10 and last success version to 11
-                                    // this logic has to be replayed in checkpoint thread
-                                    lastFailedVersion = partition.getVisibleVersion();
-                                    lastFailedVersionHash = partition.getVisibleVersionHash();
-                                    newVersion = replica.getVersion();
-                                    newVersionHash = replica.getVersionHash();
-                                }
 
-                                // success version always move forward
-                                lastSucessVersion = newCommitVersion;
-                                lastSuccessVersionHash = newCommitVersionHash;
-                            } else {
-                                // for example, A,B,C 3 replicas, B,C failed during publish version, then B C will be set abnormal
-                                // all loading will failed, B,C will have to recovery by clone, it is very inefficient and maybe lost data
-                                // Using this method, B,C will publish failed, and fe will publish again, not update their last failed version
-                                // if B is publish successfully in next turn, then B is normal and C will be set abnormal so that quorum is maintained
-                                // and loading will go on.
-                                newVersion = replica.getVersion();
-                                newVersionHash = replica.getVersionHash();
-                                if (newCommitVersion > lastFailedVersion) {
-                                    lastFailedVersion = newCommitVersion;
-                                    lastFailedVersionHash = newCommitVersionHash;
-                                }
-                            }
-                            replica.updateVersionInfo(newVersion, newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion, lastSuccessVersionHash);
-                        }
-                    }
-                } // end for indices
-                long version = partitionCommitInfo.getVersion();
-                long versionHash = partitionCommitInfo.getVersionHash();
-                partition.updateVisibleVersionAndVersionHash(version, versionHash);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]",
-                              transactionState, partition.getId(), version, versionHash);
-                }
-            }
-        }
-        return true;
-    }
-    
-    private void updateTxnLabels(TransactionState transactionState) {
-        Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(), transactionState.getLabel());
-        if (txnIds == null) {
-            txnIds = Sets.newHashSet();
-            dbIdToTxnLabels.put(transactionState.getDbId(), transactionState.getLabel(), txnIds);
-        }
-        txnIds.add(transactionState.getTransactionId());
-    }
-    
-    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
-        Map<Long, Integer> txnNumMap = null;
-        if (curTxnState.getSourceType() == LoadJobSourceType.ROUTINE_LOAD_TASK) {
-            txnNumMap = runningRoutineLoadTxnNums;
-        } else {
-            txnNumMap = runningTxnNums;
-        }
-
-        int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0);
-        if (preStatus == null
-                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
-                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
-            ++txnNum;
-        } else if ((preStatus == TransactionStatus.PREPARE
-                || preStatus == TransactionStatus.COMMITTED)
-                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
-                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
-            --txnNum;
-        }
-
-        if (txnNum < 1) {
-            txnNumMap.remove(curTxnState.getDbId());
-        } else {
-            txnNumMap.put(curTxnState.getDbId(), txnNum);
-        }
-    }
-    
     public List<List<Comparable>> getDbInfo() {
         List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
-        readLock();
-        try {
-            Set<Long> dbIds = new HashSet<>();
-            for (TransactionState transactionState : idToTransactionState.values()) {
-                dbIds.add(transactionState.getDbId());
-            }
-            for (long dbId : dbIds) {
-                List<Comparable> info = new ArrayList<Comparable>();
-                info.add(dbId);
-                Database db = Catalog.getInstance().getDb(dbId);
-                if (db == null) {
-                    continue;
-                }
-                info.add(db.getFullName());
-                infos.add(info);
+        List<Long> dbIds = Lists.newArrayList();
+        for (Long dbId : dbIdToDatabaseTransactionMgrs.keySet()) {
+            dbIds.add(dbId);
+        }
+
+        for (long dbId : dbIds) {
+            List<Comparable> info = new ArrayList<Comparable>();
+            info.add(dbId);
+            Database db = Catalog.getInstance().getDb(dbId);
+            if (db == null) {
+                continue;
             }
-        } finally {
-            readUnlock();
+            info.add(db.getFullName());
+            infos.add(info);
         }
         return infos;
     }
     
     public List<List<String>> getDbTransStateInfo(long dbId) {
-        List<List<String>> infos = Lists.newArrayList();
-        readLock();
         try {
-            infos.add(Lists.newArrayList("running", String.valueOf(
-                    runningTxnNums.getOrDefault(dbId, 0) + runningRoutineLoadTxnNums.getOrDefault(dbId, 0))));
-            long finishedNum = idToTransactionState.values().stream().filter(
-                    t -> (t.getDbId() == dbId && t.getTransactionStatus().isFinalStatus())).count();
-            infos.add(Lists.newArrayList("finished", String.valueOf(finishedNum)));
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getDbTransStateInfo();
+        } catch (AnalysisException e) {
+            LOG.warn("Get db transaction state info failed", e);
+            return Lists.newArrayList();
         }
-        return infos;
     }
 
     public List<List<String>> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException {
-        List<List<String>> infos = new ArrayList<>();
-        readLock();
-        try {
-            Database db = Catalog.getInstance().getDb(dbId);
-            if (db == null) {
-                throw new AnalysisException("Database[" + dbId + "] does not exist");
-            }
-
-            // get transaction order by txn id desc limit 'limit'
-            idToTransactionState.values().stream()
-                    .filter(t -> (t.getDbId() == dbId && (running != t.getTransactionStatus().isFinalStatus())))
-                    .sorted(TransactionState.TXN_ID_COMPARATOR)
-                    .limit(limit)
-                    .forEach(t -> {
-                        List<String> info = Lists.newArrayList();
-                        getTxnStateInfo(t, info);
-                        infos.add(info);
-                    });
-        } finally {
-            readUnlock();
-        }
-        return infos;
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+        return dbTransactionMgr.getTxnStateInfoList(running, limit);
     }
     
     // get show info of a specified txnId
     public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws AnalysisException {
-        List<List<String>> infos = new ArrayList<List<String>>();
-        readLock();
-        try {
-            Database db = Catalog.getInstance().getDb(dbId);
-            if (db == null) {
-                throw new AnalysisException("Database[" + dbId + "] does not exist");
-            }
-            
-            TransactionState txnState = idToTransactionState.get(txnId);
-            if (txnState == null) {
-                throw new AnalysisException("transaction with id " + txnId + " does not exist");
-            }
-            
-            if (ConnectContext.get() != null) {
-                // check auth
-                Set<Long> tblIds = txnState.getIdToTableCommitInfos().keySet();
-                for (Long tblId : tblIds) {
-                    Table tbl = db.getTable(tblId);
-                    if (tbl != null) {
-                        if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), db.getFullName(),
-                                tbl.getName(), PrivPredicate.SHOW)) {
-                            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
-                                    "SHOW TRANSACTION",
-                                    ConnectContext.get().getQualifiedUser(),
-                                    ConnectContext.get().getRemoteIP(),
-                                    tbl.getName());
-                        }
-                    }
-                }
-            }
-            
-            List<String> info = Lists.newArrayList();
-            getTxnStateInfo(txnState, info);
-            infos.add(info);
-        } finally {
-            readUnlock();
-        }
-        return infos;
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+        return dbTransactionMgr.getSingleTranInfo(dbId, txnId);
     }
-    
-    private void getTxnStateInfo(TransactionState txnState, List<String> info) {
-        info.add(String.valueOf(txnState.getTransactionId()));
-        info.add(txnState.getLabel());
-        info.add(txnState.getCoordinator().toString());
-        info.add(txnState.getTransactionStatus().name());
-        info.add(txnState.getSourceType().name());
-        info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
-        info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
-        info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
-        info.add(txnState.getReason());
-        info.add(String.valueOf(txnState.getErrorReplicas().size()));
-        info.add(String.valueOf(txnState.getCallbackId()));
-        info.add(String.valueOf(txnState.getTimeoutMs()));
-    }
-
-    public List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
-        List<List<Comparable>> tableInfos = new ArrayList<>();
-        readLock();
-        try {
-            TransactionState transactionState = idToTransactionState.get(txnId);
-            if (null == transactionState) {
-                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
-            }
 
-            for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
-                List<Comparable> tableInfo = new ArrayList<>();
-                tableInfo.add(entry.getKey());
-                tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
-                        PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
-                tableInfos.add(tableInfo);
-            }
-        } finally {
-            readUnlock();
-        }
-        return tableInfos;
+    public List<List<Comparable>> getTableTransInfo(long dbId, long txnId) throws AnalysisException {
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+        return dbTransactionMgr.getTableTransInfo(txnId);
     }
     
-    public List<List<Comparable>> getPartitionTransInfo(long tid, long tableId)
+    public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid, long tableId)
             throws AnalysisException {
-        List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>();
-        readLock();
-        try {
-            TransactionState transactionState = idToTransactionState.get(tid);
-            if (null == transactionState) {
-                throw new AnalysisException("Transaction[" + tid + "] does not exist.");
-            }
-            TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId);
-            Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo();
-            for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) {
-                List<Comparable> partitionInfo = new ArrayList<Comparable>();
-                partitionInfo.add(entry.getKey());
-                partitionInfo.add(entry.getValue().getVersion());
-                partitionInfo.add(entry.getValue().getVersionHash());
-                partitionInfos.add(partitionInfo);
-            }
-        } finally {
-            readUnlock();
-        }
-        return partitionInfos;
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+        return dbTransactionMgr.getPartitionTransInfo(tid, tableId);
     }
-    
+
+    /**
+     * It is a non thread safe method, only invoked by checkpoint thread without any lock or image dump thread with db lock
+     */
     public int getTransactionNum() {
-        return this.idToTransactionState.size();
+        int txnNum = 0;
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            txnNum = txnNum + dbTransactionMgr.getTransactionNum();

Review comment:
       ```suggestion
               txnNum += dbTransactionMgr.getTransactionNum();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r419821836



##########
File path: fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
##########
@@ -800,8 +800,8 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc
             checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
                     request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD);
         }
-
-        Catalog.getCurrentGlobalTransactionMgr().abortTransaction(request.getTxnId(),
+        long dbId = Catalog.getInstance().getDb(request.getDb()).getId();

Review comment:
       `getDb()` may return null if database does not exist yet.

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +95,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
         return callbackFactory;
     }
 
+    public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        if (dbTransactionMgr == null) {
+            throw new NullPointerException("databaseTransactionMgr[" + dbId + "] does not exist");
+        }
+        return dbTransactionMgr;
+    }
+
+    public void addDatabaseTransactionMgr(Long dbId, EditLog editLog) {
+        dbIdToDatabaseTransactionMgrs.put(dbId, new DatabaseTransactionMgr(dbId, editLog));

Review comment:
       Is it more safe to use `putIfAbsent()` method?

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +95,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
         return callbackFactory;
     }
 
+    public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) {
+        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
+        if (dbTransactionMgr == null) {
+            throw new NullPointerException("databaseTransactionMgr[" + dbId + "] does not exist");

Review comment:
       Not using `NullPointerException`, you can use `TransactionException`

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -194,14 +184,14 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
                 }
             }
 
-            checkRunningTxnExceedLimit(dbId, sourceType);
+            checkRunningTxnExceedLimit(dbTransactionMgr, sourceType);

Review comment:
       I think `checkRunningTxnExceedLimit()` this method can be moved into the `DbTransactionMgr`

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -500,8 +485,15 @@ public boolean commitAndPublishTransaction(Database db, long transactionId,
         } finally {
             db.writeUnlock();
         }
-        
-        TransactionState transactionState = idToTransactionState.get(transactionId);
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(db.getId());
+        TransactionState transactionState = null;
+        dbTransactionMgr.readLock();

Review comment:
       In fact, you will find that all the logic in `GlobalTransactionMgr` is now executed in `DbTransactionMgr`. `GlobalTransactionMgr` has only become a class for managing `DbTransactionMgr`.
   
   The usage patterns of most methods are:
   
   ```
   DbTransactionMgr dbMgr = getDbTransactionMgr(dbId);
   dbMgr.lock()
   try {
       dbMgr.doSomeTxnThing();
   } finally {
       dbMgr.unlock();
   }
   ```
   
   Therefore, I think it is more appropriate to move all the main implementation logic into `DbTransactionMgr`. In this way, the responsibilities of `GlobalTransactionMgr` and `DbTransactionMgr` will be clearer. `GlobalTransactionMgr` only serves as the entry class for transaction operations and is responsible for managing `DbTransactionMgr`. And `DbTransactionMgr` is the actual transaction operation class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r415011916



##########
File path: fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DatabaseTransactionMgr {
+
+    private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+
+    // the lock is used to control the access to transaction states
+    // no other locks should be inside this lock
+    private ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true);
+
+    // transactionId -> running TransactionState
+    private Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
+
+    // transactionId -> final status TransactionState
+    private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
+
+
+    // to store transtactionStates with final status
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+
+    // label -> txn ids
+    // this is used for checking if label already used. a label may correspond to multiple txns,
+    // and only one is success.
+    // this member should be consistent with idToTransactionState,
+    // which means if a txn exist in idToRunningTransactionState or idToFinalStatusTransactionState
+    // it must exists in dbIdToTxnLabels, and vice versa
+    private Map<String, Set<Long>> labelToTxnIds = Maps.newConcurrentMap();
+
+
+    // count the number of running txns of database, except for the routine load txn
+    private AtomicInteger runningTxnNums = new AtomicInteger(0);
+
+    // count only the number of running routine load txns of database
+    private AtomicInteger runningRoutineLoadTxnNums = new AtomicInteger(0);
+
+    private EditLog editLog;
+
+    private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
+    protected void readLock() {
+        this.transactionLock.readLock().lock();
+    }
+
+    protected void readUnlock() {
+        this.transactionLock.readLock().unlock();
+    }
+
+    protected void writeLock() {
+        this.transactionLock.writeLock().lock();
+    }
+
+    protected void writeUnlock() {
+        this.transactionLock.writeLock().unlock();
+    }
+
+    public DatabaseTransactionMgr(EditLog editLog) {
+        this.editLog = editLog;
+    }
+
+    public TransactionState getTransactionState(Long transactionId) {
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState != null) {
+            return transactionState;
+        } else {
+            return idToFinalStatusTransactionState.get(transactionId);
+        }
+    }
+
+    public Set<Long> getTxnIdsByLabel(String label) {
+        return labelToTxnIds.get(label);
+    }
+
+    public int getRunningTxnNums() {
+        return runningTxnNums.get();
+    }
+
+    public int getRunningRoutineLoadTxnNums() {
+        return runningRoutineLoadTxnNums.get();
+    }
+
+    public int getFinishedTxnNums() {
+        return finalStatusTransactionStateDeque.size();
+    }
+
+    public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
+        List<List<String>> infos = Lists.newArrayList();
+        Collection<TransactionState> transactionStateCollection = null;
+        readLock();
+        try {
+            if (running) {
+                transactionStateCollection = idToRunningTransactionState.values();
+            } else {
+                transactionStateCollection = finalStatusTransactionStateDeque;
+            }
+            // get transaction order by txn id desc limit 'limit'
+            transactionStateCollection.stream()
+                    .sorted(TransactionState.TXN_ID_COMPARATOR)
+                    .limit(limit)
+                    .forEach(t -> {
+                        List<String> info = Lists.newArrayList();
+                        getTxnStateInfo(t, info);
+                        infos.add(info);
+                    });
+        } finally {
+            readUnlock();
+        }
+        return infos;
+    }
+
+    private void getTxnStateInfo(TransactionState txnState, List<String> info) {
+        info.add(String.valueOf(txnState.getTransactionId()));
+        info.add(txnState.getLabel());
+        info.add(txnState.getCoordinator().toString());
+        info.add(txnState.getTransactionStatus().name());
+        info.add(txnState.getSourceType().name());
+        info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
+        info.add(txnState.getReason());
+        info.add(String.valueOf(txnState.getErrorReplicas().size()));
+        info.add(String.valueOf(txnState.getCallbackId()));
+        info.add(String.valueOf(txnState.getTimeoutMs()));
+    }
+
+    public void deleteTransaction(TransactionState transactionState) {
+        writeLock();
+        try {
+            // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+            // it must at the front of the finalStatusTransactionStateDeque
+            if (!finalStatusTransactionStateDeque.isEmpty() &&
+            transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDeque.pop();
+                idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
+                Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                txnIds.remove(transactionState.getTransactionId());
+                if (txnIds.isEmpty()) {
+                    labelToTxnIds.remove(transactionState.getLabel());
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public Map<Long, TransactionState> getIdToRunningTransactionState() {
+        return idToRunningTransactionState;
+    }
+
+    public ArrayDeque<TransactionState> getFinalStatusTransactionStateDeque() {
+        return finalStatusTransactionStateDeque;
+    }
+
+    protected void  unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
+                                               Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                               Database db) {
+        // transaction state is modified during check if the transaction could committed
+        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                OlapTable table = (OlapTable) db.getTable(tableId);
+                Partition partition = table.getPartition(partitionId);
+                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
+                        partition.getNextVersion(),
+                        partition.getNextVersionHash());
+                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
+            }
+            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
+        }
+        // persist transactionState
+        unprotectUpsertTransactionState(transactionState, false);
+
+        // add publish version tasks. set task to null as a placeholder.
+        // tasks will be created when publishing version.
+        for (long backendId : totalInvolvedBackends) {
+            transactionState.addPublishVersionTask(backendId, null);
+        }
+    }
+
+    // for add/update/delete TransactionState
+    protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) {
+        // if this is a replay operation, we should not log it
+        if (!isReplay) {
+            if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
+                    || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) {
+                // if this is a prepare txn, and load source type is not FRONTEND
+                // no need to persist it. if prepare txn lost, the following commit will just be failed.
+                // user only need to retry this txn.
+                // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
+                editLog.logInsertTransactionState(transactionState);
+            }
+        }
+
+        if (transactionState.isRunning()) {
+            idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState);
+        } else {
+            idToRunningTransactionState.remove(transactionState.getTransactionId());
+            idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
+            finalStatusTransactionStateDeque.add(transactionState);
+        }
+
+        updateTxnLabels(transactionState);
+        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
+    }
+
+    private void updateTxnLabels(TransactionState transactionState) {
+        Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+        if (txnIds == null) {
+            txnIds = Sets.newHashSet();
+            labelToTxnIds.put(transactionState.getLabel(), txnIds);
+        }
+        txnIds.add(transactionState.getTransactionId());
+    }
+
+    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
+        AtomicInteger txnNum = null;
+        if (curTxnState.getSourceType() == TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK) {
+            txnNum = runningRoutineLoadTxnNums;
+        } else {
+            txnNum = runningTxnNums;
+        }
+
+        if (preStatus == null
+                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
+                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
+            txnNum.incrementAndGet();
+        } else if ((preStatus == TransactionStatus.PREPARE
+                || preStatus == TransactionStatus.COMMITTED)
+                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
+                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
+            txnNum.decrementAndGet();
+        }
+    }
+
+    public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException {
+        if (transactionId < 0) {
+            LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
+            return;
+        }
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        // before state transform
+        transactionState.beforeStateTransform(TransactionStatus.ABORTED);
+        boolean txnOperated = false;
+        writeLock();
+        try {
+            txnOperated = unprotectAbortTransaction(transactionId, reason);
+        } finally {
+            writeUnlock();
+            transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
+        }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+    }
+
+    private boolean unprotectAbortTransaction(long transactionId, String reason)
+            throws UserException {
+        TransactionState transactionState = getTransactionState(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            return false;
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
+                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            throw new UserException("transaction's state is already "
+                    + transactionState.getTransactionStatus() + ", could not abort");
+        }
+        transactionState.setFinishTime(System.currentTimeMillis());
+        transactionState.setReason(reason);
+        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
+        unprotectUpsertTransactionState(transactionState, false);
+        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
+            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
+        }
+        return true;
+    }
+
+    private void clearBackendTransactions(TransactionState transactionState) {
+        Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+        // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+        // to all backends.
+        List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        AgentBatchTask batchTask = null;
+        synchronized (clearTransactionTasks) {
+            for (Long beId : allBeIds) {
+                ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+                clearTransactionTasks.add(task);
+            }
+
+            // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+            if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+                batchTask = new AgentBatchTask();
+                for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+                    batchTask.addTask(clearTransactionTask);
+                }
+                clearTransactionTasks.clear();
+            }
+        }
+
+        if (batchTask != null) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
+
+    protected List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
+        List<List<Comparable>> tableInfos = new ArrayList<>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
+                List<Comparable> tableInfo = new ArrayList<>();
+                tableInfo.add(entry.getKey());
+                tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
+                        PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
+                tableInfos.add(tableInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return tableInfos;
+    }
+
+    protected List<List<Comparable>> getPartitionTransInfo(long txnId, long tableId) throws AnalysisException {
+        List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId);
+            Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo();
+            for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) {
+                List<Comparable> partitionInfo = new ArrayList<Comparable>();
+                partitionInfo.add(entry.getKey());
+                partitionInfo.add(entry.getValue().getVersion());
+                partitionInfo.add(entry.getValue().getVersionHash());
+                partitionInfos.add(partitionInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return partitionInfos;
+    }
+
+    public void removeExpiredTxns() {
+        long currentMillis = System.currentTimeMillis();
+        writeLock();
+        try {
+            while (!finalStatusTransactionStateDeque.isEmpty()) {
+                TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
+                if (transactionState.isExpired(currentMillis)) {
+                    finalStatusTransactionStateDeque.pop();
+                    Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                    txnIds.remove(transactionState.getTransactionId());
+                    if (txnIds.isEmpty()) {
+                        labelToTxnIds.remove(transactionState.getLabel());
+                    }
+                    editLog.logDeleteTransactionState(transactionState);
+                    LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
+                } else {
+                    break;
+                }
+
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public int getTransactionNum() {
+        return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();

Review comment:
       I find it is only invoked by checkpoint thread, so it is ok for not thread safe, maybe there need some mement to indicate it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
morningman commented on pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#issuecomment-623799597


   @caiconghui I have some comments left. And could you please resolve the conflicts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r415011916



##########
File path: fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DatabaseTransactionMgr {
+
+    private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+
+    // the lock is used to control the access to transaction states
+    // no other locks should be inside this lock
+    private ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true);
+
+    // transactionId -> running TransactionState
+    private Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
+
+    // transactionId -> final status TransactionState
+    private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
+
+
+    // to store transtactionStates with final status
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+
+    // label -> txn ids
+    // this is used for checking if label already used. a label may correspond to multiple txns,
+    // and only one is success.
+    // this member should be consistent with idToTransactionState,
+    // which means if a txn exist in idToRunningTransactionState or idToFinalStatusTransactionState
+    // it must exists in dbIdToTxnLabels, and vice versa
+    private Map<String, Set<Long>> labelToTxnIds = Maps.newConcurrentMap();
+
+
+    // count the number of running txns of database, except for the routine load txn
+    private AtomicInteger runningTxnNums = new AtomicInteger(0);
+
+    // count only the number of running routine load txns of database
+    private AtomicInteger runningRoutineLoadTxnNums = new AtomicInteger(0);
+
+    private EditLog editLog;
+
+    private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
+    protected void readLock() {
+        this.transactionLock.readLock().lock();
+    }
+
+    protected void readUnlock() {
+        this.transactionLock.readLock().unlock();
+    }
+
+    protected void writeLock() {
+        this.transactionLock.writeLock().lock();
+    }
+
+    protected void writeUnlock() {
+        this.transactionLock.writeLock().unlock();
+    }
+
+    public DatabaseTransactionMgr(EditLog editLog) {
+        this.editLog = editLog;
+    }
+
+    public TransactionState getTransactionState(Long transactionId) {
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState != null) {
+            return transactionState;
+        } else {
+            return idToFinalStatusTransactionState.get(transactionId);
+        }
+    }
+
+    public Set<Long> getTxnIdsByLabel(String label) {
+        return labelToTxnIds.get(label);
+    }
+
+    public int getRunningTxnNums() {
+        return runningTxnNums.get();
+    }
+
+    public int getRunningRoutineLoadTxnNums() {
+        return runningRoutineLoadTxnNums.get();
+    }
+
+    public int getFinishedTxnNums() {
+        return finalStatusTransactionStateDeque.size();
+    }
+
+    public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
+        List<List<String>> infos = Lists.newArrayList();
+        Collection<TransactionState> transactionStateCollection = null;
+        readLock();
+        try {
+            if (running) {
+                transactionStateCollection = idToRunningTransactionState.values();
+            } else {
+                transactionStateCollection = finalStatusTransactionStateDeque;
+            }
+            // get transaction order by txn id desc limit 'limit'
+            transactionStateCollection.stream()
+                    .sorted(TransactionState.TXN_ID_COMPARATOR)
+                    .limit(limit)
+                    .forEach(t -> {
+                        List<String> info = Lists.newArrayList();
+                        getTxnStateInfo(t, info);
+                        infos.add(info);
+                    });
+        } finally {
+            readUnlock();
+        }
+        return infos;
+    }
+
+    private void getTxnStateInfo(TransactionState txnState, List<String> info) {
+        info.add(String.valueOf(txnState.getTransactionId()));
+        info.add(txnState.getLabel());
+        info.add(txnState.getCoordinator().toString());
+        info.add(txnState.getTransactionStatus().name());
+        info.add(txnState.getSourceType().name());
+        info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
+        info.add(txnState.getReason());
+        info.add(String.valueOf(txnState.getErrorReplicas().size()));
+        info.add(String.valueOf(txnState.getCallbackId()));
+        info.add(String.valueOf(txnState.getTimeoutMs()));
+    }
+
+    public void deleteTransaction(TransactionState transactionState) {
+        writeLock();
+        try {
+            // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+            // it must at the front of the finalStatusTransactionStateDeque
+            if (!finalStatusTransactionStateDeque.isEmpty() &&
+            transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDeque.pop();
+                idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
+                Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                txnIds.remove(transactionState.getTransactionId());
+                if (txnIds.isEmpty()) {
+                    labelToTxnIds.remove(transactionState.getLabel());
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public Map<Long, TransactionState> getIdToRunningTransactionState() {
+        return idToRunningTransactionState;
+    }
+
+    public ArrayDeque<TransactionState> getFinalStatusTransactionStateDeque() {
+        return finalStatusTransactionStateDeque;
+    }
+
+    protected void  unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
+                                               Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                               Database db) {
+        // transaction state is modified during check if the transaction could committed
+        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                OlapTable table = (OlapTable) db.getTable(tableId);
+                Partition partition = table.getPartition(partitionId);
+                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
+                        partition.getNextVersion(),
+                        partition.getNextVersionHash());
+                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
+            }
+            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
+        }
+        // persist transactionState
+        unprotectUpsertTransactionState(transactionState, false);
+
+        // add publish version tasks. set task to null as a placeholder.
+        // tasks will be created when publishing version.
+        for (long backendId : totalInvolvedBackends) {
+            transactionState.addPublishVersionTask(backendId, null);
+        }
+    }
+
+    // for add/update/delete TransactionState
+    protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) {
+        // if this is a replay operation, we should not log it
+        if (!isReplay) {
+            if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
+                    || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) {
+                // if this is a prepare txn, and load source type is not FRONTEND
+                // no need to persist it. if prepare txn lost, the following commit will just be failed.
+                // user only need to retry this txn.
+                // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
+                editLog.logInsertTransactionState(transactionState);
+            }
+        }
+
+        if (transactionState.isRunning()) {
+            idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState);
+        } else {
+            idToRunningTransactionState.remove(transactionState.getTransactionId());
+            idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
+            finalStatusTransactionStateDeque.add(transactionState);
+        }
+
+        updateTxnLabels(transactionState);
+        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
+    }
+
+    private void updateTxnLabels(TransactionState transactionState) {
+        Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+        if (txnIds == null) {
+            txnIds = Sets.newHashSet();
+            labelToTxnIds.put(transactionState.getLabel(), txnIds);
+        }
+        txnIds.add(transactionState.getTransactionId());
+    }
+
+    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
+        AtomicInteger txnNum = null;
+        if (curTxnState.getSourceType() == TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK) {
+            txnNum = runningRoutineLoadTxnNums;
+        } else {
+            txnNum = runningTxnNums;
+        }
+
+        if (preStatus == null
+                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
+                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
+            txnNum.incrementAndGet();
+        } else if ((preStatus == TransactionStatus.PREPARE
+                || preStatus == TransactionStatus.COMMITTED)
+                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
+                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
+            txnNum.decrementAndGet();
+        }
+    }
+
+    public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException {
+        if (transactionId < 0) {
+            LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
+            return;
+        }
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        // before state transform
+        transactionState.beforeStateTransform(TransactionStatus.ABORTED);
+        boolean txnOperated = false;
+        writeLock();
+        try {
+            txnOperated = unprotectAbortTransaction(transactionId, reason);
+        } finally {
+            writeUnlock();
+            transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
+        }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+    }
+
+    private boolean unprotectAbortTransaction(long transactionId, String reason)
+            throws UserException {
+        TransactionState transactionState = getTransactionState(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            return false;
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
+                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            throw new UserException("transaction's state is already "
+                    + transactionState.getTransactionStatus() + ", could not abort");
+        }
+        transactionState.setFinishTime(System.currentTimeMillis());
+        transactionState.setReason(reason);
+        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
+        unprotectUpsertTransactionState(transactionState, false);
+        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
+            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
+        }
+        return true;
+    }
+
+    private void clearBackendTransactions(TransactionState transactionState) {
+        Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+        // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+        // to all backends.
+        List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        AgentBatchTask batchTask = null;
+        synchronized (clearTransactionTasks) {
+            for (Long beId : allBeIds) {
+                ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+                clearTransactionTasks.add(task);
+            }
+
+            // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+            if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+                batchTask = new AgentBatchTask();
+                for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+                    batchTask.addTask(clearTransactionTask);
+                }
+                clearTransactionTasks.clear();
+            }
+        }
+
+        if (batchTask != null) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
+
+    protected List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
+        List<List<Comparable>> tableInfos = new ArrayList<>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
+                List<Comparable> tableInfo = new ArrayList<>();
+                tableInfo.add(entry.getKey());
+                tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
+                        PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
+                tableInfos.add(tableInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return tableInfos;
+    }
+
+    protected List<List<Comparable>> getPartitionTransInfo(long txnId, long tableId) throws AnalysisException {
+        List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId);
+            Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo();
+            for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) {
+                List<Comparable> partitionInfo = new ArrayList<Comparable>();
+                partitionInfo.add(entry.getKey());
+                partitionInfo.add(entry.getValue().getVersion());
+                partitionInfo.add(entry.getValue().getVersionHash());
+                partitionInfos.add(partitionInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return partitionInfos;
+    }
+
+    public void removeExpiredTxns() {
+        long currentMillis = System.currentTimeMillis();
+        writeLock();
+        try {
+            while (!finalStatusTransactionStateDeque.isEmpty()) {
+                TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
+                if (transactionState.isExpired(currentMillis)) {
+                    finalStatusTransactionStateDeque.pop();
+                    Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                    txnIds.remove(transactionState.getTransactionId());
+                    if (txnIds.isEmpty()) {
+                        labelToTxnIds.remove(transactionState.getLabel());
+                    }
+                    editLog.logDeleteTransactionState(transactionState);
+                    LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
+                } else {
+                    break;
+                }
+
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public int getTransactionNum() {
+        return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();

Review comment:
       I find it is only invoked by checkpoint thread, so it is ok for not thread safe, maybe there need some comment to explain it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r415015679



##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -500,8 +485,15 @@ public boolean commitAndPublishTransaction(Database db, long transactionId,
         } finally {
             db.writeUnlock();
         }
-        
-        TransactionState transactionState = idToTransactionState.get(transactionId);
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(db.getId());
+        TransactionState transactionState = null;
+        dbTransactionMgr.readLock();

Review comment:
       it is only exposed to GlobalTransactionMgr, we still use GlobalTransactionMgr to do some txn management work in global view, DatabaeTransactionMgr cannot be an independent existence without GlobalTransactionMgr. it is normal for  GlobalTransactionMgr to invoke some DatabaeTransactionMgr's methods and ensure they are 'atomatic' by lock, while DatabaseTransactionMgr only need to provide some basic methods about txn. If we not expose dbTransactionMgr's lock to GlobalTransactionMgr, the work for DatabaeTransactionMgr is too heavy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r422612957



##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -1346,81 +359,71 @@ public TransactionIdGenerator getTransactionIDGenerator() {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        int numTransactions = idToTransactionState.size();
+        int numTransactions = getTransactionNum();
         out.writeInt(numTransactions);
-        for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
-            entry.getValue().write(out);
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.unprotectWriteAllTransactionStates(out);
         }
         idGenerator.write(out);
     }
     
     public void readFields(DataInput in) throws IOException {
-        int numTransactions = in.readInt();
-        for (int i = 0; i < numTransactions; ++i) {
-            TransactionState transactionState = new TransactionState();
-            transactionState.readFields(in);
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
+        try {
+            int numTransactions = in.readInt();
+            for (int i = 0; i < numTransactions; ++i) {
+                TransactionState transactionState = new TransactionState();
+                transactionState.readFields(in);
+                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+                dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
+            }
+            idGenerator.readFields(in);
+        } catch (AnalysisException e) {
+            throw new IOException("Read transaction states failed", e);
         }
-        idGenerator.readFields(in);
+
     }
 
-    public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set<TransactionStatus> status) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackIdAndStatus(long dbId, long callbackId, Set<TransactionStatus> status) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId and status failed", e);
+            return null;
         }
-        return null;
     }
 
-    public TransactionState getTransactionStateByCallbackId(long callbackId) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackId(long dbId, long callbackId) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId failed", e);
+            return null;
         }
-        return null;
     }
 
-    public List<Long> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
-        ArrayList<Long> txnIds = new ArrayList<>();
-        readLock();
-        try {
-            idToTransactionState.values().stream()
-                    .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE
-                            && t.getCoordinator().ip.equals(coordinateHost)
-                            && (!t.getTransactionStatus().isFinalStatus())))
-                    .limit(limit)
-                    .forEach(t -> txnIds.add(t.getTransactionId()));
-        } finally {
-            readUnlock();
+    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
+        ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
+        for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit));
+            if (txnInfos.size() > limit) {
+                break;
+            }
         }
-        return txnIds;
+        return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos;

Review comment:
       I think it's OK, cause no one will modify that list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r421610744



##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -1346,81 +359,71 @@ public TransactionIdGenerator getTransactionIDGenerator() {
 
     @Override
     public void write(DataOutput out) throws IOException {
-        int numTransactions = idToTransactionState.size();
+        int numTransactions = getTransactionNum();
         out.writeInt(numTransactions);
-        for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
-            entry.getValue().write(out);
+        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            dbTransactionMgr.unprotectWriteAllTransactionStates(out);
         }
         idGenerator.write(out);
     }
     
     public void readFields(DataInput in) throws IOException {
-        int numTransactions = in.readInt();
-        for (int i = 0; i < numTransactions; ++i) {
-            TransactionState transactionState = new TransactionState();
-            transactionState.readFields(in);
-            TransactionState preTxnState = idToTransactionState.get(transactionState.getTransactionId());
-            idToTransactionState.put(transactionState.getTransactionId(), transactionState);
-            updateTxnLabels(transactionState);
-            updateDbRunningTxnNum(preTxnState == null ? null : preTxnState.getTransactionStatus(),
-                                  transactionState);
+        try {
+            int numTransactions = in.readInt();
+            for (int i = 0; i < numTransactions; ++i) {
+                TransactionState transactionState = new TransactionState();
+                transactionState.readFields(in);
+                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(transactionState.getDbId());
+                dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
+            }
+            idGenerator.readFields(in);
+        } catch (AnalysisException e) {
+            throw new IOException("Read transaction states failed", e);
         }
-        idGenerator.readFields(in);
+
     }
 
-    public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set<TransactionStatus> status) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackIdAndStatus(long dbId, long callbackId, Set<TransactionStatus> status) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId and status failed", e);
+            return null;
         }
-        return null;
     }
 
-    public TransactionState getTransactionStateByCallbackId(long callbackId) {
-        readLock();
+    public TransactionState getTransactionStateByCallbackId(long dbId, long callbackId) {
         try {
-            for (TransactionState txn : idToTransactionState.values()) {
-                if (txn.getCallbackId() == callbackId) {
-                    return txn;
-                }
-            }
-        } finally {
-            readUnlock();
+            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(dbId);
+            return dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
+        } catch (AnalysisException e) {
+            LOG.warn("Get transaction by callbackId failed", e);
+            return null;
         }
-        return null;
     }
 
-    public List<Long> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
-        ArrayList<Long> txnIds = new ArrayList<>();
-        readLock();
-        try {
-            idToTransactionState.values().stream()
-                    .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE
-                            && t.getCoordinator().ip.equals(coordinateHost)
-                            && (!t.getTransactionStatus().isFinalStatus())))
-                    .limit(limit)
-                    .forEach(t -> txnIds.add(t.getTransactionId()));
-        } finally {
-            readUnlock();
+    public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
+        ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
+        for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
+            txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit));
+            if (txnInfos.size() > limit) {
+                break;
+            }
         }
-        return txnIds;
+        return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos;

Review comment:
       @morningman  subList is a view of origin list, so is it a good way to directly return sublist?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r415011916



##########
File path: fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DatabaseTransactionMgr {
+
+    private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+
+    // the lock is used to control the access to transaction states
+    // no other locks should be inside this lock
+    private ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true);
+
+    // transactionId -> running TransactionState
+    private Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
+
+    // transactionId -> final status TransactionState
+    private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
+
+
+    // to store transtactionStates with final status
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+
+    // label -> txn ids
+    // this is used for checking if label already used. a label may correspond to multiple txns,
+    // and only one is success.
+    // this member should be consistent with idToTransactionState,
+    // which means if a txn exist in idToRunningTransactionState or idToFinalStatusTransactionState
+    // it must exists in dbIdToTxnLabels, and vice versa
+    private Map<String, Set<Long>> labelToTxnIds = Maps.newConcurrentMap();
+
+
+    // count the number of running txns of database, except for the routine load txn
+    private AtomicInteger runningTxnNums = new AtomicInteger(0);
+
+    // count only the number of running routine load txns of database
+    private AtomicInteger runningRoutineLoadTxnNums = new AtomicInteger(0);
+
+    private EditLog editLog;
+
+    private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
+    protected void readLock() {
+        this.transactionLock.readLock().lock();
+    }
+
+    protected void readUnlock() {
+        this.transactionLock.readLock().unlock();
+    }
+
+    protected void writeLock() {
+        this.transactionLock.writeLock().lock();
+    }
+
+    protected void writeUnlock() {
+        this.transactionLock.writeLock().unlock();
+    }
+
+    public DatabaseTransactionMgr(EditLog editLog) {
+        this.editLog = editLog;
+    }
+
+    public TransactionState getTransactionState(Long transactionId) {
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState != null) {
+            return transactionState;
+        } else {
+            return idToFinalStatusTransactionState.get(transactionId);
+        }
+    }
+
+    public Set<Long> getTxnIdsByLabel(String label) {
+        return labelToTxnIds.get(label);
+    }
+
+    public int getRunningTxnNums() {
+        return runningTxnNums.get();
+    }
+
+    public int getRunningRoutineLoadTxnNums() {
+        return runningRoutineLoadTxnNums.get();
+    }
+
+    public int getFinishedTxnNums() {
+        return finalStatusTransactionStateDeque.size();
+    }
+
+    public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
+        List<List<String>> infos = Lists.newArrayList();
+        Collection<TransactionState> transactionStateCollection = null;
+        readLock();
+        try {
+            if (running) {
+                transactionStateCollection = idToRunningTransactionState.values();
+            } else {
+                transactionStateCollection = finalStatusTransactionStateDeque;
+            }
+            // get transaction order by txn id desc limit 'limit'
+            transactionStateCollection.stream()
+                    .sorted(TransactionState.TXN_ID_COMPARATOR)
+                    .limit(limit)
+                    .forEach(t -> {
+                        List<String> info = Lists.newArrayList();
+                        getTxnStateInfo(t, info);
+                        infos.add(info);
+                    });
+        } finally {
+            readUnlock();
+        }
+        return infos;
+    }
+
+    private void getTxnStateInfo(TransactionState txnState, List<String> info) {
+        info.add(String.valueOf(txnState.getTransactionId()));
+        info.add(txnState.getLabel());
+        info.add(txnState.getCoordinator().toString());
+        info.add(txnState.getTransactionStatus().name());
+        info.add(txnState.getSourceType().name());
+        info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
+        info.add(txnState.getReason());
+        info.add(String.valueOf(txnState.getErrorReplicas().size()));
+        info.add(String.valueOf(txnState.getCallbackId()));
+        info.add(String.valueOf(txnState.getTimeoutMs()));
+    }
+
+    public void deleteTransaction(TransactionState transactionState) {
+        writeLock();
+        try {
+            // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+            // it must at the front of the finalStatusTransactionStateDeque
+            if (!finalStatusTransactionStateDeque.isEmpty() &&
+            transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDeque.pop();
+                idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
+                Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                txnIds.remove(transactionState.getTransactionId());
+                if (txnIds.isEmpty()) {
+                    labelToTxnIds.remove(transactionState.getLabel());
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public Map<Long, TransactionState> getIdToRunningTransactionState() {
+        return idToRunningTransactionState;
+    }
+
+    public ArrayDeque<TransactionState> getFinalStatusTransactionStateDeque() {
+        return finalStatusTransactionStateDeque;
+    }
+
+    protected void  unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
+                                               Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                               Database db) {
+        // transaction state is modified during check if the transaction could committed
+        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                OlapTable table = (OlapTable) db.getTable(tableId);
+                Partition partition = table.getPartition(partitionId);
+                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
+                        partition.getNextVersion(),
+                        partition.getNextVersionHash());
+                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
+            }
+            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
+        }
+        // persist transactionState
+        unprotectUpsertTransactionState(transactionState, false);
+
+        // add publish version tasks. set task to null as a placeholder.
+        // tasks will be created when publishing version.
+        for (long backendId : totalInvolvedBackends) {
+            transactionState.addPublishVersionTask(backendId, null);
+        }
+    }
+
+    // for add/update/delete TransactionState
+    protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) {
+        // if this is a replay operation, we should not log it
+        if (!isReplay) {
+            if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
+                    || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) {
+                // if this is a prepare txn, and load source type is not FRONTEND
+                // no need to persist it. if prepare txn lost, the following commit will just be failed.
+                // user only need to retry this txn.
+                // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
+                editLog.logInsertTransactionState(transactionState);
+            }
+        }
+
+        if (transactionState.isRunning()) {
+            idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState);
+        } else {
+            idToRunningTransactionState.remove(transactionState.getTransactionId());
+            idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
+            finalStatusTransactionStateDeque.add(transactionState);
+        }
+
+        updateTxnLabels(transactionState);
+        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
+    }
+
+    private void updateTxnLabels(TransactionState transactionState) {
+        Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+        if (txnIds == null) {
+            txnIds = Sets.newHashSet();
+            labelToTxnIds.put(transactionState.getLabel(), txnIds);
+        }
+        txnIds.add(transactionState.getTransactionId());
+    }
+
+    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
+        AtomicInteger txnNum = null;
+        if (curTxnState.getSourceType() == TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK) {
+            txnNum = runningRoutineLoadTxnNums;
+        } else {
+            txnNum = runningTxnNums;
+        }
+
+        if (preStatus == null
+                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
+                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
+            txnNum.incrementAndGet();
+        } else if ((preStatus == TransactionStatus.PREPARE
+                || preStatus == TransactionStatus.COMMITTED)
+                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
+                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
+            txnNum.decrementAndGet();
+        }
+    }
+
+    public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException {
+        if (transactionId < 0) {
+            LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
+            return;
+        }
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        // before state transform
+        transactionState.beforeStateTransform(TransactionStatus.ABORTED);
+        boolean txnOperated = false;
+        writeLock();
+        try {
+            txnOperated = unprotectAbortTransaction(transactionId, reason);
+        } finally {
+            writeUnlock();
+            transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
+        }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+    }
+
+    private boolean unprotectAbortTransaction(long transactionId, String reason)
+            throws UserException {
+        TransactionState transactionState = getTransactionState(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            return false;
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
+                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            throw new UserException("transaction's state is already "
+                    + transactionState.getTransactionStatus() + ", could not abort");
+        }
+        transactionState.setFinishTime(System.currentTimeMillis());
+        transactionState.setReason(reason);
+        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
+        unprotectUpsertTransactionState(transactionState, false);
+        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
+            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
+        }
+        return true;
+    }
+
+    private void clearBackendTransactions(TransactionState transactionState) {
+        Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+        // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+        // to all backends.
+        List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        AgentBatchTask batchTask = null;
+        synchronized (clearTransactionTasks) {
+            for (Long beId : allBeIds) {
+                ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+                clearTransactionTasks.add(task);
+            }
+
+            // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+            if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+                batchTask = new AgentBatchTask();
+                for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+                    batchTask.addTask(clearTransactionTask);
+                }
+                clearTransactionTasks.clear();
+            }
+        }
+
+        if (batchTask != null) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
+
+    protected List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
+        List<List<Comparable>> tableInfos = new ArrayList<>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
+                List<Comparable> tableInfo = new ArrayList<>();
+                tableInfo.add(entry.getKey());
+                tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
+                        PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
+                tableInfos.add(tableInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return tableInfos;
+    }
+
+    protected List<List<Comparable>> getPartitionTransInfo(long txnId, long tableId) throws AnalysisException {
+        List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId);
+            Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo();
+            for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) {
+                List<Comparable> partitionInfo = new ArrayList<Comparable>();
+                partitionInfo.add(entry.getKey());
+                partitionInfo.add(entry.getValue().getVersion());
+                partitionInfo.add(entry.getValue().getVersionHash());
+                partitionInfos.add(partitionInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return partitionInfos;
+    }
+
+    public void removeExpiredTxns() {
+        long currentMillis = System.currentTimeMillis();
+        writeLock();
+        try {
+            while (!finalStatusTransactionStateDeque.isEmpty()) {
+                TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
+                if (transactionState.isExpired(currentMillis)) {
+                    finalStatusTransactionStateDeque.pop();
+                    Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                    txnIds.remove(transactionState.getTransactionId());
+                    if (txnIds.isEmpty()) {
+                        labelToTxnIds.remove(transactionState.getLabel());
+                    }
+                    editLog.logDeleteTransactionState(transactionState);
+                    LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
+                } else {
+                    break;
+                }
+
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public int getTransactionNum() {
+        return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();

Review comment:
       I find it is only invoked by checkpoint thread or image dump thread with db lock, so it is ok for not thread safe, maybe there need some comment to explain it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3369: Support txn management in db level isolation and use ArrayDeque to improve txn task performance

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r412283934



##########
File path: fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DatabaseTransactionMgr {

Review comment:
       Please add some comment for this class

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -216,11 +206,11 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
             }
             throw e;
         } finally {
-            writeUnlock();
+            dbTransactionMgr.writeUnlock();
         }
     }
     
-    private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType sourceType) throws BeginTransactionException {
+    private void checkRunningTxnExceedLimit(DatabaseTransactionMgr dbTransactionMgr, Long dbId, LoadJobSourceType sourceType) throws BeginTransactionException {

Review comment:
       Parameter `dbTransactionMgr` and `dbId` are redundant, I think we can get dbTransactionMgr from dbId, and we can also get dbId from dbTransactionMgr.
   So only one of them is needed.

##########
File path: fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -500,8 +485,15 @@ public boolean commitAndPublishTransaction(Database db, long transactionId,
         } finally {
             db.writeUnlock();
         }
-        
-        TransactionState transactionState = idToTransactionState.get(transactionId);
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactioMgr(db.getId());
+        TransactionState transactionState = null;
+        dbTransactionMgr.readLock();

Review comment:
       How about not expose the dbTransactionMgr's lock outside? 

##########
File path: fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
##########
@@ -0,0 +1,556 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
+import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DatabaseTransactionMgr {
+
+    private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+
+    // the lock is used to control the access to transaction states
+    // no other locks should be inside this lock
+    private ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true);
+
+    // transactionId -> running TransactionState
+    private Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
+
+    // transactionId -> final status TransactionState
+    private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
+
+
+    // to store transtactionStates with final status
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+
+    // label -> txn ids
+    // this is used for checking if label already used. a label may correspond to multiple txns,
+    // and only one is success.
+    // this member should be consistent with idToTransactionState,
+    // which means if a txn exist in idToRunningTransactionState or idToFinalStatusTransactionState
+    // it must exists in dbIdToTxnLabels, and vice versa
+    private Map<String, Set<Long>> labelToTxnIds = Maps.newConcurrentMap();
+
+
+    // count the number of running txns of database, except for the routine load txn
+    private AtomicInteger runningTxnNums = new AtomicInteger(0);
+
+    // count only the number of running routine load txns of database
+    private AtomicInteger runningRoutineLoadTxnNums = new AtomicInteger(0);
+
+    private EditLog editLog;
+
+    private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
+    protected void readLock() {
+        this.transactionLock.readLock().lock();
+    }
+
+    protected void readUnlock() {
+        this.transactionLock.readLock().unlock();
+    }
+
+    protected void writeLock() {
+        this.transactionLock.writeLock().lock();
+    }
+
+    protected void writeUnlock() {
+        this.transactionLock.writeLock().unlock();
+    }
+
+    public DatabaseTransactionMgr(EditLog editLog) {
+        this.editLog = editLog;
+    }
+
+    public TransactionState getTransactionState(Long transactionId) {
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState != null) {
+            return transactionState;
+        } else {
+            return idToFinalStatusTransactionState.get(transactionId);
+        }
+    }
+
+    public Set<Long> getTxnIdsByLabel(String label) {
+        return labelToTxnIds.get(label);
+    }
+
+    public int getRunningTxnNums() {
+        return runningTxnNums.get();
+    }
+
+    public int getRunningRoutineLoadTxnNums() {
+        return runningRoutineLoadTxnNums.get();
+    }
+
+    public int getFinishedTxnNums() {
+        return finalStatusTransactionStateDeque.size();
+    }
+
+    public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
+        List<List<String>> infos = Lists.newArrayList();
+        Collection<TransactionState> transactionStateCollection = null;
+        readLock();
+        try {
+            if (running) {
+                transactionStateCollection = idToRunningTransactionState.values();
+            } else {
+                transactionStateCollection = finalStatusTransactionStateDeque;
+            }
+            // get transaction order by txn id desc limit 'limit'
+            transactionStateCollection.stream()
+                    .sorted(TransactionState.TXN_ID_COMPARATOR)
+                    .limit(limit)
+                    .forEach(t -> {
+                        List<String> info = Lists.newArrayList();
+                        getTxnStateInfo(t, info);
+                        infos.add(info);
+                    });
+        } finally {
+            readUnlock();
+        }
+        return infos;
+    }
+
+    private void getTxnStateInfo(TransactionState txnState, List<String> info) {
+        info.add(String.valueOf(txnState.getTransactionId()));
+        info.add(txnState.getLabel());
+        info.add(txnState.getCoordinator().toString());
+        info.add(txnState.getTransactionStatus().name());
+        info.add(txnState.getSourceType().name());
+        info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
+        info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
+        info.add(txnState.getReason());
+        info.add(String.valueOf(txnState.getErrorReplicas().size()));
+        info.add(String.valueOf(txnState.getCallbackId()));
+        info.add(String.valueOf(txnState.getTimeoutMs()));
+    }
+
+    public void deleteTransaction(TransactionState transactionState) {
+        writeLock();
+        try {
+            // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+            // it must at the front of the finalStatusTransactionStateDeque
+            if (!finalStatusTransactionStateDeque.isEmpty() &&
+            transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDeque.pop();
+                idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
+                Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                txnIds.remove(transactionState.getTransactionId());
+                if (txnIds.isEmpty()) {
+                    labelToTxnIds.remove(transactionState.getLabel());
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public Map<Long, TransactionState> getIdToRunningTransactionState() {
+        return idToRunningTransactionState;
+    }
+
+    public ArrayDeque<TransactionState> getFinalStatusTransactionStateDeque() {
+        return finalStatusTransactionStateDeque;
+    }
+
+    protected void  unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
+                                               Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
+                                               Database db) {
+        // transaction state is modified during check if the transaction could committed
+        if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                OlapTable table = (OlapTable) db.getTable(tableId);
+                Partition partition = table.getPartition(partitionId);
+                PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
+                        partition.getNextVersion(),
+                        partition.getNextVersionHash());
+                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
+            }
+            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
+        }
+        // persist transactionState
+        unprotectUpsertTransactionState(transactionState, false);
+
+        // add publish version tasks. set task to null as a placeholder.
+        // tasks will be created when publishing version.
+        for (long backendId : totalInvolvedBackends) {
+            transactionState.addPublishVersionTask(backendId, null);
+        }
+    }
+
+    // for add/update/delete TransactionState
+    protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) {
+        // if this is a replay operation, we should not log it
+        if (!isReplay) {
+            if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE
+                    || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) {
+                // if this is a prepare txn, and load source type is not FRONTEND
+                // no need to persist it. if prepare txn lost, the following commit will just be failed.
+                // user only need to retry this txn.
+                // The FRONTEND type txn is committed and running asynchronously, so we have to persist it.
+                editLog.logInsertTransactionState(transactionState);
+            }
+        }
+
+        if (transactionState.isRunning()) {
+            idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState);
+        } else {
+            idToRunningTransactionState.remove(transactionState.getTransactionId());
+            idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
+            finalStatusTransactionStateDeque.add(transactionState);
+        }
+
+        updateTxnLabels(transactionState);
+        updateDbRunningTxnNum(transactionState.getPreStatus(), transactionState);
+    }
+
+    private void updateTxnLabels(TransactionState transactionState) {
+        Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+        if (txnIds == null) {
+            txnIds = Sets.newHashSet();
+            labelToTxnIds.put(transactionState.getLabel(), txnIds);
+        }
+        txnIds.add(transactionState.getTransactionId());
+    }
+
+    private void updateDbRunningTxnNum(TransactionStatus preStatus, TransactionState curTxnState) {
+        AtomicInteger txnNum = null;
+        if (curTxnState.getSourceType() == TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK) {
+            txnNum = runningRoutineLoadTxnNums;
+        } else {
+            txnNum = runningTxnNums;
+        }
+
+        if (preStatus == null
+                && (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
+                || curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
+            txnNum.incrementAndGet();
+        } else if ((preStatus == TransactionStatus.PREPARE
+                || preStatus == TransactionStatus.COMMITTED)
+                && (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
+                || curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
+            txnNum.decrementAndGet();
+        }
+    }
+
+    public void abortTransaction(long transactionId, String reason, TxnCommitAttachment txnCommitAttachment) throws UserException {
+        if (transactionId < 0) {
+            LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
+            return;
+        }
+        TransactionState transactionState = idToRunningTransactionState.get(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+
+        // update transaction state extra if exists
+        if (txnCommitAttachment != null) {
+            transactionState.setTxnCommitAttachment(txnCommitAttachment);
+        }
+
+        // before state transform
+        transactionState.beforeStateTransform(TransactionStatus.ABORTED);
+        boolean txnOperated = false;
+        writeLock();
+        try {
+            txnOperated = unprotectAbortTransaction(transactionId, reason);
+        } finally {
+            writeUnlock();
+            transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
+        }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+    }
+
+    private boolean unprotectAbortTransaction(long transactionId, String reason)
+            throws UserException {
+        TransactionState transactionState = getTransactionState(transactionId);
+        if (transactionState == null) {
+            throw new UserException("transaction not found");
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            return false;
+        }
+        if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
+                || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+            throw new UserException("transaction's state is already "
+                    + transactionState.getTransactionStatus() + ", could not abort");
+        }
+        transactionState.setFinishTime(System.currentTimeMillis());
+        transactionState.setReason(reason);
+        transactionState.setTransactionStatus(TransactionStatus.ABORTED);
+        unprotectUpsertTransactionState(transactionState, false);
+        for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
+            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
+        }
+        return true;
+    }
+
+    private void clearBackendTransactions(TransactionState transactionState) {
+        Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+        // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+        // to all backends.
+        List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        AgentBatchTask batchTask = null;
+        synchronized (clearTransactionTasks) {
+            for (Long beId : allBeIds) {
+                ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+                clearTransactionTasks.add(task);
+            }
+
+            // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+            if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+                batchTask = new AgentBatchTask();
+                for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+                    batchTask.addTask(clearTransactionTask);
+                }
+                clearTransactionTasks.clear();
+            }
+        }
+
+        if (batchTask != null) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
+
+    protected List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
+        List<List<Comparable>> tableInfos = new ArrayList<>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
+                List<Comparable> tableInfo = new ArrayList<>();
+                tableInfo.add(entry.getKey());
+                tableInfo.add(Joiner.on(", ").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
+                        PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
+                tableInfos.add(tableInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return tableInfos;
+    }
+
+    protected List<List<Comparable>> getPartitionTransInfo(long txnId, long tableId) throws AnalysisException {
+        List<List<Comparable>> partitionInfos = new ArrayList<List<Comparable>>();
+        readLock();
+        try {
+            TransactionState transactionState = getTransactionState(txnId);
+            if (null == transactionState) {
+                throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
+            }
+
+            TableCommitInfo tableCommitInfo = transactionState.getIdToTableCommitInfos().get(tableId);
+            Map<Long, PartitionCommitInfo> idToPartitionCommitInfo = tableCommitInfo.getIdToPartitionCommitInfo();
+            for (Map.Entry<Long, PartitionCommitInfo> entry : idToPartitionCommitInfo.entrySet()) {
+                List<Comparable> partitionInfo = new ArrayList<Comparable>();
+                partitionInfo.add(entry.getKey());
+                partitionInfo.add(entry.getValue().getVersion());
+                partitionInfo.add(entry.getValue().getVersionHash());
+                partitionInfos.add(partitionInfo);
+            }
+        } finally {
+            readUnlock();
+        }
+        return partitionInfos;
+    }
+
+    public void removeExpiredTxns() {
+        long currentMillis = System.currentTimeMillis();
+        writeLock();
+        try {
+            while (!finalStatusTransactionStateDeque.isEmpty()) {
+                TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
+                if (transactionState.isExpired(currentMillis)) {
+                    finalStatusTransactionStateDeque.pop();
+                    Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
+                    txnIds.remove(transactionState.getTransactionId());
+                    if (txnIds.isEmpty()) {
+                        labelToTxnIds.remove(transactionState.getLabel());
+                    }
+                    editLog.logDeleteTransactionState(transactionState);
+                    LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
+                } else {
+                    break;
+                }
+
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public int getTransactionNum() {
+        return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();

Review comment:
       finalStatusTransactionStateDeque is not thread safe. So does it matter to call this without ReadLock?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org