You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/11 07:41:27 UTC

[incubator-doris] branch master updated: [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cf085b8  [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)
cf085b8 is described below

commit cf085b8b1a32d98144993ab4b6ccb2646af12501
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Nov 11 15:41:13 2021 +0800

    [RoutineLoad] And "runningTxns" fields in SHOW ROUTINE LOAD result (#6986)
    
    Add a new field `runningTxns` in the result of `SHOW ROUTINE LOAD`. eg:
    
    ```
                      Id: 11001
                    Name: test4
              CreateTime: 2021-11-02 00:04:54
               PauseTime: NULL
                 EndTime: NULL
                  DbName: default_cluster:db1
               TableName: tbl1
                   State: RUNNING
          DataSourceType: KAFKA
          CurrentTaskNum: 1
           JobProperties: {xxx}
        CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"test4"}
               Statistic: {"receivedBytes":6,"runningTxns":[1001, 1002],"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":13,"errorRowsAfterResumed":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":20965}
                Progress: {"0":"10"}
    ReasonOfStateChanged:
            ErrorLogUrls:
                OtherMsg:
    ```
    
    So that user can view the status of corresponding transactions of this job by executing `show transaction where id=xx`;
---
 .../java/org/apache/doris/load/routineload/RoutineLoadJob.java     | 2 ++
 .../org/apache/doris/load/routineload/RoutineLoadStatistic.java    | 7 +++++++
 .../org/apache/doris/load/routineload/RoutineLoadTaskInfo.java     | 1 +
 3 files changed, 10 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index b566029..a45b581 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -912,6 +912,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         
         writeLock();
         try {
+            this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
             if (state != JobState.RUNNING) {
                 // job is not running, nothing need to be done
                 return;
@@ -963,6 +964,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             throws UserException {
         long taskBeId = -1L;
         try {
+            this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
             if (txnOperated) {
                 // step0: find task in job
                 Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
index c0b3b06..7a5ad3a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -22,12 +22,14 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 public class RoutineLoadStatistic implements Writable {
     /*
@@ -62,6 +64,10 @@ public class RoutineLoadStatistic implements Writable {
     @SerializedName(value = "abortedTaskNum")
     public long abortedTaskNum = 0;
 
+    // Save all transactions current running. Including PREPARE, COMMITTED.
+    // No need to persist, only for tracing txn of routine load job.
+    public Set<Long> runningTxnIds = Sets.newHashSet();
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
@@ -87,6 +93,7 @@ public class RoutineLoadStatistic implements Writable {
                 / this.totalTaskExcutionTimeMs * 1000));
         summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
         summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
+        summary.put("runningTxns", runningTxnIds);
         return summary;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index b535b94..50acd8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,6 +189,7 @@ public abstract class RoutineLoadTaskInfo {
                     DebugUtil.printId(id), jobId, e);
             throw e;
         }
+        routineLoadJob.jobStatistic.runningTxnIds.add(txnId);
         return true;
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org