You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/11 04:16:53 UTC

[doris] branch master updated: [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 81101fc1c5 [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)
81101fc1c5 is described below

commit 81101fc1c58100a05c16966b1d2b3f408df2a81f
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Mon Jul 11 12:16:48 2022 +0800

    [enhancement](alter) Make alter job more robust by ignoring some task failure (#10719)
    
    Co-authored-by: caiconghui1 <ca...@jd.com>
---
 docs/en/docs/admin-manual/config/fe-config.md      |  4 +--
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |  4 +--
 .../java/org/apache/doris/alter/RollupJobV2.java   | 34 ++++++++++++++++------
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 34 +++++++++++++++-------
 .../main/java/org/apache/doris/common/Config.java  |  4 +--
 5 files changed, 55 insertions(+), 25 deletions(-)

diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md
index 21e9fe5623..13c6e8374f 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2084,11 +2084,11 @@ Whether to allow multiple replicas of the same tablet to be distributed on the s
 
 ### min_version_count_indicate_replica_compaction_too_slow
 
-Default: 300
+Default: 200
 
 Dynamically configured: true
 
-Only for Master FE: true
+Only for Master FE: false
 
 The version count threshold used to judge whether replica compaction is too slow
 
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 5aa42bd793..bd09d69e53 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2129,11 +2129,11 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
 
 ### `min_version_count_indicate_replica_compaction_too_slow`
 
-默认值:300
+默认值:200
 
 是否可以动态配置:true
 
-是否为 Master FE 节点独有的配置项:true
+是否为 Master FE 节点独有的配置项:false
 
 版本计数阈值,用来判断副本做 compaction 的速度是否太慢
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 79ec2d4107..4fe0c9854e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -131,6 +131,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
 
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
+    // save failed task after retry three times, tabletId -> agentTask
+    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
             long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
@@ -152,10 +154,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
         this.origStmt = origStmt;
     }
 
-    private RollupJobV2() {
-        super(JobType.ROLLUP);
-    }
-
     public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) {
         Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap
                 .computeIfAbsent(partitionId, k -> Maps.newHashMap());
@@ -425,13 +423,27 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
         } catch (MetaNotFoundException e) {
             throw new AlterCancelException(e.getMessage());
         }
-
         if (!rollupBatchTask.isFinished()) {
             LOG.info("rollup tasks not finished. job: {}", jobId);
             List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
             for (AgentTask task : tasks) {
                 if (task.getFailedTimes() >= 3) {
-                    throw new AlterCancelException("rollup task failed after try three times: " + task.getErrorMsg());
+                    task.setFinished(true);
+                    AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
+                    LOG.warn("rollup task failed after try three times: " + task.getErrorMsg());
+                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
+                        failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
+                    } else {
+                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    }
+                    int expectSucceedTaskNum = tbl.getPartitionInfo()
+                            .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+                    int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size();
+                    if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
+                        throw new AlterCancelException(
+                                "rollup tasks failed on same tablet reach threshold "
+                                        + failedAgentTasks.get(task.getTabletId()));
+                    }
                 }
             }
             return;
@@ -444,6 +456,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
         tbl.writeLockOrAlterCancelException();
         try {
             Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+            TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+            for (List<AgentTask> tasks : failedAgentTasks.values()) {
+                for (AgentTask task : tasks) {
+                    invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true);
+                }
+            }
             for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
                 long partitionId = entry.getKey();
                 Partition partition = tbl.getPartition(partitionId);
@@ -454,14 +472,12 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
                 long visiableVersion = partition.getVisibleVersion();
                 short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation(
                         partitionId).getTotalReplicaNum();
-
-
                 MaterializedIndex rollupIndex = entry.getValue();
                 for (Tablet rollupTablet : rollupIndex.getTablets()) {
                     List<Replica> replicas = rollupTablet.getReplicas();
                     int healthyReplicaNum = 0;
                     for (Replica replica : replicas) {
-                        if (replica.getLastFailedVersion() < 0
+                        if (!replica.isBad() && replica.getLastFailedVersion() < 0
                                 && replica.checkVersionCatchUp(visiableVersion, false)) {
                             healthyReplicaNum++;
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d7a0662bb2..1c112d6d28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -129,15 +129,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     // save all schema change tasks
     private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
+    // save failed task after retry three times, tabletId -> agentTask
+    private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
 
     public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
         super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
     }
 
-    private SchemaChangeJobV2() {
-        super(JobType.SCHEMA_CHANGE);
-    }
-
     public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) {
         Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, shadowIdxId);
         if (tabletMap == null) {
@@ -494,14 +492,25 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
             for (AgentTask task : tasks) {
                 if (task.getFailedTimes() >= 3) {
-                    throw new AlterCancelException("schema change task failed after try three times: "
-                            + task.getErrorMsg());
+                    task.setFinished(true);
+                    AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
+                    LOG.warn("schema change task failed after try three times: " + task.getErrorMsg());
+                    if (!failedAgentTasks.containsKey(task.getTabletId())) {
+                        failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
+                    } else {
+                        failedAgentTasks.get(task.getTabletId()).add(task);
+                    }
+                    int expectSucceedTaskNum = tbl.getPartitionInfo()
+                            .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+                    int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size();
+                    if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
+                        throw new AlterCancelException("schema change tasks failed on same tablet reach threshold "
+                                    + failedAgentTasks.get(task.getTabletId()));
+                    }
                 }
             }
             return;
         }
-
-
         /*
          * all tasks are finished. check the integrity.
          * we just check whether all new replicas are healthy.
@@ -509,7 +518,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         tbl.writeLockOrAlterCancelException();
         try {
             Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
-
+            TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+            for (List<AgentTask> tasks : failedAgentTasks.values()) {
+                for (AgentTask task : tasks) {
+                    invertedIndex.getReplica(task.getTabletId(), task.getBackendId()).setBad(true);
+                }
+            }
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
                 Preconditions.checkNotNull(partition, partitionId);
@@ -526,7 +540,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                         List<Replica> replicas = shadowTablet.getReplicas();
                         int healthyReplicaNum = 0;
                         for (Replica replica : replicas) {
-                            if (replica.getLastFailedVersion() < 0
+                            if (!replica.isBad() && replica.getLastFailedVersion() < 0
                                     && replica.checkVersionCatchUp(visiableVersion, false)) {
                                 healthyReplicaNum++;
                             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index c64de8ea6b..4f9231516d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1583,8 +1583,8 @@ public class Config extends ConfigBase {
     /**
      *  The version count threshold used to judge whether replica compaction is too slow
      */
-    @ConfField(mutable = true, masterOnly = true)
-    public static int min_version_count_indicate_replica_compaction_too_slow = 300;
+    @ConfField(mutable = true)
+    public static int min_version_count_indicate_replica_compaction_too_slow = 200;
 
     /**
      * The valid ratio threshold of the difference between the version count of the slowest replicaand the fastest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org