You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/04/23 01:47:40 UTC
[incubator-doris] branch master updated: [Optimize] Remove expired
txns in batch to avoid holding lock for too long (#5675)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b93e841 [Optimize] Remove expired txns in batch to avoid holding lock for too long (#5675)
b93e841 is described below
commit b93e8416885ac440e9c294c4bb2089e33bde788b
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 23 09:47:30 2021 +0800
[Optimize] Remove expired txns in batch to avoid holding lock for too long (#5675)
This CL mainly changes:
1. Add a config to control the expire time of load job
Add a new FE config "streaming_label_keep_max_second" to control
the expire time of some high frequency load job such as INSERT and STREAM LOAD.
2. Remove expired txn in batch to avoid holding transaction lock for a long time
---
be/src/olap/delta_writer.cpp | 2 +-
be/src/runtime/mysql_result_writer.cpp | 2 +-
.../java/org/apache/doris/catalog/Catalog.java | 17 +--
.../main/java/org/apache/doris/common/Config.java | 6 +
.../org/apache/doris/journal/JournalEntity.java | 10 +-
.../src/main/java/org/apache/doris/load/Load.java | 3 +-
.../main/java/org/apache/doris/load/LoadJob.java | 6 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 14 ++
.../org/apache/doris/load/loadv2/LoadManager.java | 20 +--
.../persist/BatchRemoveTransactionsOperation.java | 57 ++++++++
.../java/org/apache/doris/persist/EditLog.java | 15 ++-
.../org/apache/doris/persist/OperationType.java | 3 +
.../doris/transaction/DatabaseTransactionMgr.java | 145 +++++++++++++++------
.../doris/transaction/GlobalTransactionMgr.java | 24 +++-
.../apache/doris/transaction/TransactionState.java | 16 ++-
.../java/org/apache/doris/catalog/FakeEditLog.java | 6 +
.../apache/doris/load/loadv2/LoadManagerTest.java | 5 +-
.../BatchRemoveTransactionOperationTest.java | 71 ++++++++++
.../transaction/DatabaseTransactionMgrTest.java | 7 +-
19 files changed, 341 insertions(+), 88 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 4ab2fb5..692b136 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -108,7 +108,7 @@ OLAPStatus DeltaWriter::init() {
LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << _tablet->full_name();
- return OLAP_ERR_TABLE_NOT_FOUND;
+ return OLAP_ERR_TOO_MANY_VERSION;
}
{
diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp
index d56e8ac..b593ed6 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -61,7 +61,7 @@ Status MysqlResultWriter::init(RuntimeState* state) {
void MysqlResultWriter::_init_profile() {
_append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
_convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
- _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultRendTime", "AppendBatchTime");
+ _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2e0541f..16f24eb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -148,7 +148,6 @@ import org.apache.doris.load.Load;
import org.apache.doris.load.LoadChecker;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.loadv2.LoadEtlChecker;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadLoadingChecker;
@@ -228,10 +227,14 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
@@ -261,11 +264,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import com.sleepycat.je.rep.InsufficientLogException;
-import com.sleepycat.je.rep.NetworkRestore;
-import com.sleepycat.je.rep.NetworkRestoreConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-
public class Catalog {
private static final Logger LOG = LogManager.getLogger(Catalog.class);
// 0 ~ 9999 used for qe
@@ -1643,16 +1641,15 @@ public class Catalog {
int loadJobCount = dis.readInt();
newChecksum ^= loadJobCount;
+ long currentTimeMs = System.currentTimeMillis();
for (int j = 0; j < loadJobCount; j++) {
LoadJob job = new LoadJob();
job.readFields(dis);
- long currentTimeMs = System.currentTimeMillis();
// Delete the history load jobs that are older than
// LABEL_KEEP_MAX_MS
// This job must be FINISHED or CANCELLED
- if ((currentTimeMs - job.getCreateTimeMs()) / 1000 <= Config.label_keep_max_second
- || (job.getState() != JobState.FINISHED && job.getState() != JobState.CANCELLED)) {
+ if (!job.isExpired(currentTimeMs)) {
load.unprotectAddLoadJob(job, true /* replay */);
}
}
@@ -2212,7 +2209,7 @@ public class Catalog {
}
public void createTxnCleaner() {
- txnCleaner = new MasterDaemon("txnCleaner", Config.transaction_clean_interval_second) {
+ txnCleaner = new MasterDaemon("txnCleaner", Config.transaction_clean_interval_second * 1000L) {
@Override
protected void runAfterCatalogReady() {
globalTransactionMgr.removeExpiredAndTimeoutTxns();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index ff28887..4a14c20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -127,6 +127,12 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int label_keep_max_second = 3 * 24 * 3600; // 3 days
+
+ // For some high frequency load job such as
+ // INSERT、STREAMING LOAD、ROUTINE_LOAD_TASK
+ // Remove the finished job or task if expired.
+ @ConfField(mutable = true, masterOnly = true)
+ public static int streaming_label_keep_max_second = 43200; // 12 hour
/**
* The max keep time of some kind of jobs.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 89547b6..9399ced 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -52,6 +52,7 @@ import org.apache.doris.persist.BackendIdsUpdateInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.ClusterInfo;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.ConsistencyCheckInfo;
@@ -86,11 +87,11 @@ import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
+import com.google.common.base.Preconditions;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.google.common.base.Preconditions;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -431,6 +432,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_BATCH_REMOVE_TXNS: {
+ data = BatchRemoveTransactionsOperation.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_CREATE_REPOSITORY: {
data = Repository.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 126fae5..aefe73e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -2608,8 +2608,7 @@ public class Load {
while (iter.hasNext()) {
Map.Entry<Long, LoadJob> entry = iter.next();
LoadJob job = entry.getValue();
- if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > Config.label_keep_max_second
- && (job.getState() == JobState.FINISHED || job.getState() == JobState.CANCELLED)) {
+ if (job.isExpired(currentTimeMs)) {
long dbId = job.getDbId();
String label = job.getLabel();
// Remove job from idToLoadJob
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index 89a81dc..9ba1aeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -1012,5 +1012,9 @@ public class LoadJob implements Writable {
return false;
}
-
+ // Return true if this job is finished for a long time
+ public boolean isExpired(long currentTimeMs) {
+ return (getState() == JobState.FINISHED || getState() == JobState.CANCELLED)
+ && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 75bc2f0..85a1a46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -1191,4 +1191,18 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public int getLoadParallelism() {
return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM);
}
+
+ // Return true if this job is finished for a long time
+ public boolean isExpired(long currentTimeMs) {
+ if (!isCompleted()) {
+ return false;
+ }
+ long expireTime = Config.label_keep_max_second;
+ if (jobType == EtlJobType.INSERT) {
+ expireTime = Config.streaming_label_keep_max_second;
+ }
+
+ return (currentTimeMs - getFinishTimestamp()) / 1000 > expireTime;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0554952..14c5885 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -439,8 +439,7 @@ public class LoadManager implements Writable{
Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator();
while (iter.hasNext()) {
LoadJob job = iter.next().getValue();
- if (job.isCompleted()
- && ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) {
+ if (job.isExpired(currentTimeMs)) {
iter.remove();
dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
if (job instanceof SparkLoadJob) {
@@ -676,20 +675,6 @@ public class LoadManager implements Writable{
lock.writeLock().unlock();
}
- // If load job will be removed by cleaner later, it will not be saved in image.
- private boolean needSave(LoadJob loadJob) {
- if (!loadJob.isCompleted()) {
- return true;
- }
-
- long currentTimeMs = System.currentTimeMillis();
- if (loadJob.isCompleted() && ((currentTimeMs - loadJob.getFinishTimestamp()) / 1000 <= Config.label_keep_max_second)) {
- return true;
- }
-
- return false;
- }
-
public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
List<Long> relatedBackendIds) {
LoadJob job = idToLoadJob.get(jobId);
@@ -804,7 +789,8 @@ public class LoadManager implements Writable{
@Override
public void write(DataOutput out) throws IOException {
- List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(this::needSave).collect(Collectors.toList());
+ long currentTimeMs = System.currentTimeMillis();
+ List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java
new file mode 100644
index 0000000..5b8f5c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperation.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+// Persist the info when removing batch of expired txns
+public class BatchRemoveTransactionsOperation implements Writable {
+
+ @SerializedName(value = "dbTxnIds")
+ // dbId -> List of txns
+ private Map<Long, List<Long>> dbTxnIds;
+
+ public BatchRemoveTransactionsOperation(Map<Long, List<Long>> dbTxnIds) {
+ this.dbTxnIds = dbTxnIds;
+ }
+
+ public Map<Long, List<Long>> getDbTxnIds() {
+ return dbTxnIds;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static BatchRemoveTransactionsOperation read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, BatchRemoveTransactionsOperation.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 42a240f..4c2718e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -596,6 +596,11 @@ public class EditLog {
LOG.debug("opcode: {}, tid: {}", opCode, state.getTransactionId());
break;
}
+ case OperationType.OP_BATCH_REMOVE_TXNS: {
+ final BatchRemoveTransactionsOperation operation = (BatchRemoveTransactionsOperation) journal.getData();
+ Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
+ break;
+ }
case OperationType.OP_CREATE_REPOSITORY: {
Repository repository = (Repository) journal.getData();
catalog.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true);
@@ -1205,11 +1210,7 @@ public class EditLog {
public void logInsertTransactionState(TransactionState transactionState) {
logEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState);
}
-
- public void logDeleteTransactionState(TransactionState transactionState) {
- logEdit(OperationType.OP_DELETE_TRANSACTION_STATE, transactionState);
- }
-
+
public void logBackupJob(BackupJob job) {
logEdit(OperationType.OP_BACKUP_JOB, job);
}
@@ -1373,4 +1374,8 @@ public class EditLog {
public void logReplaceTable(ReplaceTableOperationLog log) {
logEdit(OperationType.OP_REPLACE_TABLE, log);
}
+
+ public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation op) {
+ logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index fa18b98..1685505 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -140,10 +140,13 @@ public class OperationType {
//real time load 100 -108
public static final short OP_UPSERT_TRANSACTION_STATE = 100;
+ @Deprecated
+ // use OP_BATCH_REMOVE_TXNS instead
public static final short OP_DELETE_TRANSACTION_STATE = 101;
public static final short OP_FINISHING_ROLLUP = 102;
public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
public static final short OP_SAVE_TRANSACTION_ID = 104;
+ public static final short OP_BATCH_REMOVE_TXNS = 105;
// routine load 110~120
public static final short OP_ROUTINE_LOAD_JOB = 110;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index cda888d..455226b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -45,6 +45,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
@@ -55,10 +56,6 @@ import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -66,6 +63,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -93,6 +94,9 @@ import java.util.stream.Collectors;
public class DatabaseTransactionMgr {
private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
+ // the max number of txn that can be remove per round.
+ // set it to avoid holding lock too long when removing too many txns per round.
+ private static final int MAX_REMOVE_TXN_PER_ROUND = 10000;
private long dbId;
@@ -106,9 +110,12 @@ public class DatabaseTransactionMgr {
// transactionId -> final status TransactionState
private Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
-
- // to store transactionStates with final status
- private ArrayDeque<TransactionState> finalStatusTransactionStateDeque = new ArrayDeque<>();
+ // The following 2 queues are to store transactionStates with final status
+ // These queues are mainly used to avoid traversing all txns and speed up the cleaning time when cleaning up expired txs.
+ // The "Short" queue is used to store the txns of the expire time controlled by Config.streaming_label_keep_max_second.
+ // The "Long" queue is used to store the txns of the expire time controlled by Config.label_keep_max_second.
+ private ArrayDeque<TransactionState> finalStatusTransactionStateDequeShort = new ArrayDeque<>();
+ private ArrayDeque<TransactionState> finalStatusTransactionStateDequeLong = new ArrayDeque<>();
// label -> txn ids
// this is used for checking if label already used. a label may correspond to multiple txns,
@@ -203,7 +210,7 @@ public class DatabaseTransactionMgr {
@VisibleForTesting
protected int getFinishedTxnNums() {
- return finalStatusTransactionStateDeque.size();
+ return idToFinalStatusTransactionState.size();
}
public List<List<String>> getTxnStateInfoList(boolean running, int limit) {
@@ -214,7 +221,7 @@ public class DatabaseTransactionMgr {
if (running) {
transactionStateCollection = idToRunningTransactionState.values();
} else {
- transactionStateCollection = finalStatusTransactionStateDeque;
+ transactionStateCollection = idToFinalStatusTransactionState.values();
}
// get transaction order by txn id desc limit 'limit'
transactionStateCollection.stream()
@@ -590,15 +597,44 @@ public class DatabaseTransactionMgr {
return transactionState.getTransactionStatus() == TransactionStatus.VISIBLE;
}
- public void deleteTransaction(TransactionState transactionState) {
+ @Deprecated
+ // use replayBatchDeleteTransaction instead
+ public void replayDeleteTransaction(TransactionState transactionState) {
writeLock();
try {
// here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
- // it must at the front of the finalStatusTransactionStateDeque
- if (!finalStatusTransactionStateDeque.isEmpty() &&
- transactionState.getTransactionId() == finalStatusTransactionStateDeque.getFirst().getTransactionId()) {
- finalStatusTransactionStateDeque.pop();
- clearTransactionState(transactionState);
+ // it must at the front of the finalStatusTransactionStateDeque.
+ // check both "short" and "long" queue.
+ if (!finalStatusTransactionStateDequeShort.isEmpty() &&
+ transactionState.getTransactionId() == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
+ finalStatusTransactionStateDequeShort.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ } else if (!finalStatusTransactionStateDequeLong.isEmpty() &&
+ transactionState.getTransactionId() == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
+ finalStatusTransactionStateDequeLong.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void replayBatchRemoveTransaction(List<Long> txnIds) {
+ writeLock();
+ try {
+ for (Long txnId : txnIds) {
+ // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque,
+ // it must at the front of the finalStatusTransactionStateDeque
+ // check both "short" and "long" queue.
+ if (!finalStatusTransactionStateDequeShort.isEmpty() &&
+ txnId == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) {
+ finalStatusTransactionStateDequeShort.pop();
+ clearTransactionState(txnId);
+ } else if (!finalStatusTransactionStateDequeLong.isEmpty() &&
+ txnId == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) {
+ finalStatusTransactionStateDequeLong.pop();
+ clearTransactionState(txnId);
+ }
}
} finally {
writeUnlock();
@@ -889,7 +925,11 @@ public class DatabaseTransactionMgr {
}
}
idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState);
- finalStatusTransactionStateDeque.add(transactionState);
+ if (transactionState.isShortTxn()) {
+ finalStatusTransactionStateDequeShort.add(transactionState);
+ } else {
+ finalStatusTransactionStateDequeLong.add(transactionState);
+ }
}
updateTxnLabels(transactionState);
}
@@ -1076,36 +1116,59 @@ public class DatabaseTransactionMgr {
}
public void removeExpiredTxns(long currentMillis) {
+ List<Long> expiredTxnIds = Lists.newArrayList();
+ // delete expired txns
+ int leftNum = MAX_REMOVE_TXN_PER_ROUND;
writeLock();
try {
- while (!finalStatusTransactionStateDeque.isEmpty()) {
- TransactionState transactionState = finalStatusTransactionStateDeque.getFirst();
- if (transactionState.isExpired(currentMillis)) {
- finalStatusTransactionStateDeque.pop();
- clearTransactionState(transactionState);
- editLog.logDeleteTransactionState(transactionState);
- LOG.info("transaction [" + transactionState.getTransactionId() + "] is expired, remove it from transaction manager");
- } else {
- break;
- }
-
- }
+ leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, finalStatusTransactionStateDequeShort, leftNum);
+ leftNum = unprotectedRemoveExpiredTxns(currentMillis, expiredTxnIds, finalStatusTransactionStateDequeLong, leftNum);
+
+ Map<Long, List<Long>> dbExpiredTxnIds = Maps.newHashMap();
+ dbExpiredTxnIds.put(dbId, expiredTxnIds);
+ BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbExpiredTxnIds);
+ editLog.logBatchRemoveTransactions(op);
+ LOG.info("Remove {} expirted transactions", MAX_REMOVE_TXN_PER_ROUND - leftNum);
} finally {
writeUnlock();
}
}
- private void clearTransactionState(TransactionState transactionState) {
- idToFinalStatusTransactionState.remove(transactionState.getTransactionId());
- Set<Long> txnIds = unprotectedGetTxnIdsByLabel(transactionState.getLabel());
- txnIds.remove(transactionState.getTransactionId());
- if (txnIds.isEmpty()) {
- labelToTxnIds.remove(transactionState.getLabel());
+ private int unprotectedRemoveExpiredTxns(long currentMillis, List<Long> expiredTxnIds,
+ ArrayDeque<TransactionState> finalStatusTransactionStateDequeShort,
+ int maxNumber) {
+ int left = maxNumber;
+ while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) {
+ TransactionState transactionState = finalStatusTransactionStateDequeShort.getFirst();
+ if (transactionState.isExpired(currentMillis)) {
+ finalStatusTransactionStateDequeShort.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ expiredTxnIds.add(transactionState.getTransactionId());
+ left--;
+ } else {
+ break;
+ }
+ }
+ return left;
+ }
+
+ private void clearTransactionState(long txnId) {
+ TransactionState transactionState = idToFinalStatusTransactionState.remove(txnId);
+ if (transactionState != null) {
+ Set<Long> txnIds = unprotectedGetTxnIdsByLabel(transactionState.getLabel());
+ txnIds.remove(transactionState.getTransactionId());
+ if (txnIds.isEmpty()) {
+ labelToTxnIds.remove(transactionState.getLabel());
+ }
+ LOG.info("transaction [" + txnId + "] is expired, remove it from transaction manager");
+ } else {
+ // should not happen, add a warn log to observer
+ LOG.warn("transaction state is not found when clear transaction: " + txnId);
}
}
public int getTransactionNum() {
- return idToRunningTransactionState.size() + finalStatusTransactionStateDeque.size();
+ return idToRunningTransactionState.size() + idToFinalStatusTransactionState.size();
}
@@ -1117,7 +1180,7 @@ public class DatabaseTransactionMgr {
return txn;
}
}
- for (TransactionState txn : finalStatusTransactionStateDeque) {
+ for (TransactionState txn : idToFinalStatusTransactionState.values()) {
if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
return txn;
}
@@ -1136,7 +1199,7 @@ public class DatabaseTransactionMgr {
return txn;
}
}
- for (TransactionState txn : finalStatusTransactionStateDeque) {
+ for (TransactionState txn : idToFinalStatusTransactionState.values()) {
if (txn.getCallbackId() == callbackId) {
return txn;
}
@@ -1436,9 +1499,13 @@ public class DatabaseTransactionMgr {
entry.getValue().write(out);
}
- for (TransactionState transactionState : finalStatusTransactionStateDeque) {
+ // Use 2 queues instead of idToFinalStatusTransactionState to keep the order in queues.
+ for (TransactionState transactionState : finalStatusTransactionStateDequeShort) {
transactionState.write(out);
}
- }
+ for (TransactionState transactionState : finalStatusTransactionStateDequeLong) {
+ transactionState.write(out);
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 55795a4..f71d2be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,8 +17,6 @@
package org.apache.doris.transaction;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
@@ -30,6 +28,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -38,6 +37,8 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -325,18 +326,31 @@ public class GlobalTransactionMgr implements Writable {
} catch (AnalysisException e) {
LOG.warn("replay upsert transaction [" + transactionState.getTransactionId() + "] failed", e);
}
-
}
-
+
+ @Deprecated
+ // Use replayBatchDeleteTransactions instead
public void replayDeleteTransactionState(TransactionState transactionState) {
try {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
- dbTransactionMgr.deleteTransaction(transactionState);
+ dbTransactionMgr.replayDeleteTransaction(transactionState);
} catch (AnalysisException e) {
LOG.warn("replay delete transaction [" + transactionState.getTransactionId() + "] failed", e);
}
}
+ public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation operation) {
+ Map<Long, List<Long>> dbTxnIds = operation.getDbTxnIds();
+ for (Long dbId : dbTxnIds.keySet()) {
+ try {
+ DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
+ dbTransactionMgr.replayBatchRemoveTransaction(dbTxnIds.get(dbId));
+ } catch (AnalysisException e) {
+ LOG.warn("replay batch remove transactions failed. db " + dbId, e);
+ }
+ }
+ }
+
public List<List<Comparable>> getDbInfo() {
List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
List<Long> dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 6ec245c..40c8cfc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -485,7 +485,21 @@ public class TransactionState implements Writable {
// return true if txn is in final status and label is expired
public boolean isExpired(long currentMillis) {
- return transactionStatus.isFinalStatus() && (currentMillis - finishTime) / 1000 > Config.label_keep_max_second;
+ if (!transactionStatus.isFinalStatus()) {
+ return false;
+ }
+ long expireTime = Config.label_keep_max_second;
+ if (isShortTxn()) {
+ expireTime = Config.stream_load_default_timeout_second;
+ }
+ return (currentMillis - finishTime) / 1000 > expireTime;
+ }
+
+ // Return true if this txn is related to streaming load/insert/routine load task.
+ // We call these tasks "Short" tasks because they will be cleaned up in a short time after they are finished.
+ public boolean isShortTxn() {
+ return sourceType == LoadJobSourceType.BACKEND_STREAMING || sourceType == LoadJobSourceType.INSERT_STREAMING
+ || sourceType == LoadJobSourceType.ROUTINE_LOAD_TASK;
}
// return true if txn is running but timeout
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index d04e8b8..06b7c0f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -22,6 +22,7 @@ import org.apache.doris.alter.BatchAlterJobPersistInfo;
import org.apache.doris.alter.RollupJob;
import org.apache.doris.alter.SchemaChangeJob;
import org.apache.doris.cluster.Cluster;
+import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
@@ -113,6 +114,11 @@ public class FakeEditLog extends MockUp<EditLog> {
}
+ @Mock
+ public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation info) {
+
+ }
+
public TransactionState getTransaction(long transactionId) {
return allTransactionState.get(transactionId);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index f6a4e0c..d8ae0a5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -161,6 +161,9 @@ public class LoadManagerTest {
table.getName();
minTimes = 0;
result = "tablename";
+ Catalog.getCurrentCatalogJournalVersion();
+ minTimes = 0;
+ result = FeMetaVersion.VERSION_CURRENT;
}
};
@@ -169,7 +172,7 @@ public class LoadManagerTest {
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
//make job1 don't serialize
- Config.label_keep_max_second = 1;
+ Config.streaming_label_keep_max_second = 1;
Thread.sleep(2000);
File file = serializeToFile(loadManager);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java
new file mode 100644
index 0000000..85532fb
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BatchRemoveTransactionOperationTest.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.meta.MetaContext;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.List;
+import java.util.Map;
+
+public class BatchRemoveTransactionOperationTest {
+ @Test
+ public void testSerialization() throws Exception {
+ MetaContext metaContext = new MetaContext();
+ metaContext.setMetaVersion(FeConstants.meta_version);
+ metaContext.setThreadLocalInfo();
+
+ // 1. Write objects to file
+ File file = new File("./BatchRemoveTransactionOperationTest");
+ file.createNewFile();
+ DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
+
+ Map<Long, List<Long>> dbTxnIds = Maps.newHashMap();
+ dbTxnIds.put(1000L, Lists.newArrayList());
+ dbTxnIds.get(1000L).add(1L);
+ dbTxnIds.get(1000L).add(2L);
+ dbTxnIds.get(1000L).add(3L);
+ BatchRemoveTransactionsOperation op = new BatchRemoveTransactionsOperation(dbTxnIds);
+ op.write(dos);
+
+ dos.flush();
+ dos.close();
+
+ // 2. Read objects from file
+ DataInputStream dis = new DataInputStream(new FileInputStream(file));
+ BatchRemoveTransactionsOperation op2 = BatchRemoveTransactionsOperation.read(dis);
+ Assert.assertEquals(1, op2.getDbTxnIds().size());
+ Assert.assertEquals(3, op2.getDbTxnIds().get(1000L).size());
+ Assert.assertTrue(op2.getDbTxnIds().get(1000L).contains(1L));
+
+ // 3. delete files
+ dis.close();
+ file.delete();
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index c723fa3..1caf243 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -17,8 +17,6 @@
package org.apache.doris.transaction;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.FakeCatalog;
@@ -32,6 +30,9 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.meta.MetaContext;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -264,7 +265,7 @@ public class DatabaseTransactionMgrTest {
DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
long txnId = LabelToTxnId.get(CatalogTestUtil.testTxnLabel1);
TransactionState transactionState = masterDbTransMgr.getTransactionState(txnId);
- masterDbTransMgr.deleteTransaction(transactionState);
+ masterDbTransMgr.replayDeleteTransaction(transactionState);
assertEquals(2, masterDbTransMgr.getRunningTxnNums());
assertEquals(1, masterDbTransMgr.getRunningRoutineLoadTxnNums());
assertEquals(0, masterDbTransMgr.getFinishedTxnNums());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org