You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/17 02:44:10 UTC
[incubator-doris] branch master updated: [Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e39e157 [Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)
e39e157 is described below
commit e39e1571ecdbe907fc755c01c268784c64c2377b
Author: Henry2SS <45...@users.noreply.github.com>
AuthorDate: Sat Jul 17 10:43:59 2021 +0800
[Feature] Add an indicator called errorRowsAfterResumed to distinguish between … (#6092)
1. [enhancement] add an indicator called errorRowsAfterResumed to distinguish between totalErrorRows(called errorRows) and errorRowsAfterResumed. (#6092)
2. [Refactor] separate some indicators from RoutineLoadJob class to avoid changing FeMetaVersion while modifying indicators of RoutineLoadJob.(#6092)
---
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../load/routineload/KafkaRoutineLoadJob.java | 12 +-
.../doris/load/routineload/RoutineLoadJob.java | 122 +++++++++------------
.../doris/load/routineload/RoutineLoadManager.java | 1 +
.../load/routineload/RoutineLoadStatistic.java | 92 ++++++++++++++++
.../doris/load/routineload/RoutineLoadJobTest.java | 12 +-
.../transaction/GlobalTransactionMgrTest.java | 11 +-
7 files changed, 163 insertions(+), 91 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 02a7b30..17b3e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -212,6 +212,8 @@ public final class FeMetaVersion {
public static final int VERSION_99 = 99;
// for max query instance
public static final int VERSION_100 = 100;
+ // add errorRowsAfterResumed to distinguish totalErrorRows and currentErrorRows even if the job is paused.
+ public static final int VERSION_101 = 101;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_100;
+ public static final int VERSION_CURRENT = VERSION_101;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index a4e0d8b..0496173 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -342,17 +342,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
protected String getStatistic() {
- Map<String, Object> summary = Maps.newHashMap();
- summary.put("totalRows", Long.valueOf(totalRows));
- summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
- summary.put("errorRows", Long.valueOf(errorRows));
- summary.put("unselectedRows", Long.valueOf(unselectedRows));
- summary.put("receivedBytes", Long.valueOf(receivedBytes));
- summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
- summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
- summary.put("loadRowsRate", Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
- summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
- summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
+ Map<String, Object> summary = this.jobStatistic.summary();
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8213514..a9b7083 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -215,24 +215,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected long pauseTimestamp = -1;
protected long endTimestamp = -1;
- /*
- * The following variables are for statistics
- * currentErrorRows/currentTotalRows: the row statistics of current sampling period
- * errorRows/totalRows/receivedBytes: cumulative measurement
- * totalTaskExcutorTimeMs: cumulative execution time of tasks
- */
- /*
- * Rows will be updated after txn state changed when txn state has been successfully changed.
- */
- protected long currentErrorRows = 0;
- protected long currentTotalRows = 0;
- protected long errorRows = 0;
- protected long totalRows = 0;
- protected long unselectedRows = 0;
- protected long receivedBytes = 0;
- protected long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
- protected long committedTaskNum = 0;
- protected long abortedTaskNum = 0;
+ protected RoutineLoadStatistic jobStatistic = new RoutineLoadStatistic();
// The tasks belong to this job
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
@@ -710,11 +693,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
long taskExecutionTime, boolean isReplay) throws UserException {
- this.totalRows += numOfTotalRows;
- this.errorRows += numOfErrorRows;
- this.unselectedRows += unselectedRows;
- this.receivedBytes += receivedBytes;
- this.totalTaskExcutionTimeMs += taskExecutionTime;
+ this.jobStatistic.totalRows += numOfTotalRows;
+ this.jobStatistic.errorRows += numOfErrorRows;
+ this.jobStatistic.unselectedRows += unselectedRows;
+ this.jobStatistic.receivedBytes += receivedBytes;
+ this.jobStatistic.totalTaskExcutionTimeMs += taskExecutionTime;
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
@@ -723,16 +706,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
// check error rate
- currentErrorRows += numOfErrorRows;
- currentTotalRows += numOfTotalRows;
- if (currentTotalRows > maxBatchRows * 10) {
- if (currentErrorRows > maxErrorNum) {
+ this.jobStatistic.currentErrorRows += numOfErrorRows;
+ this.jobStatistic.currentTotalRows += numOfTotalRows;
+ this.jobStatistic.errorRowsAfterResumed = this.jobStatistic.currentErrorRows;
+ if (this.jobStatistic.currentTotalRows > maxBatchRows * 10) {
+ if (this.jobStatistic.currentErrorRows > maxErrorNum) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("current_total_rows", currentTotalRows)
- .add("current_error_rows", currentErrorRows)
- .add("max_error_num", maxErrorNum)
- .add("msg", "current error rows is more than max error num, begin to pause job")
- .build());
+ .add("current_total_rows", this.jobStatistic.currentTotalRows)
+ .add("current_error_rows", this.jobStatistic.currentErrorRows)
+ .add("max_error_num", maxErrorNum)
+ .add("msg", "current error rows is more than max error num, begin to pause job")
+ .build());
// if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
if (!isReplay) {
// remove all of task in jobs and change job state to paused
@@ -744,23 +728,23 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("current_total_rows", currentTotalRows)
- .add("current_error_rows", currentErrorRows)
- .add("max_error_num", maxErrorNum)
- .add("msg", "reset current total rows and current error rows "
- + "when current total rows is more than base")
- .build());
+ .add("current_total_rows", this.jobStatistic.currentTotalRows)
+ .add("current_error_rows", this.jobStatistic.currentErrorRows)
+ .add("max_error_num", maxErrorNum)
+ .add("msg", "reset current total rows and current error rows "
+ + "when current total rows is more than base")
+ .build());
}
// reset currentTotalNum and currentErrorNum
- currentErrorRows = 0;
- currentTotalRows = 0;
- } else if (currentErrorRows > maxErrorNum) {
+ this.jobStatistic.currentErrorRows = 0;
+ this.jobStatistic.currentTotalRows = 0;
+ } else if (this.jobStatistic.currentErrorRows > maxErrorNum) {
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("current_total_rows", currentTotalRows)
- .add("current_error_rows", currentErrorRows)
- .add("max_error_num", maxErrorNum)
- .add("msg", "current error rows is more than max error rows, begin to pause job")
- .build());
+ .add("current_total_rows", this.jobStatistic.currentTotalRows)
+ .add("current_error_rows", this.jobStatistic.currentErrorRows)
+ .add("max_error_num", maxErrorNum)
+ .add("msg", "current error rows is more than max error rows, begin to pause job")
+ .build());
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED,
@@ -768,8 +752,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
isReplay);
}
// reset currentTotalNum and currentErrorNum
- currentErrorRows = 0;
- currentTotalRows = 0;
+ this.jobStatistic.currentErrorRows = 0;
+ this.jobStatistic.currentTotalRows = 0;
}
}
@@ -899,7 +883,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
taskBeId = routineLoadTaskInfo.getBeId();
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
- ++committedTaskNum;
+ ++this.jobStatistic.committedTaskNum;
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
}
} catch (Throwable e) {
@@ -918,7 +902,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
public void replayOnCommitted(TransactionState txnState) {
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
- this.committedTaskNum++;
+ this.jobStatistic.committedTaskNum++;
LOG.debug("replay on committed: {}", txnState);
}
@@ -1009,7 +993,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.add("msg", "txn abort with reason " + txnStatusChangeReasonString)
.build());
}
- ++abortedTaskNum;
+ ++this.jobStatistic.abortedTaskNum;
TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
if (txnStatusChangeReasonString != null) {
txnStatusChangeReason =
@@ -1053,7 +1037,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (txnState.getTxnCommitAttachment() != null) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
}
- this.abortedTaskNum++;
+ this.jobStatistic.abortedTaskNum++;
LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null);
}
@@ -1513,15 +1497,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
out.writeLong(pauseTimestamp);
out.writeLong(endTimestamp);
- out.writeLong(currentErrorRows);
- out.writeLong(currentTotalRows);
- out.writeLong(errorRows);
- out.writeLong(totalRows);
- out.writeLong(unselectedRows);
- out.writeLong(receivedBytes);
- out.writeLong(totalTaskExcutionTimeMs);
- out.writeLong(committedTaskNum);
- out.writeLong(abortedTaskNum);
+ this.jobStatistic.write(out);
+
origStmt.write(out);
out.writeInt(jobProperties.size());
for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
@@ -1568,15 +1545,20 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
pauseTimestamp = in.readLong();
endTimestamp = in.readLong();
- currentErrorRows = in.readLong();
- currentTotalRows = in.readLong();
- errorRows = in.readLong();
- totalRows = in.readLong();
- unselectedRows = in.readLong();
- receivedBytes = in.readLong();
- totalTaskExcutionTimeMs = in.readLong();
- committedTaskNum = in.readLong();
- abortedTaskNum = in.readLong();
+ if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_101) {
+ this.jobStatistic.currentErrorRows = in.readLong();
+ this.jobStatistic.currentTotalRows = in.readLong();
+ this.jobStatistic.errorRows = in.readLong();
+ this.jobStatistic.totalRows = in.readLong();
+ this.jobStatistic.errorRowsAfterResumed = 0;
+ this.jobStatistic.unselectedRows = in.readLong();
+ this.jobStatistic.receivedBytes = in.readLong();
+ this.jobStatistic.totalTaskExcutionTimeMs = in.readLong();
+ this.jobStatistic.committedTaskNum = in.readLong();
+ this.jobStatistic.abortedTaskNum = in.readLong();
+ } else {
+ this.jobStatistic = RoutineLoadStatistic.read(in);
+ }
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
origStmt = new OriginStatement(stmt, 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 1338684..05ebeb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -250,6 +250,7 @@ public class RoutineLoadManager implements Writable {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
+ routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
new file mode 100644
index 0000000..c0b3b06
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class RoutineLoadStatistic implements Writable {
+ /*
+ * The following variables are for statistics
+ * currentErrorRows/currentTotalRows: the row statistics of current sampling period
+ * errorRowsAfterResumed: currentErrorRows that is showed to users in "show routine load;".
+ * errorRows/totalRows/receivedBytes: cumulative measurement
+ * totalTaskExcutorTimeMs: cumulative execution time of tasks
+ */
+ /*
+ * Rows will be updated after txn state changed when txn state has been successfully changed.
+ */
+
+ @SerializedName(value = "currentErrorRows")
+ public long currentErrorRows = 0;
+ @SerializedName(value = "currentTotalRows")
+ public long currentTotalRows = 0;
+ @SerializedName(value = "errorRows")
+ public long errorRows = 0;
+ @SerializedName(value = "totalRows")
+ public long totalRows = 0;
+ @SerializedName(value = "errorRowsAfterResumed")
+ public long errorRowsAfterResumed = 0;
+ @SerializedName(value = "unselectedRows")
+ public long unselectedRows = 0;
+ @SerializedName(value = "receivedBytes")
+ public long receivedBytes = 0;
+ @SerializedName(value = "totalTaskExcutionTimeMs")
+ public long totalTaskExcutionTimeMs = 1; // init as 1 to avoid division by zero
+ @SerializedName(value = "committedTaskNum")
+ public long committedTaskNum = 0;
+ @SerializedName(value = "abortedTaskNum")
+ public long abortedTaskNum = 0;
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static RoutineLoadStatistic read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, RoutineLoadStatistic.class);
+ }
+
+ public Map<String, Object> summary() {
+ Map<String, Object> summary = Maps.newHashMap();
+ summary.put("totalRows", Long.valueOf(totalRows));
+ summary.put("loadedRows", Long.valueOf(totalRows - this.errorRows - this.unselectedRows));
+ summary.put("errorRows", Long.valueOf(this.errorRows));
+ summary.put("errorRowsAfterResumed", Long.valueOf(this.errorRowsAfterResumed));
+ summary.put("unselectedRows", Long.valueOf(this.unselectedRows));
+ summary.put("receivedBytes", Long.valueOf(this.receivedBytes));
+ summary.put("taskExecuteTimeMs", Long.valueOf(this.totalTaskExcutionTimeMs));
+ summary.put("receivedBytesRate", Long.valueOf(this.receivedBytes / this.totalTaskExcutionTimeMs * 1000));
+ summary.put("loadRowsRate", Long.valueOf((this.totalRows - this.errorRows - this.unselectedRows)
+ / this.totalTaskExcutionTimeMs * 1000));
+ summary.put("committedTaskNum", Long.valueOf(this.committedTaskNum));
+ summary.put("abortedTaskNum", Long.valueOf(this.abortedTaskNum));
+ return summary;
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8f049ba..23aeb17 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -139,9 +139,10 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
+ RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
- Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
+ Assert.assertEquals(new Long(1), Deencapsulation.getField(jobStatistic, "abortedTaskNum"));
}
@Test
@@ -279,13 +280,14 @@ public class RoutineLoadJobTest {
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
- Deencapsulation.setField(routineLoadJob, "currentErrorRows", 1);
- Deencapsulation.setField(routineLoadJob, "currentTotalRows", 99);
+ RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
+ Deencapsulation.setField(jobStatistic, "currentErrorRows", 1);
+ Deencapsulation.setField(jobStatistic, "currentTotalRows", 99);
Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false);
Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state"));
- Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
- Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
+ Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
+ Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index d891f48..6efa5b7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -41,6 +41,7 @@ import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
@@ -361,9 +362,10 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterCatalog.getDb(CatalogTestUtil.testDbId1).getTable(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic");
- Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
- Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
+ Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
+ Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
// todo(ml): change to assert queue
// Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size());
@@ -430,8 +432,9 @@ public class GlobalTransactionMgrTest {
masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
- Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
- Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows"));
+ RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob,"jobStatistic");
+ Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
+ Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));
Assert.assertEquals(Long.valueOf(111L),
((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1));
// todo(ml): change to assert queue
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org