You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/01 13:05:25 UTC
[incubator-doris] branch master updated: [Alter]Clean
SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9c93718 [Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)
9c93718 is described below
commit 9c937180cd68c7c57ff965dda552d814eb228a8c
Author: WingC <10...@qq.com>
AuthorDate: Wed Apr 1 08:05:17 2020 -0500
[Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)
SchemaChangeJobV2 will use too much memory in FE, which may cause FullGC. But these data is useless after job is done, so we need to clean it up.
NOTICE: update FE meta version to 80
---
.../java/org/apache/doris/alter/AlterJobV2.java | 2 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 24 ++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 170 +++++++++++++++++++--
.../org/apache/doris/common/FeMetaVersion.java | 5 +-
.../java/org/apache/doris/common/io/Writable.java | 2 +-
.../apache/doris/journal/bdbje/BDBJEJournal.java | 3 +-
.../org/apache/doris/alter/AlterJobV2Test.java | 130 ++++++++++++++++
.../java/org/apache/doris/utframe/DorisAssert.java | 2 +-
8 files changed, 301 insertions(+), 37 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
index bee6404..62c61cb 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -128,7 +128,7 @@ public abstract class AlterJobV2 implements Writable {
return finishedTimeMs;
}
- /*
+ /**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
* Only these 2 methods can be visited by different thread(internal working thread and user connection thread)
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 877d955..4127bd2 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -63,7 +63,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-/*
+/**
* Version 2 of RollupJob.
* This is for replacing the old RollupJob
* https://github.com/apache/incubator-doris/issues/1429
@@ -131,7 +131,7 @@ public class RollupJobV2 extends AlterJobV2 {
this.storageFormat = storageFormat;
}
- /*
+ /**
* runPendingJob():
* 1. Create all rollup replicas and wait them finished.
* 2. After creating done, add this shadow rollup index to catalog, user can not see this
@@ -276,7 +276,7 @@ public class RollupJobV2 extends AlterJobV2 {
rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType);
}
- /*
+ /**
* runWaitingTxnJob():
* 1. Wait the transactions before the watershedTxnId to be finished.
* 2. If all previous transactions finished, send create rollup tasks to BE.
@@ -343,7 +343,7 @@ public class RollupJobV2 extends AlterJobV2 {
LOG.info("transfer rollup job {} state to {}", jobId, this.jobState);
}
- /*
+ /**
* runRunningJob()
* 1. Wait all create rollup tasks to be finished.
* 2. Check the integrity of the newly created rollup index.
@@ -449,7 +449,7 @@ public class RollupJobV2 extends AlterJobV2 {
}
}
- /*
+ /**
* cancelImpl() can be called any time any place.
* We need to clean any possible residual of this job.
*/
@@ -552,11 +552,7 @@ public class RollupJobV2 extends AlterJobV2 {
for (int i = 0; i < size; i++) {
long partitionId = in.readLong();
int size2 = in.readInt();
- Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.get(partitionId);
- if (tabletIdMap == null) {
- tabletIdMap = Maps.newHashMap();
- partitionIdToBaseRollupTabletIdMap.put(partitionId, tabletIdMap);
- }
+ Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap());
for (int j = 0; j < size2; j++) {
long rollupTabletId = in.readLong();
long baseTabletId = in.readLong();
@@ -585,7 +581,7 @@ public class RollupJobV2 extends AlterJobV2 {
watershedTxnId = in.readLong();
}
- /*
+ /**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in RollupHander.processAddRollup()
@@ -634,7 +630,7 @@ public class RollupJobV2 extends AlterJobV2 {
}
}
- /*
+ /**
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
@@ -664,7 +660,7 @@ public class RollupJobV2 extends AlterJobV2 {
LOG.info("replay waiting txn rollup job: {}", jobId);
}
- /*
+ /**
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
*/
@@ -689,7 +685,7 @@ public class RollupJobV2 extends AlterJobV2 {
LOG.info("replay finished rollup job: {}", jobId);
}
- /*
+ /**
* Replay job in CANCELLED state.
*/
private void replayCancelled(RollupJobV2 replayedJob) {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index e18e6f9..d033dbd 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -95,6 +95,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// shadow index id -> shadow index short key count
private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
+ // identify whether the job is finished and no need to persist some data
+ private boolean isMetaPruned = false;
+
// bloom filter info
private boolean hasBfChange;
private Set<String> bfColumns = null;
@@ -114,7 +117,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
-
}
private SchemaChangeJobV2() {
@@ -159,7 +161,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
this.storageFormat = storageFormat;
}
- /*
+ /**
+ * clear some date structure in this job to save memory
+ * these data structures must not used in getInfo method
+ */
+ private void pruneMeta() {
+ partitionIndexTabletMap.clear();
+ partitionIndexMap.clear();
+ indexSchemaMap.clear();
+ indexShortKeyMap.clear();
+ isMetaPruned = true;
+ }
+
+ /**
* runPendingJob():
* 1. Create all replicas of all shadow indexes and wait them finished.
* 2. After creating done, add the shadow indexes to catalog, user can not see this
@@ -318,7 +332,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.rebuildFullSchema();
}
- /*
+ /**
* runWaitingTxnJob():
* 1. Wait the transactions before the watershedTxnId to be finished.
* 2. If all previous transactions finished, send schema change tasks to BE.
@@ -393,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("transfer schema change job {} state to {}", jobId, this.jobState);
}
- /*
+ /**
* runRunningJob()
* 1. Wait all schema change tasks to be finished.
* 2. Check the integrity of the newly created shadow indexes.
@@ -483,6 +497,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
db.writeUnlock();
}
+ pruneMeta();
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
@@ -571,6 +586,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
cancelInternal();
+ pruneMeta();
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
@@ -626,7 +642,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
return schemaChangeJob;
}
- /*
+ /**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in SchemaChangeHandler.createJob()
@@ -675,7 +691,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("replay pending schema change job: {}", jobId);
}
- /*
+ /**
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
@@ -704,7 +720,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("replay waiting txn schema change job: {}", jobId);
}
- /*
+ /**
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
*/
@@ -726,7 +742,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("replay finished schema change job: {}", jobId);
}
- /*
+ /**
* Replay job in CANCELLED state.
*/
private void replayCancelled(SchemaChangeJobV2 replayedJob) {
@@ -803,11 +819,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
return taskInfos;
}
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
+ /**
+ * write data need to persist when job not finish
+ */
+ private void writeJobNotFinishData(DataOutput out) throws IOException {
out.writeInt(partitionIndexTabletMap.rowKeySet().size());
for (Long partitionId : partitionIndexTabletMap.rowKeySet()) {
out.writeLong(partitionId);
@@ -876,10 +892,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
-
+ /**
+ * read data need to persist when job not finish
+ */
+ private void readJobNotFinishData(DataInput in) throws IOException {
int partitionNum = in.readInt();
for (int i = 0; i < partitionNum; i++) {
long partitionId = in.readLong();
@@ -953,4 +969,126 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
}
+
+ /**
+ * write data need to persist when job finished
+ */
+ private void writeJobFinishedData(DataOutput out) throws IOException {
+ // only persist data will be used in getInfo
+ out.writeInt(indexIdMap.size());
+ for (Entry<Long, Long> entry : indexIdMap.entrySet()) {
+ long shadowIndexId = entry.getKey();
+ out.writeLong(shadowIndexId);
+ // index id map
+ out.writeLong(entry.getValue());
+ // index name
+ Text.writeString(out, indexIdToName.get(shadowIndexId));
+ // index schema version and hash
+ out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
+ out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
+ }
+
+ // bloom filter
+ out.writeBoolean(hasBfChange);
+ if (hasBfChange) {
+ out.writeInt(bfColumns.size());
+ for (String bfCol : bfColumns) {
+ Text.writeString(out, bfCol);
+ }
+ out.writeDouble(bfFpp);
+ }
+
+ out.writeLong(watershedTxnId);
+
+ // index
+ out.writeBoolean(indexChange);
+ if (indexChange) {
+ if (CollectionUtils.isNotEmpty(indexes)) {
+ out.writeBoolean(true);
+ out.writeInt(indexes.size());
+ for (Index index : indexes) {
+ index.write(out);
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+ }
+
+ /**
+ * read data need to persist when job finished
+ */
+ private void readJobFinishedData(DataInput in) throws IOException {
+ // shadow index info
+ int indexNum = in.readInt();
+ for (int i = 0; i < indexNum; i++) {
+ long shadowIndexId = in.readLong();
+ long originIndexId = in.readLong();
+ String indexName = Text.readString(in);
+ int schemaVersion = in.readInt();
+ int schemaVersionHash = in.readInt();
+ Pair<Integer, Integer> schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash);
+ short shortKeyCount = in.readShort();
+
+ indexIdMap.put(shadowIndexId, originIndexId);
+ indexIdToName.put(shadowIndexId, indexName);
+ indexSchemaVersionAndHashMap.put(shadowIndexId, schemaVersionAndHash);
+ }
+
+ // bloom filter
+ hasBfChange = in.readBoolean();
+ if (hasBfChange) {
+ int bfNum = in.readInt();
+ bfColumns = Sets.newHashSetWithExpectedSize(bfNum);
+ for (int i = 0; i < bfNum; i++) {
+ bfColumns.add(Text.readString(in));
+ }
+ bfFpp = in.readDouble();
+ }
+
+ watershedTxnId = in.readLong();
+
+ // index
+ indexChange = in.readBoolean();
+ if (indexChange) {
+ if (in.readBoolean()) {
+ int indexCount = in.readInt();
+ this.indexes = new ArrayList<>();
+ for (int i = 0; i < indexCount; ++i) {
+ this.indexes.add(Index.read(in));
+ }
+ } else {
+ this.indexes = null;
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+
+ out.writeBoolean(isMetaPruned);
+ if (isMetaPruned) {
+ writeJobFinishedData(out);
+ } else {
+ writeJobNotFinishData(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_80) {
+ boolean isMetaPruned = in.readBoolean();
+ if (isMetaPruned) {
+ readJobFinishedData(in);
+ } else {
+ readJobNotFinishData(in);
+ }
+ } else {
+ readJobNotFinishData(in);
+ }
+ }
+
}
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 19b2f63..0e07c73 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -169,7 +169,8 @@ public final class FeMetaVersion {
public static final int VERSION_78 = 78;
// for transaction state in table level
public static final int VERSION_79 = 79;
-
+ // optimize alterJobV2 memory consumption
+ public static final int VERSION_80 = 80;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_79;
+ public static final int VERSION_CURRENT = VERSION_80;
}
diff --git a/fe/src/main/java/org/apache/doris/common/io/Writable.java b/fe/src/main/java/org/apache/doris/common/io/Writable.java
index a258168..9577dc8 100644
--- a/fe/src/main/java/org/apache/doris/common/io/Writable.java
+++ b/fe/src/main/java/org/apache/doris/common/io/Writable.java
@@ -20,7 +20,7 @@ package org.apache.doris.common.io;
import java.io.DataOutput;
import java.io.IOException;
-/*
+/**
* Any class that requires persistence should implement the Writable interface.
* This interface requires only a uniform writable method "write()",
* but does not require a uniform read method.
diff --git a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 7f527a0..502a9d2 100644
--- a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -124,7 +124,6 @@ public class BDBJEJournal implements Journal {
Util.stdoutWithTime(msg);
System.exit(-1);
}
- return;
}
@Override
@@ -135,7 +134,7 @@ public class BDBJEJournal implements Journal {
// id is the key
long id = nextJournalId.getAndIncrement();
- Long idLong = new Long(id);
+ Long idLong = id;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(idLong, theKey);
diff --git a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
new file mode 100644
index 0000000..c2cea4c
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.alter;
+
+import org.apache.doris.analysis.AlterTableStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.ShowAlterStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowExecutor;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
+import org.apache.doris.utframe.MockedFrontend.FeStartException;
+import org.apache.doris.utframe.MockedFrontend.NotInitException;
+
+import org.apache.doris.utframe.UtFrameUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+public class AlterJobV2Test {
+ // use a unique dir so that it won't be conflict with other unit test which
+ // may also start a Mocked Frontend
+ private static String runningDir = "fe/mocked/AlterJobV2Test/" + UUID.randomUUID().toString() + "/";
+
+ private static ConnectContext connectContext;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UtFrameUtils.createMinDorisCluster(runningDir);
+
+ // create connect context
+ connectContext = UtFrameUtils.createDefaultCtx();
+ // create database
+ String createDbStmtStr = "create database test;";
+ CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+ Catalog.getCurrentCatalog().createDb(createDbStmt);
+
+ createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ UtFrameUtils.cleanDorisFeDir(runningDir);
+ }
+
+ private static void createTable(String sql) throws Exception {
+ CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ Catalog.getCurrentCatalog().createTable(createTableStmt);
+ }
+
+ @Test
+ public void testSchemaChange() throws Exception {
+ // 1. process a schema change job
+ String alterStmtStr = "alter table test.schema_change_test add column k4 int default '1'";
+ AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+ Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ // 2. check alter job
+ Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
+ Assert.assertEquals(1, alterJobs.size());
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
+ Thread.sleep(1000);
+ }
+ System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+ }
+ // 3. check show alter table column
+ String showAlterStmtStr = "show alter table column from test;";
+ ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
+ ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
+ ShowResultSet showResultSet = showExecutor.execute();
+ System.out.println(showResultSet.getMetaData());
+ System.out.println(showResultSet.getResultRows());
+ }
+
+ @Test
+ public void testRollup() throws Exception {
+ // 1. process a rollup job
+ String alterStmtStr = "alter table test.schema_change_test add rollup test_rollup(k1, k2);";
+ AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+ Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ // 2. check alter job
+ Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+ Assert.assertEquals(1, alterJobs.size());
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
+ Thread.sleep(1000);
+ }
+ System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+ }
+ // 3. check show alter table column
+ String showAlterStmtStr = "show alter table rollup from test;";
+ ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
+ ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
+ ShowResultSet showResultSet = showExecutor.execute();
+ System.out.println(showResultSet.getMetaData());
+ System.out.println(showResultSet.getResultRows());
+ }
+}
diff --git a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
index 048599c..ed7dd11 100644
--- a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
+++ b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
@@ -133,7 +133,7 @@ public class DorisAssert {
}
public void explainContains(String keywords, int count) throws Exception {
- Assert.assertTrue(StringUtils.countMatches(explainQuery(), keywords) == count);
+ Assert.assertEquals(StringUtils.countMatches(explainQuery(), keywords), count);
}
public void explainWithout(String s) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org