You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/17 02:44:10 UTC

[incubator-doris] branch master updated: [Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e39e157  [Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)
e39e157 is described below

commit e39e1571ecdbe907fc755c01c268784c64c2377b
Author: Henry2SS <45...@users.noreply.github.com>
AuthorDate: Sat Jul 17 10:43:59 2021 +0800

    [Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)
    
    1. [enhancement] add an indicator called errorRowsAfterResumed to distinguish between totalErrorRows(called errorRows) and errorRowsAfterResumed. (#6092)
    2. [Refactor] separate some indicators from RoutineLoadJob class to avoid changing FeMetaVersion while modifying indicators of RoutineLoadJob.(#6092)
---
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  12 +-
 .../doris/load/routineload/RoutineLoadJob.java     | 122 +++++++++------------
 .../doris/load/routineload/RoutineLoadManager.java |   1 +
 .../load/routineload/RoutineLoadStatistic.java     |  92 ++++++++++++++++
 .../doris/load/routineload/RoutineLoadJobTest.java |  12 +-
 .../transaction/GlobalTransactionMgrTest.java      |  11 +-
 7 files changed, 163 insertions(+), 91 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 02a7b30..17b3e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -212,6 +212,8 @@ public final class FeMetaVersion {
     public static final int VERSION_99 = 99;
     // for max query instance
     public static final int VERSION_100 = 100;
+    // add errorRowsAfterResumed to distinguish totalErrorRows and currentErrorRows even if the job is paused.
+    public static final int VERSION_101 = 101;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_100;
+    public static final int VERSION_CURRENT = VERSION_101;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index a4e0d8b..0496173 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -342,17 +342,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     @Override
     protected String getStatistic() {
-        Map<String, Object> summary = Maps.newHashMap();
-        summary.put("totalRows", Long.valueOf(totalRows));
-        summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
-        summary.put("errorRows", Long.valueOf(errorRows));
-        summary.put("unselectedRows", Long.valueOf(unselectedRows));
-        summary.put("receivedBytes", Long.valueOf(receivedBytes));
-        summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
-        summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
-        summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
-        summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
-        summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
+        Map<String, Object> summary = this.jobStatistic.summary();
         Gson gson = new GsonBuilder().disableHtmlEscaping().create();
         return gson.toJson(summary);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8213514..a9b7083 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -215,24 +215,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     protected long pauseTimestamp = -1;
     protected long endTimestamp = -1;
 
-    /*
-     * The following variables are for statistics
-     * currentErrorRows/currentTotalRows: the row statistics of current sampling period
-     * errorRows/totalRows/receivedBytes: cumulative measurement
-     * totalTaskExcutorTimeMs: cumulative execution time of tasks
-     */
-    /*
-     * Rows will be updated after txn state changed when txn state has been successfully changed.
-     */
-    protected long currentErrorRows = 0;
-    protected long currentTotalRows = 0;
-    protected long errorRows = 0;
-    protected long totalRows = 0;
-    protected long unselectedRows = 0;
-    protected long receivedBytes = 0;
-    protected long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
-    protected long committedTaskNum = 0;
-    protected long abortedTaskNum = 0;
+    protected RoutineLoadStatistic jobStatistic = new RoutineLoadStatistic();
 
     // The tasks belong to this job
     protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
@@ -710,11 +693,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
     private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
             long taskExecutionTime, boolean isReplay) throws UserException {
-        this.totalRows += numOfTotalRows;
-        this.errorRows += numOfErrorRows;
-        this.unselectedRows += unselectedRows;
-        this.receivedBytes += receivedBytes;
-        this.totalTaskExcutionTimeMs += taskExecutionTime;
+        this.jobStatistic.totalRows += numOfTotalRows;
+        this.jobStatistic.errorRows += numOfErrorRows;
+        this.jobStatistic.unselectedRows += unselectedRows;
+        this.jobStatistic.receivedBytes += receivedBytes;
+        this.jobStatistic.totalTaskExcutionTimeMs += taskExecutionTime;
 
         if (MetricRepo.isInit && !isReplay) {
             MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
@@ -723,16 +706,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         }
 
         // check error rate
-        currentErrorRows += numOfErrorRows;
-        currentTotalRows += numOfTotalRows;
-        if (currentTotalRows > maxBatchRows * 10) {
-            if (currentErrorRows > maxErrorNum) {
+        this.jobStatistic.currentErrorRows += numOfErrorRows;
+        this.jobStatistic.currentTotalRows += numOfTotalRows;
+        this.jobStatistic.errorRowsAfterResumed = this.jobStatistic.currentErrorRows;
+        if (this.jobStatistic.currentTotalRows > maxBatchRows * 10) {
+            if (this.jobStatistic.currentErrorRows > maxErrorNum) {
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                                 .add("current_total_rows", currentTotalRows)
-                                 .add("current_error_rows", currentErrorRows)
-                                 .add("max_error_num", maxErrorNum)
-                                 .add("msg", "current error rows is more than max error num, begin to pause job")
-                                 .build());
+                        .add("current_total_rows", this.jobStatistic.currentTotalRows)
+                        .add("current_error_rows", this.jobStatistic.currentErrorRows)
+                        .add("max_error_num", maxErrorNum)
+                        .add("msg", "current error rows is more than max error num, begin to pause job")
+                        .build());
                 // if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
                 if (!isReplay) {
                     // remove all of task in jobs and change job state to paused
@@ -744,23 +728,23 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                                  .add("current_total_rows", currentTotalRows)
-                                  .add("current_error_rows", currentErrorRows)
-                                  .add("max_error_num", maxErrorNum)
-                                  .add("msg", "reset current total rows and current error rows "
-                                          + "when current total rows is more than base")
-                                  .build());
+                        .add("current_total_rows", this.jobStatistic.currentTotalRows)
+                        .add("current_error_rows", this.jobStatistic.currentErrorRows)
+                        .add("max_error_num", maxErrorNum)
+                        .add("msg", "reset current total rows and current error rows "
+                                + "when current total rows is more than base")
+                        .build());
             }
             // reset currentTotalNum and currentErrorNum
-            currentErrorRows = 0;
-            currentTotalRows = 0;
-        } else if (currentErrorRows > maxErrorNum) {
+            this.jobStatistic.currentErrorRows = 0;
+            this.jobStatistic.currentTotalRows = 0;
+        } else if (this.jobStatistic.currentErrorRows > maxErrorNum) {
             LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                             .add("current_total_rows", currentTotalRows)
-                             .add("current_error_rows", currentErrorRows)
-                             .add("max_error_num", maxErrorNum)
-                             .add("msg", "current error rows is more than max error rows, begin to pause job")
-                             .build());
+                    .add("current_total_rows", this.jobStatistic.currentTotalRows)
+                    .add("current_error_rows", this.jobStatistic.currentErrorRows)
+                    .add("max_error_num", maxErrorNum)
+                    .add("msg", "current error rows is more than max error rows, begin to pause job")
+                    .build());
             if (!isReplay) {
                 // remove all of task in jobs and change job state to paused
                 updateState(JobState.PAUSED,
@@ -768,8 +752,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                         isReplay);
             }
             // reset currentTotalNum and currentErrorNum
-            currentErrorRows = 0;
-            currentTotalRows = 0;
+            this.jobStatistic.currentErrorRows = 0;
+            this.jobStatistic.currentTotalRows = 0;
         }
     }
 
@@ -899,7 +883,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                 RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
                 taskBeId = routineLoadTaskInfo.getBeId();
                 executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
-                ++committedTaskNum;
+                ++this.jobStatistic.committedTaskNum;
                 LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
             }
         } catch (Throwable e) {
@@ -918,7 +902,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     public void replayOnCommitted(TransactionState txnState) {
         Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
         replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
-        this.committedTaskNum++;
+        this.jobStatistic.committedTaskNum++;
         LOG.debug("replay on committed: {}", txnState);
     }
 
@@ -1009,7 +993,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                                       .add("msg", "txn abort with reason " + txnStatusChangeReasonString)
                                       .build());
                 }
-                ++abortedTaskNum;
+                ++this.jobStatistic.abortedTaskNum;
                 TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
                 if (txnStatusChangeReasonString != null) {
                     txnStatusChangeReason =
@@ -1053,7 +1037,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         if (txnState.getTxnCommitAttachment() != null) {
             replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
         }
-        this.abortedTaskNum++;
+        this.jobStatistic.abortedTaskNum++;
         LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null);
     }
 
@@ -1513,15 +1497,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         out.writeLong(pauseTimestamp);
         out.writeLong(endTimestamp);
 
-        out.writeLong(currentErrorRows);
-        out.writeLong(currentTotalRows);
-        out.writeLong(errorRows);
-        out.writeLong(totalRows);
-        out.writeLong(unselectedRows);
-        out.writeLong(receivedBytes);
-        out.writeLong(totalTaskExcutionTimeMs);
-        out.writeLong(committedTaskNum);
-        out.writeLong(abortedTaskNum);
+        this.jobStatistic.write(out);
+
         origStmt.write(out);
         out.writeInt(jobProperties.size());
         for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
@@ -1568,15 +1545,20 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         pauseTimestamp = in.readLong();
         endTimestamp = in.readLong();
 
-        currentErrorRows = in.readLong();
-        currentTotalRows = in.readLong();
-        errorRows = in.readLong();
-        totalRows = in.readLong();
-        unselectedRows = in.readLong();
-        receivedBytes = in.readLong();
-        totalTaskExcutionTimeMs = in.readLong();
-        committedTaskNum = in.readLong();
-        abortedTaskNum = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_101) {
+            this.jobStatistic.currentErrorRows = in.readLong();
+            this.jobStatistic.currentTotalRows = in.readLong();
+            this.jobStatistic.errorRows = in.readLong();
+            this.jobStatistic.totalRows = in.readLong();
+            this.jobStatistic.errorRowsAfterResumed = 0;
+            this.jobStatistic.unselectedRows = in.readLong();
+            this.jobStatistic.receivedBytes = in.readLong();
+            this.jobStatistic.totalTaskExcutionTimeMs = in.readLong();
+            this.jobStatistic.committedTaskNum = in.readLong();
+            this.jobStatistic.abortedTaskNum = in.readLong();
+        } else {
+            this.jobStatistic = RoutineLoadStatistic.read(in);
+        }
         if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
             String stmt = Text.readString(in);
             origStmt = new OriginStatement(stmt, 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 1338684..05ebeb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -250,6 +250,7 @@ public class RoutineLoadManager implements Writable {
         RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
                 resumeRoutineLoadStmt.getName());
 
+        routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
         routineLoadJob.autoResumeCount = 0;
         routineLoadJob.firstResumeTimestamp = 0;
         routineLoadJob.autoResumeLock = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
new file mode 100644
index 0000000..c0b3b06
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class RoutineLoadStatistic implements Writable {
+    /*
+     * The following variables are for statistics
+     * currentErrorRows/currentTotalRows: the row statistics of current sampling period
+     * errorRowsAfterResumed: currentErrorRows that is showed to users in "show routine load;".
+     * errorRows/totalRows/receivedBytes: cumulative measurement
+     * totalTaskExcutorTimeMs: cumulative execution time of tasks
+     */
+    /*
+     * Rows will be updated after txn state changed when txn state has been successfully changed.
+     */
+
+    @SerializedName(value = "currentErrorRows")
+    public long currentErrorRows = 0;
+    @SerializedName(value = "currentTotalRows")
+    public long currentTotalRows = 0;
+    @SerializedName(value = "errorRows")
+    public long errorRows = 0;
+    @SerializedName(value = "totalRows")
+    public long totalRows = 0;
+    @SerializedName(value = "errorRowsAfterResumed")
+    public long errorRowsAfterResumed = 0;
+    @SerializedName(value = "unselectedRows")
+    public long unselectedRows = 0;
+    @SerializedName(value = "receivedBytes")
+    public long receivedBytes = 0;
+    @SerializedName(value = "totalTaskExcutionTimeMs")
+    public long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
+    @SerializedName(value = "committedTaskNum")
+    public long committedTaskNum = 0;
+    @SerializedName(value = "abortedTaskNum")
+    public long abortedTaskNum = 0;
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static RoutineLoadStatistic read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, RoutineLoadStatistic.class);
+    }
+
+    public Map<String, Object> summary() {
+        Map<String, Object> summary = Maps.newHashMap();
+        summary.put("totalRows", Long.valueOf(totalRows));
+        summary.put("loadedRows", Long.valueOf(totalRows - this.errorRows - this.unselectedRows));
+        summary.put("errorRows", Long.valueOf(this.errorRows));
+        summary.put("errorRowsAfterResumed", Long.valueOf(this.errorRowsAfterResumed));
+        summary.put("unselectedRows", Long.valueOf(this.unselectedRows));
+        summary.put("receivedBytes", Long.valueOf(this.receivedBytes));
+        summary.put("taskExecuteTimeMs", Long.valueOf(this.totalTaskExcutionTimeMs));
+        summary.put("receivedBytesRate", Long.valueOf(this.receivedBytes / this.totalTaskExcutionTimeMs * 1000));
+        summary.put("loadRowsRate", Long.valueOf((this.totalRows - this.errorRows - this.unselectedRows)
+                / this.totalTaskExcutionTimeMs * 1000));
+        summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
+        summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
+        return summary;
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8f049ba..23aeb17 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -139,9 +139,10 @@ public class RoutineLoadJobTest {
         Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
         Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
         routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
+        RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
 
         Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
-        Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
+        Assert.assertEquals(new Long(1), Deencapsulation.getField(jobStatistic, "abortedTaskNum"));
     }
 
     @Test
@@ -279,13 +280,14 @@ public class RoutineLoadJobTest {
         Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
         Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
-        Deencapsulation.setField(routineLoadJob, "currentErrorRows", 1);
-        Deencapsulation.setField(routineLoadJob, "currentTotalRows", 99);
+        RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
+        Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
+        Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
         Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);
 
         Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
-        Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
-        Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
+        Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
+        Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
 
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index d891f48..6efa5b7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -41,6 +41,7 @@ import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.thrift.TKafkaRLTaskProgress;
@@ -361,9 +362,10 @@ public class GlobalTransactionMgrTest {
         Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
         Table testTable1 = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1);
         masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+        RoutineLoadStatistic jobStatistic =  Deencapsulation.getField(routineLoadJob,"jobStatistic");
 
-        Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
-        Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
+        Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
+        Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
         Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
         // todo(ml): change to assert queue
         // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size());
@@ -430,8 +432,9 @@ public class GlobalTransactionMgrTest {
         masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
 
         // current total rows and error rows will be reset after job pause, so here they should be 0.
-        Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
-        Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
+        RoutineLoadStatistic jobStatistic =  Deencapsulation.getField(routineLoadJob,"jobStatistic");
+        Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
+        Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
         Assert.assertEquals(Long.valueOf(111L),
                 ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
         // todo(ml): change to assert queue

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org