You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/03/02 11:11:17 UTC

[incubator-doris] branch master updated: [MaterializedView] Fix bug that preAggregation is different between old and new selector (#3018)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d151718  [MaterializedView] Fix bug that preAggregation is different between old and new selector (#3018)
d151718 is described below

commit d151718e98fc51f9d9a4f5fa9f05470f3c135bd2
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Mon Mar 2 19:11:10 2020 +0800

    [MaterializedView] Fix bug that preAggregation is different between old and new selector (#3018)
    
    If there is no aggregated column in aggregate index, the index will be deduplicate table.
    For example:
    
        aggregate table (k1, k2, v1 sum)
        mv index (k1, k2)
    
    This kind of index is SPJG which same as `select k1, k2 from aggregate_table group by k1, k2`.
    It also need to check the grouping column using following steps.
    
    If there is no aggregated column in duplicate index, the index will be SPJ which passes the grouping verification directly.
    
    Also after the supplement of index, the new candidate index should be checked the output columns also.
---
 .../java/org/apache/doris/alter/AlterHandler.java  |  3 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |  8 +++-
 .../main/java/org/apache/doris/common/Config.java  |  8 ++--
 .../java/org/apache/doris/common/FeConstants.java  |  3 ++
 .../load/routineload/RoutineLoadScheduler.java     |  4 +-
 .../doris/planner/MaterializedViewSelector.java    | 43 ++++++++++++++++------
 .../org/apache/doris/planner/OlapScanNode.java     | 22 ++++++++++-
 .../org/apache/doris/planner/RollupSelector.java   | 17 +++------
 .../apache/doris/planner/SingleNodePlanner.java    |  4 ++
 .../planner/MaterializedViewFunctionTest.java      | 20 +++++++++-
 .../planner/MaterializedViewSelectorTest.java      | 33 +++++++++++++----
 .../java/org/apache/doris/utframe/DorisAssert.java | 28 +++++++++++---
 12 files changed, 145 insertions(+), 48 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
index 9e630c2..4a47d68 100644
--- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
@@ -83,7 +84,7 @@ public abstract class AlterHandler extends MasterDaemon {
     }
     
     public AlterHandler(String name) {
-        super(name, 10000);
+        super(name, FeConstants.default_scheduler_interval_millisecond);
     }
 
     protected void addAlterJobV2(AlterJobV2 alterJob) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 3337ffd..5a4785f 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -345,14 +345,18 @@ public class OlapTable extends Table {
 
     public Map<Long, List<Column>> getVisibleIndexIdToSchema() {
         Map<Long, List<Column>> visibleMVs = Maps.newHashMap();
-        Partition partition = idToPartition.values().stream().findFirst().get();
-        List<MaterializedIndex> mvs = partition.getMaterializedIndices(IndexExtState.VISIBLE);
+        List<MaterializedIndex> mvs = getVisibleIndex();
         for (MaterializedIndex mv : mvs) {
             visibleMVs.put(mv.getId(), indexIdToSchema.get(mv.getId()));
         }
         return visibleMVs;
     }
 
+    public List<MaterializedIndex> getVisibleIndex() {
+        Partition partition = idToPartition.values().stream().findFirst().get();
+        return partition.getMaterializedIndices(IndexExtState.VISIBLE);
+    }
+
     // this is only for schema change.
     public void renameIndexForSchemaChange(String name, String newName) {
         long idxId = indexNameToId.remove(name);
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java
index ee335f3..6018b4c 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -361,9 +361,11 @@ public class Config extends ConfigBase {
     public static int max_layout_length_per_row = 100000; // 100k
 
     /*
-     * Load checker's running interval.
-     * A load job will transfer its state from PENDING to ETL to LOADING to FINISHED.
-     * So a load job will cost at least 3 check intervals to finish.
+     * The load scheduler running interval.
+     * A load job will transfer its state from PENDING to LOADING to FINISHED.
+     * The load scheduler will transfer load job from PENDING to LOADING
+     *      while the txn callback will transfer load job from LOADING to FINISHED.
+     * So a load job will cost at most one interval to finish when the concurrency has not reached the upper limit.
      */
     @ConfField public static int load_checker_interval_second = 5;
 
diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java
index 93e3ba4..e4fef6f 100644
--- a/fe/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java
@@ -44,6 +44,9 @@ public class FeConstants {
     // set to true to skip some step when running FE unit test
     public static boolean runningUnitTest = false;
 
+    // default scheduler interval is 10 seconds
+    public static int default_scheduler_interval_millisecond = 10000;
+
     // general model
     // Current meta data version. Use this version to write journals and image
     public static int meta_version = FeMetaVersion.VERSION_CURRENT;
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 9f26f9e..35057be 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.routineload;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
@@ -36,7 +37,6 @@ import java.util.List;
 public class RoutineLoadScheduler extends MasterDaemon {
 
     private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class);
-    private static final int DEFAULT_INTERVAL_SECONDS = 10;
 
     private RoutineLoadManager routineLoadManager;
 
@@ -47,7 +47,7 @@ public class RoutineLoadScheduler extends MasterDaemon {
     }
 
     public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) {
-        super("Routine load scheduler", DEFAULT_INTERVAL_SECONDS * 1000);
+        super("Routine load scheduler", FeConstants.default_scheduler_interval_millisecond);
         this.routineLoadManager = routineLoadManager;
     }
 
diff --git a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index f8624e1..8572ea4 100644
--- a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -122,7 +122,7 @@ public class MaterializedViewSelector {
         // Step2: check all columns in compensating predicates are available in the view output
         checkCompensatingPredicates(columnNamesInPredicates.get(tableName), candidateIndexIdToSchema);
         // Step3: group by list in query is the subset of group by list in view or view contains no aggregation
-        checkGrouping(columnNamesInGrouping.get(tableName), candidateIndexIdToSchema);
+        checkGrouping(columnNamesInGrouping.get(tableName), candidateIndexIdToSchema, table.getKeysType());
         // Step4: aggregation functions are available in the view output
         checkAggregationFunction(aggregateColumnsInQuery.get(tableName), candidateIndexIdToSchema);
         // Step5: columns required to compute output expr are available in the view output
@@ -144,8 +144,9 @@ public class MaterializedViewSelector {
              * So, we need to compensate those kinds of index in following step.
              *
              */
-            compensateIndex(candidateIndexIdToSchema, scanNode.getOlapTable().getVisibleIndexIdToSchema(),
-                            table.getSchemaByIndexId(table.getBaseIndexId()).size());
+            compensateCandidateIndex(candidateIndexIdToSchema, scanNode.getOlapTable().getVisibleIndexIdToSchema(),
+                            table);
+            checkOutputColumns(columnNamesInQueryOutput.get(tableName), candidateIndexIdToSchema);
         }
         return candidateIndexIdToSchema;
     }
@@ -279,7 +280,8 @@ public class MaterializedViewSelector {
      * @param candidateIndexIdToSchema
      */
 
-    private void checkGrouping(Set<String> columnsInGrouping, Map<Long, List<Column>> candidateIndexIdToSchema) {
+    private void checkGrouping(Set<String> columnsInGrouping, Map<Long, List<Column>> candidateIndexIdToSchema,
+            KeysType keysType) {
         Iterator<Map.Entry<Long, List<Column>>> iterator = candidateIndexIdToSchema.entrySet().iterator();
         while (iterator.hasNext()) {
             Map.Entry<Long, List<Column>> entry = iterator.next();
@@ -287,8 +289,23 @@ public class MaterializedViewSelector {
             List<Column> candidateIndexSchema = entry.getValue();
             candidateIndexSchema.stream().filter(column -> !column.isAggregated())
                     .forEach(column -> indexNonAggregatedColumnNames.add(column.getName()));
-            // When the candidate index is SPJ type, it passes the verification directly
-            if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size()) {
+            /*
+            If there is no aggregated column in duplicate table, the index will be SPJ.
+            For example:
+                duplicate table (k1, k2, v1)
+                mv index (k1, v1)
+            When the candidate index is SPJ type, it passes the verification directly
+
+            If there is no aggregated column in aggregate index, the index will be deduplicate table.
+            For example:
+                aggregate table (k1, k2, v1 sum)
+                mv index (k1, k2)
+            This kind of index is SPJG which same as select k1, k2 from aggregate_table group by k1, k2.
+            It also need to check the grouping column using following steps.
+
+            ISSUE-3016, MaterializedViewFunctionTest: testDeduplicateQueryInAgg
+             */
+            if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size() && keysType == KeysType.DUP_KEYS) {
                 continue;
             }
             // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly.
@@ -366,14 +383,16 @@ public class MaterializedViewSelector {
                           + Joiner.on(",").join(candidateIndexIdToSchema.keySet()));
     }
 
-    private void compensateIndex(Map<Long, List<Column>> candidateIndexIdToSchema,
+    private void compensateCandidateIndex(Map<Long, List<Column>> candidateIndexIdToSchema,
                                  Map<Long, List<Column>> allVisibleIndexes,
-                                 int sizeOfBaseIndexSchema) {
+                                 OlapTable table) {
         isPreAggregation = false;
         reasonOfDisable = "The aggregate operator does not match";
+        int keySizeOfBaseIndex = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
         for (Map.Entry<Long, List<Column>> index : allVisibleIndexes.entrySet()) {
-            if (index.getValue().size() == sizeOfBaseIndexSchema) {
-                candidateIndexIdToSchema.put(index.getKey(), index.getValue());
+            long mvIndexId = index.getKey();
+            if (table.getKeyColumnsByIndexId(mvIndexId).size() == keySizeOfBaseIndex) {
+                candidateIndexIdToSchema.put(mvIndexId, index.getValue());
             }
         }
         LOG.debug("Those mv pass the test of output columns:"
@@ -415,7 +434,6 @@ public class MaterializedViewSelector {
                 Expr aggChild0 = aggExpr.getChild(0);
                 if (aggChild0 instanceof SlotRef) {
                     SlotRef slotRef = (SlotRef) aggChild0;
-                    Preconditions.checkState(slotRef.getColumnName() != null);
                     Table table = slotRef.getDesc().getParent().getTable();
                     /* If this column come from subquery, the parent table will be null
                      * For example: select k1 from (select name as k1 from tableA) A
@@ -424,11 +442,11 @@ public class MaterializedViewSelector {
                     if (table == null) {
                         continue;
                     }
+                    Preconditions.checkState(slotRef.getColumnName() != null);
                     addAggregatedColumn(slotRef.getColumnName(), aggExpr.getFnName().getFunction(),
                                         table.getName());
                 } else if ((aggChild0 instanceof CastExpr) && (aggChild0.getChild(0) instanceof SlotRef)) {
                     SlotRef slotRef = (SlotRef) aggChild0.getChild(0);
-                    Preconditions.checkState(slotRef.getColumnName() != null);
                     Table table = slotRef.getDesc().getParent().getTable();
                     /*
                      * Same as above
@@ -436,6 +454,7 @@ public class MaterializedViewSelector {
                     if (table == null) {
                         continue;
                     }
+                    Preconditions.checkState(slotRef.getColumnName() != null);
                     addAggregatedColumn(slotRef.getColumnName(), aggExpr.getFnName().getFunction(),
                                         table.getName());
                 } else {
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 1c5dfab..83849bd 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -43,6 +43,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
@@ -85,6 +86,23 @@ public class OlapScanNode extends ScanNode {
     private static final Logger LOG = LogManager.getLogger(OlapScanNode.class);
 
     private List<TScanRangeLocations> result = new ArrayList<>();
+    /*
+     * When the field value is ON, the storage engine can return the data directly without pre-aggregation.
+     * When the field value is OFF, the storage engine needs to aggregate the data before returning to scan node.
+     * For example:
+     * Aggregate table: k1, k2, v1 sum
+     * Field value is ON
+     * Query1: select k1, sum(v1) from table group by k1
+     * This aggregation function in query is same as the schema.
+     * So the field value is ON while the query can scan data directly.
+     *
+     * Field value is OFF
+     * Query1: select k1 , k2 from table
+     * This aggregation info is null.
+     * Query2: select k1, min(v1) from table group by k1
+     * This aggregation function in query is min which different from the schema.
+     * So the data stored in storage engine need to be merged firstly before returning to scan node.
+     */
     private boolean isPreAggregation = false;
     private String reasonOfPreAggregation = null;
     private boolean canTurnOnPreAggr = true;
@@ -416,7 +434,9 @@ public class OlapScanNode extends ScanNode {
         selectedPartitionNum = selectedPartitionIds.size();
         LOG.debug("partition prune cost: {} ms, partitions: {}",
                   (System.currentTimeMillis() - start), selectedPartitionIds);
-        if (selectedPartitionIds.size() == 0) {
+        // The fe unit test need to check the selected index id without any data.
+        // So the step2 needs to be continued although selectedPartitionIds is 0.
+        if (selectedPartitionIds.size() == 0 && !FeConstants.runningUnitTest) {
             return;
         }
 
diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
index d303c29..fc76a86 100644
--- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
@@ -28,9 +28,7 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
 
@@ -65,14 +63,10 @@ public final class RollupSelector {
     public long selectBestRollup(
             Collection<Long> partitionIds, List<Expr> conjuncts, boolean isPreAggregation)
             throws UserException {
-        Preconditions.checkArgument(partitionIds != null && !partitionIds.isEmpty(),
-                "Paritition can't be null or empty.");
+        Preconditions.checkArgument(partitionIds != null , "Paritition can't be null.");
         // Get first partition to select best prefix index rollups, because MaterializedIndex ids in one rollup's partitions are all same.
         final List<Long> bestPrefixIndexRollups =
-                selectBestPrefixIndexRollup(
-                        table.getPartition(partitionIds.iterator().next()),
-                        conjuncts,
-                        isPreAggregation);
+                selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
         return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
     }
 
@@ -121,8 +115,7 @@ public final class RollupSelector {
         return selectedIndexId;
     }
 
-    private List<Long> selectBestPrefixIndexRollup(
-            Partition partition, List<Expr> conjuncts, boolean isPreAggregation) throws UserException {
+    private List<Long> selectBestPrefixIndexRollup(List<Expr> conjuncts, boolean isPreAggregation) throws UserException {
 
         final List<String> outputColumns = Lists.newArrayList();
         for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
@@ -130,12 +123,12 @@ public final class RollupSelector {
             outputColumns.add(col.getName());
         }
 
-        final List<MaterializedIndex> rollups = partition.getMaterializedIndices(IndexExtState.VISIBLE);
+        final List<MaterializedIndex> rollups = table.getVisibleIndex();
         LOG.debug("num of rollup(base included): {}, pre aggr: {}", rollups.size(), isPreAggregation);
 
         // 1. find all rollup indexes which contains all tuple columns
         final List<MaterializedIndex> rollupsContainsOutput = Lists.newArrayList();
-        final List<Column> baseTableColumns = table.getKeyColumnsByIndexId(partition.getBaseIndex().getId());
+        final List<Column> baseTableColumns = table.getKeyColumnsByIndexId(table.getBaseIndexId());
         for (MaterializedIndex rollup : rollups) {
             final Set<String> rollupColumns = Sets.newHashSet();
             table.getSchemaByIndexId(rollup.getId())
diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 5b7b012..ca88171 100644
--- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -54,6 +54,7 @@ import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 
@@ -769,6 +770,9 @@ public class SingleNodePlanner {
                     continue;
                 }
                 OlapScanNode olapScanNode = (OlapScanNode) scanNode;
+                if (olapScanNode.getSelectedPartitionIds().size() == 0 && !FeConstants.runningUnitTest) {
+                    continue;
+                }
                 MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector.selectBestMV
                         (olapScanNode);
                 olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(), bestIndexInfo.isPreAggregation(),
diff --git a/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
index 7bee2a8..80e6ad4 100644
--- a/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
+++ b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.utframe.DorisAssert;
 import org.apache.doris.utframe.UtFrameUtils;
 
@@ -48,6 +49,8 @@ public class MaterializedViewFunctionTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.default_scheduler_interval_millisecond = 10;
+        FeConstants.runningUnitTest = true;
         UtFrameUtils.createMinDorisCluster(runningDir);
         dorisAssert = new DorisAssert();
         dorisAssert.withEnableMV().withDatabase(HR_DB_NAME).useDatabase(HR_DB_NAME);
@@ -56,11 +59,13 @@ public class MaterializedViewFunctionTest {
     @Before
     public void beforeMethod() throws Exception {
         String createTableSQL = "create table " + HR_DB_NAME + "." + EMPS_TABLE_NAME + " (empid int, name varchar, "
-                + "deptno int, salary int, commission int) "
+                + "deptno int, salary int, commission int) partition by range (empid) "
+                + "(partition p1 values less than MAXVALUE) "
                 + "distributed by hash(empid) buckets 3 properties('replication_num' = '1');";
         dorisAssert.withTable(createTableSQL);
         createTableSQL = "create table " + HR_DB_NAME + "." + DEPTS_TABLE_NAME
-                + " (deptno int, name varchar, cost int) "
+                + " (deptno int, name varchar, cost int) partition by range (deptno) "
+                + "(partition p1 values less than MAXVALUE) "
                 + "distributed by hash(deptno) buckets 3 properties('replication_num' = '1');";
         dorisAssert.withTable(createTableSQL);
     }
@@ -568,4 +573,15 @@ public class MaterializedViewFunctionTest {
                 " deptno from " + EMPS_TABLE_NAME + " where empid <0;";
         dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, 2);
     }
+
+    @Test
+    public void testDeduplicateQueryInAgg() throws Exception {
+        String aggregateTable = "create table agg_table (k1 int, k2 int, v1 bigint sum) aggregate key (k1, k2) "
+                + "distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
+        dorisAssert.withTable(aggregateTable);
+        String createRollupSQL = "alter table agg_table add rollup old_key (k1, k2) "
+                + "properties ('replication_num' = '1');";
+        String query = "select k1, k2 from agg_table;";
+        dorisAssert.withRollup(createRollupSQL).query(query).explainContains("OFF", "old_key");
+    }
 }
diff --git a/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
index 12aa5ae..25d3548 100644
--- a/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
+++ b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java
@@ -27,6 +27,8 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.jmockit.Deencapsulation;
@@ -35,6 +37,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.lang3.builder.ToStringExclude;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -231,7 +234,8 @@ public class MaterializedViewSelectorTest {
 
         MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer);
         Deencapsulation.setField(selector, "isSPJQuery", false);
-        Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, candidateIndexIdToSchema);
+        Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, candidateIndexIdToSchema,
+                KeysType.DUP_KEYS);
         Assert.assertEquals(2, candidateIndexIdToSchema.size());
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1)));
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2)));
@@ -318,37 +322,50 @@ public class MaterializedViewSelectorTest {
     }
 
     @Test
-    public void testCompensateIndex(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) {
+    public void testCompensateIndex(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer,
+            @Injectable OlapTable table) {
         Map<Long, List<Column>> candidateIndexIdToSchema = Maps.newHashMap();
         Map<Long, List<Column>> allVisibleIndexes = Maps.newHashMap();
         List<Column> index1Columns = Lists.newArrayList();
-        Column index1Column1 = new Column("c2", Type.INT, true, null, true, "", "");
+        Column index1Column1 = new Column("c2", Type.INT, true, AggregateType.SUM, true, "", "");
         index1Columns.add(index1Column1);
         allVisibleIndexes.put(new Long(1), index1Columns);
         List<Column> index2Columns = Lists.newArrayList();
         Column index2Column1 = new Column("c1", Type.INT, true, null, true, "", "");
         index2Columns.add(index2Column1);
-        Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.NONE, true, "", "");
+        Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.SUM, true, "", "");
         index2Columns.add(index2Column2);
         allVisibleIndexes.put(new Long(2), index2Columns);
         List<Column> index3Columns = Lists.newArrayList();
-        Column index3Column1 = new Column("C2", Type.INT, true, null, true, "", "");
+        Column index3Column1 = new Column("c1", Type.INT, true, null, true, "", "");
         index3Columns.add(index3Column1);
-        Column index3Column2 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", "");
+        Column index3Column2 = new Column("c3", Type.INT, false, AggregateType.SUM, true, "", "");
         index3Columns.add(index3Column2);
         allVisibleIndexes.put(new Long(3), index3Columns);
+        List<Column> keyColumns = Lists.newArrayList();
+        keyColumns.add(index2Column1);
         new Expectations() {
             {
                 selectStmt.getAggInfo();
                 result = null;
                 selectStmt.getResultExprs();
                 result = Lists.newArrayList();
+                table.getBaseIndexId();
+                result = -1L;
+                table.getKeyColumnsByIndexId(-1L);
+                result = keyColumns;
+                table.getKeyColumnsByIndexId(1L);
+                result = Lists.newArrayList();
+                table.getKeyColumnsByIndexId(2L);
+                result = keyColumns;
+                table.getKeyColumnsByIndexId(3L);
+                result = keyColumns;
             }
         };
 
         MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer);
-        Deencapsulation.invoke(selector, "compensateIndex", candidateIndexIdToSchema,
-                               allVisibleIndexes, 2);
+        Deencapsulation.invoke(selector, "compensateCandidateIndex", candidateIndexIdToSchema,
+                               allVisibleIndexes, table);
         Assert.assertEquals(2, candidateIndexIdToSchema.size());
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2)));
         Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3)));
diff --git a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
index 02de02a..048599c 100644
--- a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
+++ b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java
@@ -18,10 +18,13 @@
 package org.apache.doris.utframe;
 
 import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.analysis.AlterClause;
+import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
@@ -82,19 +85,34 @@ public class DorisAssert {
         CreateMaterializedViewStmt createMaterializedViewStmt =
                 (CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
         Catalog.getCurrentCatalog().createMaterializedView(createMaterializedViewStmt);
+        checkAlterJob();
+        // waiting table state to normal
+        Thread.sleep(100);
+        return this;
+    }
+
+    // Add rollup
+    public DorisAssert withRollup(String sql) throws Exception {
+        AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
+        Catalog.getCurrentCatalog().alterTable(alterTableStmt);
+        checkAlterJob();
+        // waiting table state to normal
+        Thread.sleep(100);
+        return this;
+    }
+
+    private void checkAlterJob() throws InterruptedException {
         // check alter job
         Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
         for (AlterJobV2 alterJobV2 : alterJobs.values()) {
             while (!alterJobV2.getJobState().isFinalState()) {
-                System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
-                Thread.sleep(5000);
+                System.out.println("alter job " + alterJobV2.getDbId()
+                        + " is running. state: " + alterJobV2.getJobState());
+                Thread.sleep(100);
             }
             System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
             Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
         }
-        // waiting table state to normal
-        Thread.sleep(5000);
-        return this;
     }
 
     public QueryAssert query(String sql) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org