You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/17 15:08:22 UTC

[doris] branch master updated: [fix](MTMV) Refine the process of refreshing data (#16006)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 388d623506 [fix](MTMV) Refine the process of refreshing data (#16006)
388d623506 is described below

commit 388d6235069eacafa8ea8ebe3b41eb42730805fe
Author: Adonis Ling <ad...@gmail.com>
AuthorDate: Tue Jan 17 23:08:12 2023 +0800

    [fix](MTMV) Refine the process of refreshing data (#16006)
    
    1. Remove some redundant code.
    2. Fix the issue with the state of MTMV task.
    3. Fix the case - test_create_mtmv.
    
    ## Problem summary
    
    1. We used a retry policy to re-run the failed MTMV tasks, but we set the state to `FAILURE` during re-running the tasks.
    We should do this after all the retry runs fail.
    2. There are some redundant code can be removed.
    3. In the case test_create_mtmv, we created many background tasks to refresh the data. Some task may fail due to the concurrency and cause the test fail. Actually, we only need single one task to verify the functionality.
---
 .../main/java/org/apache/doris/alter/Alter.java    |  30 +--
 .../org/apache/doris/analysis/MVRefreshInfo.java   |   2 +-
 .../org/apache/doris/catalog/MaterializedView.java |  26 +-
 .../java/org/apache/doris/mtmv/MTMVJobFactory.java |   8 +-
 .../java/org/apache/doris/mtmv/MTMVJobManager.java |   4 +-
 .../org/apache/doris/mtmv/MTMVTaskExecutor.java    |  16 +-
 .../apache/doris/mtmv/MTMVTaskExecutorPool.java    |  33 ++-
 .../org/apache/doris/mtmv/MTMVTaskManager.java     |  29 +--
 .../org/apache/doris/mtmv/MTMVTaskProcessor.java   | 272 +++++++--------------
 .../main/java/org/apache/doris/mtmv/MTMVUtils.java |   2 +-
 .../org/apache/doris/mtmv/metadata/MTMVJob.java    |  12 +-
 .../org/apache/doris/mtmv/metadata/MTMVTask.java   |  12 +-
 .../org/apache/doris/mtmv/MTMVJobManagerTest.java  |   2 +-
 .../apache/doris/mtmv/MTMVTaskExecutorTest.java    |  11 +-
 .../java/org/apache/doris/mtmv/MTMVUtilsTest.java  |  12 +-
 .../suites/mtmv_p0/test_create_mtmv.groovy         |  10 +-
 16 files changed, 201 insertions(+), 280 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index d9b7b9bd12..e47021f0cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -137,29 +137,17 @@ public class Alter {
         if (!stmt.isForMTMV() && stmt.getTableName() == null) {
             throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql());
         }
-        TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName();
-        Database db;
-        OlapTable olapTable;
-        if (stmt.isIfExists()) {
-            try {
-                String dbName = tableName.getDb();
-                db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-                String name = tableName.getTbl();
-                olapTable = (OlapTable) db.getTableOrMetaException(name,
-                        !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-            } catch (Exception e) {
-                LOG.info("db or table not exists, msg={}", e.getMessage());
-                return;
-            }
-        } else {
-            String dbName = tableName.getDb();
-            db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-            String name = tableName.getTbl();
-            olapTable = (OlapTable) db.getTableOrMetaException(name,
-                    !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW);
-        }
+
         // drop materialized view
         if (!stmt.isForMTMV()) {
+            TableName tableName = stmt.getTableName();
+
+            // check db
+            String dbName = tableName.getDb();
+            Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+            String name = tableName.getTbl();
+            OlapTable olapTable = (OlapTable) db.getTableOrMetaException(name, TableType.OLAP);
             ((MaterializedViewHandler) materializedViewHandler).processDropMaterializedView(stmt, db, olapTable);
         } else {
             DropTableStmt dropTableStmt = new DropTableStmt(stmt.isIfExists(), stmt.getMTMVName(), false);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java
index f2a49ee335..93d7389dae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java
@@ -37,7 +37,7 @@ public class MVRefreshInfo {
     }
 
     public MVRefreshInfo(RefreshMethod method, MVRefreshTriggerInfo trigger) {
-        this(false, method, trigger);
+        this(trigger == null, method, trigger);
     }
 
     public MVRefreshInfo(boolean neverRefresh, RefreshMethod method, MVRefreshTriggerInfo trigger) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java
index 1f1763c29f..72a810eeee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java
@@ -20,13 +20,19 @@ package org.apache.doris.catalog;
 import org.apache.doris.analysis.MVRefreshInfo;
 import org.apache.doris.analysis.MVRefreshInfo.BuildMode;
 import org.apache.doris.catalog.OlapTableFactory.MaterializedViewParams;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
@@ -42,7 +48,7 @@ public class MaterializedView extends OlapTable {
 
     private final ReentrantLock mvTaskLock = new ReentrantLock(true);
 
-    public boolean tryMvTaskLock() {
+    public boolean tryLockMVTask() {
         try {
             return mvTaskLock.tryLock(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -50,7 +56,7 @@ public class MaterializedView extends OlapTable {
         }
     }
 
-    public void mvTaskUnLock() {
+    public void unLockMVTask() {
         this.mvTaskLock.unlock();
     }
 
@@ -100,4 +106,20 @@ public class MaterializedView extends OlapTable {
         query = materializedView.query;
         buildMode = materializedView.buildMode;
     }
+
+    public MaterializedView clone(String mvName) throws IOException {
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeConstants.meta_version);
+        metaContext.setThreadLocalInfo();
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream(256);
+            MaterializedView cloned = new MaterializedView();
+            this.write(new DataOutputStream(out));
+            cloned.readFields(new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+            cloned.setName(mvName);
+            return cloned;
+        } finally {
+            MetaContext.remove();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
index c7ad2479df..5af572304e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
@@ -73,8 +73,8 @@ public class MTMVJobFactory {
         MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
         job.setTriggerMode(TriggerMode.PERIODICAL);
         job.setSchedule(genJobSchedule(materializedView));
-        job.setDbName(dbName);
-        job.setMvName(materializedView.getName());
+        job.setDBName(dbName);
+        job.setMVName(materializedView.getName());
         job.setQuery(materializedView.getQuery());
         job.setCreateTime(MTMVUtils.getNowTimeStamp());
         return job;
@@ -84,8 +84,8 @@ public class MTMVJobFactory {
         String uid = UUID.randomUUID().toString();
         MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
         job.setTriggerMode(TriggerMode.ONCE);
-        job.setDbName(dbName);
-        job.setMvName(materializedView.getName());
+        job.setDBName(dbName);
+        job.setMVName(materializedView.getName());
         job.setQuery(materializedView.getQuery());
         job.setCreateTime(MTMVUtils.getNowTimeStamp());
         return job;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index ca92bd48da..fe77ea4015 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -285,14 +285,14 @@ public class MTMVJobManager {
         if (dbName == null) {
             jobList.addAll(nameToJobMap.values());
         } else {
-            jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDbName().equals(dbName))
+            jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDBName().equals(dbName))
                     .collect(Collectors.toList()));
         }
         return jobList.stream().sorted().collect(Collectors.toList());
     }
 
     public List<MTMVJob> showJobs(String dbName, String mvName) {
-        return showJobs(dbName).stream().filter(u -> u.getMvName().equals(mvName)).collect(Collectors.toList());
+        return showJobs(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
     }
 
     private boolean tryLock() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
index 1fb6128acd..7fddfd91a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
@@ -19,9 +19,8 @@ package org.apache.doris.mtmv;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
 import org.apache.doris.qe.ConnectContext;
@@ -98,7 +97,9 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
         MTMVTaskContext taskContext = new MTMVTaskContext();
         taskContext.setQuery(task.getQuery());
         ctx = new ConnectContext();
-        ctx.setDatabase(job.getDbName());
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDBName()));
+        ctx.setDatabase(job.getDBName());
         ctx.setQualifiedUser(task.getUser());
         ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
         ctx.getState().reset();
@@ -113,10 +114,7 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
 
         Map<String, String> properties = Maps.newHashMap();
         taskContext.setProperties(properties);
-        processor.process(taskContext);
-        ChangeMTMVTask changeTask = new ChangeMTMVTask(job.getId(), task, TaskState.RUNNING, task.getState());
-        Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
-        return task.getState() == TaskState.SUCCESS;
+        return processor.process(taskContext);
     }
 
     public ConnectContext getCtx() {
@@ -136,9 +134,9 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
         } else {
             task.setCreateTime(createTime);
         }
-        task.setMvName(job.getMvName());
+        task.setMVName(job.getMVName());
         task.setUser(job.getUser());
-        task.setDbName(job.getDbName());
+        task.setDBName(job.getDBName());
         task.setQuery(job.getQuery());
         task.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_task_expired);
         task.setRetryTimes(job.getRetryPolicy().getTimes());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
index b069a2f2af..419412b3c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.mtmv;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.mtmv.MTMVUtils.TaskState;
+import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
 import org.apache.doris.mtmv.metadata.MTMVTask;
 
 import org.apache.logging.log4j.LogManager;
@@ -26,9 +28,11 @@ import org.apache.logging.log4j.Logger;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class MTMVTaskExecutorPool {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskExecutorPool.class);
+    private static final long RETRY_INTERVAL = TimeUnit.SECONDS.toMillis(30);
     private final ExecutorService taskPool = Executors.newCachedThreadPool();
 
     public void executeTask(MTMVTaskExecutor taskExecutor) {
@@ -39,7 +43,7 @@ public class MTMVTaskExecutorPool {
         if (task == null) {
             return;
         }
-        if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILED) {
+        if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) {
             LOG.warn("Task {} is in final status {} ", task.getTaskId(), task.getState());
             return;
         }
@@ -53,20 +57,31 @@ public class MTMVTaskExecutorPool {
                     isSuccess = taskExecutor.executeTask();
                     if (isSuccess) {
                         task.setState(TaskState.SUCCESS);
-                    } else {
-                        task.setState(TaskState.FAILED);
+                        break;
                     }
-                } catch (Exception ex) {
-                    LOG.warn("failed to execute task.", ex);
-                } finally {
-                    task.setFinishTime(MTMVUtils.getNowTimeStamp());
+                } catch (Throwable t) {
+                    LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t);
                 }
                 retryTimes--;
-            } while (!isSuccess && retryTimes >= 0);
+
+                if (retryTimes > 0) {
+                    try {
+                        Thread.sleep(RETRY_INTERVAL);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Failed to sleep.", e);
+                        break;
+                    }
+                }
+            } while (!isSuccess && retryTimes > 0);
             if (!isSuccess) {
-                task.setState(TaskState.FAILED);
+                task.setState(TaskState.FAILURE);
                 task.setErrorCode(-1);
             }
+            task.setFinishTime(MTMVUtils.getNowTimeStamp());
+
+            ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
+                    task.getState());
+            Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
         });
         taskExecutor.setFuture(future);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index e3682ab5cd..6cb90c262d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -278,20 +278,20 @@ public class MTMVTaskManager {
         } else {
             for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
                 taskList.addAll(
-                        pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDbName().equals(dbName))
+                        pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
                                 .collect(Collectors.toList()));
             }
             taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
-                    .filter(u -> u.getDbName().equals(dbName)).collect(Collectors.toList()));
+                    .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
             taskList.addAll(
-                    getAllHistory().stream().filter(u -> u.getDbName().equals(dbName)).collect(Collectors.toList()));
+                    getAllHistory().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
 
         }
         return taskList.stream().sorted().collect(Collectors.toList());
     }
 
     public List<MTMVTask> showTasks(String dbName, String mvName) {
-        return showTasks(dbName).stream().filter(u -> u.getMvName().equals(mvName)).collect(Collectors.toList());
+        return showTasks(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
     }
 
     public MTMVTask getTask(String taskId) throws AnalysisException {
@@ -307,7 +307,7 @@ public class MTMVTaskManager {
     }
 
     public void replayCreateJobTask(MTMVTask task) {
-        if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILED) {
+        if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) {
             if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) {
                 return;
             }
@@ -326,10 +326,10 @@ public class MTMVTaskManager {
                 arrangeToPendingTask(taskExecutor);
                 break;
             case RUNNING:
-                task.setState(TaskState.FAILED);
+                task.setState(TaskState.FAILURE);
                 addHistory(task);
                 break;
-            case FAILED:
+            case FAILURE:
             case SUCCESS:
                 addHistory(task);
                 break;
@@ -360,16 +360,17 @@ public class MTMVTaskManager {
                     status.setState(TaskState.RUNNING);
                     getRunningTaskMap().put(jobId, pendingTask);
                 }
-            } else if (toStatus == TaskState.FAILED) {
+            } else if (toStatus == TaskState.FAILURE) {
                 status.setMessage(changeTask.getErrorMessage());
                 status.setErrorCode(changeTask.getErrorCode());
-                status.setState(TaskState.FAILED);
+                status.setState(TaskState.FAILURE);
                 addHistory(status);
             }
             if (taskQueue.size() == 0) {
                 getPendingTaskMap().remove(jobId);
             }
-        } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS || toStatus == TaskState.FAILED)) {
+        } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS
+                || toStatus == TaskState.FAILURE)) {
             MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId);
             if (runningTask == null) {
                 return;
@@ -425,10 +426,10 @@ public class MTMVTaskManager {
                     MTMVTaskExecutor taskExecutor = tasks.poll();
                     taskExecutor.getTask().setMessage("Fe abort the task");
                     taskExecutor.getTask().setErrorCode(-1);
-                    taskExecutor.getTask().setState(TaskState.FAILED);
+                    taskExecutor.getTask().setState(TaskState.FAILURE);
                     addHistory(taskExecutor.getTask());
                     changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.PENDING,
-                            TaskState.FAILED);
+                            TaskState.FAILURE);
                 }
                 pendingIter.remove();
             }
@@ -437,12 +438,12 @@ public class MTMVTaskManager {
                 MTMVTaskExecutor taskExecutor = getRunningTaskMap().get(runningIter.next());
                 taskExecutor.getTask().setMessage("Fe abort the task");
                 taskExecutor.getTask().setErrorCode(-1);
-                taskExecutor.getTask().setState(TaskState.FAILED);
+                taskExecutor.getTask().setState(TaskState.FAILURE);
                 taskExecutor.getTask().setFinishTime(MTMVUtils.getNowTimeStamp());
                 runningIter.remove();
                 addHistory(taskExecutor.getTask());
                 changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING,
-                        TaskState.FAILED);
+                        TaskState.FAILURE);
             }
         } finally {
             unlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java
index 5ba0bccaaf..3e133af6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java
@@ -17,31 +17,20 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.StatementBase;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.StringReader;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -49,209 +38,118 @@ import java.util.concurrent.atomic.AtomicLong;
 public class MTMVTaskProcessor {
     private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class);
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
-    private ConnectContext context;
 
-    void process(MTMVTaskContext context) throws Exception {
+    boolean process(MTMVTaskContext context) throws Exception {
         String taskId = context.getTask().getTaskId();
         long jobId = context.getJob().getId();
-        LOG.info("run mtmv logic start, task_id:{}, jobid:{}", taskId, jobId);
-        String tableName = context.getTask().getMvName();
-        String tmpTableName = genTmpTableName(tableName);
-        DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
-                .getDbOrAnalysisException(context.getTask().getDbName());
-        MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName);
-        if (!table.tryMvTaskLock()) {
-            LOG.warn("run mtmv task  failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail");
-            return;
+        LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId, jobId);
+
+        String mvName = context.getTask().getMVName();
+        String temporaryMVName = getTemporaryMVName(mvName);
+        Database db = context.getCtx().getEnv().getInternalCatalog()
+                .getDbOrMetaException(context.getTask().getDBName());
+        MaterializedView mv = (MaterializedView) db.getTableOrAnalysisException(mvName);
+
+        if (!mv.tryLockMVTask()) {
+            LOG.warn("Failed to run the MTMV task, taskId={}, jobId={}, msg={}.", taskId, jobId,
+                    "Failed to get the lock");
+            context.getTask().setMessage("Failed to get the lock.");
+            return false;
         }
         try {
-            //step1 create tmp table
-            String tmpCreateTableStmt = genCreateTempMaterializedViewStmt(context, tableName, tmpTableName);
-            //check whther tmp table exists, if exists means run mtmv task failed before, so need to drop it first
-            if (db.isTableExist(tmpTableName)) {
-                String dropStml = genDropStml(context, tmpTableName);
-                ConnectContext dropResult = execSQL(context, dropStml);
-                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
-                        dropResult.getState(), dropResult.getState().getInfoMessage());
-            }
-            ConnectContext createTempTableResult = execSQL(context, tmpCreateTableStmt);
-            LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, tmpCreateTableStmt,
-                    createTempTableResult.getState(), createTempTableResult.getState().getInfoMessage());
-            if (createTempTableResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("create tmp table failed, sql:" + tmpCreateTableStmt);
+            // Check whether the temporary materialized view exists, we should drop the obsolete materialized view first
+            // because it was created by previous tasks which failed to complete their work.
+            dropMaterializedView(context, temporaryMVName);
+
+            // Step 1: create the temporary materialized view.
+            String createStatement = generateCreateStatement(mv.clone(temporaryMVName));
+            if (!executeSQL(context, createStatement)) {
+                throw new RuntimeException(
+                        "Failed to create the temporary materialized view, sql=" + createStatement + ".");
             }
 
-            //step2 insert data to tmp table
-            String insertStmt = genInsertIntoStmt(context, tmpTableName);
-            ConnectContext insertDataResult = execSQL(context, insertStmt);
-            LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{}, msg:{}, effected_row:{}", taskId, insertStmt,
-                    insertDataResult.getState(), insertDataResult.getState().getInfoMessage(),
-                    insertDataResult.getState().getAffectedRows());
-            if (insertDataResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("insert data failed, sql:" + insertStmt);
+            // Step 2: insert data to the temporary materialized view.
+            String insertSelectStatement = generateInsertSelectStmt(context, temporaryMVName);
+            if (!executeSQL(context, insertSelectStatement)) {
+                throw new RuntimeException(
+                        "Failed to insert data to the temporary materialized view, sql=" + insertSelectStatement + ".");
             }
-
-            //step3 swap tmp table with origin table
-            String swapStmt = genSwapStmt(context, tableName, tmpTableName);
-            ConnectContext swapResult = execSQL(context, swapStmt);
-            LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, swapStmt, swapResult.getState(),
-                    swapResult.getState().getInfoMessage());
-            if (swapResult.getState().getStateType() != QueryState.MysqlStateType.OK) {
-                throw new Throwable("swap table failed, sql:" + swapStmt);
+            String insertInfoMessage = context.getCtx().getState().getInfoMessage();
+
+            // Step 3: swap the temporary materialized view with the original materialized view.
+            String swapStatement = generateSwapStatement(mvName, temporaryMVName);
+            if (!executeSQL(context, swapStatement)) {
+                throw new RuntimeException(
+                        "Failed to swap the temporary materialized view with the original materialized view, sql="
+                                + swapStatement + ".");
             }
-            //step4 update task info
-            context.getTask().setMessage(insertDataResult.getState().getInfoMessage());
-            context.getTask().setState(TaskState.SUCCESS);
-            LOG.info("run mtmv task success, task_id:{},jobid:{}", taskId, jobId);
-        } catch (AnalysisException e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+
+            context.getTask().setMessage(insertInfoMessage);
+            LOG.info("Run MTMV task successfully, taskId={}, jobId={}.", taskId, jobId);
+            return true;
         } catch (Throwable e) {
-            LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage());
-            context.getTask().setMessage("run task failed, caused by " + e.getMessage());
-            context.getTask().setState(TaskState.FAILED);
+            context.getTask().setMessage(e.getMessage());
+            throw e;
         } finally {
-            context.getTask().setFinishTime(MTMVUtils.getNowTimeStamp());
-            table.mvTaskUnLock();
-            //double check
-            if (db.isTableExist(tmpTableName)) {
-                String dropStml = genDropStml(context, tmpTableName);
-                ConnectContext dropResult = execSQL(context, dropStml);
-                LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml,
-                        dropResult.getState(), dropResult.getState().getInfoMessage());
-            }
+            mv.unLockMVTask();
+            dropMaterializedView(context, temporaryMVName);
         }
     }
 
-    private String genDropStml(MTMVTaskContext context, String tableName) {
-        String stmt = "DROP MATERIALIZED VIEW  if exists " + tableName;
-        LOG.info("gen drop stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
-        return stmt;
-    }
-
-    private String genTmpTableName(String tableName) {
-        String tmpTableName = FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + tableName;
-        return tmpTableName;
-    }
-
-    // ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false');
-    private String genSwapStmt(MTMVTaskContext context, String tableName, String tmpTableName) {
-        String stmt = "ALTER TABLE " + tableName + " REPLACE WITH TABLE " + tmpTableName
-                + " PROPERTIES('swap' = 'false');";
-        LOG.info("gen swap stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
-        return stmt;
-    }
-
-    private String genInsertIntoStmt(MTMVTaskContext context, String tmpTableName) {
-        String query = context.getQuery();
-        String stmt = "insert into " + tmpTableName + " " + query;
-        stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", "");
-        LOG.info("gen insert into stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt);
-        return stmt;
+    private String getTemporaryMVName(String mvName) {
+        return FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + mvName;
     }
 
-    private String genCreateTempMaterializedViewStmt(MTMVTaskContext context, String tableName, String tmpTableName) {
-        try {
-            String dbName = context.getTask().getDbName();
-            String originViewStmt = getCreateViewStmt(dbName, tableName);
-            String tmpViewStmt = convertCreateViewStmt(originViewStmt, tmpTableName);
-            LOG.info("gen tmp table stmt, taskid:{}, originstml:{},  stmt:{}", context.getTask().getTaskId(),
-                    originViewStmt.replaceAll("\n", " "), tmpViewStmt);
-            return tmpViewStmt;
-        } catch (Throwable e) {
-            LOG.warn("fail to gen tmp table stmt, taskid:{}, msg:{}", context.getTask().getTaskId(), e.getMessage());
-            return "";
+    private void dropMaterializedView(MTMVTaskContext context, String mvName) {
+        String dropStatement = generateDropStatement(mvName);
+        if (!executeSQL(context, dropStatement)) {
+            throw new RuntimeException(
+                    "Failed to drop the temporary materialized view, sql=" + dropStatement + ".");
         }
     }
 
-    //Generate temporary view table statement
-    private String convertCreateViewStmt(String stmt, String tmpTable) {
-        stmt = stmt.replace("`", "");
-        String regex = "CREATE MATERIALIZED VIEW.*\n";
-        String replacement = "CREATE MATERIALIZED VIEW " + tmpTable + "\n";
-        stmt = stmt.replaceAll(regex, replacement);
-        // regex = "BUILD.*\n";
-        // stmt = stmt.replaceAll(regex, " BUILD deferred never REFRESH \n");
-        stmt = stmt.replaceAll("\n", " ");
-        stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", "");
-        return stmt;
+    private String generateDropStatement(String mvName) {
+        return "DROP MATERIALIZED VIEW IF EXISTS " + mvName;
     }
 
-    // get origin table create stmt from env
-    private String getCreateViewStmt(String dbName, String tableName) throws AnalysisException {
-        ConnectContext ctx = new ConnectContext();
-        ctx.setEnv(Env.getCurrentEnv());
-        DatabaseIf db = ctx.getEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
-                .getDbOrAnalysisException(dbName);
-        TableIf table = db.getTableOrAnalysisException(tableName);
-        table.readLock();
-        try {
-            List<String> createTableStmt = Lists.newArrayList();
-            Env.getDdlStmt(table, createTableStmt, null, null, false, true /* hide password */, -1L);
-            if (createTableStmt.isEmpty()) {
-                return "";
-            }
-            return createTableStmt.get(0);
-        } catch (Throwable e) {
-            //throw new AnalysisException(e.getMessage());
-        } finally {
-            table.readUnlock();
-        }
-        return "";
-    }
-
-    private ConnectContext execSQL(MTMVTaskContext context, String originStmt) throws AnalysisException, DdlException {
-        ConnectContext ctx = new ConnectContext();
-        ctx.setEnv(Env.getCurrentEnv());
-        ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+    private boolean executeSQL(MTMVTaskContext context, String sql) {
+        ConnectContext ctx = context.getCtx();
         ctx.setThreadLocalInfo();
-        String fullDbName = ClusterNamespace
-                .getFullName(SystemInfoService.DEFAULT_CLUSTER, context.getTask().getDbName());
-        ctx.setDatabase(fullDbName);
-        ctx.setQualifiedUser("root");
-        ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp("root", "%"));
         ctx.getState().reset();
-
-        List<StatementBase> stmts = null;
-        StatementBase parsedStmt = null;
-        stmts = parse(ctx, originStmt);
-        parsedStmt = stmts.get(0);
         try {
-            StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
+            StmtExecutor executor = new StmtExecutor(ctx, sql);
             ctx.setExecutor(executor);
             executor.execute();
         } catch (Throwable e) {
-            LOG.warn("execSQL failed, taskid:{}, msg:{}, stmt:{}", context.getTask().getTaskId(), e.getMessage(),
-                    originStmt);
+            QueryState queryState = new QueryState();
+            queryState.setError(ErrorCode.ERR_INTERNAL_ERROR, e.getMessage());
+            ctx.setState(queryState);
         } finally {
-            LOG.debug("execSQL succ, taskid:{}, stmt:{}", context.getTask().getTaskId(), originStmt);
+            ConnectContext.remove();
         }
-        return ctx;
-    }
 
-    private List<StatementBase> parse(ConnectContext ctx, String originStmt) throws AnalysisException, DdlException {
-        // Parse statement with parser generated by CUP&FLEX
-        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
-        SqlParser parser = new SqlParser(input);
-        try {
-            return SqlParserUtils.getMultiStmts(parser);
-        } catch (Error e) {
-            throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
-        } catch (AnalysisException | DdlException e) {
-            String errorMessage = parser.getErrorMsg(originStmt);
-            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
-            if (errorMessage == null) {
-                throw e;
-            } else {
-                throw new AnalysisException(errorMessage, e);
-            }
-        } catch (ArrayStoreException e) {
-            throw new AnalysisException("Sql parser can't convert the result to array, please check your sql.", e);
-        } catch (Exception e) {
-            // TODO(lingbin): we catch 'Exception' to prevent unexpected error,
-            // should be removed this try-catch clause future.
-            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
+        if (ctx.getState().getStateType() == MysqlStateType.OK) {
+            LOG.info("Execute SQL successfully, taskId={}, sql={}.", context.getTask().getTaskId(), sql);
+        } else {
+            LOG.warn("Failed to execute SQL, taskId={}, sql={}, errorCode={}, message={}.",
+                    context.getTask().getTaskId(),
+                    sql, ctx.getState().getErrorCode(), ctx.getState().getErrorMessage());
         }
+        return ctx.getState().getStateType() == MysqlStateType.OK;
+    }
+
+    private String generateCreateStatement(MaterializedView mv) {
+        List<String> createStatement = Lists.newArrayList();
+        Env.getDdlStmt(mv, createStatement, null, null, false, true /* hide password */, -1L);
+        return createStatement.stream().findFirst().orElse("");
+    }
+
+    private String generateInsertSelectStmt(MTMVTaskContext context, String temporaryMVName) {
+        return "INSERT INTO " + temporaryMVName + " " + context.getQuery();
+    }
+
+    // ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false');
+    private String generateSwapStatement(String mvName, String temporaryMVName) {
+        return "ALTER TABLE " + mvName + " REPLACE WITH TABLE " + temporaryMVName + " PROPERTIES('swap' = 'false')";
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
index 027aadfc1b..874eb0510c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java
@@ -60,7 +60,7 @@ public class MTMVUtils {
     }
 
     public enum TaskState {
-        PENDING, RUNNING, FAILED, SUCCESS,
+        PENDING, RUNNING, FAILURE, SUCCESS,
     }
 
     enum TaskSubmitStatus {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
index 6ed264ce51..a8dd62b4a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
@@ -150,19 +150,19 @@ public class MTMVJob implements Writable, Comparable {
         this.createTime = createTime;
     }
 
-    public String getDbName() {
+    public String getDBName() {
         return dbName;
     }
 
-    public void setDbName(String dbName) {
+    public void setDBName(String dbName) {
         this.dbName = dbName;
     }
 
-    public String getMvName() {
+    public String getMVName() {
         return mvName;
     }
 
-    public void setMvName(String mvName) {
+    public void setMVName(String mvName) {
         this.mvName = mvName;
     }
 
@@ -287,8 +287,8 @@ public class MTMVJob implements Writable, Comparable {
         list.add(getName());
         list.add(getTriggerMode().toString());
         list.add(getSchedule() == null ? "NULL" : getSchedule().toString());
-        list.add(getDbName());
-        list.add(getMvName());
+        list.add(getDBName());
+        list.add(getMVName());
         list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery());
         list.add(getUser());
         list.add(getRetryPolicy().toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java
index 98b0566643..603e98a560 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java
@@ -117,19 +117,19 @@ public class MTMVTask implements Writable, Comparable {
         this.state = state;
     }
 
-    public String getDbName() {
+    public String getDBName() {
         return dbName;
     }
 
-    public void setDbName(String dbName) {
+    public void setDBName(String dbName) {
         this.dbName = dbName;
     }
 
-    public String getMvName() {
+    public String getMVName() {
         return mvName;
     }
 
-    public void setMvName(String mvName) {
+    public void setMVName(String mvName) {
         this.mvName = mvName;
     }
 
@@ -222,8 +222,8 @@ public class MTMVTask implements Writable, Comparable {
         List<String> list = Lists.newArrayList();
         list.add(getTaskId());
         list.add(getJobName());
-        list.add(getDbName());
-        list.add(getMvName());
+        list.add(getDBName());
+        list.add(getMVName());
         list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery());
         list.add(getUser());
         list.add(Integer.toString(getPriority()));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 9985669449..c0545000a5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -125,7 +125,7 @@ public class MTMVJobManagerTest extends TestWithFeService {
         // index 7: RetryTimes
         Assertions.assertEquals("0", taskRow.get(7));
         // index 8: State
-        Assertions.assertEquals("FAILED", taskRow.get(8));
+        Assertions.assertEquals("FAILURE", taskRow.get(8));
         // index 9: Message
         Assertions.assertEquals("", taskRow.get(9));
         // index 10: ErrorCode
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
index 9756dd5db2..bee3c2e8bd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
@@ -38,7 +38,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
-        Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState());
+        Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState());
     }
 
 
@@ -51,7 +51,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
-        Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState());
+        Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState());
         //Assertions.assertEquals("java.lang.Exception: my define error 1", executor.getTask().getMessage());
     }
 
@@ -67,7 +67,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
-        Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState());
+        Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState());
     }
 
     @Test
@@ -82,7 +82,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
-        Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState());
+        Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState());
         //Assertions.assertEquals("java.lang.Exception: my define error 4", executor.getTask().getMessage());
     }
 
@@ -94,11 +94,12 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
             this.times = times;
         }
 
-        void process(MTMVTaskContext context) throws Exception {
+        boolean process(MTMVTaskContext context) throws Exception {
             if (runTimes < times) {
                 runTimes++;
                 throw new Exception("my define error " + runTimes);
             }
+            return true;
         }
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
index 76a96f45f3..5a81173715 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
@@ -40,17 +40,17 @@ public class MTMVUtilsTest {
 
     public static MTMVJob createDummyJob() {
         MTMVJob job = new MTMVJob("dummy");
-        job.setDbName(dbName);
-        job.setMvName(MV_NAME);
+        job.setDBName(dbName);
+        job.setMVName(MV_NAME);
         return job;
     }
 
     public static MTMVJob createOnceJob() {
         MTMVJob job = new MTMVJob("");
         job.setTriggerMode(TriggerMode.ONCE);
-        job.setDbName(dbName);
+        job.setDBName(dbName);
         job.setName(O_JOB);
-        job.setMvName(MV_NAME);
+        job.setMVName(MV_NAME);
         return job;
     }
 
@@ -59,9 +59,9 @@ public class MTMVUtilsTest {
         JobSchedule jobSchedule = new JobSchedule(System.currentTimeMillis() / 1000, 1, TimeUnit.SECONDS);
         job.setSchedule(jobSchedule);
         job.setTriggerMode(TriggerMode.PERIODICAL);
-        job.setDbName(dbName);
+        job.setDBName(dbName);
         job.setName(S_JOB);
-        job.setMvName(MV_NAME);
+        job.setMVName(MV_NAME);
         return job;
     }
 
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
index 9d50dd3286..6d3966614e 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
@@ -58,10 +58,7 @@ suite("test_create_mtmv") {
     """
     sql """
         CREATE MATERIALIZED VIEW ${mvName}
-        BUILD IMMEDIATE 
-        REFRESH COMPLETE 
-        START WITH "2022-10-27 19:35:00"
-        NEXT  60 second
+        BUILD IMMEDIATE REFRESH COMPLETE
         KEY(username)   
         DISTRIBUTED BY HASH (username)  buckets 1
         PROPERTIES ('replication_num' = '1') 
@@ -72,15 +69,16 @@ suite("test_create_mtmv") {
     def show_task_meta = sql_meta "SHOW MTMV TASK FROM ${dbName}"
     def index = show_task_meta.indexOf(['State', 'CHAR'])
     def query = "SHOW MTMV TASK FROM ${dbName}"
+    def show_task_result
     def state
     do {
-        def show_task_result = sql "${query}"
+        show_task_result = sql "${query}"
         state = show_task_result.last().get(index)
         println "The state of ${query} is ${state}"
         Thread.sleep(1000);
     } while (state.equals('PENDING') || state.equals('RUNNING'))
 
-    assertEquals('SUCCESS', state)
+    assertEquals 'SUCCESS', state, show_task_result.last().toString()
     order_qt_select "SELECT * FROM ${mvName}"
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org