You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/11 04:16:53 UTC
[doris] branch master updated: [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 81101fc1c5 [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)
81101fc1c5 is described below
commit 81101fc1c58100a05c16966b1d2b3f408df2a81f
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Mon Jul 11 12:16:48 2022 +0800
[enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)
Co-authored-by: caiconghui1 <ca...@jd.com>
---
docs/en/docs/admin-manual/config/fe-config.md | 4 +--
docs/zh-CN/docs/admin-manual/config/fe-config.md | 4 +--
.../java/org/apache/doris/alter/RollupJobV2.java | 34 ++++++++++++++++------
.../org/apache/doris/alter/SchemaChangeJobV2.java | 34 +++++++++++++++-------
.../main/java/org/apache/doris/common/Config.java | 4 +--
5 files changed, 55 insertions(+), 25 deletions(-)
diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md
index 21e9fe5623..13c6e8374f 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2084,11 +2084,11 @@ Whether to allow multiple replicas of the same tablet to be distributed on the s
### min_version_count_indicate_replica_compaction_too_slow
-Default: 300
+Default: 200
Dynamically configured: true
-Only for Master FE: true
+Only for Master FE: false
The version count threshold used to judge whether replica compaction is too slow
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 5aa42bd793..bd09d69e53 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2129,11 +2129,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
### `min_version_count_indicate_replica_compaction_too_slow`
-默认值:300
+默认值:200
是否可以动态配置:true
-是否为 Master FE 节点独有的配置项:true
+是否为 Master FE 节点独有的配置项:false
版本计数阈值,用来判断副本做 compaction 的速度是否太慢
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 79ec2d4107..4fe0c9854e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -131,6 +131,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
+ // save failed task after retry three times, tabletId -> agentTask
+ private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
@@ -152,10 +154,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
this.origStmt = origStmt;
}
- private RollupJobV2() {
- super(JobType.ROLLUP);
- }
-
public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) {
Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap
.computeIfAbsent(partitionId, k -> Maps.newHashMap());
@@ -425,13 +423,27 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
-
if (!rollupBatchTask.isFinished()) {
LOG.info("rollup tasks not finished. job: {}", jobId);
List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() >= 3) {
- throw new AlterCancelException("rollup task failed after try three times: " + task.getErrorMsg());
+ task.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
+ LOG.warn("rollup task failed after try three times: " + task.getErrorMsg());
+ if (!failedAgentTasks.containsKey(task.getTabletId())) {
+ failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
+ } else {
+ failedAgentTasks.get(task.getTabletId()).add(task);
+ }
+ int expectSucceedTaskNum = tbl.getPartitionInfo()
+ .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+ int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size();
+ if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
+ throw new AlterCancelException(
+ "rollup tasks failed on same tablet reach threshold "
+ + failedAgentTasks.get(task.getTabletId()));
+ }
}
}
return;
@@ -444,6 +456,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ for (List<AgentTask> tasks : failedAgentTasks.values()) {
+ for (AgentTask task : tasks) {
+ invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true);
+ }
+ }
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
@@ -454,14 +472,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
long visiableVersion = partition.getVisibleVersion();
short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation(
partitionId).getTotalReplicaNum();
-
-
MaterializedIndex rollupIndex = entry.getValue();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
List<Replica> replicas = rollupTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica replica : replicas) {
- if (replica.getLastFailedVersion() < 0
+ if (!replica.isBad() && replica.getLastFailedVersion() < 0
&& replica.checkVersionCatchUp(visiableVersion, false)) {
healthyReplicaNum++;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d7a0662bb2..1c112d6d28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -129,15 +129,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// save all schema change tasks
private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
+ // save failed task after retry three times, tabletId -> agentTask
+ private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
}
- private SchemaChangeJobV2() {
- super(JobType.SCHEMA_CHANGE);
- }
-
public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) {
Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, shadowIdxId);
if (tabletMap == null) {
@@ -494,14 +492,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() >= 3) {
- throw new AlterCancelException("schema change task failed after try three times: "
- + task.getErrorMsg());
+ task.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
+ LOG.warn("schema change task failed after try three times: " + task.getErrorMsg());
+ if (!failedAgentTasks.containsKey(task.getTabletId())) {
+ failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
+ } else {
+ failedAgentTasks.get(task.getTabletId()).add(task);
+ }
+ int expectSucceedTaskNum = tbl.getPartitionInfo()
+ .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+ int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size();
+ if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
+ throw new AlterCancelException("schema change tasks failed on same tablet reach threshold "
+ + failedAgentTasks.get(task.getTabletId()));
+ }
}
}
return;
}
-
-
/*
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
@@ -509,7 +518,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
-
+ TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+ for (List<AgentTask> tasks : failedAgentTasks.values()) {
+ for (AgentTask task : tasks) {
+ invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true);
+ }
+ }
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
@@ -526,7 +540,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
List<Replica> replicas = shadowTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica replica : replicas) {
- if (replica.getLastFailedVersion() < 0
+ if (!replica.isBad() && replica.getLastFailedVersion() < 0
&& replica.checkVersionCatchUp(visiableVersion, false)) {
healthyReplicaNum++;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index c64de8ea6b..4f9231516d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1583,8 +1583,8 @@ public class Config extends ConfigBase {
/**
* The version count threshold used to judge whether replica compaction is too slow
*/
- @ConfField(mutable = true, masterOnly = true)
- public static int min_version_count_indicate_replica_compaction_too_slow = 300;
+ @ConfField(mutable = true)
+ public static int min_version_count_indicate_replica_compaction_too_slow = 200;
/**
* The valid ratio threshold of the difference between the version count of the slowest replicaand the fastest
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org