You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/01 13:05:25 UTC

[incubator-doris] branch master updated: [Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c93718  [Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)
9c93718 is described below

commit 9c937180cd68c7c57ff965dda552d814eb228a8c
Author: WingC <10...@qq.com>
AuthorDate: Wed Apr 1 08:05:17 2020 -0500

    [Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED (#3212)
    
    SchemaChangeJobV2 will use too much memory in FE, which may cause FullGC. But these data is useless after job is done, so we need to clean it up.
    
    NOTICE: update FE meta version to 80
---
 .../java/org/apache/doris/alter/AlterJobV2.java    |   2 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  24 ++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 170 +++++++++++++++++++--
 .../org/apache/doris/common/FeMetaVersion.java     |   5 +-
 .../java/org/apache/doris/common/io/Writable.java  |   2 +-
 .../apache/doris/journal/bdbje/BDBJEJournal.java   |   3 +-
 .../org/apache/doris/alter/AlterJobV2Test.java     | 130 ++++++++++++++++
 .../java/org/apache/doris/utframe/DorisAssert.java |   2 +-
 8 files changed, 301 insertions(+), 37 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
index bee6404..62c61cb 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -128,7 +128,7 @@ public abstract class AlterJobV2 implements Writable {
         return finishedTimeMs;
     }
 
-    /*
+    /**
      * The keyword 'synchronized' only protects 2 methods:
      * run() and cancel()
      * Only these 2 methods can be visited by different thread(internal working thread and user connection thread)
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 877d955..4127bd2 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -63,7 +63,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
-/*
+/**
  * Version 2 of RollupJob.
  * This is for replacing the old RollupJob
  * https://github.com/apache/incubator-doris/issues/1429
@@ -131,7 +131,7 @@ public class RollupJobV2 extends AlterJobV2 {
         this.storageFormat = storageFormat;
     }
 
-    /*
+    /**
      * runPendingJob():
      * 1. Create all rollup replicas and wait them finished.
      * 2. After creating done, add this shadow rollup index to catalog, user can not see this
@@ -276,7 +276,7 @@ public class RollupJobV2 extends AlterJobV2 {
                 rollupSchemaHash, rollupShortKeyColumnCount,TStorageType.COLUMN, rollupKeysType);
     }
 
-    /*
+    /**
      * runWaitingTxnJob():
      * 1. Wait the transactions before the watershedTxnId to be finished.
      * 2. If all previous transactions finished, send create rollup tasks to BE.
@@ -343,7 +343,7 @@ public class RollupJobV2 extends AlterJobV2 {
         LOG.info("transfer rollup job {} state to {}", jobId, this.jobState);
     }
 
-    /*
+    /**
      * runRunningJob()
      * 1. Wait all create rollup tasks to be finished.
      * 2. Check the integrity of the newly created rollup index.
@@ -449,7 +449,7 @@ public class RollupJobV2 extends AlterJobV2 {
         }
     }
 
-    /*
+    /**
      * cancelImpl() can be called any time any place.
      * We need to clean any possible residual of this job.
      */
@@ -552,11 +552,7 @@ public class RollupJobV2 extends AlterJobV2 {
         for (int i = 0; i < size; i++) {
             long partitionId = in.readLong();
             int size2 = in.readInt();
-            Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.get(partitionId);
-            if (tabletIdMap == null) {
-                tabletIdMap = Maps.newHashMap();
-                partitionIdToBaseRollupTabletIdMap.put(partitionId, tabletIdMap);
-            }
+            Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap.computeIfAbsent(partitionId, k -> Maps.newHashMap());
             for (int j = 0; j < size2; j++) {
                 long rollupTabletId = in.readLong();
                 long baseTabletId = in.readLong();
@@ -585,7 +581,7 @@ public class RollupJobV2 extends AlterJobV2 {
         watershedTxnId = in.readLong();
     }
 
-    /*
+    /**
      * Replay job in PENDING state.
      * Should replay all changes before this job's state transfer to PENDING.
      * These changes should be same as changes in RollupHander.processAddRollup()
@@ -634,7 +630,7 @@ public class RollupJobV2 extends AlterJobV2 {
         }
     }
 
-    /*
+    /**
      * Replay job in WAITING_TXN state.
      * Should replay all changes in runPendingJob()
      */
@@ -664,7 +660,7 @@ public class RollupJobV2 extends AlterJobV2 {
         LOG.info("replay waiting txn rollup job: {}", jobId);
     }
 
-    /*
+    /**
      * Replay job in FINISHED state.
      * Should replay all changes in runRuningJob()
      */
@@ -689,7 +685,7 @@ public class RollupJobV2 extends AlterJobV2 {
         LOG.info("replay finished rollup job: {}", jobId);
     }
 
-    /*
+    /**
      * Replay job in CANCELLED state.
      */
     private void replayCancelled(RollupJobV2 replayedJob) {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index e18e6f9..d033dbd 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -95,6 +95,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     // shadow index id -> shadow index short key count
     private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
 
+    // identify whether the job is finished and no need to persist some data
+    private boolean isMetaPruned = false;
+
     // bloom filter info
     private boolean hasBfChange;
     private Set<String> bfColumns = null;
@@ -114,7 +117,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
         super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
-
     }
 
     private SchemaChangeJobV2() {
@@ -159,7 +161,19 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         this.storageFormat = storageFormat;
     }
 
-    /*
+    /**
+     * clear some date structure in this job to save memory
+     * these data structures must not used in getInfo method
+     */
+    private void pruneMeta() {
+        partitionIndexTabletMap.clear();
+        partitionIndexMap.clear();
+        indexSchemaMap.clear();
+        indexShortKeyMap.clear();
+        isMetaPruned = true;
+    }
+
+    /**
      * runPendingJob():
      * 1. Create all replicas of all shadow indexes and wait them finished.
      * 2. After creating done, add the shadow indexes to catalog, user can not see this
@@ -318,7 +332,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         tbl.rebuildFullSchema();
     }
 
-    /*
+    /**
      * runWaitingTxnJob():
      * 1. Wait the transactions before the watershedTxnId to be finished.
      * 2. If all previous transactions finished, send schema change tasks to BE.
@@ -393,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("transfer schema change job {} state to {}", jobId, this.jobState);
     }
 
-    /*
+    /**
      * runRunningJob()
      * 1. Wait all schema change tasks to be finished.
      * 2. Check the integrity of the newly created shadow indexes.
@@ -483,6 +497,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             db.writeUnlock();
         }
 
+        pruneMeta();
         this.jobState = JobState.FINISHED;
         this.finishedTimeMs = System.currentTimeMillis();
 
@@ -571,6 +586,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         cancelInternal();
 
+        pruneMeta();
         this.errMsg = errMsg;
         this.finishedTimeMs = System.currentTimeMillis();
         LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
@@ -626,7 +642,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         return schemaChangeJob;
     }
 
-    /*
+    /**
      * Replay job in PENDING state.
      * Should replay all changes before this job's state transfer to PENDING.
      * These changes should be same as changes in SchemaChangeHandler.createJob()
@@ -675,7 +691,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay pending schema change job: {}", jobId);
     }
 
-    /*
+    /**
      * Replay job in WAITING_TXN state.
      * Should replay all changes in runPendingJob()
      */
@@ -704,7 +720,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay waiting txn schema change job: {}", jobId);
     }
 
-    /*
+    /**
      * Replay job in FINISHED state.
      * Should replay all changes in runRuningJob()
      */
@@ -726,7 +742,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("replay finished schema change job: {}", jobId);
     }
 
-    /*
+    /**
      * Replay job in CANCELLED state.
      */
     private void replayCancelled(SchemaChangeJobV2 replayedJob) {
@@ -803,11 +819,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
         return taskInfos;
     }
-    
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
 
+    /**
+     * write data need to persist when job not finish
+     */
+    private void writeJobNotFinishData(DataOutput out) throws IOException {
         out.writeInt(partitionIndexTabletMap.rowKeySet().size());
         for (Long partitionId : partitionIndexTabletMap.rowKeySet()) {
             out.writeLong(partitionId);
@@ -876,10 +892,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
     }
 
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-
+    /**
+     * read data need to persist when job not finish
+     */
+    private void readJobNotFinishData(DataInput in) throws IOException {
         int partitionNum = in.readInt();
         for (int i = 0; i < partitionNum; i++) {
             long partitionId = in.readLong();
@@ -953,4 +969,126 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             }
         }
     }
+
+    /**
+     * write data need to persist when job finished
+     */
+    private void writeJobFinishedData(DataOutput out) throws IOException {
+        // only persist data will be used in getInfo
+        out.writeInt(indexIdMap.size());
+        for (Entry<Long, Long> entry : indexIdMap.entrySet()) {
+            long shadowIndexId = entry.getKey();
+            out.writeLong(shadowIndexId);
+            // index id map
+            out.writeLong(entry.getValue());
+            // index name
+            Text.writeString(out, indexIdToName.get(shadowIndexId));
+            // index schema version and hash
+            out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first);
+            out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second);
+        }
+
+        // bloom filter
+        out.writeBoolean(hasBfChange);
+        if (hasBfChange) {
+            out.writeInt(bfColumns.size());
+            for (String bfCol : bfColumns) {
+                Text.writeString(out, bfCol);
+            }
+            out.writeDouble(bfFpp);
+        }
+
+        out.writeLong(watershedTxnId);
+
+        // index
+        out.writeBoolean(indexChange);
+        if (indexChange) {
+            if (CollectionUtils.isNotEmpty(indexes)) {
+                out.writeBoolean(true);
+                out.writeInt(indexes.size());
+                for (Index index : indexes) {
+                    index.write(out);
+                }
+            } else {
+                out.writeBoolean(false);
+            }
+        }
+    }
+
+    /**
+     * read data need to persist when job finished
+     */
+    private void readJobFinishedData(DataInput in) throws IOException {
+        // shadow index info
+        int indexNum = in.readInt();
+        for (int i = 0; i < indexNum; i++) {
+            long shadowIndexId = in.readLong();
+            long originIndexId = in.readLong();
+            String indexName = Text.readString(in);
+            int schemaVersion = in.readInt();
+            int schemaVersionHash = in.readInt();
+            Pair<Integer, Integer> schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash);
+            short shortKeyCount = in.readShort();
+
+            indexIdMap.put(shadowIndexId, originIndexId);
+            indexIdToName.put(shadowIndexId, indexName);
+            indexSchemaVersionAndHashMap.put(shadowIndexId, schemaVersionAndHash);
+        }
+
+        // bloom filter
+        hasBfChange = in.readBoolean();
+        if (hasBfChange) {
+            int bfNum = in.readInt();
+            bfColumns = Sets.newHashSetWithExpectedSize(bfNum);
+            for (int i = 0; i < bfNum; i++) {
+                bfColumns.add(Text.readString(in));
+            }
+            bfFpp = in.readDouble();
+        }
+
+        watershedTxnId = in.readLong();
+
+        // index
+        indexChange = in.readBoolean();
+        if (indexChange) {
+            if (in.readBoolean()) {
+                int indexCount = in.readInt();
+                this.indexes = new ArrayList<>();
+                for (int i = 0; i < indexCount; ++i) {
+                    this.indexes.add(Index.read(in));
+                }
+            } else {
+                this.indexes = null;
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+
+        out.writeBoolean(isMetaPruned);
+        if (isMetaPruned) {
+            writeJobFinishedData(out);
+        } else {
+            writeJobNotFinishData(out);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_80) {
+            boolean isMetaPruned = in.readBoolean();
+            if (isMetaPruned) {
+                readJobFinishedData(in);
+            } else {
+                readJobNotFinishData(in);
+            }
+        } else {
+            readJobNotFinishData(in);
+        }
+    }
+
 }
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 19b2f63..0e07c73 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -169,7 +169,8 @@ public final class FeMetaVersion {
     public static final int VERSION_78 = 78;
     // for transaction state in table level
     public static final int VERSION_79 = 79;
-
+      // optimize alterJobV2 memory consumption
+    public static final int VERSION_80 = 80;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_79;
+    public static final int VERSION_CURRENT = VERSION_80;
 }
diff --git a/fe/src/main/java/org/apache/doris/common/io/Writable.java b/fe/src/main/java/org/apache/doris/common/io/Writable.java
index a258168..9577dc8 100644
--- a/fe/src/main/java/org/apache/doris/common/io/Writable.java
+++ b/fe/src/main/java/org/apache/doris/common/io/Writable.java
@@ -20,7 +20,7 @@ package org.apache.doris.common.io;
 import java.io.DataOutput;
 import java.io.IOException;
 
-/*
+/**
  * Any class that requires persistence should implement the Writable interface.
  * This interface requires only a uniform writable method "write()",
  * but does not require a uniform read method.
diff --git a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 7f527a0..502a9d2 100644
--- a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -124,7 +124,6 @@ public class BDBJEJournal implements Journal {
             Util.stdoutWithTime(msg);
             System.exit(-1);
         }
-        return;
     }
 
     @Override
@@ -135,7 +134,7 @@ public class BDBJEJournal implements Journal {
         
         // id is the key
         long id = nextJournalId.getAndIncrement();
-        Long idLong = new Long(id);
+        Long idLong = id;
         DatabaseEntry theKey = new DatabaseEntry();
         TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
         idBinding.objectToEntry(idLong, theKey);
diff --git a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
new file mode 100644
index 0000000..c2cea4c
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.alter;
+
+import org.apache.doris.analysis.AlterTableStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.ShowAlterStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowExecutor;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
+import org.apache.doris.utframe.MockedFrontend.FeStartException;
+import org.apache.doris.utframe.MockedFrontend.NotInitException;
+
+import org.apache.doris.utframe.UtFrameUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+public class AlterJobV2Test {
+    // use a unique dir so that it won't be conflict with other unit test which
+    // may also start a Mocked Frontend
+    private static String runningDir = "fe/mocked/AlterJobV2Test/" + UUID.randomUUID().toString() + "/";
+
+    private static ConnectContext connectContext;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        UtFrameUtils.createMinDorisCluster(runningDir);
+
+        // create connect context
+        connectContext = UtFrameUtils.createDefaultCtx();
+        // create database
+        String createDbStmtStr = "create database test;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        Catalog.getCurrentCatalog().createDb(createDbStmt);
+
+        createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        UtFrameUtils.cleanDorisFeDir(runningDir);
+    }
+
+    private static void createTable(String sql) throws Exception {
+        CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().createTable(createTableStmt);
+    }
+
+    @Test
+    public void testSchemaChange() throws Exception {
+        // 1. process a schema change job
+        String alterStmtStr = "alter table test.schema_change_test add column k4 int default '1'";
+        AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+        Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+        // 2. check alter job
+        Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
+        Assert.assertEquals(1, alterJobs.size());
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
+                Thread.sleep(1000);
+            }
+            System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+        }
+        // 3. check show alter table column
+        String showAlterStmtStr = "show alter table column from test;";
+        ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
+        ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
+        ShowResultSet showResultSet = showExecutor.execute();
+        System.out.println(showResultSet.getMetaData());
+        System.out.println(showResultSet.getResultRows());
+    }
+
+    @Test
+    public void testRollup() throws Exception {
+        // 1. process a rollup job
+        String alterStmtStr = "alter table test.schema_change_test add rollup test_rollup(k1, k2);";
+        AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+        Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+        // 2. check alter job
+        Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        Assert.assertEquals(1, alterJobs.size());
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
+                Thread.sleep(1000);
+            }
+            System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
+        }
+        // 3. check show alter table column
+        String showAlterStmtStr = "show alter table rollup from test;";
+        ShowAlterStmt showAlterStmt = (ShowAlterStmt) UtFrameUtils.parseAndAnalyzeStmt(showAlterStmtStr, connectContext);
+        ShowExecutor showExecutor = new ShowExecutor(connectContext, showAlterStmt);
+        ShowResultSet showResultSet = showExecutor.execute();
+        System.out.println(showResultSet.getMetaData());
+        System.out.println(showResultSet.getResultRows());
+    }
+}
diff --git a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
index 048599c..ed7dd11 100644
--- a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
+++ b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
@@ -133,7 +133,7 @@ public class DorisAssert {
         }
 
         public void explainContains(String keywords, int count) throws Exception {
-            Assert.assertTrue(StringUtils.countMatches(explainQuery(), keywords) == count);
+            Assert.assertEquals(StringUtils.countMatches(explainQuery(), keywords), count);
         }
 
         public void explainWithout(String s) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org