You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/12 03:47:07 UTC
[incubator-doris] 02/09: [RoutineLoad] And "runningTxns" fields in
SHOW ROUTINE LOAD result (#6986)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 42c93ff56bd4c097a178ddcd3757c8f224711704
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Nov 11 15:41:13 2021 +0800
[RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)
Add a new field `runningTxns` in the result of `SHOW ROUTINE LOAD`. eg:
```
Id: 11001
Name: test4
CreateTime: 2021-11-02 00:04:54
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:db1
TableName: tbl1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {xxx}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"test4"}
Statistic: {"receivedBytes":6,"runningTxns":[1001, 1002],"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":13,"errorRowsAfterResumed":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":20965}
Progress: {"0":"10"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
```
So that user can view the status of corresponding transactions of this job by executing `show transaction where id=xx`;
---
.../java/org/apache/doris/load/routineload/RoutineLoadJob.java | 2 ++
.../org/apache/doris/load/routineload/RoutineLoadStatistic.java | 7 +++++++
.../org/apache/doris/load/routineload/RoutineLoadTaskInfo.java | 1 +
3 files changed, 10 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index b566029..a45b581 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -912,6 +912,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
writeLock();
try {
+ this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
if (state != JobState.RUNNING) {
// job is not running, nothing need to be done
return;
@@ -963,6 +964,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
throws UserException {
long taskBeId = -1L;
try {
+ this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
if (txnOperated) {
// step0: find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
index c0b3b06..7a5ad3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -22,12 +22,14 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
public class RoutineLoadStatistic implements Writable {
/*
@@ -62,6 +64,10 @@ public class RoutineLoadStatistic implements Writable {
@SerializedName(value = "abortedTaskNum")
public long abortedTaskNum = 0;
+ // Save all transactions current running. Including PREPARE, COMMITTED.
+ // No need to persist, only for tracing txn of routine load job.
+ public Set<Long> runningTxnIds = Sets.newHashSet();
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
@@ -87,6 +93,7 @@ public class RoutineLoadStatistic implements Writable {
/ this.totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
+ summary.put("runningTxns", runningTxnIds);
return summary;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index b535b94..50acd8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,6 +189,7 @@ public abstract class RoutineLoadTaskInfo {
DebugUtil.printId(id), jobId, e);
throw e;
}
+ routineLoadJob.jobStatistic.runningTxnIds.add(txnId);
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org