You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/27 08:23:57 UTC

[doris] branch master updated: [feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b93b26b8c [feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)
7b93b26b8c is described below

commit 7b93b26b8cc395d652385975a16aab9571906a20
Author: zhangdong <49...@qq.com>
AuthorDate: Tue Jun 27 16:23:50 2023 +0800

    [feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)
---
 .../java/org/apache/doris/mtmv/MTMVJobFactory.java |   8 +-
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 352 +++++++--------------
 .../org/apache/doris/mtmv/MTMVTaskExecutor.java    |   6 +
 .../org/apache/doris/mtmv/MTMVTaskManager.java     | 345 +++++++++-----------
 .../org/apache/doris/mtmv/metadata/MTMVJob.java    |  62 +++-
 .../java/org/apache/doris/qe/ShowExecutor.java     |   2 +-
 .../org/apache/doris/mtmv/MTMVJobManagerTest.java  |  69 ++--
 .../apache/doris/mtmv/MTMVTaskExecutorTest.java    |  19 +-
 .../java/org/apache/doris/mtmv/MTMVUtilsTest.java  |  26 +-
 .../suites/mtmv_p0/test_refresh_mtmv.groovy        |   2 +-
 10 files changed, 388 insertions(+), 503 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
index ac771c4264..3ac142d063 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
 import org.apache.doris.analysis.MVRefreshInfo.RefreshTrigger;
 import org.apache.doris.analysis.MVRefreshIntervalTriggerInfo;
 import org.apache.doris.analysis.MVRefreshTriggerInfo;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
@@ -76,6 +78,7 @@ public class MTMVJobFactory {
     private static MTMVJob genPeriodicalJob(MaterializedView materializedView, String dbName) {
         String uid = UUID.randomUUID().toString();
         MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
+        job.setId(Env.getCurrentEnv().getNextId());
         job.setTriggerMode(TriggerMode.PERIODICAL);
         job.setSchedule(genJobSchedule(materializedView));
         job.setDBName(dbName);
@@ -88,11 +91,14 @@ public class MTMVJobFactory {
     public static MTMVJob genOnceJob(MaterializedView materializedView, String dbName) {
         String uid = UUID.randomUUID().toString();
         MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
+        job.setId(Env.getCurrentEnv().getNextId());
         job.setTriggerMode(TriggerMode.ONCE);
         job.setDBName(dbName);
         job.setMVName(materializedView.getName());
         job.setQuery(materializedView.getQuery());
-        job.setCreateTime(MTMVUtils.getNowTimeStamp());
+        long nowTimeStamp = MTMVUtils.getNowTimeStamp();
+        job.setCreateTime(nowTimeStamp);
+        job.setExpireTime(nowTimeStamp + Config.scheduler_mtmv_job_expired);
         return job;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index cae36ab3da..7295f40b60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedView;
 import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
@@ -30,16 +29,12 @@ import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.MetricLabel;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mtmv.MTMVUtils.JobState;
-import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus;
-import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
 import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVCheckpointData;
 import org.apache.doris.mtmv.metadata.MTMVJob;
-import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
 import org.apache.doris.mtmv.metadata.MTMVTask;
 import org.apache.doris.persist.gson.GsonUtils;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -55,10 +50,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class MTMVJobManager {
@@ -69,7 +63,6 @@ public class MTMVJobManager {
 
     private final Map<Long, MTMVJob> idToJobMap;
     private final Map<String, MTMVJob> nameToJobMap;
-    private final Map<Long, ScheduledFuture<?>> periodFutureMap;
 
     private final MTMVTaskManager taskManager;
 
@@ -77,16 +70,15 @@ public class MTMVJobManager {
 
     private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
 
-    private final ReentrantLock reentrantLock;
+    private final ReentrantReadWriteLock rwLock;
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
     public MTMVJobManager() {
         idToJobMap = Maps.newConcurrentMap();
         nameToJobMap = Maps.newConcurrentMap();
-        periodFutureMap = Maps.newConcurrentMap();
-        reentrantLock = new ReentrantLock(true);
-        taskManager = new MTMVTaskManager(this);
+        rwLock = new ReentrantReadWriteLock(true);
+        taskManager = new MTMVTaskManager();
     }
 
     public void start() {
@@ -104,19 +96,11 @@ public class MTMVJobManager {
             }
             cleanerScheduler.scheduleAtFixedRate(() -> {
                 if (!Env.getCurrentEnv().isMaster()) {
+                    LOG.warn("only master can run MTMVJob");
                     return;
                 }
-                if (!tryLock()) {
-                    return;
-                }
-                try {
-                    removeExpiredJobs();
-                    taskManager.removeExpiredTasks();
-                } catch (Exception ex) {
-                    LOG.warn("failed remove expired jobs and tasks.", ex);
-                } finally {
-                    unlock();
-                }
+                removeExpiredJobs();
+                taskManager.removeExpiredTasks();
             }, 0, 1, TimeUnit.MINUTES);
 
             taskManager.startTaskScheduler();
@@ -146,7 +130,13 @@ public class MTMVJobManager {
                 Metric.MetricUnit.NOUNIT, "Active job number of mtmv.") {
             @Override
             public Integer getValue() {
-                return periodFutureMap.size();
+                int result = 0;
+                for (MTMVJob job : getAllJobsWithLock()) {
+                    if (job.getState() == JobState.ACTIVE) {
+                        result++;
+                    }
+                }
+                return result;
             }
         };
         activeJob.addLabel(new MetricLabel("type", "ACTIVE-JOB"));
@@ -206,126 +196,35 @@ public class MTMVJobManager {
     }
 
     private void registerJobs() {
-        int num = nameToJobMap.size();
-        int periodNum = 0;
-        int onceNum = 0;
-        for (MTMVJob job : nameToJobMap.values()) {
-            if (!job.getState().equals(JobState.ACTIVE)) {
-                continue;
-            }
-            if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
-                JobSchedule schedule = job.getSchedule();
-                ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
-                        MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
-                periodFutureMap.put(job.getId(), future);
-                periodNum++;
-            } else if (job.getTriggerMode() == TriggerMode.ONCE) {
-                MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
-                submitJobTask(job.getName(), executeOption);
-                onceNum++;
-            }
+        for (MTMVJob job : getAllJobsWithLock()) {
+            job.start();
         }
-        LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num);
     }
 
     public void createJob(MTMVJob job, boolean isReplay) throws DdlException {
-        if (!tryLock()) {
-            throw new DdlException("Failed to get job manager lock when create Job [" + job.getName() + "]");
+        createJobWithLock(job, isReplay);
+        if (!isReplay) {
+            job.start();
         }
+    }
+
+    private void createJobWithLock(MTMVJob job, boolean isReplay) throws DdlException {
+        writeLock();
         try {
             if (nameToJobMap.containsKey(job.getName())) {
                 throw new DdlException("Job [" + job.getName() + "] already exists");
             }
+            nameToJobMap.put(job.getName(), job);
+            idToJobMap.put(job.getId(), job);
             if (!isReplay) {
-                Preconditions.checkArgument(job.getId() == 0);
-                job.setId(Env.getCurrentEnv().getNextId());
-            }
-            if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
-                JobSchedule schedule = job.getSchedule();
-                if (schedule == null) {
-                    throw new DdlException("Job [" + job.getName() + "] has no scheduling");
-                }
-                job.setState(JobState.ACTIVE);
-                nameToJobMap.put(job.getName(), job);
-                idToJobMap.put(job.getId(), job);
-                if (!isReplay) {
-                    // log job before submit any task.
-                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
-                    ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
-                            MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
-                    periodFutureMap.put(job.getId(), future);
-                }
-            } else if (job.getTriggerMode() == TriggerMode.ONCE) {
-                // only change once job state from unknown to active. if job is completed, only put it in map
-                if (job.getState() == JobState.UNKNOWN) {
-                    job.setState(JobState.ACTIVE);
-                    job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired);
-                }
-                nameToJobMap.put(job.getName(), job);
-                idToJobMap.put(job.getId(), job);
-                if (!isReplay) {
-                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
-                    MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
-                    MTMVUtils.TaskSubmitStatus status = submitJobTask(job.getName(), executeOption);
-                    if (status != TaskSubmitStatus.SUBMITTED) {
-                        throw new DdlException("submit job task with: " + status.toString());
-                    }
-                }
-            } else if (job.getTriggerMode() == TriggerMode.MANUAL) {
-                // only change once job state from unknown to active. if job is completed, only put it in map
-                if (job.getState() == JobState.UNKNOWN) {
-                    job.setState(JobState.ACTIVE);
-                }
-                nameToJobMap.put(job.getName(), job);
-                idToJobMap.put(job.getId(), job);
-                if (!isReplay) {
-                    Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
-                }
-            } else {
-                throw new DdlException("Unsupported trigger mode for multi-table mv.");
+                // log job before submit any task.
+                Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
             }
         } finally {
-            unlock();
+            writeUnlock();
         }
     }
 
-    private boolean stopScheduler(String jobName) {
-        MTMVJob job = nameToJobMap.get(jobName);
-        if (job.getTriggerMode() != TriggerMode.PERIODICAL) {
-            return false;
-        }
-        if (job.getState() == MTMVUtils.JobState.PAUSE) {
-            return true;
-        }
-        JobSchedule jobSchedule = job.getSchedule();
-        // this will not happen
-        if (jobSchedule == null) {
-            LOG.warn("fail to obtain scheduled info for job [{}]", job.getName());
-            return true;
-        }
-        ScheduledFuture<?> future = periodFutureMap.get(job.getId());
-        if (future == null) {
-            LOG.warn("fail to obtain scheduled info for job [{}]", job.getName());
-            return true;
-        }
-        // MUST not set true for "mayInterruptIfRunning".
-        // Because this thread may doing bdbje write operation, it is interrupted,
-        // FE may exit due to bdbje write failure.
-        boolean isCancel = future.cancel(false);
-        if (!isCancel) {
-            LOG.warn("fail to cancel scheduler for job [{}]", job.getName());
-        }
-        return isCancel;
-    }
-
-    public boolean killJobTask(String jobName, boolean clearPending) {
-        MTMVJob job = nameToJobMap.get(jobName);
-        if (job == null) {
-            return false;
-        }
-        return taskManager.killTask(job.getId(), clearPending);
-    }
-
     public void refreshMTMV(String dbName, String mvName)
             throws DdlException, MetaNotFoundException {
         Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
@@ -334,85 +233,53 @@ public class MTMVJobManager {
         createJob(mtmvJob, false);
     }
 
-    public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName) {
-        return submitJobTask(jobName, new MTMVTaskExecuteParams());
-    }
-
-    public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName, MTMVTaskExecuteParams param) {
-        MTMVJob job = nameToJobMap.get(jobName);
-        if (job == null) {
-            return MTMVUtils.TaskSubmitStatus.FAILED;
-        }
-        return taskManager.submitTask(MTMVUtils.buildTask(job), param);
-    }
-
-    public void updateJob(ChangeMTMVJob changeJob, boolean isReplay) {
-        if (!tryLock()) {
-            return;
-        }
-        try {
-            MTMVJob job = idToJobMap.get(changeJob.getJobId());
-            if (job == null) {
-                LOG.warn("change jobId {} failed because job is null", changeJob.getJobId());
-                return;
-            }
-            job.setState(changeJob.getToStatus());
-            job.setLastModifyTime(changeJob.getLastModifyTime());
-            if (!isReplay) {
-                Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
-            }
-        } finally {
-            unlock();
-        }
-        LOG.info("change job:{}", changeJob.getJobId());
-    }
-
     public void dropJobByName(String dbName, String mvName, boolean isReplay) {
+        List<Long> jobIds = Lists.newArrayList();
         for (String jobName : nameToJobMap.keySet()) {
             MTMVJob job = nameToJobMap.get(jobName);
             if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) {
-                dropJobs(Collections.singletonList(job.getId()), isReplay);
-                return;
+                jobIds.add(job.getId());
             }
         }
+        dropJobs(jobIds, isReplay);
     }
 
     public void dropJobs(List<Long> jobIds, boolean isReplay) {
         if (jobIds.isEmpty()) {
             return;
         }
-        if (!tryLock()) {
-            return;
+
+        for (long jobId : jobIds) {
+            dropJob(jobId, isReplay);
         }
+
+        LOG.info("drop jobs:{}", jobIds);
+    }
+
+    private void dropJob(long jobId, boolean isReplay) {
+        MTMVJob job = dropJobWithLock(jobId, isReplay);
+        if (!isReplay && job != null) {
+            job.stop();
+        }
+    }
+
+    private MTMVJob dropJobWithLock(long jobId, boolean isReplay) {
+        writeLock();
         try {
-            for (long jobId : jobIds) {
-                MTMVJob job = idToJobMap.get(jobId);
-                if (job == null) {
-                    LOG.warn("drop jobId {} failed because job is null", jobId);
-                    continue;
-                }
-                if (job.getTriggerMode() == TriggerMode.PERIODICAL && !isReplay) {
-                    boolean isCancel = stopScheduler(job.getName());
-                    if (!isCancel) {
-                        continue;
-                    }
-                    periodFutureMap.remove(job.getId());
-                }
-                killJobTask(job.getName(), true);
-                if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
-                    taskManager.clearTasksByJobName(job.getName(), isReplay);
-                }
-                idToJobMap.remove(job.getId());
-                nameToJobMap.remove(job.getName());
+            MTMVJob job = idToJobMap.get(jobId);
+            if (job == null) {
+                LOG.warn("drop jobId {} failed because job is null", jobId);
+                return null;
             }
-
+            idToJobMap.remove(job.getId());
+            nameToJobMap.remove(job.getName());
             if (!isReplay) {
-                Env.getCurrentEnv().getEditLog().logDropMTMVJob(jobIds);
+                Env.getCurrentEnv().getEditLog().logDropMTMVJob(Collections.singletonList(jobId));
             }
+            return job;
         } finally {
-            unlock();
+            writeUnlock();
         }
-        LOG.info("drop jobs:{}", jobIds);
     }
 
     public List<MTMVJob> showAllJobs() {
@@ -422,9 +289,9 @@ public class MTMVJobManager {
     public List<MTMVJob> showJobs(String dbName) {
         List<MTMVJob> jobList = Lists.newArrayList();
         if (Strings.isNullOrEmpty(dbName)) {
-            jobList.addAll(nameToJobMap.values());
+            jobList.addAll(getAllJobsWithLock());
         } else {
-            jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDBName().equals(dbName))
+            jobList.addAll(getAllJobsWithLock().stream().filter(u -> u.getDBName().equals(dbName))
                     .collect(Collectors.toList()));
         }
         return jobList.stream().sorted().collect(Collectors.toList());
@@ -434,27 +301,7 @@ public class MTMVJobManager {
         return showJobs(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
     }
 
-    private boolean tryLock() {
-        try {
-            return reentrantLock.tryLock(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOG.warn("got exception while getting job manager lock", e);
-        }
-        return false;
-    }
-
-    public void unlock() {
-        this.reentrantLock.unlock();
-    }
-
     public void replayCreateJob(MTMVJob job) {
-        if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
-            JobSchedule jobSchedule = job.getSchedule();
-            if (jobSchedule == null) {
-                LOG.warn("replay a null schedule period job [{}]", job.getName());
-                return;
-            }
-        }
         if (job.getExpireTime() > 0 && MTMVUtils.getNowTimeStamp() > job.getExpireTime()) {
             return;
         }
@@ -470,7 +317,10 @@ public class MTMVJobManager {
     }
 
     public void replayUpdateJob(ChangeMTMVJob changeJob) {
-        updateJob(changeJob, true);
+        MTMVJob mtmvJob = idToJobMap.get(changeJob.getJobId());
+        if (mtmvJob != null) {
+            mtmvJob.updateJob(changeJob, true);
+        }
     }
 
     public void replayCreateJobTask(MTMVTask task) {
@@ -478,41 +328,23 @@ public class MTMVJobManager {
     }
 
     public void replayDropJobTasks(List<String> taskIds) {
-        taskManager.dropTasks(taskIds, true);
+        taskManager.dropHistoryTasks(taskIds, true);
     }
 
-    public void removeExpiredJobs() {
+    private void removeExpiredJobs() {
         long currentTimeSeconds = MTMVUtils.getNowTimeStamp();
-
         List<Long> jobIdsToDelete = Lists.newArrayList();
-        if (!tryLock()) {
-            return;
-        }
-        try {
-            List<MTMVJob> jobs = showJobs(null);
-            for (MTMVJob job : jobs) {
-                // active job should not clean
-                if (job.getState() == MTMVUtils.JobState.ACTIVE) {
-                    continue;
-                }
-                if (job.getTriggerMode() == MTMVUtils.TriggerMode.PERIODICAL) {
-                    JobSchedule jobSchedule = job.getSchedule();
-                    if (jobSchedule == null) {
-                        jobIdsToDelete.add(job.getId());
-                        LOG.warn("clean up a null schedule periodical Task [{}]", job.getName());
-                        continue;
-                    }
-
-                }
-                long expireTime = job.getExpireTime();
-                if (expireTime > 0 && currentTimeSeconds > expireTime) {
-                    jobIdsToDelete.add(job.getId());
-                }
+        List<MTMVJob> jobs = getAllJobsWithLock();
+        for (MTMVJob job : jobs) {
+            // active job should not clean
+            if (job.getState() != MTMVUtils.JobState.COMPLETE) {
+                continue;
+            }
+            long expireTime = job.getExpireTime();
+            if (expireTime > 0 && currentTimeSeconds > expireTime) {
+                jobIdsToDelete.add(job.getId());
             }
-        } finally {
-            unlock();
         }
-
         dropJobs(jobIdsToDelete, false);
     }
 
@@ -520,6 +352,40 @@ public class MTMVJobManager {
         return nameToJobMap.get(jobName);
     }
 
+    private List<MTMVJob> getAllJobsWithLock() {
+        readLock();
+        try {
+            return Lists.newArrayList(nameToJobMap.values());
+        } finally {
+            readUnlock();
+        }
+
+    }
+
+    public MTMVTaskManager getTaskManager() {
+        return taskManager;
+    }
+
+    public ScheduledExecutorService getPeriodScheduler() {
+        return periodScheduler;
+    }
+
+    private void readLock() {
+        this.rwLock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        this.rwLock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        this.rwLock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        this.rwLock.writeLock().unlock();
+    }
+
     public long write(DataOutputStream dos, long checksum) throws IOException {
         MTMVCheckpointData data = new MTMVCheckpointData();
         data.jobs = new ArrayList<>(nameToJobMap.values());
@@ -548,8 +414,4 @@ public class MTMVJobManager {
         }
         return mtmvJobManager;
     }
-
-    public MTMVTaskManager getTaskManager() {
-        return taskManager;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
index 55b4949c07..fb2aaf5419 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
@@ -147,6 +147,12 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
         return task;
     }
 
+    public void stop() {
+        if (ctx != null) {
+            ctx.kill(false);
+        }
+    }
+
     @Override
     public int compareTo(@NotNull MTMVTaskExecutor task) {
         if (this.getTask().getPriority() != task.getTask().getPriority()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 3396b30bbb..d6e370480b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -20,12 +20,9 @@ package org.apache.doris.mtmv;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.mtmv.MTMVUtils.JobState;
 import org.apache.doris.mtmv.MTMVUtils.TaskState;
-import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
-import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
+import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
-import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -33,7 +30,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.jetbrains.annotations.Nullable;
 
 import java.util.Deque;
 import java.util.HashSet;
@@ -49,7 +45,7 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class MTMVTaskManager {
@@ -64,37 +60,23 @@ public class MTMVTaskManager {
 
     private final MTMVTaskExecutorPool taskExecutorPool = new MTMVTaskExecutorPool();
 
-    private final ReentrantLock reentrantLock = new ReentrantLock(true);
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
     // keep track of all the completed tasks
     private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque();
 
     private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
 
-    private final MTMVJobManager mtmvJobManager;
-
     private final AtomicInteger failedTaskCount = new AtomicInteger(0);
 
-    public MTMVTaskManager(MTMVJobManager mtmvJobManager) {
-        this.mtmvJobManager = mtmvJobManager;
-    }
-
     public void startTaskScheduler() {
         if (taskScheduler.isShutdown()) {
             taskScheduler = Executors.newScheduledThreadPool(1);
         }
         taskScheduler.scheduleAtFixedRate(() -> {
-            if (!tryLock()) {
-                return;
-            }
-            try {
-                checkRunningTask();
-                scheduledPendingTask();
-            } catch (Exception ex) {
-                LOG.warn("failed to schedule task.", ex);
-            } finally {
-                unlock();
-            }
+            checkRunningTask();
+            scheduledPendingTask();
+
         }, 0, 1, TimeUnit.SECONDS);
     }
 
@@ -102,7 +84,69 @@ public class MTMVTaskManager {
         taskScheduler.shutdown();
     }
 
-    public MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
+    private void checkRunningTask() {
+        writeLock();
+        try {
+            Iterator<Long> runningIterator = runningTaskMap.keySet().iterator();
+            while (runningIterator.hasNext()) {
+                Long jobId = runningIterator.next();
+                MTMVTaskExecutor taskExecutor = runningTaskMap.get(jobId);
+                Future<?> future = taskExecutor.getFuture();
+                if (future.isDone()) {
+                    runningIterator.remove();
+                    addHistory(taskExecutor.getTask());
+                    MTMVUtils.TaskState finalState = taskExecutor.getTask().getState();
+                    if (finalState == TaskState.FAILURE) {
+                        failedTaskCount.incrementAndGet();
+                    }
+                    //task save final state only
+                    Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
+                    taskExecutor.getJob().taskFinished();
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void scheduledPendingTask() {
+        writeLock();
+        try {
+            int currentRunning = runningTaskMap.size();
+
+            Iterator<Long> pendingIterator = pendingTaskMap.keySet().iterator();
+            while (pendingIterator.hasNext()) {
+                Long jobId = pendingIterator.next();
+                MTMVTaskExecutor runningTaskExecutor = runningTaskMap.get(jobId);
+                if (runningTaskExecutor == null) {
+                    Queue<MTMVTaskExecutor> taskQueue = pendingTaskMap.get(jobId);
+                    if (taskQueue.size() == 0) {
+                        pendingIterator.remove();
+                    } else {
+                        if (currentRunning >= Config.max_running_mtmv_scheduler_task_num) {
+                            break;
+                        }
+                        MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
+                        taskExecutorPool.executeTask(pendingTaskExecutor);
+                        runningTaskMap.put(jobId, pendingTaskExecutor);
+                        currentRunning++;
+                    }
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job) {
+        return submitJobTask(job, new MTMVTaskExecuteParams());
+    }
+
+    private MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job, MTMVTaskExecuteParams param) {
+        return submitTask(MTMVUtils.buildTask(job), param);
+    }
+
+    private MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
         // duplicate submit
         if (taskExecutor.getTask() != null) {
             return MTMVUtils.TaskSubmitStatus.FAILED;
@@ -129,132 +173,39 @@ public class MTMVTaskManager {
         return MTMVUtils.TaskSubmitStatus.SUBMITTED;
     }
 
-    public boolean killTask(Long jobId, boolean clearPending) {
-        if (clearPending) {
-            if (!tryLock()) {
-                return false;
-            }
-            try {
-                getPendingTaskMap().remove(jobId);
-            } catch (Exception ex) {
-                LOG.warn("failed to kill task.", ex);
-            } finally {
-                unlock();
-            }
-        }
-        MTMVTaskExecutor task = runningTaskMap.get(jobId);
-        if (task == null) {
-            return false;
-        }
-        ConnectContext connectContext = task.getCtx();
-        if (connectContext != null) {
-            connectContext.kill(false);
-            return true;
-        }
-        return false;
-    }
-
-    public void arrangeToPendingTask(MTMVTaskExecutor task) {
-        if (!tryLock()) {
-            return;
-        }
+    private void arrangeToPendingTask(MTMVTaskExecutor task) {
+        writeLock();
         try {
             long jobId = task.getJobId();
             PriorityBlockingQueue<MTMVTaskExecutor> tasks =
                     pendingTaskMap.computeIfAbsent(jobId, u -> Queues.newPriorityBlockingQueue());
             tasks.offer(task);
         } finally {
-            unlock();
+            writeUnlock();
         }
     }
 
-    @Nullable
-    private MTMVTaskExecutor getTask(PriorityBlockingQueue<MTMVTaskExecutor> tasks, MTMVTaskExecutor task) {
-        MTMVTaskExecutor oldTask = null;
-        for (MTMVTaskExecutor t : tasks) {
-            if (t.equals(task)) {
-                oldTask = t;
-                break;
-            }
+    public void dealJobRemoved(MTMVJob job) {
+        removePendingTask(job.getId());
+        removeRunningTask(job.getId());
+        if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
+            clearHistoryTasksByJobName(job.getName(), false);
         }
-        return oldTask;
     }
 
-    private void checkRunningTask() {
-        Iterator<Long> runningIterator = runningTaskMap.keySet().iterator();
-        while (runningIterator.hasNext()) {
-            Long jobId = runningIterator.next();
-            MTMVTaskExecutor taskExecutor = runningTaskMap.get(jobId);
-            if (taskExecutor == null) {
-                LOG.warn("failed to get running task by jobId:{}", jobId);
-                runningIterator.remove();
-                return;
-            }
-            Future<?> future = taskExecutor.getFuture();
-            if (future.isDone()) {
-                runningIterator.remove();
-                addHistory(taskExecutor.getTask());
-                MTMVUtils.TaskState finalState = taskExecutor.getTask().getState();
-                if (finalState == TaskState.FAILURE) {
-                    failedTaskCount.incrementAndGet();
-                }
-                Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
-
-                TriggerMode triggerMode = taskExecutor.getJob().getTriggerMode();
-                if (triggerMode == TriggerMode.ONCE) {
-                    // update the run once job status
-                    ChangeMTMVJob changeJob = new ChangeMTMVJob(taskExecutor.getJobId(), JobState.COMPLETE);
-                    mtmvJobManager.updateJob(changeJob, false);
-                } else if (triggerMode == TriggerMode.PERIODICAL) {
-                    // just update the last modify time.
-                    ChangeMTMVJob changeJob = new ChangeMTMVJob(taskExecutor.getJobId(), JobState.ACTIVE);
-                    mtmvJobManager.updateJob(changeJob, false);
-                }
-            }
-        }
+    private void removePendingTask(Long jobId) {
+        pendingTaskMap.remove(jobId);
     }
 
-    public int getFailedTaskCount() {
-        return failedTaskCount.get();
-    }
-
-    private void scheduledPendingTask() {
-        int currentRunning = runningTaskMap.size();
-
-        Iterator<Long> pendingIterator = pendingTaskMap.keySet().iterator();
-        while (pendingIterator.hasNext()) {
-            Long jobId = pendingIterator.next();
-            MTMVTaskExecutor runningTaskExecutor = runningTaskMap.get(jobId);
-            if (runningTaskExecutor == null) {
-                Queue<MTMVTaskExecutor> taskQueue = pendingTaskMap.get(jobId);
-                if (taskQueue.size() == 0) {
-                    pendingIterator.remove();
-                } else {
-                    if (currentRunning >= Config.max_running_mtmv_scheduler_task_num) {
-                        break;
-                    }
-                    MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
-                    taskExecutorPool.executeTask(pendingTaskExecutor);
-                    runningTaskMap.put(jobId, pendingTaskExecutor);
-                    currentRunning++;
-                }
-            }
+    private void removeRunningTask(Long jobId) {
+        MTMVTaskExecutor task = runningTaskMap.remove(jobId);
+        if (task != null) {
+            task.stop();
         }
     }
 
-    public boolean tryLock() {
-        try {
-            return reentrantLock.tryLock(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOG.warn("got exception while getting task lock", e);
-            Thread.currentThread().interrupt();
-        }
-        return false;
-    }
-
-
-    public void unlock() {
-        this.reentrantLock.unlock();
+    public int getFailedTaskCount() {
+        return failedTaskCount.get();
     }
 
     public Map<Long, PriorityBlockingQueue<MTMVTaskExecutor>> getPendingTaskMap() {
@@ -273,36 +224,50 @@ public class MTMVTaskManager {
         return historyTasks;
     }
 
+    public List<MTMVTask> getHistoryTasksByJobName(String jobName) {
+        return getHistoryTasks().stream().filter(u -> u.getJobName().equals(jobName))
+                .collect(Collectors.toList());
+    }
+
     public List<MTMVTask> showAllTasks() {
-        return showTasks(null);
+        return showTasksWithLock(null);
     }
 
-    public List<MTMVTask> showTasks(String dbName) {
+    public List<MTMVTask> showTasksWithLock(String dbName) {
         List<MTMVTask> taskList = Lists.newArrayList();
-        if (Strings.isNullOrEmpty(dbName)) {
-            for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
-                taskList.addAll(pTaskQueue.stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
-            }
-            taskList.addAll(
-                    getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
-            taskList.addAll(getHistoryTasks());
-        } else {
-            for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+        readLock();
+        try {
+            if (Strings.isNullOrEmpty(dbName)) {
+                for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+                    taskList.addAll(pTaskQueue.stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
+                }
                 taskList.addAll(
-                        pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
+                        getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
+                                .collect(Collectors.toList()));
+                taskList.addAll(getHistoryTasks());
+            } else {
+                for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+                    taskList.addAll(
+                            pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
+                                    .collect(Collectors.toList()));
+                }
+                taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
+                        .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
+                taskList.addAll(
+                        getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName))
                                 .collect(Collectors.toList()));
-            }
-            taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
-                    .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
-            taskList.addAll(
-                    getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
 
+            }
+        } finally {
+            readUnlock();
         }
+
         return taskList.stream().sorted().collect(Collectors.toList());
     }
 
     public List<MTMVTask> showTasks(String dbName, String mvName) {
-        return showTasks(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
+        return showTasksWithLock(dbName).stream().filter(u -> u.getMVName().equals(mvName))
+                .collect(Collectors.toList());
     }
 
     public MTMVTask getTask(String taskId) throws AnalysisException {
@@ -321,75 +286,63 @@ public class MTMVTaskManager {
         addHistory(task);
     }
 
-    public void clearTasksByJobName(String jobName, boolean isReplay) {
+    private void clearHistoryTasksByJobName(String jobName, boolean isReplay) {
         List<String> clearTasks = Lists.newArrayList();
 
-        if (!tryLock()) {
-            return;
-        }
-        try {
-            List<MTMVTask> taskHistory = showAllTasks();
-            for (MTMVTask task : taskHistory) {
-                if (task.getJobName().equals(jobName)) {
-                    clearTasks.add(task.getTaskId());
-                }
+        Deque<MTMVTask> taskHistory = getHistoryTasks();
+        for (MTMVTask task : taskHistory) {
+            if (task.getJobName().equals(jobName)) {
+                clearTasks.add(task.getTaskId());
             }
-        } finally {
-            unlock();
         }
-        dropTasks(clearTasks, isReplay);
+
+        dropHistoryTasks(clearTasks, isReplay);
     }
 
     public void removeExpiredTasks() {
         long currentTime = MTMVUtils.getNowTimeStamp();
-
         List<String> historyToDelete = Lists.newArrayList();
-
-        if (!tryLock()) {
-            return;
-        }
-        try {
-            Deque<MTMVTask> taskHistory = getHistoryTasks();
-            for (MTMVTask task : taskHistory) {
-                long expireTime = task.getExpireTime();
-                if (currentTime > expireTime) {
-                    historyToDelete.add(task.getTaskId());
-                }
+        Deque<MTMVTask> taskHistory = getHistoryTasks();
+        for (MTMVTask task : taskHistory) {
+            long expireTime = task.getExpireTime();
+            if (currentTime > expireTime) {
+                historyToDelete.add(task.getTaskId());
             }
-        } finally {
-            unlock();
         }
-        dropTasks(historyToDelete, false);
+        dropHistoryTasks(historyToDelete, false);
     }
 
-    public void dropTasks(List<String> taskIds, boolean isReplay) {
+    public void dropHistoryTasks(List<String> taskIds, boolean isReplay) {
         if (taskIds.isEmpty()) {
             return;
         }
-        if (!tryLock()) {
-            return;
-        }
+        writeLock();
         try {
             Set<String> taskSet = new HashSet<>(taskIds);
-            // Pending tasks will be clear directly. So we don't drop it again here.
-            // Check the running task since the task was killed but was not move to the history queue.
-            if (!isReplay) {
-                for (long key : runningTaskMap.keySet()) {
-                    MTMVTaskExecutor executor = runningTaskMap.get(key);
-                    // runningTaskMap may be removed in the runningIterator
-                    if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
-                        runningTaskMap.remove(key);
-                    }
-                }
-            }
             // Try to remove history tasks.
             getHistoryTasks().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId()));
             if (!isReplay) {
                 Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
             }
         } finally {
-            unlock();
+            writeUnlock();
         }
         LOG.info("drop task history:{}", taskIds);
     }
+
+    private void readLock() {
+        this.rwLock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        this.rwLock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        this.rwLock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        this.rwLock.writeLock().unlock();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
index 8a348b7f99..f7e2cadeb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.mtmv.metadata;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.mtmv.MTMVUtils;
@@ -28,6 +29,8 @@ import org.apache.doris.persist.gson.GsonUtils;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
 
 import java.io.DataInput;
@@ -38,9 +41,13 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 public class MTMVJob implements Writable, Comparable {
+    private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
+
     @SerializedName("id")
     private long id;
 
@@ -51,9 +58,8 @@ public class MTMVJob implements Writable, Comparable {
     @SerializedName("triggerMode")
     private MTMVUtils.TriggerMode triggerMode = MTMVUtils.TriggerMode.MANUAL;
 
-    // set default to UNKNOWN is for compatibility
     @SerializedName("state")
-    private MTMVUtils.JobState state = MTMVUtils.JobState.UNKNOWN;
+    private MTMVUtils.JobState state = JobState.ACTIVE;
 
     @SerializedName("schedule")
     private JobSchedule schedule;
@@ -86,6 +92,8 @@ public class MTMVJob implements Writable, Comparable {
     @SerializedName("lastModifyTime")
     private long lastModifyTime;
 
+    private ScheduledFuture<?> future;
+
     public MTMVJob(String name) {
         this.name = name;
         this.createTime = MTMVUtils.getNowTimeStamp();
@@ -299,6 +307,56 @@ public class MTMVJob implements Writable, Comparable {
         return list;
     }
 
+    public synchronized void start() {
+
+        if (state == JobState.COMPLETE || state == JobState.PAUSE) {
+            return;
+        }
+        if (getTriggerMode() == TriggerMode.PERIODICAL) {
+            JobSchedule schedule = getSchedule();
+            ScheduledExecutorService periodScheduler = Env.getCurrentEnv().getMTMVJobManager().getPeriodScheduler();
+            future = periodScheduler.scheduleAtFixedRate(
+                    () -> Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this),
+                    MTMVUtils.getDelaySeconds(this), schedule.getSecondPeriod(), TimeUnit.SECONDS);
+
+        } else if (getTriggerMode() == TriggerMode.ONCE) {
+            Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this);
+        }
+    }
+
+    public synchronized void stop() {
+        // MUST not set true for "mayInterruptIfRunning".
+        // Because this thread may doing bdbje write operation, it is interrupted,
+        // FE may exit due to bdbje write failure.
+        if (future != null) {
+            boolean isCancel = future.cancel(false);
+            if (!isCancel) {
+                LOG.warn("fail to cancel scheduler for job [{}]", name);
+            }
+        }
+        Env.getCurrentEnv().getMTMVJobManager().getTaskManager().dealJobRemoved(this);
+    }
+
+    public void taskFinished() {
+        if (triggerMode == TriggerMode.ONCE) {
+            // update the run once job status
+            ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.COMPLETE);
+            updateJob(changeJob, false);
+        } else if (triggerMode == TriggerMode.PERIODICAL) {
+            // just update the last modify time.
+            ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.ACTIVE);
+            updateJob(changeJob, false);
+        }
+    }
+
+    public void updateJob(ChangeMTMVJob changeJob, boolean isReplay) {
+        setState(changeJob.getToStatus());
+        setLastModifyTime(changeJob.getLastModifyTime());
+        if (!isReplay) {
+            Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
+        }
+    }
+
     @Override
     public int compareTo(@NotNull Object o) {
         return (int) (getCreateTime() - ((MTMVJob) o).getCreateTime());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 1ccf2be1b3..1eee9630d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2699,7 +2699,7 @@ public class ShowExecutor {
         if (showStmt.isShowAllTasks()) {
             tasks.addAll(jobManager.getTaskManager().showAllTasks());
         } else if (showStmt.isShowAllTasksFromDb()) {
-            tasks.addAll(jobManager.getTaskManager().showTasks(showStmt.getDbName()));
+            tasks.addAll(jobManager.getTaskManager().showTasksWithLock(showStmt.getDbName()));
         } else if (showStmt.isShowAllTasksOnMv()) {
             tasks.addAll(jobManager.getTaskManager().showTasks(showStmt.getDbName(), showStmt.getMVName()));
         } else if (showStmt.isSpecificTask()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 9c5b98df11..6993299f85 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.mtmv;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.MetricRepo;
@@ -36,65 +37,62 @@ public class MTMVJobManagerTest extends TestWithFeService {
 
     @Test
     public void testSampleCase() throws DdlException {
-        MTMVJobManager jobManager = new MTMVJobManager();
-        jobManager.start();
-        MTMVJob job = MTMVUtilsTest.createDummyJob();
+        String jobName = "testSampleCaseJob";
+        String mvName = "testSampleCaseMv";
+        MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+        MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
         jobManager.createJob(job, false);
-        Assertions.assertEquals(1, jobManager.showAllJobs().size());
-        MTMVJob resultJob = jobManager.getJob("dummy");
-        Assertions.assertEquals("dummy", resultJob.getName());
+        Assertions.assertNotNull(jobManager.getJob(job.getName()));
+        MTMVJob resultJob = jobManager.getJob(jobName);
         Assertions.assertEquals(JobState.ACTIVE, resultJob.getState());
         long jobId = resultJob.getId();
         ChangeMTMVJob changeMTMVJob = new ChangeMTMVJob(jobId, JobState.PAUSE);
-        jobManager.updateJob(changeMTMVJob, false);
-        resultJob = jobManager.getJob("dummy");
-        Assertions.assertEquals("dummy", resultJob.getName());
+        resultJob.updateJob(changeMTMVJob, false);
+        resultJob = jobManager.getJob(jobName);
+        Assertions.assertEquals(jobName, resultJob.getName());
         Assertions.assertEquals(JobState.PAUSE, resultJob.getState());
         jobManager.dropJobs(Collections.singletonList(jobId), false);
-        Assertions.assertEquals(0, jobManager.showAllJobs().size());
+        Assertions.assertNull(jobManager.getJob(jobName));
     }
 
     @Test
     public void testSchedulerJob() throws DdlException, InterruptedException {
-        MTMVJobManager jobManager = new MTMVJobManager();
-        jobManager.start();
-        Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().isEmpty());
-        MTMVJob job = MTMVUtilsTest.createSchedulerJob();
+        String jobName = "testSchedulerJob";
+        String mvName = "testSchedulerJobMv";
+        MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+        MTMVJob job = MTMVUtilsTest.createSchedulerJob(mvName, jobName);
         jobManager.createJob(job, false);
-        Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName).size());
-        while (jobManager.getTaskManager().getHistoryTasks().isEmpty()) {
+        Assertions.assertNotNull(jobManager.getJob(jobName));
+        while (jobManager.getTaskManager().getHistoryTasksByJobName(jobName).isEmpty()) {
             Thread.sleep(1000L);
             System.out.println("Loop    once");
         }
-        Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().size() > 0);
+        Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasksByJobName(jobName).size() > 0);
     }
 
     @Test
     public void testOnceJob() throws DdlException, InterruptedException {
-        MTMVJobManager jobManager = new MTMVJobManager();
-        jobManager.start();
-        MTMVJob job = MTMVUtilsTest.createOnceJob();
+        String jobName = "testOnceJob";
+        String mvName = "testOnceJobMv";
+        MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+        MTMVJob job = MTMVUtilsTest.createOnceJob(mvName, jobName);
         jobManager.createJob(job, false);
-        Assertions.assertEquals(1, jobManager.showAllJobs().size());
-        Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName).size());
-        Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName, MTMVUtilsTest.MV_NAME).size());
-        while (!jobManager.getJob(MTMVUtilsTest.O_JOB).getState().equals(JobState.COMPLETE)) {
+        Assertions.assertNotNull(jobManager.getJob(jobName));
+        while (!jobManager.getJob(jobName).getState().equals(JobState.COMPLETE)) {
             Thread.sleep(1000L);
             System.out.println("Loop    once");
         }
 
-        Assertions.assertEquals(1, jobManager.getTaskManager().getHistoryTasks().size());
-        Assertions.assertEquals(1, jobManager.getTaskManager().showAllTasks().size());
-        Assertions.assertEquals(1, jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName).size());
+        Assertions.assertEquals(1, jobManager.getTaskManager().getHistoryTasksByJobName(jobName).size());
         Assertions.assertEquals(1,
-                jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName, MTMVUtilsTest.MV_NAME).size());
+                jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName, mvName).size());
 
         // verify job meta
-        MTMVJob metaJob = jobManager.showAllJobs().get(0);
+        MTMVJob metaJob = jobManager.getJob(jobName);
         List<String> jobRow = metaJob.toStringRow();
         Assertions.assertEquals(13, jobRow.size());
         // index 1: Name
-        Assertions.assertEquals(MTMVUtilsTest.O_JOB, jobRow.get(1));
+        Assertions.assertEquals(jobName, jobRow.get(1));
         // index 2: TriggerMode
         Assertions.assertEquals("ONCE", jobRow.get(2));
         // index 3: Schedule
@@ -102,7 +100,7 @@ public class MTMVJobManagerTest extends TestWithFeService {
         // index 4: DBName
         Assertions.assertEquals(MTMVUtilsTest.dbName, jobRow.get(4));
         // index 5: MVName
-        Assertions.assertEquals(MTMVUtilsTest.MV_NAME, jobRow.get(5));
+        Assertions.assertEquals(mvName, jobRow.get(5));
         // index 6: Query
         Assertions.assertEquals("", jobRow.get(6));
         // index 7: User
@@ -113,15 +111,15 @@ public class MTMVJobManagerTest extends TestWithFeService {
         Assertions.assertEquals("COMPLETE", jobRow.get(9));
 
         // verify task meta
-        MTMVTask metaTask = jobManager.getTaskManager().showAllTasks().get(0);
+        MTMVTask metaTask = jobManager.getTaskManager().getHistoryTasksByJobName(jobName).get(0);
         List<String> taskRow = metaTask.toStringRow();
         Assertions.assertEquals(14, taskRow.size());
         // index 1: JobName
-        Assertions.assertEquals(MTMVUtilsTest.O_JOB, taskRow.get(1));
+        Assertions.assertEquals(jobName, taskRow.get(1));
         // index 2: DBName
         Assertions.assertEquals(MTMVUtilsTest.dbName, taskRow.get(2));
         // index 3: MVName
-        Assertions.assertEquals(MTMVUtilsTest.MV_NAME, taskRow.get(3));
+        Assertions.assertEquals(mvName, taskRow.get(3));
         // index 4: Query
         Assertions.assertEquals("", taskRow.get(4));
         // index 5: User
@@ -140,9 +138,6 @@ public class MTMVJobManagerTest extends TestWithFeService {
 
     @Test
     public void testMetrics() {
-        MTMVJobManager jobManager = new MTMVJobManager();
-        jobManager.start();
-
         int jobMetricCount = 0;
         int taskMetricCount = 0;
         List<Metric> metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
index bee3c2e8bd..7bd58b145a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.utframe.TestWithFeService;
 
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
@@ -31,10 +32,12 @@ import java.util.concurrent.ExecutionException;
 public class MTMVTaskExecutorTest extends TestWithFeService {
     @Test
     public void testSubmitTask() throws InterruptedException, ExecutionException {
+        String mvName = "testSubmitTaskMv";
+        String jobName = "testSubmitTaskJob";
         MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
         MTMVTaskExecutor executor = new MTMVTaskExecutor();
         executor.setProcessor(new MTMVTaskProcessor());
-        executor.setJob(MTMVUtilsTest.createDummyJob());
+        executor.setJob(MTMVUtilsTest.createDummyJob(mvName, jobName));
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
@@ -44,10 +47,12 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
 
     @Test
     public void testFailTask() throws InterruptedException, ExecutionException {
+        String mvName = "testFailTaskMv";
+        String jobName = "testFailTaskJob";
         MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
         MTMVTaskExecutor executor = new MTMVTaskExecutor();
         executor.setProcessor(new MTMVTaskProcessorTest(1));
-        executor.setJob(MTMVUtilsTest.createDummyJob());
+        executor.setJob(MTMVUtilsTest.createDummyJob(mvName, jobName));
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
         pool.executeTask(executor);
         executor.getFuture().get();
@@ -56,12 +61,15 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
     }
 
     @Test
+    @Disabled
     public void testRetryTask() throws InterruptedException, ExecutionException {
+        String mvName = "testRetryTaskMv";
+        String jobName = "testRetryTaskJob";
         MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
 
         MTMVTaskExecutor executor = new MTMVTaskExecutor();
         executor.setProcessor(new MTMVTaskProcessorTest(3));
-        MTMVJob job = MTMVUtilsTest.createDummyJob();
+        MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
         job.setRetryPolicy(TaskRetryPolicy.TIMES);
         executor.setJob(job);
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
@@ -71,12 +79,15 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
     }
 
     @Test
+    @Disabled
     public void testRetryFailTask() throws InterruptedException, ExecutionException {
+        String mvName = "testRetryTaskMv";
+        String jobName = "testRetryTaskJob";
         MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
 
         MTMVTaskExecutor executor = new MTMVTaskExecutor();
         executor.setProcessor(new MTMVTaskProcessorTest(4));
-        MTMVJob job = MTMVUtilsTest.createDummyJob();
+        MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
         job.setRetryPolicy(TaskRetryPolicy.TIMES);
         executor.setJob(job);
         executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
index 5a81173715..e76e26ec0d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
@@ -33,41 +33,35 @@ public class MTMVUtilsTest {
     public static final String dbName = "test";
 
     public static final String MV_NAME = "mvName";
-    public static final String S_JOB = "SchedulerJob";
-    public static final String O_JOB = "OnceJob";
 
-
-
-    public static MTMVJob createDummyJob() {
-        MTMVJob job = new MTMVJob("dummy");
+    public static MTMVJob createDummyJob(String mvName, String jobName) {
+        MTMVJob job = new MTMVJob(jobName);
         job.setDBName(dbName);
-        job.setMVName(MV_NAME);
+        job.setMVName(mvName);
         return job;
     }
 
-    public static MTMVJob createOnceJob() {
-        MTMVJob job = new MTMVJob("");
+    public static MTMVJob createOnceJob(String mvName, String jobName) {
+        MTMVJob job = new MTMVJob(jobName);
         job.setTriggerMode(TriggerMode.ONCE);
         job.setDBName(dbName);
-        job.setName(O_JOB);
-        job.setMVName(MV_NAME);
+        job.setMVName(mvName);
         return job;
     }
 
-    public static MTMVJob createSchedulerJob() {
-        MTMVJob job = new MTMVJob("");
+    public static MTMVJob createSchedulerJob(String mvName, String jobName) {
+        MTMVJob job = new MTMVJob(jobName);
         JobSchedule jobSchedule = new JobSchedule(System.currentTimeMillis() / 1000, 1, TimeUnit.SECONDS);
         job.setSchedule(jobSchedule);
         job.setTriggerMode(TriggerMode.PERIODICAL);
         job.setDBName(dbName);
-        job.setName(S_JOB);
-        job.setMVName(MV_NAME);
+        job.setMVName(mvName);
         return job;
     }
 
     @Test
     public void testGetDelaySeconds() {
-        MTMVJob job = MTMVUtilsTest.createDummyJob();
+        MTMVJob job = MTMVUtilsTest.createDummyJob("testGetDelaySecondsMv", "testGetDelaySecondsJob");
 
         // 2022-10-03 15:00:00
         JobSchedule jobSchedule = new JobSchedule(1664780400L, 1, TimeUnit.HOURS);
diff --git a/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
index 17e7b05abe..70ee205d35 100644
--- a/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
@@ -99,7 +99,7 @@ suite("test_refresh_mtmv") {
     sql """
         REFRESH MATERIALIZED VIEW ${mvNameDemand} COMPLETE
     """
-    waitingMTMVTaskFinished(mvName)
+    waitingMTMVTaskFinished(mvNameDemand)
 
     show_task_result = sql "SHOW MTMV TASK ON ${mvNameDemand}"
     assertEquals 1, show_task_result.size(), show_task_result.toString()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org