You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/27 08:23:57 UTC
[doris] branch master updated: [feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b93b26b8c [feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)
7b93b26b8c is described below
commit 7b93b26b8cc395d652385975a16aab9571906a20
Author: zhangdong <49...@qq.com>
AuthorDate: Tue Jun 27 16:23:50 2023 +0800
[feature-wip](MTMV) optimize lock of mtmv job & task, to avoid dead lock (#21054)
---
.../java/org/apache/doris/mtmv/MTMVJobFactory.java | 8 +-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 352 +++++++--------------
.../org/apache/doris/mtmv/MTMVTaskExecutor.java | 6 +
.../org/apache/doris/mtmv/MTMVTaskManager.java | 345 +++++++++-----------
.../org/apache/doris/mtmv/metadata/MTMVJob.java | 62 +++-
.../java/org/apache/doris/qe/ShowExecutor.java | 2 +-
.../org/apache/doris/mtmv/MTMVJobManagerTest.java | 69 ++--
.../apache/doris/mtmv/MTMVTaskExecutorTest.java | 19 +-
.../java/org/apache/doris/mtmv/MTMVUtilsTest.java | 26 +-
.../suites/mtmv_p0/test_refresh_mtmv.groovy | 2 +-
10 files changed, 388 insertions(+), 503 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
index ac771c4264..3ac142d063 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod;
import org.apache.doris.analysis.MVRefreshInfo.RefreshTrigger;
import org.apache.doris.analysis.MVRefreshIntervalTriggerInfo;
import org.apache.doris.analysis.MVRefreshTriggerInfo;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
@@ -76,6 +78,7 @@ public class MTMVJobFactory {
private static MTMVJob genPeriodicalJob(MaterializedView materializedView, String dbName) {
String uid = UUID.randomUUID().toString();
MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
+ job.setId(Env.getCurrentEnv().getNextId());
job.setTriggerMode(TriggerMode.PERIODICAL);
job.setSchedule(genJobSchedule(materializedView));
job.setDBName(dbName);
@@ -88,11 +91,14 @@ public class MTMVJobFactory {
public static MTMVJob genOnceJob(MaterializedView materializedView, String dbName) {
String uid = UUID.randomUUID().toString();
MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid);
+ job.setId(Env.getCurrentEnv().getNextId());
job.setTriggerMode(TriggerMode.ONCE);
job.setDBName(dbName);
job.setMVName(materializedView.getName());
job.setQuery(materializedView.getQuery());
- job.setCreateTime(MTMVUtils.getNowTimeStamp());
+ long nowTimeStamp = MTMVUtils.getNowTimeStamp();
+ job.setCreateTime(nowTimeStamp);
+ job.setExpireTime(nowTimeStamp + Config.scheduler_mtmv_job_expired);
return job;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index cae36ab3da..7295f40b60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
@@ -30,16 +29,12 @@ import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
-import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus;
-import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.MTMVCheckpointData;
import org.apache.doris.mtmv.metadata.MTMVJob;
-import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.persist.gson.GsonUtils;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -55,10 +50,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class MTMVJobManager {
@@ -69,7 +63,6 @@ public class MTMVJobManager {
private final Map<Long, MTMVJob> idToJobMap;
private final Map<String, MTMVJob> nameToJobMap;
- private final Map<Long, ScheduledFuture<?>> periodFutureMap;
private final MTMVTaskManager taskManager;
@@ -77,16 +70,15 @@ public class MTMVJobManager {
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
- private final ReentrantLock reentrantLock;
+ private final ReentrantReadWriteLock rwLock;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
public MTMVJobManager() {
idToJobMap = Maps.newConcurrentMap();
nameToJobMap = Maps.newConcurrentMap();
- periodFutureMap = Maps.newConcurrentMap();
- reentrantLock = new ReentrantLock(true);
- taskManager = new MTMVTaskManager(this);
+ rwLock = new ReentrantReadWriteLock(true);
+ taskManager = new MTMVTaskManager();
}
public void start() {
@@ -104,19 +96,11 @@ public class MTMVJobManager {
}
cleanerScheduler.scheduleAtFixedRate(() -> {
if (!Env.getCurrentEnv().isMaster()) {
+ LOG.warn("only master can run MTMVJob");
return;
}
- if (!tryLock()) {
- return;
- }
- try {
- removeExpiredJobs();
- taskManager.removeExpiredTasks();
- } catch (Exception ex) {
- LOG.warn("failed remove expired jobs and tasks.", ex);
- } finally {
- unlock();
- }
+ removeExpiredJobs();
+ taskManager.removeExpiredTasks();
}, 0, 1, TimeUnit.MINUTES);
taskManager.startTaskScheduler();
@@ -146,7 +130,13 @@ public class MTMVJobManager {
Metric.MetricUnit.NOUNIT, "Active job number of mtmv.") {
@Override
public Integer getValue() {
- return periodFutureMap.size();
+ int result = 0;
+ for (MTMVJob job : getAllJobsWithLock()) {
+ if (job.getState() == JobState.ACTIVE) {
+ result++;
+ }
+ }
+ return result;
}
};
activeJob.addLabel(new MetricLabel("type", "ACTIVE-JOB"));
@@ -206,126 +196,35 @@ public class MTMVJobManager {
}
private void registerJobs() {
- int num = nameToJobMap.size();
- int periodNum = 0;
- int onceNum = 0;
- for (MTMVJob job : nameToJobMap.values()) {
- if (!job.getState().equals(JobState.ACTIVE)) {
- continue;
- }
- if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
- JobSchedule schedule = job.getSchedule();
- ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
- MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
- periodFutureMap.put(job.getId(), future);
- periodNum++;
- } else if (job.getTriggerMode() == TriggerMode.ONCE) {
- MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
- submitJobTask(job.getName(), executeOption);
- onceNum++;
- }
+ for (MTMVJob job : getAllJobsWithLock()) {
+ job.start();
}
- LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num);
}
public void createJob(MTMVJob job, boolean isReplay) throws DdlException {
- if (!tryLock()) {
- throw new DdlException("Failed to get job manager lock when create Job [" + job.getName() + "]");
+ createJobWithLock(job, isReplay);
+ if (!isReplay) {
+ job.start();
}
+ }
+
+ private void createJobWithLock(MTMVJob job, boolean isReplay) throws DdlException {
+ writeLock();
try {
if (nameToJobMap.containsKey(job.getName())) {
throw new DdlException("Job [" + job.getName() + "] already exists");
}
+ nameToJobMap.put(job.getName(), job);
+ idToJobMap.put(job.getId(), job);
if (!isReplay) {
- Preconditions.checkArgument(job.getId() == 0);
- job.setId(Env.getCurrentEnv().getNextId());
- }
- if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
- JobSchedule schedule = job.getSchedule();
- if (schedule == null) {
- throw new DdlException("Job [" + job.getName() + "] has no scheduling");
- }
- job.setState(JobState.ACTIVE);
- nameToJobMap.put(job.getName(), job);
- idToJobMap.put(job.getId(), job);
- if (!isReplay) {
- // log job before submit any task.
- Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
- ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
- MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
- periodFutureMap.put(job.getId(), future);
- }
- } else if (job.getTriggerMode() == TriggerMode.ONCE) {
- // only change once job state from unknown to active. if job is completed, only put it in map
- if (job.getState() == JobState.UNKNOWN) {
- job.setState(JobState.ACTIVE);
- job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired);
- }
- nameToJobMap.put(job.getName(), job);
- idToJobMap.put(job.getId(), job);
- if (!isReplay) {
- Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
- MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
- MTMVUtils.TaskSubmitStatus status = submitJobTask(job.getName(), executeOption);
- if (status != TaskSubmitStatus.SUBMITTED) {
- throw new DdlException("submit job task with: " + status.toString());
- }
- }
- } else if (job.getTriggerMode() == TriggerMode.MANUAL) {
- // only change once job state from unknown to active. if job is completed, only put it in map
- if (job.getState() == JobState.UNKNOWN) {
- job.setState(JobState.ACTIVE);
- }
- nameToJobMap.put(job.getName(), job);
- idToJobMap.put(job.getId(), job);
- if (!isReplay) {
- Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
- }
- } else {
- throw new DdlException("Unsupported trigger mode for multi-table mv.");
+ // log job before submit any task.
+ Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
}
} finally {
- unlock();
+ writeUnlock();
}
}
- private boolean stopScheduler(String jobName) {
- MTMVJob job = nameToJobMap.get(jobName);
- if (job.getTriggerMode() != TriggerMode.PERIODICAL) {
- return false;
- }
- if (job.getState() == MTMVUtils.JobState.PAUSE) {
- return true;
- }
- JobSchedule jobSchedule = job.getSchedule();
- // this will not happen
- if (jobSchedule == null) {
- LOG.warn("fail to obtain scheduled info for job [{}]", job.getName());
- return true;
- }
- ScheduledFuture<?> future = periodFutureMap.get(job.getId());
- if (future == null) {
- LOG.warn("fail to obtain scheduled info for job [{}]", job.getName());
- return true;
- }
- // MUST not set true for "mayInterruptIfRunning".
- // Because this thread may doing bdbje write operation, it is interrupted,
- // FE may exit due to bdbje write failure.
- boolean isCancel = future.cancel(false);
- if (!isCancel) {
- LOG.warn("fail to cancel scheduler for job [{}]", job.getName());
- }
- return isCancel;
- }
-
- public boolean killJobTask(String jobName, boolean clearPending) {
- MTMVJob job = nameToJobMap.get(jobName);
- if (job == null) {
- return false;
- }
- return taskManager.killTask(job.getId(), clearPending);
- }
-
public void refreshMTMV(String dbName, String mvName)
throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
@@ -334,85 +233,53 @@ public class MTMVJobManager {
createJob(mtmvJob, false);
}
- public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName) {
- return submitJobTask(jobName, new MTMVTaskExecuteParams());
- }
-
- public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName, MTMVTaskExecuteParams param) {
- MTMVJob job = nameToJobMap.get(jobName);
- if (job == null) {
- return MTMVUtils.TaskSubmitStatus.FAILED;
- }
- return taskManager.submitTask(MTMVUtils.buildTask(job), param);
- }
-
- public void updateJob(ChangeMTMVJob changeJob, boolean isReplay) {
- if (!tryLock()) {
- return;
- }
- try {
- MTMVJob job = idToJobMap.get(changeJob.getJobId());
- if (job == null) {
- LOG.warn("change jobId {} failed because job is null", changeJob.getJobId());
- return;
- }
- job.setState(changeJob.getToStatus());
- job.setLastModifyTime(changeJob.getLastModifyTime());
- if (!isReplay) {
- Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
- }
- } finally {
- unlock();
- }
- LOG.info("change job:{}", changeJob.getJobId());
- }
-
public void dropJobByName(String dbName, String mvName, boolean isReplay) {
+ List<Long> jobIds = Lists.newArrayList();
for (String jobName : nameToJobMap.keySet()) {
MTMVJob job = nameToJobMap.get(jobName);
if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) {
- dropJobs(Collections.singletonList(job.getId()), isReplay);
- return;
+ jobIds.add(job.getId());
}
}
+ dropJobs(jobIds, isReplay);
}
public void dropJobs(List<Long> jobIds, boolean isReplay) {
if (jobIds.isEmpty()) {
return;
}
- if (!tryLock()) {
- return;
+
+ for (long jobId : jobIds) {
+ dropJob(jobId, isReplay);
}
+
+ LOG.info("drop jobs:{}", jobIds);
+ }
+
+ private void dropJob(long jobId, boolean isReplay) {
+ MTMVJob job = dropJobWithLock(jobId, isReplay);
+ if (!isReplay && job != null) {
+ job.stop();
+ }
+ }
+
+ private MTMVJob dropJobWithLock(long jobId, boolean isReplay) {
+ writeLock();
try {
- for (long jobId : jobIds) {
- MTMVJob job = idToJobMap.get(jobId);
- if (job == null) {
- LOG.warn("drop jobId {} failed because job is null", jobId);
- continue;
- }
- if (job.getTriggerMode() == TriggerMode.PERIODICAL && !isReplay) {
- boolean isCancel = stopScheduler(job.getName());
- if (!isCancel) {
- continue;
- }
- periodFutureMap.remove(job.getId());
- }
- killJobTask(job.getName(), true);
- if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
- taskManager.clearTasksByJobName(job.getName(), isReplay);
- }
- idToJobMap.remove(job.getId());
- nameToJobMap.remove(job.getName());
+ MTMVJob job = idToJobMap.get(jobId);
+ if (job == null) {
+ LOG.warn("drop jobId {} failed because job is null", jobId);
+ return null;
}
-
+ idToJobMap.remove(job.getId());
+ nameToJobMap.remove(job.getName());
if (!isReplay) {
- Env.getCurrentEnv().getEditLog().logDropMTMVJob(jobIds);
+ Env.getCurrentEnv().getEditLog().logDropMTMVJob(Collections.singletonList(jobId));
}
+ return job;
} finally {
- unlock();
+ writeUnlock();
}
- LOG.info("drop jobs:{}", jobIds);
}
public List<MTMVJob> showAllJobs() {
@@ -422,9 +289,9 @@ public class MTMVJobManager {
public List<MTMVJob> showJobs(String dbName) {
List<MTMVJob> jobList = Lists.newArrayList();
if (Strings.isNullOrEmpty(dbName)) {
- jobList.addAll(nameToJobMap.values());
+ jobList.addAll(getAllJobsWithLock());
} else {
- jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDBName().equals(dbName))
+ jobList.addAll(getAllJobsWithLock().stream().filter(u -> u.getDBName().equals(dbName))
.collect(Collectors.toList()));
}
return jobList.stream().sorted().collect(Collectors.toList());
@@ -434,27 +301,7 @@ public class MTMVJobManager {
return showJobs(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
}
- private boolean tryLock() {
- try {
- return reentrantLock.tryLock(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.warn("got exception while getting job manager lock", e);
- }
- return false;
- }
-
- public void unlock() {
- this.reentrantLock.unlock();
- }
-
public void replayCreateJob(MTMVJob job) {
- if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
- JobSchedule jobSchedule = job.getSchedule();
- if (jobSchedule == null) {
- LOG.warn("replay a null schedule period job [{}]", job.getName());
- return;
- }
- }
if (job.getExpireTime() > 0 && MTMVUtils.getNowTimeStamp() > job.getExpireTime()) {
return;
}
@@ -470,7 +317,10 @@ public class MTMVJobManager {
}
public void replayUpdateJob(ChangeMTMVJob changeJob) {
- updateJob(changeJob, true);
+ MTMVJob mtmvJob = idToJobMap.get(changeJob.getJobId());
+ if (mtmvJob != null) {
+ mtmvJob.updateJob(changeJob, true);
+ }
}
public void replayCreateJobTask(MTMVTask task) {
@@ -478,41 +328,23 @@ public class MTMVJobManager {
}
public void replayDropJobTasks(List<String> taskIds) {
- taskManager.dropTasks(taskIds, true);
+ taskManager.dropHistoryTasks(taskIds, true);
}
- public void removeExpiredJobs() {
+ private void removeExpiredJobs() {
long currentTimeSeconds = MTMVUtils.getNowTimeStamp();
-
List<Long> jobIdsToDelete = Lists.newArrayList();
- if (!tryLock()) {
- return;
- }
- try {
- List<MTMVJob> jobs = showJobs(null);
- for (MTMVJob job : jobs) {
- // active job should not clean
- if (job.getState() == MTMVUtils.JobState.ACTIVE) {
- continue;
- }
- if (job.getTriggerMode() == MTMVUtils.TriggerMode.PERIODICAL) {
- JobSchedule jobSchedule = job.getSchedule();
- if (jobSchedule == null) {
- jobIdsToDelete.add(job.getId());
- LOG.warn("clean up a null schedule periodical Task [{}]", job.getName());
- continue;
- }
-
- }
- long expireTime = job.getExpireTime();
- if (expireTime > 0 && currentTimeSeconds > expireTime) {
- jobIdsToDelete.add(job.getId());
- }
+ List<MTMVJob> jobs = getAllJobsWithLock();
+ for (MTMVJob job : jobs) {
+ // active job should not clean
+ if (job.getState() != MTMVUtils.JobState.COMPLETE) {
+ continue;
+ }
+ long expireTime = job.getExpireTime();
+ if (expireTime > 0 && currentTimeSeconds > expireTime) {
+ jobIdsToDelete.add(job.getId());
}
- } finally {
- unlock();
}
-
dropJobs(jobIdsToDelete, false);
}
@@ -520,6 +352,40 @@ public class MTMVJobManager {
return nameToJobMap.get(jobName);
}
+ private List<MTMVJob> getAllJobsWithLock() {
+ readLock();
+ try {
+ return Lists.newArrayList(nameToJobMap.values());
+ } finally {
+ readUnlock();
+ }
+
+ }
+
+ public MTMVTaskManager getTaskManager() {
+ return taskManager;
+ }
+
+ public ScheduledExecutorService getPeriodScheduler() {
+ return periodScheduler;
+ }
+
+ private void readLock() {
+ this.rwLock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ this.rwLock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ this.rwLock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ this.rwLock.writeLock().unlock();
+ }
+
public long write(DataOutputStream dos, long checksum) throws IOException {
MTMVCheckpointData data = new MTMVCheckpointData();
data.jobs = new ArrayList<>(nameToJobMap.values());
@@ -548,8 +414,4 @@ public class MTMVJobManager {
}
return mtmvJobManager;
}
-
- public MTMVTaskManager getTaskManager() {
- return taskManager;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
index 55b4949c07..fb2aaf5419 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java
@@ -147,6 +147,12 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
return task;
}
+ public void stop() {
+ if (ctx != null) {
+ ctx.kill(false);
+ }
+ }
+
@Override
public int compareTo(@NotNull MTMVTaskExecutor task) {
if (this.getTask().getPriority() != task.getTask().getPriority()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 3396b30bbb..d6e370480b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -20,12 +20,9 @@ package org.apache.doris.mtmv;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
-import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
-import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
+import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
-import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -33,7 +30,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.jetbrains.annotations.Nullable;
import java.util.Deque;
import java.util.HashSet;
@@ -49,7 +45,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class MTMVTaskManager {
@@ -64,37 +60,23 @@ public class MTMVTaskManager {
private final MTMVTaskExecutorPool taskExecutorPool = new MTMVTaskExecutorPool();
- private final ReentrantLock reentrantLock = new ReentrantLock(true);
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
// keep track of all the completed tasks
private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque();
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
- private final MTMVJobManager mtmvJobManager;
-
private final AtomicInteger failedTaskCount = new AtomicInteger(0);
- public MTMVTaskManager(MTMVJobManager mtmvJobManager) {
- this.mtmvJobManager = mtmvJobManager;
- }
-
public void startTaskScheduler() {
if (taskScheduler.isShutdown()) {
taskScheduler = Executors.newScheduledThreadPool(1);
}
taskScheduler.scheduleAtFixedRate(() -> {
- if (!tryLock()) {
- return;
- }
- try {
- checkRunningTask();
- scheduledPendingTask();
- } catch (Exception ex) {
- LOG.warn("failed to schedule task.", ex);
- } finally {
- unlock();
- }
+ checkRunningTask();
+ scheduledPendingTask();
+
}, 0, 1, TimeUnit.SECONDS);
}
@@ -102,7 +84,69 @@ public class MTMVTaskManager {
taskScheduler.shutdown();
}
- public MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
+ private void checkRunningTask() {
+ writeLock();
+ try {
+ Iterator<Long> runningIterator = runningTaskMap.keySet().iterator();
+ while (runningIterator.hasNext()) {
+ Long jobId = runningIterator.next();
+ MTMVTaskExecutor taskExecutor = runningTaskMap.get(jobId);
+ Future<?> future = taskExecutor.getFuture();
+ if (future.isDone()) {
+ runningIterator.remove();
+ addHistory(taskExecutor.getTask());
+ MTMVUtils.TaskState finalState = taskExecutor.getTask().getState();
+ if (finalState == TaskState.FAILURE) {
+ failedTaskCount.incrementAndGet();
+ }
+ //task save final state only
+ Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
+ taskExecutor.getJob().taskFinished();
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void scheduledPendingTask() {
+ writeLock();
+ try {
+ int currentRunning = runningTaskMap.size();
+
+ Iterator<Long> pendingIterator = pendingTaskMap.keySet().iterator();
+ while (pendingIterator.hasNext()) {
+ Long jobId = pendingIterator.next();
+ MTMVTaskExecutor runningTaskExecutor = runningTaskMap.get(jobId);
+ if (runningTaskExecutor == null) {
+ Queue<MTMVTaskExecutor> taskQueue = pendingTaskMap.get(jobId);
+ if (taskQueue.size() == 0) {
+ pendingIterator.remove();
+ } else {
+ if (currentRunning >= Config.max_running_mtmv_scheduler_task_num) {
+ break;
+ }
+ MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
+ taskExecutorPool.executeTask(pendingTaskExecutor);
+ runningTaskMap.put(jobId, pendingTaskExecutor);
+ currentRunning++;
+ }
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job) {
+ return submitJobTask(job, new MTMVTaskExecuteParams());
+ }
+
+ private MTMVUtils.TaskSubmitStatus submitJobTask(MTMVJob job, MTMVTaskExecuteParams param) {
+ return submitTask(MTMVUtils.buildTask(job), param);
+ }
+
+ private MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
// duplicate submit
if (taskExecutor.getTask() != null) {
return MTMVUtils.TaskSubmitStatus.FAILED;
@@ -129,132 +173,39 @@ public class MTMVTaskManager {
return MTMVUtils.TaskSubmitStatus.SUBMITTED;
}
- public boolean killTask(Long jobId, boolean clearPending) {
- if (clearPending) {
- if (!tryLock()) {
- return false;
- }
- try {
- getPendingTaskMap().remove(jobId);
- } catch (Exception ex) {
- LOG.warn("failed to kill task.", ex);
- } finally {
- unlock();
- }
- }
- MTMVTaskExecutor task = runningTaskMap.get(jobId);
- if (task == null) {
- return false;
- }
- ConnectContext connectContext = task.getCtx();
- if (connectContext != null) {
- connectContext.kill(false);
- return true;
- }
- return false;
- }
-
- public void arrangeToPendingTask(MTMVTaskExecutor task) {
- if (!tryLock()) {
- return;
- }
+ private void arrangeToPendingTask(MTMVTaskExecutor task) {
+ writeLock();
try {
long jobId = task.getJobId();
PriorityBlockingQueue<MTMVTaskExecutor> tasks =
pendingTaskMap.computeIfAbsent(jobId, u -> Queues.newPriorityBlockingQueue());
tasks.offer(task);
} finally {
- unlock();
+ writeUnlock();
}
}
- @Nullable
- private MTMVTaskExecutor getTask(PriorityBlockingQueue<MTMVTaskExecutor> tasks, MTMVTaskExecutor task) {
- MTMVTaskExecutor oldTask = null;
- for (MTMVTaskExecutor t : tasks) {
- if (t.equals(task)) {
- oldTask = t;
- break;
- }
+ public void dealJobRemoved(MTMVJob job) {
+ removePendingTask(job.getId());
+ removeRunningTask(job.getId());
+ if (!Config.keep_scheduler_mtmv_task_when_job_deleted) {
+ clearHistoryTasksByJobName(job.getName(), false);
}
- return oldTask;
}
- private void checkRunningTask() {
- Iterator<Long> runningIterator = runningTaskMap.keySet().iterator();
- while (runningIterator.hasNext()) {
- Long jobId = runningIterator.next();
- MTMVTaskExecutor taskExecutor = runningTaskMap.get(jobId);
- if (taskExecutor == null) {
- LOG.warn("failed to get running task by jobId:{}", jobId);
- runningIterator.remove();
- return;
- }
- Future<?> future = taskExecutor.getFuture();
- if (future.isDone()) {
- runningIterator.remove();
- addHistory(taskExecutor.getTask());
- MTMVUtils.TaskState finalState = taskExecutor.getTask().getState();
- if (finalState == TaskState.FAILURE) {
- failedTaskCount.incrementAndGet();
- }
- Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
-
- TriggerMode triggerMode = taskExecutor.getJob().getTriggerMode();
- if (triggerMode == TriggerMode.ONCE) {
- // update the run once job status
- ChangeMTMVJob changeJob = new ChangeMTMVJob(taskExecutor.getJobId(), JobState.COMPLETE);
- mtmvJobManager.updateJob(changeJob, false);
- } else if (triggerMode == TriggerMode.PERIODICAL) {
- // just update the last modify time.
- ChangeMTMVJob changeJob = new ChangeMTMVJob(taskExecutor.getJobId(), JobState.ACTIVE);
- mtmvJobManager.updateJob(changeJob, false);
- }
- }
- }
+ private void removePendingTask(Long jobId) {
+ pendingTaskMap.remove(jobId);
}
- public int getFailedTaskCount() {
- return failedTaskCount.get();
- }
-
- private void scheduledPendingTask() {
- int currentRunning = runningTaskMap.size();
-
- Iterator<Long> pendingIterator = pendingTaskMap.keySet().iterator();
- while (pendingIterator.hasNext()) {
- Long jobId = pendingIterator.next();
- MTMVTaskExecutor runningTaskExecutor = runningTaskMap.get(jobId);
- if (runningTaskExecutor == null) {
- Queue<MTMVTaskExecutor> taskQueue = pendingTaskMap.get(jobId);
- if (taskQueue.size() == 0) {
- pendingIterator.remove();
- } else {
- if (currentRunning >= Config.max_running_mtmv_scheduler_task_num) {
- break;
- }
- MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
- taskExecutorPool.executeTask(pendingTaskExecutor);
- runningTaskMap.put(jobId, pendingTaskExecutor);
- currentRunning++;
- }
- }
+ private void removeRunningTask(Long jobId) {
+ MTMVTaskExecutor task = runningTaskMap.remove(jobId);
+ if (task != null) {
+ task.stop();
}
}
- public boolean tryLock() {
- try {
- return reentrantLock.tryLock(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.warn("got exception while getting task lock", e);
- Thread.currentThread().interrupt();
- }
- return false;
- }
-
-
- public void unlock() {
- this.reentrantLock.unlock();
+ public int getFailedTaskCount() {
+ return failedTaskCount.get();
}
public Map<Long, PriorityBlockingQueue<MTMVTaskExecutor>> getPendingTaskMap() {
@@ -273,36 +224,50 @@ public class MTMVTaskManager {
return historyTasks;
}
+ public List<MTMVTask> getHistoryTasksByJobName(String jobName) {
+ return getHistoryTasks().stream().filter(u -> u.getJobName().equals(jobName))
+ .collect(Collectors.toList());
+ }
+
public List<MTMVTask> showAllTasks() {
- return showTasks(null);
+ return showTasksWithLock(null);
}
- public List<MTMVTask> showTasks(String dbName) {
+ public List<MTMVTask> showTasksWithLock(String dbName) {
List<MTMVTask> taskList = Lists.newArrayList();
- if (Strings.isNullOrEmpty(dbName)) {
- for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
- taskList.addAll(pTaskQueue.stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
- }
- taskList.addAll(
- getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
- taskList.addAll(getHistoryTasks());
- } else {
- for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+ readLock();
+ try {
+ if (Strings.isNullOrEmpty(dbName)) {
+ for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+ taskList.addAll(pTaskQueue.stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
+ }
taskList.addAll(
- pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
+ getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
+ .collect(Collectors.toList()));
+ taskList.addAll(getHistoryTasks());
+ } else {
+ for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) {
+ taskList.addAll(
+ pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName))
+ .collect(Collectors.toList()));
+ }
+ taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
+ .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
+ taskList.addAll(
+ getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName))
.collect(Collectors.toList()));
- }
- taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
- .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
- taskList.addAll(
- getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList()));
+ }
+ } finally {
+ readUnlock();
}
+
return taskList.stream().sorted().collect(Collectors.toList());
}
public List<MTMVTask> showTasks(String dbName, String mvName) {
- return showTasks(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList());
+ return showTasksWithLock(dbName).stream().filter(u -> u.getMVName().equals(mvName))
+ .collect(Collectors.toList());
}
public MTMVTask getTask(String taskId) throws AnalysisException {
@@ -321,75 +286,63 @@ public class MTMVTaskManager {
addHistory(task);
}
- public void clearTasksByJobName(String jobName, boolean isReplay) {
+ private void clearHistoryTasksByJobName(String jobName, boolean isReplay) {
List<String> clearTasks = Lists.newArrayList();
- if (!tryLock()) {
- return;
- }
- try {
- List<MTMVTask> taskHistory = showAllTasks();
- for (MTMVTask task : taskHistory) {
- if (task.getJobName().equals(jobName)) {
- clearTasks.add(task.getTaskId());
- }
+ Deque<MTMVTask> taskHistory = getHistoryTasks();
+ for (MTMVTask task : taskHistory) {
+ if (task.getJobName().equals(jobName)) {
+ clearTasks.add(task.getTaskId());
}
- } finally {
- unlock();
}
- dropTasks(clearTasks, isReplay);
+
+ dropHistoryTasks(clearTasks, isReplay);
}
public void removeExpiredTasks() {
long currentTime = MTMVUtils.getNowTimeStamp();
-
List<String> historyToDelete = Lists.newArrayList();
-
- if (!tryLock()) {
- return;
- }
- try {
- Deque<MTMVTask> taskHistory = getHistoryTasks();
- for (MTMVTask task : taskHistory) {
- long expireTime = task.getExpireTime();
- if (currentTime > expireTime) {
- historyToDelete.add(task.getTaskId());
- }
+ Deque<MTMVTask> taskHistory = getHistoryTasks();
+ for (MTMVTask task : taskHistory) {
+ long expireTime = task.getExpireTime();
+ if (currentTime > expireTime) {
+ historyToDelete.add(task.getTaskId());
}
- } finally {
- unlock();
}
- dropTasks(historyToDelete, false);
+ dropHistoryTasks(historyToDelete, false);
}
- public void dropTasks(List<String> taskIds, boolean isReplay) {
+ public void dropHistoryTasks(List<String> taskIds, boolean isReplay) {
if (taskIds.isEmpty()) {
return;
}
- if (!tryLock()) {
- return;
- }
+ writeLock();
try {
Set<String> taskSet = new HashSet<>(taskIds);
- // Pending tasks will be clear directly. So we don't drop it again here.
- // Check the running task since the task was killed but was not move to the history queue.
- if (!isReplay) {
- for (long key : runningTaskMap.keySet()) {
- MTMVTaskExecutor executor = runningTaskMap.get(key);
- // runningTaskMap may be removed in the runningIterator
- if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
- runningTaskMap.remove(key);
- }
- }
- }
// Try to remove history tasks.
getHistoryTasks().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId()));
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
}
} finally {
- unlock();
+ writeUnlock();
}
LOG.info("drop task history:{}", taskIds);
}
+
+ private void readLock() {
+ this.rwLock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ this.rwLock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ this.rwLock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ this.rwLock.writeLock().unlock();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
index 8a348b7f99..f7e2cadeb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.mtmv.metadata;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mtmv.MTMVUtils;
@@ -28,6 +29,8 @@ import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.io.DataInput;
@@ -38,9 +41,13 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class MTMVJob implements Writable, Comparable {
+ private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
+
@SerializedName("id")
private long id;
@@ -51,9 +58,8 @@ public class MTMVJob implements Writable, Comparable {
@SerializedName("triggerMode")
private MTMVUtils.TriggerMode triggerMode = MTMVUtils.TriggerMode.MANUAL;
- // set default to UNKNOWN is for compatibility
@SerializedName("state")
- private MTMVUtils.JobState state = MTMVUtils.JobState.UNKNOWN;
+ private MTMVUtils.JobState state = JobState.ACTIVE;
@SerializedName("schedule")
private JobSchedule schedule;
@@ -86,6 +92,8 @@ public class MTMVJob implements Writable, Comparable {
@SerializedName("lastModifyTime")
private long lastModifyTime;
+ private ScheduledFuture<?> future;
+
public MTMVJob(String name) {
this.name = name;
this.createTime = MTMVUtils.getNowTimeStamp();
@@ -299,6 +307,56 @@ public class MTMVJob implements Writable, Comparable {
return list;
}
+ public synchronized void start() {
+
+ if (state == JobState.COMPLETE || state == JobState.PAUSE) {
+ return;
+ }
+ if (getTriggerMode() == TriggerMode.PERIODICAL) {
+ JobSchedule schedule = getSchedule();
+ ScheduledExecutorService periodScheduler = Env.getCurrentEnv().getMTMVJobManager().getPeriodScheduler();
+ future = periodScheduler.scheduleAtFixedRate(
+ () -> Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this),
+ MTMVUtils.getDelaySeconds(this), schedule.getSecondPeriod(), TimeUnit.SECONDS);
+
+ } else if (getTriggerMode() == TriggerMode.ONCE) {
+ Env.getCurrentEnv().getMTMVJobManager().getTaskManager().submitJobTask(this);
+ }
+ }
+
+ public synchronized void stop() {
+ // MUST not set true for "mayInterruptIfRunning".
+ // Because this thread may doing bdbje write operation, it is interrupted,
+ // FE may exit due to bdbje write failure.
+ if (future != null) {
+ boolean isCancel = future.cancel(false);
+ if (!isCancel) {
+ LOG.warn("fail to cancel scheduler for job [{}]", name);
+ }
+ }
+ Env.getCurrentEnv().getMTMVJobManager().getTaskManager().dealJobRemoved(this);
+ }
+
+ public void taskFinished() {
+ if (triggerMode == TriggerMode.ONCE) {
+ // update the run once job status
+ ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.COMPLETE);
+ updateJob(changeJob, false);
+ } else if (triggerMode == TriggerMode.PERIODICAL) {
+ // just update the last modify time.
+ ChangeMTMVJob changeJob = new ChangeMTMVJob(id, JobState.ACTIVE);
+ updateJob(changeJob, false);
+ }
+ }
+
+ public void updateJob(ChangeMTMVJob changeJob, boolean isReplay) {
+ setState(changeJob.getToStatus());
+ setLastModifyTime(changeJob.getLastModifyTime());
+ if (!isReplay) {
+ Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
+ }
+ }
+
@Override
public int compareTo(@NotNull Object o) {
return (int) (getCreateTime() - ((MTMVJob) o).getCreateTime());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 1ccf2be1b3..1eee9630d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2699,7 +2699,7 @@ public class ShowExecutor {
if (showStmt.isShowAllTasks()) {
tasks.addAll(jobManager.getTaskManager().showAllTasks());
} else if (showStmt.isShowAllTasksFromDb()) {
- tasks.addAll(jobManager.getTaskManager().showTasks(showStmt.getDbName()));
+ tasks.addAll(jobManager.getTaskManager().showTasksWithLock(showStmt.getDbName()));
} else if (showStmt.isShowAllTasksOnMv()) {
tasks.addAll(jobManager.getTaskManager().showTasks(showStmt.getDbName(), showStmt.getMVName()));
} else if (showStmt.isSpecificTask()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 9c5b98df11..6993299f85 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.mtmv;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricRepo;
@@ -36,65 +37,62 @@ public class MTMVJobManagerTest extends TestWithFeService {
@Test
public void testSampleCase() throws DdlException {
- MTMVJobManager jobManager = new MTMVJobManager();
- jobManager.start();
- MTMVJob job = MTMVUtilsTest.createDummyJob();
+ String jobName = "testSampleCaseJob";
+ String mvName = "testSampleCaseMv";
+ MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+ MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
jobManager.createJob(job, false);
- Assertions.assertEquals(1, jobManager.showAllJobs().size());
- MTMVJob resultJob = jobManager.getJob("dummy");
- Assertions.assertEquals("dummy", resultJob.getName());
+ Assertions.assertNotNull(jobManager.getJob(job.getName()));
+ MTMVJob resultJob = jobManager.getJob(jobName);
Assertions.assertEquals(JobState.ACTIVE, resultJob.getState());
long jobId = resultJob.getId();
ChangeMTMVJob changeMTMVJob = new ChangeMTMVJob(jobId, JobState.PAUSE);
- jobManager.updateJob(changeMTMVJob, false);
- resultJob = jobManager.getJob("dummy");
- Assertions.assertEquals("dummy", resultJob.getName());
+ resultJob.updateJob(changeMTMVJob, false);
+ resultJob = jobManager.getJob(jobName);
+ Assertions.assertEquals(jobName, resultJob.getName());
Assertions.assertEquals(JobState.PAUSE, resultJob.getState());
jobManager.dropJobs(Collections.singletonList(jobId), false);
- Assertions.assertEquals(0, jobManager.showAllJobs().size());
+ Assertions.assertNull(jobManager.getJob(jobName));
}
@Test
public void testSchedulerJob() throws DdlException, InterruptedException {
- MTMVJobManager jobManager = new MTMVJobManager();
- jobManager.start();
- Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().isEmpty());
- MTMVJob job = MTMVUtilsTest.createSchedulerJob();
+ String jobName = "testSchedulerJob";
+ String mvName = "testSchedulerJobMv";
+ MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+ MTMVJob job = MTMVUtilsTest.createSchedulerJob(mvName, jobName);
jobManager.createJob(job, false);
- Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName).size());
- while (jobManager.getTaskManager().getHistoryTasks().isEmpty()) {
+ Assertions.assertNotNull(jobManager.getJob(jobName));
+ while (jobManager.getTaskManager().getHistoryTasksByJobName(jobName).isEmpty()) {
Thread.sleep(1000L);
System.out.println("Loop once");
}
- Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().size() > 0);
+ Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasksByJobName(jobName).size() > 0);
}
@Test
public void testOnceJob() throws DdlException, InterruptedException {
- MTMVJobManager jobManager = new MTMVJobManager();
- jobManager.start();
- MTMVJob job = MTMVUtilsTest.createOnceJob();
+ String jobName = "testOnceJob";
+ String mvName = "testOnceJobMv";
+ MTMVJobManager jobManager = Env.getCurrentEnv().getMTMVJobManager();
+ MTMVJob job = MTMVUtilsTest.createOnceJob(mvName, jobName);
jobManager.createJob(job, false);
- Assertions.assertEquals(1, jobManager.showAllJobs().size());
- Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName).size());
- Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName, MTMVUtilsTest.MV_NAME).size());
- while (!jobManager.getJob(MTMVUtilsTest.O_JOB).getState().equals(JobState.COMPLETE)) {
+ Assertions.assertNotNull(jobManager.getJob(jobName));
+ while (!jobManager.getJob(jobName).getState().equals(JobState.COMPLETE)) {
Thread.sleep(1000L);
System.out.println("Loop once");
}
- Assertions.assertEquals(1, jobManager.getTaskManager().getHistoryTasks().size());
- Assertions.assertEquals(1, jobManager.getTaskManager().showAllTasks().size());
- Assertions.assertEquals(1, jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName).size());
+ Assertions.assertEquals(1, jobManager.getTaskManager().getHistoryTasksByJobName(jobName).size());
Assertions.assertEquals(1,
- jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName, MTMVUtilsTest.MV_NAME).size());
+ jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName, mvName).size());
// verify job meta
- MTMVJob metaJob = jobManager.showAllJobs().get(0);
+ MTMVJob metaJob = jobManager.getJob(jobName);
List<String> jobRow = metaJob.toStringRow();
Assertions.assertEquals(13, jobRow.size());
// index 1: Name
- Assertions.assertEquals(MTMVUtilsTest.O_JOB, jobRow.get(1));
+ Assertions.assertEquals(jobName, jobRow.get(1));
// index 2: TriggerMode
Assertions.assertEquals("ONCE", jobRow.get(2));
// index 3: Schedule
@@ -102,7 +100,7 @@ public class MTMVJobManagerTest extends TestWithFeService {
// index 4: DBName
Assertions.assertEquals(MTMVUtilsTest.dbName, jobRow.get(4));
// index 5: MVName
- Assertions.assertEquals(MTMVUtilsTest.MV_NAME, jobRow.get(5));
+ Assertions.assertEquals(mvName, jobRow.get(5));
// index 6: Query
Assertions.assertEquals("", jobRow.get(6));
// index 7: User
@@ -113,15 +111,15 @@ public class MTMVJobManagerTest extends TestWithFeService {
Assertions.assertEquals("COMPLETE", jobRow.get(9));
// verify task meta
- MTMVTask metaTask = jobManager.getTaskManager().showAllTasks().get(0);
+ MTMVTask metaTask = jobManager.getTaskManager().getHistoryTasksByJobName(jobName).get(0);
List<String> taskRow = metaTask.toStringRow();
Assertions.assertEquals(14, taskRow.size());
// index 1: JobName
- Assertions.assertEquals(MTMVUtilsTest.O_JOB, taskRow.get(1));
+ Assertions.assertEquals(jobName, taskRow.get(1));
// index 2: DBName
Assertions.assertEquals(MTMVUtilsTest.dbName, taskRow.get(2));
// index 3: MVName
- Assertions.assertEquals(MTMVUtilsTest.MV_NAME, taskRow.get(3));
+ Assertions.assertEquals(mvName, taskRow.get(3));
// index 4: Query
Assertions.assertEquals("", taskRow.get(4));
// index 5: User
@@ -140,9 +138,6 @@ public class MTMVJobManagerTest extends TestWithFeService {
@Test
public void testMetrics() {
- MTMVJobManager jobManager = new MTMVJobManager();
- jobManager.start();
-
int jobMetricCount = 0;
int taskMetricCount = 0;
List<Metric> metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
index bee3c2e8bd..7bd58b145a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.UUID;
@@ -31,10 +32,12 @@ import java.util.concurrent.ExecutionException;
public class MTMVTaskExecutorTest extends TestWithFeService {
@Test
public void testSubmitTask() throws InterruptedException, ExecutionException {
+ String mvName = "testSubmitTaskMv";
+ String jobName = "testSubmitTaskJob";
MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
MTMVTaskExecutor executor = new MTMVTaskExecutor();
executor.setProcessor(new MTMVTaskProcessor());
- executor.setJob(MTMVUtilsTest.createDummyJob());
+ executor.setJob(MTMVUtilsTest.createDummyJob(mvName, jobName));
executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
pool.executeTask(executor);
executor.getFuture().get();
@@ -44,10 +47,12 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
@Test
public void testFailTask() throws InterruptedException, ExecutionException {
+ String mvName = "testFailTaskMv";
+ String jobName = "testFailTaskJob";
MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
MTMVTaskExecutor executor = new MTMVTaskExecutor();
executor.setProcessor(new MTMVTaskProcessorTest(1));
- executor.setJob(MTMVUtilsTest.createDummyJob());
+ executor.setJob(MTMVUtilsTest.createDummyJob(mvName, jobName));
executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
pool.executeTask(executor);
executor.getFuture().get();
@@ -56,12 +61,15 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
}
@Test
+ @Disabled
public void testRetryTask() throws InterruptedException, ExecutionException {
+ String mvName = "testRetryTaskMv";
+ String jobName = "testRetryTaskJob";
MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
MTMVTaskExecutor executor = new MTMVTaskExecutor();
executor.setProcessor(new MTMVTaskProcessorTest(3));
- MTMVJob job = MTMVUtilsTest.createDummyJob();
+ MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
job.setRetryPolicy(TaskRetryPolicy.TIMES);
executor.setJob(job);
executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
@@ -71,12 +79,15 @@ public class MTMVTaskExecutorTest extends TestWithFeService {
}
@Test
+ @Disabled
public void testRetryFailTask() throws InterruptedException, ExecutionException {
+ String mvName = "testRetryTaskMv";
+ String jobName = "testRetryTaskJob";
MTMVTaskExecutorPool pool = new MTMVTaskExecutorPool();
MTMVTaskExecutor executor = new MTMVTaskExecutor();
executor.setProcessor(new MTMVTaskProcessorTest(4));
- MTMVJob job = MTMVUtilsTest.createDummyJob();
+ MTMVJob job = MTMVUtilsTest.createDummyJob(mvName, jobName);
job.setRetryPolicy(TaskRetryPolicy.TIMES);
executor.setJob(job);
executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
index 5a81173715..e76e26ec0d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java
@@ -33,41 +33,35 @@ public class MTMVUtilsTest {
public static final String dbName = "test";
public static final String MV_NAME = "mvName";
- public static final String S_JOB = "SchedulerJob";
- public static final String O_JOB = "OnceJob";
-
-
- public static MTMVJob createDummyJob() {
- MTMVJob job = new MTMVJob("dummy");
+ public static MTMVJob createDummyJob(String mvName, String jobName) {
+ MTMVJob job = new MTMVJob(jobName);
job.setDBName(dbName);
- job.setMVName(MV_NAME);
+ job.setMVName(mvName);
return job;
}
- public static MTMVJob createOnceJob() {
- MTMVJob job = new MTMVJob("");
+ public static MTMVJob createOnceJob(String mvName, String jobName) {
+ MTMVJob job = new MTMVJob(jobName);
job.setTriggerMode(TriggerMode.ONCE);
job.setDBName(dbName);
- job.setName(O_JOB);
- job.setMVName(MV_NAME);
+ job.setMVName(mvName);
return job;
}
- public static MTMVJob createSchedulerJob() {
- MTMVJob job = new MTMVJob("");
+ public static MTMVJob createSchedulerJob(String mvName, String jobName) {
+ MTMVJob job = new MTMVJob(jobName);
JobSchedule jobSchedule = new JobSchedule(System.currentTimeMillis() / 1000, 1, TimeUnit.SECONDS);
job.setSchedule(jobSchedule);
job.setTriggerMode(TriggerMode.PERIODICAL);
job.setDBName(dbName);
- job.setName(S_JOB);
- job.setMVName(MV_NAME);
+ job.setMVName(mvName);
return job;
}
@Test
public void testGetDelaySeconds() {
- MTMVJob job = MTMVUtilsTest.createDummyJob();
+ MTMVJob job = MTMVUtilsTest.createDummyJob("testGetDelaySecondsMv", "testGetDelaySecondsJob");
// 2022-10-03 15:00:00
JobSchedule jobSchedule = new JobSchedule(1664780400L, 1, TimeUnit.HOURS);
diff --git a/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
index 17e7b05abe..70ee205d35 100644
--- a/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
@@ -99,7 +99,7 @@ suite("test_refresh_mtmv") {
sql """
REFRESH MATERIALIZED VIEW ${mvNameDemand} COMPLETE
"""
- waitingMTMVTaskFinished(mvName)
+ waitingMTMVTaskFinished(mvNameDemand)
show_task_result = sql "SHOW MTMV TASK ON ${mvNameDemand}"
assertEquals 1, show_task_result.size(), show_task_result.toString()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org