You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/03/27 12:16:37 UTC
[incubator-doris] branch master updated: Support determine
isPreviousLoadFinished for some alter jobs in table level (#3196)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 32c4fc6 Support determine isPreviousLoadFinished for some alter jobs in table level (#3196)
32c4fc6 is described below
commit 32c4fc691cd79ccf47023ed02160485b1906579b
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Fri Mar 27 07:16:23 2020 -0500
Support determine isPreviousLoadFinished for some alter jobs in table level (#3196)
This PR is to reduce the time cost for waiting transactions to be completed in same db by filter the running transactions in table level.
NOTICE: Update FE meta version to 79
---
.../main/java/org/apache/doris/alter/AlterJob.java | 3 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 2 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +-
.../java/org/apache/doris/analysis/InsertStmt.java | 3 +-
.../org/apache/doris/clone/TabletScheduler.java | 2 +-
.../org/apache/doris/common/FeMetaVersion.java | 5 ++-
fe/src/main/java/org/apache/doris/load/Load.java | 3 +-
.../main/java/org/apache/doris/load/LoadJob.java | 6 +++
.../apache/doris/load/loadv2/BrokerLoadJob.java | 3 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 1 -
.../org/apache/doris/load/loadv2/LoadManager.java | 4 +-
.../org/apache/doris/load/loadv2/MiniLoadJob.java | 8 +++-
.../load/routineload/RoutineLoadTaskInfo.java | 3 +-
.../apache/doris/service/FrontendServiceImpl.java | 3 +-
.../org/apache/doris/task/LoadPendingTask.java | 4 +-
.../doris/transaction/GlobalTransactionMgr.java | 51 ++++++++++++++--------
.../apache/doris/transaction/TransactionState.java | 27 +++++++++++-
.../org/apache/doris/load/loadv2/LoadJobTest.java | 3 +-
.../transaction/GlobalTransactionMgrTest.java | 24 +++++-----
19 files changed, 108 insertions(+), 49 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
index d0aaaa4..ab927cc 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.alter;
+import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
@@ -291,7 +292,7 @@ public abstract class AlterJob implements Writable {
return true;
} else {
isPreviousLoadFinished = Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
- transactionId, dbId);
+ transactionId, dbId, Lists.newArrayList(tableId));
return isPreviousLoadFinished;
}
}
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 031483c..877d955 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -498,7 +498,7 @@ public class RollupJobV2 extends AlterJobV2 {
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() {
- return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
+ return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static RollupJobV2 read(DataInput in) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 96025fd..e18e6f9 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -617,7 +617,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() {
- return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId);
+ return Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watershedTxnId, dbId, Lists.newArrayList(tableId));
}
public static SchemaChangeJobV2 read(DataInput in) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 2351cb1..50b94e2 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -292,7 +292,8 @@ public class InsertStmt extends DdlStmt {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
- label, "FE: " + FrontendOptions.getLocalHostAddress(), sourceType, timeoutSecond);
+ Lists.newArrayList(targetTable.getId()), label, "FE: " + FrontendOptions.getLocalHostAddress(),
+ sourceType, timeoutSecond);
}
isTransactionBegin = true;
}
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 0e3e194..036fe6e 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -880,7 +880,7 @@ public class TabletScheduler extends MasterDaemon {
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
if (!Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
- tabletCtx.getDbId())) {
+ tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
throw new SchedException(Status.SCHEDULE_FAILED, "wait txn before " + watermarkTxnId + " to be finished");
}
}
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ed32274..19b2f63 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -167,6 +167,9 @@ public final class FeMetaVersion {
public static final int VERSION_77 = 77;
// plugin support
public static final int VERSION_78 = 78;
+ // for transaction state in table level
+ public static final int VERSION_79 = 79;
+
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_78;
+ public static final int VERSION_CURRENT = VERSION_79;
}
diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java
index 2990a09..8b5e8b8 100644
--- a/fe/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/src/main/java/org/apache/doris/load/Load.java
@@ -3326,7 +3326,8 @@ public class Load {
}
loadDeleteJob.setIdToTabletLoadInfo(idToTabletLoadInfo);
loadDeleteJob.setState(JobState.LOADING);
- long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), jobLabel,
+ long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
+ Lists.newArrayList(table.getId()), jobLabel,
"FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
Config.stream_load_default_timeout_second);
loadDeleteJob.setTransactionId(transactionId);
diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java
index 450dfdb..ef0e26c 100644
--- a/fe/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java
@@ -78,6 +78,7 @@ public class LoadJob implements Writable {
private long id;
private long dbId;
+ private long tableId;
private String label;
// when this job is a real time load job, the job is attach with a transaction
private long transactionId = -1;
@@ -144,6 +145,7 @@ public class LoadJob implements Writable {
DeleteInfo deleteInfo) {
this.id = id;
this.dbId = dbId;
+ this.tableId = tableId;
this.label = label;
this.transactionId = -1;
this.timestamp = -1;
@@ -243,6 +245,10 @@ public class LoadJob implements Writable {
return dbId;
}
+ public long getTableId() {
+ return tableId;
+ }
+
public void setDbId(long dbId) {
this.dbId = dbId;
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 62c10b5..eb7938e 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -208,7 +208,8 @@ public class BrokerLoadJob extends LoadJob {
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
- .beginTransaction(dbId, label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
+ .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
+ "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
timeoutSecond);
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 8f68309..08efaeb 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -222,7 +222,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public long getDbId() {
return dbId;
}
-
public String getLabel() {
return label;
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4490a1b..5104094 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -137,11 +138,12 @@ public class LoadManager implements Writable{
cluster = request.getCluster();
}
Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
+ Table table = database.getTable(request.tbl);
checkTable(database, request.getTbl());
LoadJob loadJob = null;
writeLock();
try {
- loadJob = new MiniLoadJob(database.getId(), request);
+ loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
// call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
// NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
// for other kind of load job, execute the job after adding job.
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 94d4707..68def55 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import com.google.common.collect.Lists;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@@ -47,14 +48,17 @@ public class MiniLoadJob extends LoadJob {
private String tableName;
+ private long tableId;
+
// only for log replay
public MiniLoadJob() {
super();
this.jobType = EtlJobType.MINI;
}
- public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
+ public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
super(dbId, request.getLabel());
+ this.tableId = tableId;
this.jobType = EtlJobType.MINI;
this.tableName = request.getTbl();
if (request.isSetTimeout_second()) {
@@ -93,7 +97,7 @@ public class MiniLoadJob extends LoadJob {
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
- .beginTransaction(dbId, label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
+ .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
timeoutSecond);
}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 3af9fb4..3a7af6d 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -164,7 +164,8 @@ public abstract class RoutineLoadTaskInfo {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
try {
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
- routineLoadJob.getDbId(), DebugUtil.printId(id), null, "FE: " + FrontendOptions.getLocalHostAddress(),
+ routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
+ "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9a6075f..78b2138 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -670,6 +670,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Catalog catalog = Catalog.getInstance();
String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
Database db = catalog.getDb(fullDbName);
+ Table table = db.getTable(request.tbl);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -681,7 +682,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
- db.getId(), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
+ db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequest_id(), "BE: " + clientIp,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
}
diff --git a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
index ef13da8..b9fcc87 100644
--- a/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
+++ b/fe/src/main/java/org/apache/doris/task/LoadPendingTask.java
@@ -17,6 +17,7 @@
package org.apache.doris.task;
+import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.util.DebugUtil;
@@ -67,6 +68,7 @@ public abstract class LoadPendingTask extends MasterTask {
// get db
long dbId = job.getDbId();
+ long tableId = job.getTableId();
db = Catalog.getInstance().getDb(dbId);
if (db == null) {
load.cancelLoadJob(job, CancelType.ETL_SUBMIT_FAIL, "db does not exist. id: " + dbId);
@@ -78,7 +80,7 @@ public abstract class LoadPendingTask extends MasterTask {
// create etl request and make some guarantee for schema change and rollup
if (job.getTransactionId() < 0) {
long transactionId = Catalog.getCurrentGlobalTransactionMgr()
- .beginTransaction(dbId, DebugUtil.printId(UUID.randomUUID()),
+ .beginTransaction(dbId, Lists.newArrayList(tableId), DebugUtil.printId(UUID.randomUUID()),
"FE: " + FrontendOptions.getLocalHostAddress(), LoadJobSourceType.FRONTEND,
job.getTimeoutSecond());
job.setTransactionId(transactionId);
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 2279ffc..e201f17 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -17,6 +17,7 @@
package org.apache.doris.transaction;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@@ -121,10 +122,10 @@ public class GlobalTransactionMgr implements Writable {
return callbackFactory;
}
- public long beginTransaction(long dbId, String label, String coordinator, LoadJobSourceType sourceType,
+ public long beginTransaction(long dbId, List<Long> tableIdList, String label, String coordinator, LoadJobSourceType sourceType,
long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
- return beginTransaction(dbId, label, null, coordinator, sourceType, -1, timeoutSecond);
+ return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond);
}
/**
@@ -140,7 +141,7 @@ public class GlobalTransactionMgr implements Writable {
* @throws DuplicatedRequestException
* @throws IllegalTransactionParameterException
*/
- public long beginTransaction(long dbId, String label, TUniqueId requestId,
+ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TUniqueId requestId,
String coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException {
@@ -196,7 +197,7 @@ public class GlobalTransactionMgr implements Writable {
long tid = idGenerator.getNextTransactionId();
LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator);
- TransactionState transactionState = new TransactionState(dbId, tid, label, requestId, sourceType,
+ TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType,
coordinator, listenerId, timeoutSecond * 1000);
transactionState.setPrepareTime(System.currentTimeMillis());
unprotectUpsertTransactionState(transactionState);
@@ -802,25 +803,37 @@ public class GlobalTransactionMgr implements Writable {
}
// check if there exists a load job before the endTransactionId have all finished
- // load job maybe started but could not know the affected table id, so that we not check by table
- public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId) {
- readLock();
- try {
- for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
- if (entry.getValue().getDbId() != dbId || !entry.getValue().isRunning()) {
- continue;
- }
- if (entry.getKey() <= endTransactionId) {
- LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
- entry.getKey(), dbId, endTransactionId);
- return false;
- }
+ public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList) {
+ for (Map.Entry<Long, TransactionState> entry : idToTransactionState.entrySet()) {
+ if (entry.getValue().getDbId() != dbId || !isIntersectionNotEmpty(entry.getValue().getTableIdList(),
+ tableIdList) || !entry.getValue().isRunning()) {
+ continue;
+ }
+ if (entry.getKey() <= endTransactionId) {
+ LOG.debug("find a running txn with txn_id={} on db: {}, less than watermark txn_id {}",
+ entry.getKey(), dbId, endTransactionId);
+ return false;
}
- } finally {
- readUnlock();
}
return true;
}
+
+ // check if there exists a intersection between the source tableId list and target tableId list
+ // if one of them is null or empty, that means that we don't know related tables in tableList,
+ // we think the two lists may have intersection for right ordered txns
+ public boolean isIntersectionNotEmpty(List<Long> sourceTableIdList, List<Long> targetTableIdList) {
+ if (CollectionUtils.isEmpty(sourceTableIdList) || CollectionUtils.isEmpty(targetTableIdList)) {
+ return true;
+ }
+ for (int i = 0; i < sourceTableIdList.size(); i++) {
+ for (int j = 0; j < targetTableIdList.size(); j++) {
+ if (sourceTableIdList.get(i).equals(targetTableIdList.get(j))) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
/*
* The txn cleaner will run at a fixed interval and try to delete expired and timeout txns:
diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index 0a967ac..e261f7f 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,6 +17,8 @@
package org.apache.doris.transaction;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
@@ -40,6 +42,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -126,6 +129,7 @@ public class TransactionState implements Writable {
}
private long dbId;
+ private List<Long> tableIdList;
private long transactionId;
private String label;
// requsetId is used to judge whether a begin request is a internal retry request.
@@ -168,6 +172,7 @@ public class TransactionState implements Writable {
public TransactionState() {
this.dbId = -1;
+ this.tableIdList = Lists.newArrayList();
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
@@ -184,9 +189,10 @@ public class TransactionState implements Writable {
this.latch = new CountDownLatch(1);
}
- public TransactionState(long dbId, long transactionId, String label, TUniqueId requsetId,
+ public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requsetId,
LoadJobSourceType sourceType, String coordinator, long callbackId, long timeoutMs) {
this.dbId = dbId;
+ this.tableIdList = (tableIdList == null ? Lists.newArrayList() : tableIdList);
this.transactionId = transactionId;
this.label = label;
this.requsetId = requsetId;
@@ -408,7 +414,11 @@ public class TransactionState implements Writable {
public long getDbId() {
return dbId;
}
-
+
+ public List<Long> getTableIdList() {
+ return tableIdList;
+ }
+
public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
return idToTableCommitInfos;
}
@@ -467,6 +477,7 @@ public class TransactionState implements Writable {
sb.append("transaction id: ").append(transactionId);
sb.append(", label: ").append(label);
sb.append(", db id: ").append(dbId);
+ sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
sb.append(", callback id: ").append(callbackId);
sb.append(", coordinator: ").append(coordinator);
sb.append(", transaction status: ").append(transactionStatus);
@@ -533,6 +544,10 @@ public class TransactionState implements Writable {
}
out.writeLong(callbackId);
out.writeLong(timeoutMs);
+ out.writeInt(tableIdList.size());
+ for (int i = 0; i < tableIdList.size(); i++) {
+ out.writeLong(tableIdList.get(i));
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -564,5 +579,13 @@ public class TransactionState implements Writable {
callbackId = in.readLong();
timeoutMs = in.readLong();
}
+
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_79) {
+ tableIdList = Lists.newArrayList();
+ int tableListSize = in.readInt();
+ for (int i = 0; i < tableListSize; i++) {
+ tableIdList.add(in.readLong());
+ }
+ }
}
}
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index ec4533f..2feecfc 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
+import com.google.common.collect.Lists;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
@@ -106,7 +107,7 @@ public class LoadJobTest {
LoadJob loadJob = new BrokerLoadJob();
new Expectations() {
{
- globalTransactionMgr.beginTransaction(anyLong, anyString, (TUniqueId) any, anyString,
+ globalTransactionMgr.beginTransaction(anyLong, Lists.newArrayList(), anyString, (TUniqueId) any, anyString,
(TransactionState.LoadJobSourceType) any, anyLong, anyLong);
minTimes = 0;
result = 1;
diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index e3a5f3f..860553c 100644
--- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -106,7 +106,7 @@ public class GlobalTransactionMgrTest {
public void testBeginTransaction() throws LabelAlreadyUsedException, AnalysisException,
BeginTransactionException, DuplicatedRequestException {
FakeCatalog.setCatalog(masterCatalog);
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -124,7 +124,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
long transactionId = 0;
try {
- transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -141,7 +141,7 @@ public class GlobalTransactionMgrTest {
assertEquals(transactionSource, transactionState.getCoordinator());
try {
- transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -154,7 +154,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testCommitTransaction1() throws UserException {
FakeCatalog.setCatalog(masterCatalog);
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -195,7 +195,7 @@ public class GlobalTransactionMgrTest {
public void testCommitTransactionWithOneFailed() throws UserException {
TransactionState transactionState = null;
FakeCatalog.setCatalog(masterCatalog);
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -217,7 +217,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
// commit another transaction with 1,3 success
- long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable2,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -320,7 +320,7 @@ public class GlobalTransactionMgrTest {
partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
- TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
+ TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@@ -387,7 +387,7 @@ public class GlobalTransactionMgrTest {
partitionIdToOffset);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
- TransactionState transactionState = new TransactionState(1L, 1L, "label", null,
+ TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
LoadJobSourceType.ROUTINE_LOAD_TASK, "be1", routineLoadJob.getId(),
Config.stream_load_default_timeout_second);
transactionState.setTransactionStatus(TransactionStatus.PREPARE);
@@ -431,7 +431,7 @@ public class GlobalTransactionMgrTest {
}
public void testFinishTransaction() throws UserException {
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -477,7 +477,7 @@ public class GlobalTransactionMgrTest {
.getPartition(CatalogTestUtil.testPartition1);
Tablet tablet = testPartition.getIndex(CatalogTestUtil.testIndexId1).getTablet(CatalogTestUtil.testTabletId1);
FakeCatalog.setCatalog(masterCatalog);
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -531,7 +531,7 @@ public class GlobalTransactionMgrTest {
FakeCatalog.setCatalog(masterCatalog);
// commit another transaction with 1,3 success
- long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable2,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
@@ -603,7 +603,7 @@ public class GlobalTransactionMgrTest {
public void testDeleteTransaction() throws LabelAlreadyUsedException,
AnalysisException, BeginTransactionException, DuplicatedRequestException {
- long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
+ long transactionId = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLable1,
transactionSource,
LoadJobSourceType.FRONTEND, Config.stream_load_default_timeout_second);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org