You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/04/23 01:47:40 UTC

[incubator-doris] branch master updated: [Optimize] Remove expired txns in batch to avoid holding lock for too long (#5675)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b93e841  [Optimize] Remove expired txns in batch to avoid holding lock for too long (#5675)
b93e841 is described below

commit b93e8416885ac440e9c294c4bb2089e33bde788b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 23 09:47:30 2021 +0800

    [Optimize] Remove expired txns in batch to avoid holding lock for too long (#5675)
    
    This CL mainly changes:
    
    1.  Add a config to control the expire time of load job
    
        Add a new FE config "streaming_label_keep_max_second" to control
        the expire time of some high frequency load job such as INSERT and STREAM LOAD.
    
    2. Remove expired txn in batch to avoid holding transaction lock for a long time
---
 be/src/olap/delta_writer.cpp                       |   2 +-
 be/src/runtime/mysql_result_writer.cpp             |   2 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  17 +--
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../org/apache/doris/journal/JournalEntity.java    |  10 +-
 .../src/main/java/org/apache/doris/load/Load.java  |   3 +-
 .../main/java/org/apache/doris/load/LoadJob.java   |   6 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  14 ++
 .../org/apache/doris/load/loadv2/LoadManager.java  |  20 +--
 .../persist/BatchRemoveTransactionsOperation.java  |  57 ++++++++
 .../java/org/apache/doris/persist/EditLog.java     |  15 ++-
 .../org/apache/doris/persist/OperationType.java    |   3 +
 .../doris/transaction/DatabaseTransactionMgr.java  | 145 +++++++++++++++------
 .../doris/transaction/GlobalTransactionMgr.java    |  24 +++-
 .../apache/doris/transaction/TransactionState.java |  16 ++-
 .../java/org/apache/doris/catalog/FakeEditLog.java |   6 +
 .../apache/doris/load/loadv2/LoadManagerTest.java  |   5 +-
 .../BatchRemoveTransactionOperationTest.java       |  71 ++++++++++
 .../transaction/DatabaseTransactionMgrTest.java    |   7 +-
 19 files changed, 341 insertions(+), 88 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 4ab2fb5..692b136 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -108,7 +108,7 @@ OLAPStatus DeltaWriter::init() {
         LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
                      << ", exceed limit: " << config::max_tablet_version_num
                      << ". tablet: " << _tablet->full_name();
-        return OLAP_ERR_TABLE_NOT_FOUND;
+        return OLAP_ERR_TOO_MANY_VERSION;
     }
 
     {
diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp
index d56e8ac..b593ed6 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -61,7 +61,7 @@ Status MysqlResultWriter::init(RuntimeState* state) {
 void MysqlResultWriter::_init_profile() {
     _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
     _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
-    _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultRendTime", "AppendBatchTime");
+    _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
     _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2e0541f..16f24eb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -148,7 +148,6 @@ import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadChecker;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.LoadJob.JobState;
 import org.apache.doris.load.loadv2.LoadEtlChecker;
 import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadLoadingChecker;
@@ -228,10 +227,14 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
@@ -261,11 +264,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-
 public class Catalog {
     private static final Logger LOG = LogManager.getLogger(Catalog.class);
     // 0 ~ 9999 used for qe
@@ -1643,16 +1641,15 @@ public class Catalog {
 
             int loadJobCount = dis.readInt();
             newChecksum ^= loadJobCount;
+            long currentTimeMs = System.currentTimeMillis();
             for (int j = 0; j < loadJobCount; j++) {
                 LoadJob job = new LoadJob();
                 job.readFields(dis);
-                long currentTimeMs = System.currentTimeMillis();
 
                 // Delete the history load jobs that are older than
                 // LABEL_KEEP_MAX_MS
                 // This job must be FINISHED or CANCELLED
-                if ((currentTimeMs - job.getCreateTimeMs()) / 1000 <= Config.label_keep_max_second
-                        || (job.getState() != JobState.FINISHED && job.getState() != JobState.CANCELLED)) {
+                if (!job.isExpired(currentTimeMs)) {
                     load.unprotectAddLoadJob(job, true /* replay */);
                 }
             }
@@ -2212,7 +2209,7 @@ public class Catalog {
     }
 
     public void createTxnCleaner() {
-        txnCleaner = new MasterDaemon("txnCleaner", Config.transaction_clean_interval_second) {
+        txnCleaner = new MasterDaemon("txnCleaner", Config.transaction_clean_interval_second * 1000L) {
             @Override
             protected void runAfterCatalogReady() {
                 globalTransactionMgr.removeExpiredAndTimeoutTxns();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index ff28887..4a14c20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -127,6 +127,12 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int label_keep_max_second = 3 * 24 * 3600; // 3 days
+
+    // For some high frequency load job such as
+    // INSERT、STREAMING LOAD、ROUTINE_LOAD_TASK
+    // Remove the finished job or task if expired.
+    @ConfField(mutable = true, masterOnly = true)
+    public static int streaming_label_keep_max_second = 43200; // 12 hour
   
     /**
      * The max keep time of some kind of jobs.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 89547b6..9399ced 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -52,6 +52,7 @@ import org.apache.doris.persist.BackendIdsUpdateInfo;
 import org.apache.doris.persist.BackendTabletsInfo;
 import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.ClusterInfo;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.ConsistencyCheckInfo;
@@ -86,11 +87,11 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.transaction.TransactionState;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import com.google.common.base.Preconditions;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -431,6 +432,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_BATCH_REMOVE_TXNS: {
+                data = BatchRemoveTransactionsOperation.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_CREATE_REPOSITORY: {
                 data = Repository.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 126fae5..aefe73e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -2608,8 +2608,7 @@ public class Load {
             while (iter.hasNext()) {
                 Map.Entry<Long, LoadJob> entry = iter.next();
                 LoadJob job = entry.getValue();
-                if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > Config.label_keep_max_second
-                        && (job.getState() == JobState.FINISHED || job.getState() == JobState.CANCELLED)) {
+                if (job.isExpired(currentTimeMs)) {
                     long dbId = job.getDbId();
                     String label = job.getLabel();
                     // Remove job from idToLoadJob
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index 89a81dc..9ba1aeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -1012,5 +1012,9 @@ public class LoadJob implements Writable {
         return false;
     }
 
-
+    // Return true if this job is finished for a long time
+    public boolean isExpired(long currentTimeMs) {
+        return (getState() == JobState.FINISHED || getState() == JobState.CANCELLED)
+            && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 75bc2f0..85a1a46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -1191,4 +1191,18 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     public int getLoadParallelism() {
         return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM);
     }
+
+    // Return true if this job is finished for a long time
+    public boolean isExpired(long currentTimeMs) {
+        if (!isCompleted()) {
+            return false;
+        }
+        long expireTime = Config.label_keep_max_second;
+        if (jobType == EtlJobType.INSERT) {
+            expireTime = Config.streaming_label_keep_max_second;
+        }
+
+        return (currentTimeMs - getFinishTimestamp()) / 1000 > expireTime;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0554952..14c5885 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -439,8 +439,7 @@ public class LoadManager implements Writable{
             Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator();
             while (iter.hasNext()) {
                 LoadJob job = iter.next().getValue();
-                if (job.isCompleted()
-                        && ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) {
+                if (job.isExpired(currentTimeMs)) {
                     iter.remove();
                     dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
                     if (job instanceof SparkLoadJob) {
@@ -676,20 +675,6 @@ public class LoadManager implements Writable{
         lock.writeLock().unlock();
     }
 
-    // If load job will be removed by cleaner later, it will not be saved in image.
-    private boolean needSave(LoadJob loadJob) {
-        if (!loadJob.isCompleted()) {
-            return true;
-        }
-
-        long currentTimeMs = System.currentTimeMillis();
-        if (loadJob.isCompleted() && ((currentTimeMs - loadJob.getFinishTimestamp()) / 1000 <= Config.label_keep_max_second)) {
-            return true;
-        }
-
-        return false;
-    }
-
     public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
             List<Long> relatedBackendIds) {
         LoadJob job = idToLoadJob.get(jobId);
@@ -804,7 +789,8 @@ public class LoadManager implements Writable{
 
     @Override
     public void write(DataOutput out) throws IOException {
-        List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
+        long currentTimeMs = System.currentTimeMillis();
+        List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
 
         out.writeInt(loadJobs.size());
         for (LoadJob loadJob : loadJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java
new file mode 100644
index 0000000..5b8f5c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+// Persist the info when removing batch of expired txns
+public class BatchRemoveTransactionsOperation implements Writable {
+
+    @SerializedName(value = "dbTxnIds")
+    // dbId -> List of txns
+    private Map<Long, List<Long>> dbTxnIds;
+
+    public BatchRemoveTransactionsOperation(Map<Long, List<Long>> dbTxnIds) {
+        this.dbTxnIds = dbTxnIds;
+    }
+
+    public Map<Long, List<Long>> getDbTxnIds() {
+        return dbTxnIds;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static BatchRemoveTransactionsOperation read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, BatchRemoveTransactionsOperation.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 42a240f..4c2718e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -596,6 +596,11 @@ public class EditLog {
                     LOG.debug("opcode: {}, tid: {}", opCode, state.getTransactionId());
                     break;
                 }
+                case OperationType.OP_BATCH_REMOVE_TXNS: {
+                    final BatchRemoveTransactionsOperation operation = (BatchRemoveTransactionsOperation) journal.getData();
+                    Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
+                    break;
+                }
                 case OperationType.OP_CREATE_REPOSITORY: {
                     Repository repository = (Repository) journal.getData();
                     catalog.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true);
@@ -1205,11 +1210,7 @@ public class EditLog {
     public void logInsertTransactionState(TransactionState transactionState) {
         logEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
     }
-
-    public void logDeleteTransactionState(TransactionState transactionState) {
-        logEdit(OperationType.OP_DELETE_TRANSACTION_STATE, transactionState);
-    }
-
+    
     public void logBackupJob(BackupJob job) {
         logEdit(OperationType.OP_BACKUP_JOB, job);
     }
@@ -1373,4 +1374,8 @@ public class EditLog {
     public void logReplaceTable(ReplaceTableOperationLog log) {
         logEdit(OperationType.OP_REPLACE_TABLE, log);
     }
+
+    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation op) {
+        logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index fa18b98..1685505 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -140,10 +140,13 @@ public class OperationType {
 
     //real time load 100 -108
     public static final short OP_UPSERT_TRANSACTION_STATE = 100;
+    @Deprecated
+    // use OP_BATCH_REMOVE_TXNS instead
     public static final short OP_DELETE_TRANSACTION_STATE = 101;
     public static final short OP_FINISHING_ROLLUP = 102;
     public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
     public static final short OP_SAVE_TRANSACTION_ID = 104;
+    public static final short OP_BATCH_REMOVE_TXNS = 105;
 
     // routine load 110~120
     public static final short OP_ROUTINE_LOAD_JOB = 110;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index cda888d..455226b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -45,6 +45,7 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.AgentBatchTask;
@@ -55,10 +56,6 @@ import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -66,6 +63,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -93,6 +94,9 @@ import java.util.stream.Collectors;
 public class DatabaseTransactionMgr {
 
     private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+    // the max number of txn that can be remove per round.
+    // set it to avoid holding lock too long when removing too many txns per round.
+    private static final int MAX_REMOVE_TXN_PER_ROUND = 10000;
 
     private long dbId;
 
@@ -106,9 +110,12 @@ public class DatabaseTransactionMgr {
     // transactionId -> final status TransactionState
     private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
 
-
-    // to store transactionStates with final status
-    private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+    // The following 2 queues are to store transactionStates with final status
+    // These queues are mainly used to avoid traversing all txns and speed up the cleaning time when cleaning up expired txs.
+    // The "Short" queue is used to store the txns of the expire time controlled by Config.streaming_label_keep_max_second.
+    // The "Long" queue is used to store the txns of the expire time controlled by Config.label_keep_max_second.
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDequeShort = new ArrayDeque<>();
+    private ArrayDeque<TransactionState> finalStatusTransactionStateDequeLong = new ArrayDeque<>();
 
     // label -> txn ids
     // this is used for checking if label already used. a label may correspond to multiple txns,
@@ -203,7 +210,7 @@ public class DatabaseTransactionMgr {
 
     @VisibleForTesting
     protected int getFinishedTxnNums() {
-        return finalStatusTransactionStateDeque.size();
+        return idToFinalStatusTransactionState.size();
     }
 
     public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
@@ -214,7 +221,7 @@ public class DatabaseTransactionMgr {
             if (running) {
                 transactionStateCollection = idToRunningTransactionState.values();
             } else {
-                transactionStateCollection = finalStatusTransactionStateDeque;
+                transactionStateCollection = idToFinalStatusTransactionState.values();
             }
             // get transaction order by txn id desc limit 'limit'
             transactionStateCollection.stream()
@@ -590,15 +597,44 @@ public class DatabaseTransactionMgr {
         return transactionState.getTransactionStatus() == TransactionStatus.VISIBLE;
     }
 
-    public void deleteTransaction(TransactionState transactionState) {
+    @Deprecated
+    // use replayBatchDeleteTransaction instead
+    public void replayDeleteTransaction(TransactionState transactionState) {
         writeLock();
         try {
             // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
-            // it must at the front of the finalStatusTransactionStateDeque
-            if (!finalStatusTransactionStateDeque.isEmpty() &&
-            transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
-                finalStatusTransactionStateDeque.pop();
-                clearTransactionState(transactionState);
+            // it must at the front of the finalStatusTransactionStateDeque.
+            // check both "short" and "long" queue.
+            if (!finalStatusTransactionStateDequeShort.isEmpty() &&
+                    transactionState.getTransactionId() == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDequeShort.pop();
+                clearTransactionState(transactionState.getTransactionId());
+            } else if (!finalStatusTransactionStateDequeLong.isEmpty() &&
+                    transactionState.getTransactionId() == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
+                finalStatusTransactionStateDequeLong.pop();
+                clearTransactionState(transactionState.getTransactionId());
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void replayBatchRemoveTransaction(List<Long> txnIds) {
+        writeLock();
+        try {
+            for (Long txnId : txnIds) {
+                // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+                // it must at the front of the finalStatusTransactionStateDeque
+                // check both "short" and "long" queue.
+                if (!finalStatusTransactionStateDequeShort.isEmpty() &&
+                        txnId == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
+                    finalStatusTransactionStateDequeShort.pop();
+                    clearTransactionState(txnId);
+                } else if (!finalStatusTransactionStateDequeLong.isEmpty() &&
+                        txnId == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
+                    finalStatusTransactionStateDequeLong.pop();
+                    clearTransactionState(txnId);
+                }
             }
         } finally {
             writeUnlock();
@@ -889,7 +925,11 @@ public class DatabaseTransactionMgr {
                 }
             }
             idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
-            finalStatusTransactionStateDeque.add(transactionState);
+            if (transactionState.isShortTxn()) {
+                finalStatusTransactionStateDequeShort.add(transactionState);
+            } else {
+                finalStatusTransactionStateDequeLong.add(transactionState);
+            }
         }
         updateTxnLabels(transactionState);
     }
@@ -1076,36 +1116,59 @@ public class DatabaseTransactionMgr {
     }
 
     public void removeExpiredTxns(long currentMillis) {
+        List<Long> expiredTxnIds = Lists.newArrayList();
+        // delete expired txns
+        int leftNum = MAX_REMOVE_TXN_PER_ROUND;
         writeLock();
         try {
-            while (!finalStatusTransactionStateDeque.isEmpty()) {
-                TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
-                if (transactionState.isExpired(currentMillis)) {
-                    finalStatusTransactionStateDeque.pop();
-                    clearTransactionState(transactionState);
-                    editLog.logDeleteTransactionState(transactionState);
-                    LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
-                } else {
-                    break;
-                }
-
-            }
+            leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, finalStatusTransactionStateDequeShort, leftNum);
+            leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, finalStatusTransactionStateDequeLong, leftNum);
+
+            Map<Long, List<Long>> dbExpiredTxnIds = Maps.newHashMap();
+            dbExpiredTxnIds.put(dbId, expiredTxnIds);
+            BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbExpiredTxnIds);
+            editLog.logBatchRemoveTransactions(op);
+            LOG.info("Remove {} expirted transactions", MAX_REMOVE_TXN_PER_ROUND - leftNum);
         } finally {
             writeUnlock();
         }
     }
 
-    private void clearTransactionState(TransactionState transactionState) {
-        idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
-        Set<Long> txnIds = unprotectedGetTxnIdsByLabel(transactionState.getLabel());
-        txnIds.remove(transactionState.getTransactionId());
-        if (txnIds.isEmpty()) {
-            labelToTxnIds.remove(transactionState.getLabel());
+    private int unprotectedRemoveExpiredTxns(long currentMillis, List<Long> expiredTxnIds,
+                                             ArrayDeque<TransactionState> finalStatusTransactionStateDequeShort,
+                                             int maxNumber) {
+        int left = maxNumber;
+        while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) {
+            TransactionState transactionState = finalStatusTransactionStateDequeShort.getFirst();
+            if (transactionState.isExpired(currentMillis)) {
+                finalStatusTransactionStateDequeShort.pop();
+                clearTransactionState(transactionState.getTransactionId());
+                expiredTxnIds.add(transactionState.getTransactionId());
+                left--;
+            } else {
+                break;
+            }
+        }
+        return left;
+    }
+
+    private void clearTransactionState(long txnId) {
+        TransactionState transactionState = idToFinalStatusTransactionState.remove(txnId);
+        if (transactionState != null) {
+            Set<Long> txnIds = unprotectedGetTxnIdsByLabel(transactionState.getLabel());
+            txnIds.remove(transactionState.getTransactionId());
+            if (txnIds.isEmpty()) {
+                labelToTxnIds.remove(transactionState.getLabel());
+            }
+            LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
+        } else {
+            // should not happen, add a warn log to observer
+            LOG.warn("transaction state is not found when clear transaction: " + txnId);
         }
     }
 
     public int getTransactionNum() {
-        return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();
+        return idToRunningTransactionState.size() + idToFinalStatusTransactionState.size();
     }
 
 
@@ -1117,7 +1180,7 @@ public class DatabaseTransactionMgr {
                     return txn;
                 }
             }
-            for (TransactionState txn : finalStatusTransactionStateDeque) {
+            for (TransactionState txn : idToFinalStatusTransactionState.values()) {
                 if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
                     return txn;
                 }
@@ -1136,7 +1199,7 @@ public class DatabaseTransactionMgr {
                     return txn;
                 }
             }
-            for (TransactionState txn : finalStatusTransactionStateDeque) {
+            for (TransactionState txn : idToFinalStatusTransactionState.values()) {
                 if (txn.getCallbackId() == callbackId) {
                     return txn;
                 }
@@ -1436,9 +1499,13 @@ public class DatabaseTransactionMgr {
             entry.getValue().write(out);
         }
 
-        for (TransactionState transactionState : finalStatusTransactionStateDeque) {
+        // Use 2 queues instead of idToFinalStatusTransactionState to keep the order in queues.
+        for (TransactionState transactionState : finalStatusTransactionStateDequeShort) {
             transactionState.write(out);
         }
-    }
 
+        for (TransactionState transactionState : finalStatusTransactionStateDequeLong) {
+            transactionState.write(out);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 55795a4..f71d2be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.transaction;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
@@ -30,6 +28,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -38,6 +37,8 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -325,18 +326,31 @@ public class GlobalTransactionMgr implements Writable {
         } catch (AnalysisException e) {
             LOG.warn("replay upsert transaction [" + transactionState.getTransactionId() + "] failed", e);
         }
-
     }
-    
+
+    @Deprecated
+    // Use replayBatchDeleteTransactions instead
     public void replayDeleteTransactionState(TransactionState transactionState) {
         try {
             DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
-            dbTransactionMgr.deleteTransaction(transactionState);
+            dbTransactionMgr.replayDeleteTransaction(transactionState);
         } catch (AnalysisException e) {
             LOG.warn("replay delete transaction [" + transactionState.getTransactionId() + "] failed", e);
         }
     }
 
+    public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation operation) {
+        Map<Long, List<Long>> dbTxnIds = operation.getDbTxnIds();
+        for (Long dbId : dbTxnIds.keySet()) {
+            try {
+                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
+                dbTransactionMgr.replayBatchRemoveTransaction(dbTxnIds.get(dbId));
+            } catch (AnalysisException e) {
+                LOG.warn("replay batch remove transactions failed. db " + dbId, e);
+            }
+        }
+    }
+
     public List<List<Comparable>> getDbInfo() {
         List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
         List<Long> dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 6ec245c..40c8cfc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -485,7 +485,21 @@ public class TransactionState implements Writable {
     
     // return true if txn is in final status and label is expired
     public boolean isExpired(long currentMillis) {
-        return transactionStatus.isFinalStatus() && (currentMillis - finishTime) / 1000 > Config.label_keep_max_second;
+        if (!transactionStatus.isFinalStatus()) {
+            return false;
+        }
+        long expireTime = Config.label_keep_max_second;
+        if (isShortTxn()) {
+            expireTime = Config.stream_load_default_timeout_second;
+        }
+        return (currentMillis - finishTime) / 1000 > expireTime;
+    }
+
+    // Return true if this txn is related to streaming load/insert/routine load task.
+    // We call these tasks "Short" tasks because they will be cleaned up in a short time after they are finished.
+    public boolean isShortTxn() {
+        return sourceType == LoadJobSourceType.BACKEND_STREAMING || sourceType == LoadJobSourceType.INSERT_STREAMING
+                || sourceType == LoadJobSourceType.ROUTINE_LOAD_TASK;
     }
 
     // return true if txn is running but timeout
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index d04e8b8..06b7c0f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -22,6 +22,7 @@ import org.apache.doris.alter.BatchAlterJobPersistInfo;
 import org.apache.doris.alter.RollupJob;
 import org.apache.doris.alter.SchemaChangeJob;
 import org.apache.doris.cluster.Cluster;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
@@ -113,6 +114,11 @@ public class FakeEditLog extends MockUp<EditLog> {
 
     }
 
+    @Mock
+    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation info) {
+
+    }
+
     public TransactionState getTransaction(long transactionId) {
         return allTransactionState.get(transactionId);
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index f6a4e0c..d8ae0a5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -161,6 +161,9 @@ public class LoadManagerTest {
                 table.getName();
                 minTimes = 0;
                 result = "tablename";
+                Catalog.getCurrentCatalogJournalVersion();
+                minTimes = 0;
+                result = FeMetaVersion.VERSION_CURRENT;
             }
         };
 
@@ -169,7 +172,7 @@ public class LoadManagerTest {
         Deencapsulation.invoke(loadManager, "addLoadJob", job1);
 
         //make job1 don't serialize
-        Config.label_keep_max_second = 1;
+        Config.streaming_label_keep_max_second = 1;
         Thread.sleep(2000);
 
         File file = serializeToFile(loadManager);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java
new file mode 100644
index 0000000..85532fb
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.meta.MetaContext;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.List;
+import java.util.Map;
+
+public class BatchRemoveTransactionOperationTest {
+    @Test
+    public void testSerialization() throws Exception {
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeConstants.meta_version);
+        metaContext.setThreadLocalInfo();
+
+        // 1. Write objects to file
+        File file = new File("./BatchRemoveTransactionOperationTest");
+        file.createNewFile();
+        DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
+
+        Map<Long, List<Long>> dbTxnIds = Maps.newHashMap();
+        dbTxnIds.put(1000L, Lists.newArrayList());
+        dbTxnIds.get(1000L).add(1L);
+        dbTxnIds.get(1000L).add(2L);
+        dbTxnIds.get(1000L).add(3L);
+        BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbTxnIds);
+        op.write(dos);
+
+        dos.flush();
+        dos.close();
+        
+        // 2. Read objects from file
+        DataInputStream dis = new DataInputStream(new FileInputStream(file));
+        BatchRemoveTransactionsOperation op2 = BatchRemoveTransactionsOperation.read(dis);
+        Assert.assertEquals(1, op2.getDbTxnIds().size());
+        Assert.assertEquals(3, op2.getDbTxnIds().get(1000L).size());
+        Assert.assertTrue(op2.getDbTxnIds().get(1000L).contains(1L));
+
+        // 3. delete files
+        dis.close();
+        file.delete();
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index c723fa3..1caf243 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.transaction;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.FakeCatalog;
@@ -32,6 +30,9 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.meta.MetaContext;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -264,7 +265,7 @@ public class DatabaseTransactionMgrTest {
         DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
         long txnId = LabelToTxnId.get(CatalogTestUtil.testTxnLabel1);
         TransactionState transactionState = masterDbTransMgr.getTransactionState(txnId);
-        masterDbTransMgr.deleteTransaction(transactionState);
+        masterDbTransMgr.replayDeleteTransaction(transactionState);
         assertEquals(2, masterDbTransMgr.getRunningTxnNums());
         assertEquals(1, masterDbTransMgr.getRunningRoutineLoadTxnNums());
         assertEquals(0, masterDbTransMgr.getFinishedTxnNums());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org