You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ad...@apache.org on 2023/06/08 02:46:32 UTC
[doris] branch master updated: [feature-wip](MTMV) Sync finish status only for tasks (#20441)
This is an automated email from the ASF dual-hosted git repository.
adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46c68d11aa [feature-wip](MTMV) Sync finish status only for tasks (#20441)
46c68d11aa is described below
commit 46c68d11aaa8ca9b8d129a2ff509f82e428f5208
Author: zhangdong <49...@qq.com>
AuthorDate: Thu Jun 8 10:46:25 2023 +0800
[feature-wip](MTMV) Sync finish status only for tasks (#20441)
MTMV tasks keep finish status only to reduce the loss caused by logging.
After changes, unfinished tasks will be lost directly when FE master restarts.
---
.../org/apache/doris/journal/JournalEntity.java | 3 +-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 19 +--
.../apache/doris/mtmv/MTMVTaskExecutorPool.java | 6 -
.../org/apache/doris/mtmv/MTMVTaskManager.java | 141 ++-------------------
.../apache/doris/mtmv/metadata/ChangeMTMVTask.java | 131 -------------------
.../doris/mtmv/metadata/MTMVCheckpointData.java | 20 +--
.../java/org/apache/doris/persist/EditLog.java | 7 -
.../org/apache/doris/persist/OperationType.java | 1 +
8 files changed, 16 insertions(+), 312 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 37a7f1a1a6..6b1dc9e06c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -55,7 +55,6 @@ import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
@@ -762,7 +761,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
- data = ChangeMTMVTask.read(in);
+ Text.readString(in);
isRead = true;
break;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index ae3d9007a6..f1d976a7f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -26,10 +26,8 @@ import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
-import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVCheckpointData;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
@@ -88,8 +86,6 @@ public class MTMVJobManager {
public void start() {
if (isStarted.compareAndSet(false, true)) {
- taskManager.clearUnfinishedTasks();
-
// check the scheduler before using it
// since it may be shutdown when master change to follower without process shutdown.
if (periodScheduler.isShutdown()) {
@@ -219,11 +215,9 @@ public class MTMVJobManager {
periodFutureMap.put(job.getId(), future);
periodNum++;
} else if (job.getTriggerMode() == TriggerMode.ONCE) {
- if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || job.getRetryPolicy() == TaskRetryPolicy.TIMES) {
- MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
- submitJobTask(job.getName(), executeOption);
- onceNum++;
- }
+ MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
+ submitJobTask(job.getName(), executeOption);
+ onceNum++;
}
}
LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num);
@@ -477,10 +471,6 @@ public class MTMVJobManager {
taskManager.replayCreateJobTask(task);
}
- public void replayUpdateTask(ChangeMTMVTask changeTask) {
- taskManager.replayUpdateTask(changeTask);
- }
-
public void replayDropJobTasks(List<String> taskIds) {
taskManager.dropTasks(taskIds, true);
}
@@ -527,7 +517,7 @@ public class MTMVJobManager {
public long write(DataOutputStream dos, long checksum) throws IOException {
MTMVCheckpointData data = new MTMVCheckpointData();
data.jobs = new ArrayList<>(nameToJobMap.values());
- data.tasks = taskManager.showTasks(null);
+ data.tasks = Lists.newArrayList(taskManager.getHistoryTasks());
String s = GsonUtils.GSON.toJson(data);
Text.writeString(dos, s);
return checksum;
@@ -553,7 +543,6 @@ public class MTMVJobManager {
return mtmvJobManager;
}
- // for test only
public MTMVTaskManager getTaskManager() {
return taskManager;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
index f68f68cbaf..0c63c287e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java
@@ -17,9 +17,7 @@
package org.apache.doris.mtmv;
-import org.apache.doris.catalog.Env;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.logging.log4j.LogManager;
@@ -78,10 +76,6 @@ public class MTMVTaskExecutorPool {
task.setErrorCode(-1);
}
task.setFinishTime(MTMVUtils.getNowTimeStamp());
-
- ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
- task.getState());
- Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
});
taskExecutor.setFuture(future);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index cbac71b9bb..3396b30bbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -24,8 +24,6 @@ import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
-import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.qe.ConnectContext;
@@ -127,7 +125,6 @@ public class MTMVTaskManager {
MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp());
task.setPriority(params.getPriority());
LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName());
- Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task);
arrangeToPendingTask(taskExecutor);
return MTMVUtils.TaskSubmitStatus.SUBMITTED;
}
@@ -201,7 +198,7 @@ public class MTMVTaskManager {
if (finalState == TaskState.FAILURE) {
failedTaskCount.incrementAndGet();
}
- changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING, finalState);
+ Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
TriggerMode triggerMode = taskExecutor.getJob().getTriggerMode();
if (triggerMode == TriggerMode.ONCE) {
@@ -239,19 +236,12 @@ public class MTMVTaskManager {
MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
taskExecutorPool.executeTask(pendingTaskExecutor);
runningTaskMap.put(jobId, pendingTaskExecutor);
- // change status from PENDING to Running
- changeAndLogTaskStatus(jobId, pendingTaskExecutor.getTask(), TaskState.PENDING, TaskState.RUNNING);
currentRunning++;
}
}
}
}
- private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) {
- ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, fromStatus, toStatus);
- Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
- }
-
public boolean tryLock() {
try {
return reentrantLock.tryLock(5, TimeUnit.SECONDS);
@@ -328,86 +318,7 @@ public class MTMVTaskManager {
}
public void replayCreateJobTask(MTMVTask task) {
- if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) {
- if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) {
- return;
- }
- }
-
- switch (task.getState()) {
- case PENDING:
- String jobName = task.getJobName();
- MTMVJob job = mtmvJobManager.getJob(jobName);
- if (job == null) {
- LOG.warn("fail to obtain task name {} because task is null", jobName);
- return;
- }
- MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job);
- taskExecutor.setTask(task);
- arrangeToPendingTask(taskExecutor);
- break;
- case RUNNING:
- task.setState(TaskState.FAILURE);
- addHistory(task);
- break;
- case FAILURE:
- case SUCCESS:
- addHistory(task);
- break;
- default:
- break;
- }
- }
-
- public void replayUpdateTask(ChangeMTMVTask changeTask) {
- TaskState fromStatus = changeTask.getFromStatus();
- TaskState toStatus = changeTask.getToStatus();
- Long jobId = changeTask.getJobId();
- if (fromStatus == TaskState.PENDING) {
- Queue<MTMVTaskExecutor> taskQueue = getPendingTaskMap().get(jobId);
- if (taskQueue == null) {
- return;
- }
- if (taskQueue.size() == 0) {
- getPendingTaskMap().remove(jobId);
- return;
- }
-
- MTMVTaskExecutor pendingTask = taskQueue.poll();
- MTMVTask status = pendingTask.getTask();
-
- if (toStatus == TaskState.RUNNING) {
- if (status.getTaskId().equals(changeTask.getTaskId())) {
- status.setState(TaskState.RUNNING);
- getRunningTaskMap().put(jobId, pendingTask);
- }
- } else if (toStatus == TaskState.FAILURE) {
- status.setMessage(changeTask.getErrorMessage());
- status.setErrorCode(changeTask.getErrorCode());
- status.setState(TaskState.FAILURE);
- addHistory(status);
- }
- if (taskQueue.size() == 0) {
- getPendingTaskMap().remove(jobId);
- }
- } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS
- || toStatus == TaskState.FAILURE)) {
- MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId);
- if (runningTask == null) {
- return;
- }
- MTMVTask status = runningTask.getTask();
- if (status.getTaskId().equals(changeTask.getTaskId())) {
- status.setMessage(changeTask.getErrorMessage());
- status.setErrorCode(changeTask.getErrorCode());
- status.setState(toStatus);
- status.setFinishTime(changeTask.getFinishTime());
- addHistory(status);
- }
- } else {
- LOG.warn("Illegal Task taskId:{} status transform from {} to {}", changeTask.getTaskId(), fromStatus,
- toStatus);
- }
+ addHistory(task);
}
public void clearTasksByJobName(String jobName, boolean isReplay) {
@@ -462,11 +373,13 @@ public class MTMVTaskManager {
Set<String> taskSet = new HashSet<>(taskIds);
// Pending tasks will be clear directly. So we don't drop it again here.
// Check the running task since the task was killed but was not move to the history queue.
- for (long key : runningTaskMap.keySet()) {
- MTMVTaskExecutor executor = runningTaskMap.get(key);
- // runningTaskMap may be removed in the runningIterator
- if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
- runningTaskMap.remove(key);
+ if (!isReplay) {
+ for (long key : runningTaskMap.keySet()) {
+ MTMVTaskExecutor executor = runningTaskMap.get(key);
+ // runningTaskMap may be removed in the runningIterator
+ if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
+ runningTaskMap.remove(key);
+ }
}
}
// Try to remove history tasks.
@@ -479,40 +392,4 @@ public class MTMVTaskManager {
}
LOG.info("drop task history:{}", taskIds);
}
-
- public void clearUnfinishedTasks() {
- if (!tryLock()) {
- return;
- }
- try {
- Iterator<Long> pendingIter = getPendingTaskMap().keySet().iterator();
- while (pendingIter.hasNext()) {
- Queue<MTMVTaskExecutor> tasks = getPendingTaskMap().get(pendingIter.next());
- while (!tasks.isEmpty()) {
- MTMVTaskExecutor taskExecutor = tasks.poll();
- taskExecutor.getTask().setMessage("Fe abort the task");
- taskExecutor.getTask().setErrorCode(-1);
- taskExecutor.getTask().setState(TaskState.FAILURE);
- addHistory(taskExecutor.getTask());
- changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.PENDING,
- TaskState.FAILURE);
- }
- pendingIter.remove();
- }
- Iterator<Long> runningIter = getRunningTaskMap().keySet().iterator();
- while (runningIter.hasNext()) {
- MTMVTaskExecutor taskExecutor = getRunningTaskMap().get(runningIter.next());
- taskExecutor.getTask().setMessage("Fe abort the task");
- taskExecutor.getTask().setErrorCode(-1);
- taskExecutor.getTask().setState(TaskState.FAILURE);
- taskExecutor.getTask().setFinishTime(MTMVUtils.getNowTimeStamp());
- runningIter.remove();
- addHistory(taskExecutor.getTask());
- changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING,
- TaskState.FAILURE);
- }
- } finally {
- unlock();
- }
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java
deleted file mode 100644
index 416cc123d9..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mtmv.metadata;
-
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
-import org.apache.doris.persist.gson.GsonUtils;
-
-import com.google.gson.annotations.SerializedName;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class ChangeMTMVTask implements Writable {
-
- @SerializedName("jobId")
- private long jobId;
-
- @SerializedName("taskId")
- private String taskId;
-
- @SerializedName("finishTime")
- private long finishTime;
-
- @SerializedName("fromStatus")
- TaskState fromStatus;
-
- @SerializedName("toStatus")
- TaskState toStatus;
-
- @SerializedName("errorCode")
- private int errorCode = -1;
-
- @SerializedName("errorMessage")
- private String errorMessage = "";
-
-
- public ChangeMTMVTask(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) {
- this.jobId = jobId;
- this.taskId = task.getTaskId();
- this.fromStatus = fromStatus;
- this.toStatus = toStatus;
- this.finishTime = task.getFinishTime();
- errorCode = task.getErrorCode();
- errorMessage = task.getMessage();
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public void setJobId(long jobId) {
- this.jobId = jobId;
- }
-
- public String getTaskId() {
- return taskId;
- }
-
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
-
- public TaskState getFromStatus() {
- return fromStatus;
- }
-
- public void setFromStatus(TaskState fromStatus) {
- this.fromStatus = fromStatus;
- }
-
- public TaskState getToStatus() {
- return toStatus;
- }
-
- public void setToStatus(TaskState toStatus) {
- this.toStatus = toStatus;
- }
-
- public int getErrorCode() {
- return errorCode;
- }
-
- public void setErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
-
- public String getErrorMessage() {
- return errorMessage;
- }
-
- public void setErrorMessage(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
- }
-
- public static ChangeMTMVTask read(DataInput in) throws IOException {
- String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, ChangeMTMVTask.class);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- String json = GsonUtils.GSON.toJson(this);
- Text.writeString(out, json);
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java
index 163faed034..933ce31cf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java
@@ -17,32 +17,14 @@
package org.apache.doris.mtmv.metadata;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.persist.gson.GsonUtils;
-
import com.google.gson.annotations.SerializedName;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.List;
-public class MTMVCheckpointData implements Writable {
+public class MTMVCheckpointData {
@SerializedName("jobs")
public List<MTMVJob> jobs;
@SerializedName("tasks")
public List<MTMVTask> tasks;
-
- @Override
- public void write(DataOutput out) throws IOException {
- String json = GsonUtils.GSON.toJson(this);
- Text.writeString(out, json);
- }
-
- public static MTMVCheckpointData read(DataInput in) throws IOException {
- String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, MTMVCheckpointData.class);
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index c64882632f..4f3d137442 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -69,7 +69,6 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
-import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
@@ -917,8 +916,6 @@ public class EditLog {
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
- final ChangeMTMVTask changeTask = (ChangeMTMVTask) journal.getData();
- env.getMTMVJobManager().replayUpdateTask(changeTask);
break;
}
case OperationType.OP_DROP_MTMV_TASK: {
@@ -1694,10 +1691,6 @@ public class EditLog {
logEdit(OperationType.OP_CREATE_MTMV_TASK, task);
}
- public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) {
- logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord);
- }
-
public void logDropMTMVTasks(List<String> taskIds) {
logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 32f7d958e8..e24055445d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -266,6 +266,7 @@ public class OperationType {
public static final short OP_CREATE_MTMV_TASK = 340;
public static final short OP_DROP_MTMV_TASK = 341;
+ @Deprecated
public static final short OP_CHANGE_MTMV_TASK = 342;
public static final short OP_ALTER_MTMV_STMT = 345;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org