You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/20 14:06:45 UTC

[doris] 01/21: [Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b6ab4a2cd881d113fa5dcf606d52e8f6526da8ae
Author: yujun <yu...@gmail.com>
AuthorDate: Tue Jul 18 23:25:22 2023 +0800

    [Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856)
---
 docker/runtime/doris-compose/database.py           | 241 +++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  16 +-
 .../main/java/org/apache/doris/catalog/Tablet.java |  15 +-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  50 ++-
 .../clone/ColocateTableCheckerAndBalancer.java     |   5 +-
 .../org/apache/doris/clone/DiskRebalancer.java     |   4 +-
 .../apache/doris/clone/PartitionRebalancer.java    |   6 +-
 .../org/apache/doris/clone/SchedException.java     |  16 +
 .../java/org/apache/doris/clone/TabletChecker.java |   6 +-
 .../org/apache/doris/clone/TabletSchedCtx.java     | 245 +++++------
 .../org/apache/doris/clone/TabletScheduler.java    | 461 +++++++++++++--------
 .../common/proc/TabletSchedulerDetailProcDir.java  |  10 +-
 .../org/apache/doris/clone/TabletSchedCtxTest.java |  12 +-
 .../doris/cluster/DecommissionBackendTest.java     |   2 +-
 .../apache/doris/utframe/TestWithFeService.java    |   5 +-
 15 files changed, 726 insertions(+), 368 deletions(-)

diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py
new file mode 100644
index 0000000000..57fc90275a
--- /dev/null
+++ b/docker/runtime/doris-compose/database.py
@@ -0,0 +1,241 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cluster as CLUSTER
+import os.path
+import pymysql
+import time
+import utils
+
+LOG = utils.get_logger()
+
+
+class FEState(object):
+
+    def __init__(self, id, query_port, is_master, alive, last_heartbeat,
+                 err_msg):
+        self.id = id
+        self.query_port = query_port
+        self.is_master = is_master
+        self.alive = alive
+        self.last_heartbeat = last_heartbeat
+        self.err_msg = err_msg
+
+
+class BEState(object):
+
+    def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat,
+                 err_msg):
+        self.id = id
+        self.decommissioned = decommissioned
+        self.alive = alive
+        self.tablet_num = tablet_num
+        self.last_heartbeat = last_heartbeat
+        self.err_msg = err_msg
+
+
+class DBManager(object):
+
+    def __init__(self):
+        self.fe_states = {}
+        self.be_states = {}
+        self.query_port = -1
+        self.conn = None
+
+    def set_query_port(self, query_port):
+        self.query_port = query_port
+
+    def get_fe(self, id):
+        return self.fe_states.get(id, None)
+
+    def get_be(self, id):
+        return self.be_states.get(id, None)
+
+    def load_states(self, query_ports):
+        self._load_fe_states(query_ports)
+        self._load_be_states()
+
+    def drop_fe(self, fe_endpoint):
+        id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+            LOG.info("Drop fe {} with id {} from db succ.".format(
+                fe_endpoint, id))
+        except Exception as e:
+            if str(e).find("frontend does not exist") >= 0:
+                LOG.info(
+                    "Drop fe {} with id {} from db succ cause it does not exist in db."
+                    .format(fe_endpoint, id))
+                return
+            raise e
+
+    def drop_be(self, be_endpoint):
+        id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
+            LOG.info("Drop be {} with id {} from db succ.".format(
+                be_endpoint, id))
+        except Exception as e:
+            if str(e).find("backend does not exists") >= 0:
+                LOG.info(
+                    "Drop be {} with id {} from db succ cause it does not exist in db."
+                    .format(be_endpoint, id))
+                return
+            raise e
+
+    def decommission_be(self, be_endpoint):
+        old_tablet_num = 0
+        id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+        start_ts = time.time()
+        if id not in self.be_states:
+            self._load_be_states()
+        if id in self.be_states:
+            be = self.be_states[id]
+            old_tablet_num = be.tablet_num
+            if not be.alive:
+                raise Exception("Decommission be {} with id {} fail " \
+                        "cause it's not alive, maybe you should specific --drop-force " \
+                        " to dropp it from db".format(be_endpoint, id))
+        try:
+            self._exec_query(
+                "ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
+            LOG.info("Mark be {} with id {} as decommissioned, start migrate its tablets, " \
+                    "wait migrating job finish.".format(be_endpoint, id))
+        except Exception as e:
+            if str(e).find("Backend does not exist") >= 0:
+                LOG.info("Decommission be {} with id {} from db succ " \
+                        "cause it does not exist in db.",format(be_endpoint, id))
+                return
+            raise e
+
+        while True:
+            self._load_be_states()
+            be = self.be_states.get(id, None)
+            if not be:
+                LOG.info("Decommission be {} succ, total migrate {} tablets, " \
+                        "has drop it from db.".format(be_endpoint, old_tablet_num))
+                return
+            LOG.info(
+                    "Decommission be {} status: alive {}, decommissioned {}. " \
+                    "It is migrating its tablets, left {}/{} tablets. Time elapse {} s."
+                .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num,
+                        int(time.time() - start_ts)))
+
+            time.sleep(5)
+
+    def _load_fe_states(self, query_ports):
+        fe_states = {}
+        alive_master_fe_port = None
+        for record in self._exec_query("show frontends;"):
+            ip = record[1]
+            is_master = record[7] == "true"
+            alive = record[10] == "true"
+            last_heartbeat = record[12]
+            err_msg = record[14]
+            id = CLUSTER.Node.get_id_from_ip(ip)
+            query_port = query_ports.get(id, None)
+            fe = FEState(id, query_port, is_master, alive, last_heartbeat,
+                         err_msg)
+            fe_states[id] = fe
+            if is_master and alive and query_port:
+                alive_master_fe_port = query_port
+        self.fe_states = fe_states
+        if alive_master_fe_port and alive_master_fe_port != self.query_port:
+            self.query_port = alive_master_fe_port
+            self._reset_conn()
+
+    def _load_be_states(self):
+        be_states = {}
+        for record in self._exec_query("show backends;"):
+            ip = record[1]
+            last_heartbeat = record[7]
+            alive = record[8] == "true"
+            decommissioned = record[9] == "true"
+            tablet_num = int(record[10])
+            err_msg = record[18]
+            id = CLUSTER.Node.get_id_from_ip(ip)
+            be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat,
+                         err_msg)
+            be_states[id] = be
+        self.be_states = be_states
+
+    def _exec_query(self, sql):
+        self._prepare_conn()
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            return cursor.fetchall()
+
+    def _prepare_conn(self):
+        if self.conn:
+            return
+        if self.query_port <= 0:
+            raise Exception("Not set query_port")
+        self._reset_conn()
+
+    def _reset_conn(self):
+        self.conn = pymysql.connect(user="root",
+                                    host="127.0.0.1",
+                                    read_timeout=10,
+                                    port=self.query_port)
+
+
+def get_db_mgr(cluster_name, required_load_succ=True):
+    assert cluster_name
+    db_mgr = DBManager()
+    containers = utils.get_doris_containers(cluster_name).get(
+        cluster_name, None)
+    if not containers:
+        return db_mgr
+    alive_fe_ports = {}
+    for container in containers:
+        if utils.is_container_running(container):
+            _, node_type, id = utils.parse_service_name(container.name)
+            if node_type == CLUSTER.Node.TYPE_FE:
+                query_port = utils.get_map_ports(container).get(
+                    CLUSTER.FE_QUERY_PORT, None)
+                if query_port:
+                    alive_fe_ports[id] = query_port
+    if not alive_fe_ports:
+        return db_mgr
+
+    master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+                                     "master_fe_ip")
+    query_port = None
+    if os.path.exists(master_fe_ip_file):
+        with open(master_fe_ip_file, "r") as f:
+            master_fe_ip = f.read()
+            if master_fe_ip:
+                master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
+                query_port = alive_fe_ports.get(master_id, None)
+    if not query_port:
+        # A new cluster's master is fe-1
+        if 1 in alive_fe_ports:
+            query_port = alive_fe_ports[1]
+        else:
+            query_port = list(alive_fe_ports.values())[0]
+
+    db_mgr.set_query_port(query_port)
+    try:
+        db_mgr.load_states(alive_fe_ports)
+    except Exception as e:
+        if required_load_succ:
+            raise e
+        #LOG.exception(e)
+
+    return db_mgr
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 373cb777e4..4a7c9e929d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -904,7 +904,21 @@ public class Config extends ConfigBase {
      * the default slot number per path in tablet scheduler
      * TODO(cmy): remove this config and dynamically adjust it by clone task statistic
      */
-    @ConfField public static int schedule_slot_num_per_path = 2;
+    @ConfField(mutable = true, masterOnly = true)
+    public static int schedule_slot_num_per_path = 4;
+
+    /**
+     * the default slot number per path in tablet scheduler for decommission backend
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int schedule_decommission_slot_num_per_path = 8;
+
+    /**
+     * the default batch size in tablet scheduler for a single schedule.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int schedule_batch_size = 50;
+
 
     /**
      * Deprecated after 0.10
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 74f1c31cbf..0e93a4dd0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -703,11 +703,24 @@ public class Tablet extends MetaObject implements Writable {
      * NORMAL:  delay Config.tablet_repair_delay_factor_second * 2;
      * LOW:     delay Config.tablet_repair_delay_factor_second * 3;
      */
-    public boolean readyToBeRepaired(TabletSchedCtx.Priority priority) {
+    public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.Priority priority) {
         if (priority == Priority.VERY_HIGH) {
             return true;
         }
 
+        boolean allBeAliveOrDecommissioned = true;
+        for (Replica replica : replicas) {
+            Backend backend = infoService.getBackend(replica.getBackendId());
+            if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) {
+                allBeAliveOrDecommissioned = false;
+                break;
+            }
+        }
+
+        if (allBeAliveOrDecommissioned) {
+            return true;
+        }
+
         long currentTime = System.currentTimeMillis();
 
         // first check, wait for next round
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index ebbebe6806..5317725881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
@@ -166,7 +167,7 @@ public class BeLoadRebalancer extends Rebalancer {
                             System.currentTimeMillis());
                     tabletCtx.setTag(clusterStat.getTag());
                     // balance task's priority is always LOW
-                    tabletCtx.setOrigPriority(Priority.LOW);
+                    tabletCtx.setPriority(Priority.LOW);
 
                     alternativeTablets.add(tabletCtx);
                     if (--numOfLowPaths <= 0) {
@@ -262,7 +263,7 @@ public class BeLoadRebalancer extends Rebalancer {
         }
 
         // Select a low load backend as destination.
-        boolean setDest = false;
+        List<BackendLoadStatistic> candidates = Lists.newArrayList();
         for (BackendLoadStatistic beStat : lowBe) {
             if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) {
                 // check if on same host.
@@ -296,27 +297,36 @@ public class BeLoadRebalancer extends Rebalancer {
                     continue;
                 }
 
-                // classify the paths.
-                // And we only select path from 'low' and 'mid' paths
-                Set<Long> pathLow = Sets.newHashSet();
-                Set<Long> pathMid = Sets.newHashSet();
-                Set<Long> pathHigh = Sets.newHashSet();
-                beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
-                pathLow.addAll(pathMid);
-
-                long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
-                if (pathHash == -1) {
-                    LOG.debug("paths has no available balance slot: {}", pathLow);
-                } else {
-                    tabletCtx.setDest(beStat.getBeId(), pathHash);
-                    setDest = true;
-                    break;
-                }
+                candidates.add(beStat);
             }
         }
 
-        if (!setDest) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "unable to find low backend");
+        if (candidates.isEmpty()) {
+            throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend");
         }
+
+        for (BackendLoadStatistic beStat : candidates) {
+            PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
+            if (slot == null) {
+                continue;
+            }
+
+            // classify the paths.
+            // And we only select path from 'low' and 'mid' paths
+            Set<Long> pathLow = Sets.newHashSet();
+            Set<Long> pathMid = Sets.newHashSet();
+            Set<Long> pathHigh = Sets.newHashSet();
+            beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
+            pathLow.addAll(pathMid);
+
+            long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
+            if (pathHash != -1) {
+                tabletCtx.setDest(beStat.getBeId(), pathHash);
+                return;
+            }
+        }
+
+        throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                "unable to find low backend");
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index b3fdc3ca71..b34bc926dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -202,6 +202,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
      */
     private void matchGroup() {
         Env env = Env.getCurrentEnv();
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
         ColocateTableIndex colocateIndex = env.getColocateTableIndex();
         TabletScheduler tabletScheduler = env.getTabletScheduler();
 
@@ -254,7 +255,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                                             + " status: %s", tablet.getId(), st);
                                     LOG.debug(unstableReason);
 
-                                    if (!tablet.readyToBeRepaired(Priority.NORMAL)) {
+                                    if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
                                         continue;
                                     }
 
@@ -265,7 +266,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
                                             System.currentTimeMillis());
                                     // the tablet status will be set again when being scheduled
                                     tabletCtx.setTabletStatus(st);
-                                    tabletCtx.setOrigPriority(Priority.NORMAL);
+                                    tabletCtx.setPriority(Priority.NORMAL);
                                     tabletCtx.setTabletOrderIdx(idx);
 
                                     AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 9d676d950c..abac0c2d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -208,10 +208,10 @@ public class DiskRebalancer extends Rebalancer {
                     tabletCtx.setTag(clusterStat.getTag());
                     if (prioBackends.containsKey(beStat.getBeId())) {
                         // priority of balance task of prio BE is NORMAL
-                        tabletCtx.setOrigPriority(Priority.NORMAL);
+                        tabletCtx.setPriority(Priority.NORMAL);
                     } else {
                         // balance task's default priority is LOW
-                        tabletCtx.setOrigPriority(Priority.LOW);
+                        tabletCtx.setPriority(Priority.LOW);
                     }
                     // we must set balanceType to DISK_BALANCE for create migration task
                     tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 9e3e37ef00..d9d3f27cc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -151,7 +151,7 @@ public class PartitionRebalancer extends Rebalancer {
                     System.currentTimeMillis());
             tabletCtx.setTag(clusterStat.getTag());
             // Balance task's priority is always LOW
-            tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+            tabletCtx.setPriority(TabletSchedCtx.Priority.LOW);
             alternativeTablets.add(tabletCtx);
             // Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
             movesInProgress.get().put(pickedTabletId,
@@ -251,7 +251,7 @@ public class PartitionRebalancer extends Rebalancer {
             if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
                 tabletCtx.setSrc(srcReplica);
             } else {
-                throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+                throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
                         "no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
             }
 
@@ -269,7 +269,7 @@ public class PartitionRebalancer extends Rebalancer {
                     .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
             long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
             if (pathHash == -1) {
-                throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+                throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
                         "paths has no available balance slot: " + availPath);
             } else {
                 tabletCtx.setDest(beStat.getBeId(), pathHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
index 0ad83e8909..a343e6543c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
@@ -27,14 +27,30 @@ public class SchedException extends Exception {
         FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED
     }
 
+    public enum SubCode {
+        NONE,
+        WAITING_DECOMMISSION,
+        WAITING_SLOT,
+    }
+
     private Status status;
+    private SubCode subCode;
 
     public SchedException(Status status, String errorMsg) {
+        this(status, SubCode.NONE, errorMsg);
+    }
+
+    public SchedException(Status status, SubCode subCode, String errorMsg) {
         super(errorMsg);
         this.status = status;
+        this.subCode = subCode;
     }
 
     public Status getStatus() {
         return status;
     }
+
+    public SubCode getSubCode() {
+        return subCode;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 67226eae67..d9330e0f16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -372,9 +372,7 @@ public class TabletChecker extends MasterDaemon {
                 }
 
                 counter.unhealthyTabletNum++;
-
-                if (!tablet.readyToBeRepaired(statusWithPrio.second)) {
-                    counter.tabletNotReady++;
+                if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) {
                     continue;
                 }
 
@@ -386,7 +384,7 @@ public class TabletChecker extends MasterDaemon {
                         System.currentTimeMillis());
                 // the tablet status will be set again when being scheduled
                 tabletCtx.setTabletStatus(statusWithPrio.first);
-                tabletCtx.setOrigPriority(statusWithPrio.second);
+                tabletCtx.setPriority(statusWithPrio.second);
 
                 AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
                 if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 58c411c40f..286b70f65e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Tablet.TabletStatus;
 import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -102,6 +103,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
      */
     private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
 
+    public static final int FINISHED_COUNTER_THRESHOLD = 3;
+
     private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();
 
     public enum Type {
@@ -117,22 +120,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         NORMAL,
         HIGH,
         VERY_HIGH;
-
-        // VERY_HIGH can only be downgraded to NORMAL
-        // LOW can only be upgraded to HIGH
-        public Priority adjust(Priority origPriority, boolean isUp) {
-            switch (this) {
-                case VERY_HIGH:
-                    return isUp ? VERY_HIGH : HIGH;
-                case HIGH:
-                    return isUp ? (origPriority == LOW ? HIGH : VERY_HIGH) : NORMAL;
-                case NORMAL:
-                    return isUp ? HIGH : (origPriority == Priority.VERY_HIGH ? NORMAL : LOW);
-                default:
-                    return isUp ? NORMAL : LOW;
-            }
-        }
-
     }
 
     public enum State {
@@ -147,23 +134,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
     private Type type;
     private BalanceType balanceType;
 
-    /*
-     * origPriority is the origin priority being set when this tablet being added to scheduler.
-     * dynamicPriority will be set during tablet schedule processing, it will not be prior than origin priority.
-     * And dynamic priority is also used in priority queue compare in tablet scheduler.
-     */
-    private Priority origPriority;
-    private Priority dynamicPriority;
+    private Priority priority;
 
     // we change the dynamic priority based on how many times it fails to be scheduled
     private int failedSchedCounter = 0;
     // clone task failed counter
     private int failedRunningCounter = 0;
+    // When finish a tablet ctx, it will check the tablet's health status.
+    // If the tablet is unhealthy, it will add a new ctx.
+    // The new ctx's finishedCounter = old ctx's finishedCounter + 1.
+    private int finishedCounter = 0;
 
     // last time this tablet being scheduled
     private long lastSchedTime = 0;
-    // last time the dynamic priority being adjusted
-    private long lastAdjustPrioTime = 0;
 
     // last time this tablet being visited.
     // being visited means:
@@ -180,6 +163,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
     private State state;
     private TabletStatus tabletStatus;
 
+    private long decommissionTime = -1;
+
     private long dbId;
     private long tblId;
     private long partitionId;
@@ -223,6 +208,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
     // tag is only set for BALANCE task, used to identify which workload group this Balance job is in
     private Tag tag;
 
+    private SubCode schedFailedCode;
+
     public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
             long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) {
         this.type = type;
@@ -236,12 +223,17 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         this.state = State.PENDING;
         this.replicaAlloc = replicaAlloc;
         this.balanceType = BalanceType.BE_BALANCE;
+        this.schedFailedCode = SubCode.NONE;
     }
 
     public ReplicaAllocation getReplicaAlloc() {
         return replicaAlloc;
     }
 
+    public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+        this.replicaAlloc = replicaAlloc;
+    }
+
     public void setTag(Tag tag) {
         this.tag = tag;
     }
@@ -266,21 +258,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         return balanceType;
     }
 
-    public Priority getOrigPriority() {
-        return origPriority;
+    public Priority getPriority() {
+        return priority;
+    }
+
+    public void setPriority(Priority priority) {
+        this.priority = priority;
     }
 
-    public void setOrigPriority(Priority origPriority) {
-        this.origPriority = origPriority;
-        // reset dynamic priority along with the origin priority being set.
-        this.dynamicPriority = origPriority;
-        this.failedSchedCounter = 0;
-        this.lastSchedTime = 0;
-        this.lastAdjustPrioTime = 0;
+    public int getFinishedCounter() {
+        return finishedCounter;
     }
 
-    public Priority getDynamicPriority() {
-        return dynamicPriority;
+    public void setFinishedCounter(int finishedCounter) {
+        this.finishedCounter = finishedCounter;
     }
 
     public void increaseFailedSchedCounter() {
@@ -295,8 +286,27 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         ++failedRunningCounter;
     }
 
-    public int getFailedRunningCounter() {
-        return failedRunningCounter;
+    public boolean isExceedFailedRunningLimit() {
+        return failedRunningCounter >= RUNNING_FAILED_COUNTER_THRESHOLD;
+    }
+
+    public boolean onSchedFailedAndCheckExceedLimit(SubCode code) {
+        schedFailedCode = code;
+        failedSchedCounter++;
+        if (code == SubCode.WAITING_DECOMMISSION) {
+            failedSchedCounter = 0;
+            if (decommissionTime < 0) {
+                decommissionTime = System.currentTimeMillis();
+            }
+            return System.currentTimeMillis() > decommissionTime + 10 * 60 * 1000L;
+        } else {
+            decommissionTime = -1;
+            if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
+                return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS;
+            } else {
+                return failedSchedCounter > 10;
+            }
+        }
     }
 
     public void setLastSchedTime(long lastSchedTime) {
@@ -311,6 +321,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         this.finishedTime = finishedTime;
     }
 
+    public void setDecommissionTime(long decommissionTime) {
+        this.decommissionTime = decommissionTime;
+    }
+
     public State getState() {
         return state;
     }
@@ -615,7 +629,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             setSrc(srcReplica);
             return;
         }
-        throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
+        throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                "unable to find source slot");
     }
 
     /*
@@ -641,7 +656,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
      */
     public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWorkingSlots)
             throws SchedException {
-        Replica chosenReplica = null;
+        List<Replica> candidates = Lists.newArrayList();
         for (Replica replica : tablet.getReplicas()) {
             if (replica.isBad()) {
                 LOG.debug("replica {} is bad, skip. tablet: {}",
@@ -660,18 +675,37 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             // if the replica's state is DECOMMISSION, it may be chose as dest replica,
             // and its state will be set to NORMAL later.
             if (replica.getLastFailedVersion() <= 0
-                    && ((replica.getVersion() == visibleVersion)
-                    || replica.getVersion() > visibleVersion) && replica.getState() != ReplicaState.DECOMMISSION) {
+                    && replica.getVersion() >= visibleVersion
+                    && replica.getState() != ReplicaState.DECOMMISSION) {
                 // skip healthy replica
                 LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}",
                         replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId);
                 continue;
             }
 
+            candidates.add(replica);
+        }
+
+        if (candidates.isEmpty()) {
+            throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
+        }
+
+        Replica chosenReplica = null;
+        for (Replica replica : candidates) {
+            PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
+            if (slot == null || !slot.hasAvailableSlot(replica.getPathHash())) {
+                if (!replica.needFurtherRepair()) {
+                    throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                            "replica " + replica + " has not slot");
+                }
+
+                continue;
+            }
+
             if (replica.needFurtherRepair()) {
+                chosenReplica = replica;
                 LOG.debug("replica {} need further repair, choose it. tablet: {}",
                         replica.getId(), tabletId);
-                chosenReplica = replica;
                 break;
             }
 
@@ -686,20 +720,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             }
         }
 
-        if (chosenReplica == null) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "unable to choose dest replica");
-        }
-
         // check if the dest replica has available slot
+        // it should not happen cause it just check hasAvailableSlot yet.
         PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId());
         if (slot == null) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "backend of dest replica is missing");
+            throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                    "backend of dest replica is missing");
         }
-
         long destPathHash = slot.takeSlot(chosenReplica.getPathHash());
         if (destPathHash == -1) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
+            throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                    "unable to take slot of dest path");
         }
+
         if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
             // Since this replica is selected as the repair object of VERSION_INCOMPLETE,
             // it means that this replica needs to be able to accept loading data.
@@ -717,6 +750,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             // forever, because the replica in the DECOMMISSION state will not receive the load task.
             chosenReplica.setWatermarkTxnId(-1);
             chosenReplica.setState(ReplicaState.NORMAL);
+            setDecommissionTime(-1);
             LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete,"
                     + " and change state from DECOMMISSION to NORMAL",
                     chosenReplica.getId(), chosenReplica.getBackendId(), tabletId);
@@ -941,7 +975,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
                     cloneTask.getDbId(), cloneTask.getTableId(), cloneTask.getPartitionId(),
                     cloneTask.getIndexId(), cloneTask.getTabletId(), cloneTask.getBackendId(),
                     dbId, tblId, partitionId, indexId, tablet.getId(), destBackendId);
-            throw new SchedException(Status.RUNNING_FAILED, msg);
+            throw new SchedException(Status.UNRECOVERABLE, msg);
         }
 
         // 1. check the tablet status first
@@ -1041,13 +1075,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
 
             state = State.FINISHED;
             LOG.info("clone finished: {}", this);
-        } catch (SchedException e) {
-            // if failed to too many times, remove this task
-            ++failedRunningCounter;
-            if (failedRunningCounter > RUNNING_FAILED_COUNTER_THRESHOLD) {
-                throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
-            }
-            throw e;
         } finally {
             olapTable.writeUnlock();
         }
@@ -1061,73 +1088,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         }
     }
 
-    /*
-     * we try to adjust the priority based on schedule history
-     * 1. If failed counter is larger than FAILED_COUNTER_THRESHOLD, which means this tablet is being scheduled
-     *    at least FAILED_TIME_THRESHOLD times and all are failed. So we downgrade its priority.
-     *    Also reset the failedCounter, or it will be downgraded forever.
-     *
-     * 2. Else, if it has been a long time since last time the tablet being scheduled, we upgrade its
-     *    priority to let it more available to be scheduled.
-     *
-     * The time gap between adjustment should be larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, to avoid
-     * being downgraded too fast.
-     *
-     * eg:
-     *    A tablet has been scheduled for 5 times and all were failed. its priority will be downgraded. And if it is
-     *    scheduled for 5 times and all are failed again, it will be downgraded again, until to the LOW.
-     *    And than, because of LOW, this tablet can not be scheduled for a long time, and it will be upgraded
-     *    to NORMAL, if still not being scheduled, it will be upgraded up to VERY_HIGH.
-     *
-     * return true if dynamic priority changed
-     */
-    public boolean adjustPriority(TabletSchedulerStat stat) {
-        long currentTime = System.currentTimeMillis();
-        if (lastAdjustPrioTime == 0) {
-            // skip the first time we adjust this priority
-            lastAdjustPrioTime = currentTime;
-            return false;
-        } else {
-            if (currentTime - lastAdjustPrioTime < MIN_ADJUST_PRIORITY_INTERVAL_MS) {
-                return false;
-            }
-        }
-
-        boolean isDowngrade = false;
-        boolean isUpgrade = false;
-
-        if (failedSchedCounter > SCHED_FAILED_COUNTER_THRESHOLD) {
-            isDowngrade = true;
-        } else {
-            long lastTime = lastSchedTime == 0 ? createTime : lastSchedTime;
-            if (currentTime - lastTime > MAX_NOT_BEING_SCHEDULED_INTERVAL_MS) {
-                isUpgrade = true;
-            }
-        }
-
-        Priority originDynamicPriority = dynamicPriority;
-        if (isDowngrade) {
-            dynamicPriority = dynamicPriority.adjust(origPriority, false /* downgrade */);
-            failedSchedCounter = 0;
-            if (originDynamicPriority != dynamicPriority) {
-                LOG.debug("downgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
-                        originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
-                stat.counterTabletPrioDowngraded.incrementAndGet();
-                return true;
-            }
-        } else if (isUpgrade) {
-            dynamicPriority = dynamicPriority.adjust(origPriority, true /* upgrade */);
-            // no need to set lastSchedTime, lastSchedTime is set each time we schedule this tablet
-            if (originDynamicPriority != dynamicPriority) {
-                LOG.debug("upgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
-                        originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
-                stat.counterTabletPrioUpgraded.incrementAndGet();
-                return true;
-            }
-        }
-        return false;
-    }
-
     public boolean isTimeout() {
         if (state != TabletSchedCtx.State.RUNNING) {
             return false;
@@ -1144,8 +1104,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name());
         result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name());
         result.add(state.name());
-        result.add(origPriority.name());
-        result.add(dynamicPriority.name());
+        result.add(schedFailedCode.name());
+        result.add(priority.name());
         result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId()));
         result.add(String.valueOf(srcPathHash));
         result.add(String.valueOf(destBackendId));
@@ -1158,7 +1118,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string);
         result.add(String.valueOf(failedSchedCounter));
         result.add(String.valueOf(failedRunningCounter));
-        result.add(TimeUtils.longToTimeString(lastAdjustPrioTime));
         result.add(String.valueOf(visibleVersion));
         result.add(String.valueOf(committedVersion));
         result.add(Strings.nullToEmpty(errMsg));
@@ -1171,19 +1130,23 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
      */
     @Override
     public int compareTo(TabletSchedCtx o) {
-        if (dynamicPriority.ordinal() < o.dynamicPriority.ordinal()) {
-            return 1;
-        } else if (dynamicPriority.ordinal() > o.dynamicPriority.ordinal()) {
-            return -1;
-        } else {
-            if (lastVisitedTime < o.lastVisitedTime) {
-                return -1;
-            } else if (lastVisitedTime > o.lastVisitedTime) {
-                return 1;
-            } else {
-                return 0;
-            }
+        return Long.compare(getCompareValue(), o.getCompareValue());
+    }
+
+    private long getCompareValue() {
+        long value = createTime;
+        if (lastVisitedTime > 0) {
+            value = lastVisitedTime;
         }
+
+        value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60  * 1000L;
+        value += 5000L * (failedSchedCounter / 10);
+
+        if (type == Type.BALANCE) {
+            value += 30 * 60 * 1000L;
+        }
+
+        return value;
     }
 
     @Override
@@ -1227,6 +1190,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
      * call this when releaseTabletCtx()
      */
     public void resetReplicaState() {
+        setDecommissionTime(-1);
         if (tablet != null) {
             for (Replica replica : tablet.getReplicas()) {
                 // To address issue: https://github.com/apache/doris/issues/9422
@@ -1243,5 +1207,4 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             }
         }
     }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index dbf3d46237..aaaac36a24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -32,10 +32,12 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Tablet.TabletStatus;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletSchedCtx.Type;
 import org.apache.doris.common.AnalysisException;
@@ -59,6 +61,7 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.EvictingQueue;
 import com.google.common.collect.ImmutableMap;
@@ -100,21 +103,21 @@ public class TabletScheduler extends MasterDaemon {
     // the minimum interval of updating cluster statistics and priority of tablet info
     private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
 
-    private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
+    public static final long SCHEDULE_INTERVAL_MS = 100;
 
     /*
-     * Tablet is added to pendingTablets as well it's id in allTabletIds.
-     * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when
+     * Tablet is added to pendingTablets as well it's id in allTabletTypes.
+     * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when
      * handling a tablet.
      * Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished).
-     * So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler.
+     * So if a tablet's id is still in allTabletTypes, TabletChecker can not add tablet to TabletScheduler.
      *
-     * pendingTablets + runningTablets = allTabletIds
+     * pendingTablets + runningTablets = allTabletTypes
      *
-     * pendingTablets, allTabletIds, runningTablets and schedHistory are protected by 'synchronized'
+     * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized'
      */
     private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
-    private Set<Long> allTabletIds = Sets.newHashSet();
+    private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
     // contains all tabletCtxs which state are RUNNING
     private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
     // save the latest 1000 scheduled tablet info
@@ -128,6 +131,8 @@ public class TabletScheduler extends MasterDaemon {
 
     private long lastSlotAdjustTime = 0;
 
+    private long lastCheckTimeoutTime = 0;
+
     private Env env;
     private SystemInfoService infoService;
     private TabletInvertedIndex invertedIndex;
@@ -175,7 +180,8 @@ public class TabletScheduler extends MasterDaemon {
                 // when upgrading, backend may not get path info yet. so return false and wait for next round.
                 // and we should check if backend is alive. If backend is dead when upgrading, this backend
                 // will never report its path hash, and tablet scheduler is blocked.
-                LOG.info("not all backends have path info");
+                LOG.info("backend {}:{} with id {} doesn't have path info.", backend.getHost(),
+                        backend.getBePort(), backend.getId());
                 return false;
             }
         }
@@ -204,7 +210,7 @@ public class TabletScheduler extends MasterDaemon {
             if (!backendsWorkingSlots.containsKey(be.getId())) {
                 List<Long> pathHashes = be.getDisks().values().stream()
                         .map(DiskInfo::getPathHash).collect(Collectors.toList());
-                PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path);
+                PathSlot slot = new PathSlot(pathHashes, be.getId());
                 backendsWorkingSlots.put(be.getId(), slot);
                 LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size());
             }
@@ -225,7 +231,18 @@ public class TabletScheduler extends MasterDaemon {
         if (!force && Config.disable_tablet_scheduler) {
             return AddResult.DISABLED;
         }
-        if (!force && containsTablet(tablet.getTabletId())) {
+
+        // REPAIR has higher priority than BALANCE.
+        // Suppose adding a BALANCE tablet successfully, then adding this tablet's REPAIR ctx will fail.
+        // But we set allTabletTypes[tabletId] to REPAIR. Later at the beginning of scheduling this tablet,
+        // it will reset its type as allTabletTypes[tabletId], so its type will convert to REPAIR.
+
+        long tabletId = tablet.getTabletId();
+        boolean contains = allTabletTypes.containsKey(tabletId);
+        if (contains && !force) {
+            if (tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+                allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR);
+            }
             return AddResult.ALREADY_IN;
         }
 
@@ -238,13 +255,22 @@ public class TabletScheduler extends MasterDaemon {
             return AddResult.LIMIT_EXCEED;
         }
 
-        allTabletIds.add(tablet.getTabletId());
+        if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+            allTabletTypes.put(tabletId, tablet.getType());
+        }
+
         pendingTablets.offer(tablet);
+        if (!contains) {
+            LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}",
+                    tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(),
+                    tablet.getPriority());
+        }
+
         return AddResult.ADDED;
     }
 
     public synchronized boolean containsTablet(long tabletId) {
-        return allTabletIds.contains(tabletId);
+        return allTabletTypes.containsKey(tabletId);
     }
 
     public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
@@ -263,7 +289,7 @@ public class TabletScheduler extends MasterDaemon {
         for (TabletSchedCtx tabletCtx : pendingTablets) {
             if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
                     && partitionIds.contains(tabletCtx.getPartitionId())) {
-                tabletCtx.setOrigPriority(Priority.VERY_HIGH);
+                tabletCtx.setPriority(Priority.VERY_HIGH);
             }
             newPendingTablets.add(tabletCtx);
         }
@@ -293,14 +319,15 @@ public class TabletScheduler extends MasterDaemon {
             return;
         }
 
-        updateLoadStatisticsAndPriorityIfNecessary();
+        if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
+            updateLoadStatisticsAndPriorityIfNecessary();
+            handleRunningTablets();
+            selectTabletsForBalance();
+            lastCheckTimeoutTime = System.currentTimeMillis();
+        }
 
         schedulePendingTablets();
 
-        handleRunningTablets();
-
-        selectTabletsForBalance();
-
         stat.counterTabletScheduleRound.incrementAndGet();
     }
 
@@ -314,8 +341,6 @@ public class TabletScheduler extends MasterDaemon {
         rebalancer.updateLoadStatistic(statisticMap);
         diskRebalancer.updateLoadStatistic(statisticMap);
 
-        adjustPriorities();
-
         lastStatUpdateTime = System.currentTimeMillis();
     }
 
@@ -332,7 +357,7 @@ public class TabletScheduler extends MasterDaemon {
             LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex);
             loadStatistic.init();
             newStatisticMap.put(tag, loadStatistic);
-            LOG.debug("update load statistic:\n{}", loadStatistic.getBrief());
+            LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief());
         }
 
         this.statisticMap = newStatisticMap;
@@ -342,28 +367,6 @@ public class TabletScheduler extends MasterDaemon {
         return statisticMap;
     }
 
-    /**
-     * adjust priorities of all tablet infos
-     */
-    private synchronized void adjustPriorities() {
-        int size = pendingTablets.size();
-        int changedNum = 0;
-        TabletSchedCtx tabletCtx;
-        for (int i = 0; i < size; i++) {
-            tabletCtx = pendingTablets.poll();
-            if (tabletCtx == null) {
-                break;
-            }
-
-            if (tabletCtx.adjustPriority(stat)) {
-                changedNum++;
-            }
-            pendingTablets.add(tabletCtx);
-        }
-
-        LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
-    }
-
     /**
      * get at most BATCH_NUM tablets from queue, and try to schedule them.
      * After handle, the tablet info should be
@@ -371,7 +374,7 @@ public class TabletScheduler extends MasterDaemon {
      * 2. or in schedHistory with state CANCELLING, if some unrecoverable error happens.
      * 3. or in pendingTablets with state PENDING, if failed to be scheduled.
      *
-     * if in schedHistory, it should be removed from allTabletIds.
+     * if in schedHistory, it should be removed from allTabletTypes.
      */
     private void schedulePendingTablets() {
         long start = System.currentTimeMillis();
@@ -385,36 +388,25 @@ public class TabletScheduler extends MasterDaemon {
                     // do not schedule more tablet is tablet scheduler is disabled.
                     throw new SchedException(Status.FINISHED, "tablet scheduler is disabled");
                 }
+                if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
+                    finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+                            "config disable balance");
+                    continue;
+                }
                 scheduleTablet(tabletCtx, batchTask);
             } catch (SchedException e) {
-                tabletCtx.increaseFailedSchedCounter();
                 tabletCtx.setErrMsg(e.getMessage());
-
                 if (e.getStatus() == Status.SCHEDULE_FAILED) {
-                    if (tabletCtx.getType() == Type.BALANCE) {
-                        // if balance is disabled, remove this tablet
-                        if (Config.disable_balance) {
-                            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
-                                    "disable balance and " + e.getMessage());
-                        } else {
-                            // remove the balance task if it fails to be scheduled many times
-                            if (tabletCtx.getFailedSchedCounter() > 10) {
-                                finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
-                                        "schedule failed too many times and " + e.getMessage());
-                            } else {
-                                // we must release resource it current hold, and be scheduled again
-                                tabletCtx.releaseResource(this);
-                                // adjust priority to avoid some higher priority always be the first in pendingTablets
-                                stat.counterTabletScheduledFailed.incrementAndGet();
-                                dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
-                            }
-                        }
+                    boolean isExceedLimit = tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode());
+                    if (isExceedLimit) {
+                        finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
+                                "schedule failed too many times and " + e.getMessage());
                     } else {
                         // we must release resource it current hold, and be scheduled again
                         tabletCtx.releaseResource(this);
                         // adjust priority to avoid some higher priority always be the first in pendingTablets
                         stat.counterTabletScheduledFailed.incrementAndGet();
-                        dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
+                        addBackToPendingTablets(tabletCtx);
                     }
                 } else if (e.getStatus() == Status.FINISHED) {
                     // schedule redundant tablet or scheduler disabled will throw this exception
@@ -485,6 +477,8 @@ public class TabletScheduler extends MasterDaemon {
         tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
                 + tbl.getName() + " does not exist"));
         try {
+            long tabletId = tabletCtx.getTabletId();
+
             boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
 
             OlapTableState tableState = tbl.getState();
@@ -499,8 +493,9 @@ public class TabletScheduler extends MasterDaemon {
                 throw new SchedException(Status.UNRECOVERABLE, "index does not exist");
             }
 
-            Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+            Tablet tablet = idx.getTablet(tabletId);
             Preconditions.checkNotNull(tablet);
+            ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
 
             if (isColocateTable) {
                 GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
@@ -516,18 +511,26 @@ public class TabletScheduler extends MasterDaemon {
 
                 Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
                 TabletStatus st = tablet.getColocateHealthStatus(
-                        partition.getVisibleVersion(),
-                        tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
-                        backendsSet);
+                        partition.getVisibleVersion(), replicaAlloc, backendsSet);
                 statusPair = Pair.of(st, Priority.HIGH);
                 tabletCtx.setColocateGroupBackendIds(backendsSet);
             } else {
                 List<Long> aliveBeIds = infoService.getAllBackendIds(true);
                 statusPair = tablet.getHealthStatusWithPriority(
-                        infoService,
-                        partition.getVisibleVersion(),
-                        tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
-                        aliveBeIds);
+                        infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
+            }
+
+            if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
+                TabletSchedCtx.Type curType = tabletCtx.getType();
+                TabletSchedCtx.Type newType = allTabletTypes.get(tabletId);
+                if (curType == TabletSchedCtx.Type.BALANCE && newType == TabletSchedCtx.Type.REPAIR) {
+                    tabletCtx.setType(newType);
+                    tabletCtx.setReplicaAlloc(replicaAlloc);
+                    tabletCtx.setTag(null);
+                } else {
+                    throw new SchedException(Status.UNRECOVERABLE, "can not convert type of tablet "
+                            + tabletId + " from " + curType.name() + " to " + newType.name());
+                }
             }
 
             if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) {
@@ -732,11 +735,11 @@ public class TabletScheduler extends MasterDaemon {
     private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
             throws SchedException {
         stat.counterReplicaVersionMissingErr.incrementAndGet();
-
         try {
             tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
         } catch (SchedException e) {
-            if (e.getMessage().equals("unable to choose dest replica")) {
+            // could not find dest, try add a missing.
+            if (e.getStatus() == Status.UNRECOVERABLE) {
                 // This situation may occur when the BE nodes
                 // where all replicas of a tablet are located are decommission,
                 // and this task is a VERSION_INCOMPLETE task.
@@ -779,25 +782,7 @@ public class TabletScheduler extends MasterDaemon {
     private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
             throws SchedException {
         stat.counterReplicaUnavailableErr.incrementAndGet();
-        try {
-            handleReplicaVersionIncomplete(tabletCtx, batchTask);
-            LOG.debug("succeed to find version incomplete replica from tablet relocating. tablet id: {}",
-                    tabletCtx.getTabletId());
-        } catch (SchedException e) {
-            if (e.getStatus() == Status.SCHEDULE_FAILED) {
-                LOG.debug("failed to find version incomplete replica from tablet relocating. tablet id: {}, "
-                        + "try to find a new backend", tabletCtx.getTabletId());
-                // the dest or src slot may be taken after calling handleReplicaVersionIncomplete(),
-                // so we need to release these slots first.
-                // and reserve the tablet in TabletSchedCtx so that it can continue to be scheduled.
-                tabletCtx.releaseResource(this, true);
-                tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING);
-                handleReplicaMissing(tabletCtx, batchTask);
-                LOG.debug("succeed to find new backend for tablet relocating. tablet id: {}", tabletCtx.getTabletId());
-            } else {
-                throw e;
-            }
-        }
+        handleReplicaVersionIncomplete(tabletCtx, batchTask);
     }
 
     /**
@@ -831,7 +816,7 @@ public class TabletScheduler extends MasterDaemon {
             // to remove this tablet from the pendingTablets(consider it as finished)
             throw new SchedException(Status.FINISHED, "redundant replica is deleted");
         }
-        throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas");
+        throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
     }
 
     private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
@@ -1035,7 +1020,7 @@ public class TabletScheduler extends MasterDaemon {
             deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false);
             throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted");
         }
-        throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any colocate redundant replicas");
+        throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas");
     }
 
     /**
@@ -1102,16 +1087,17 @@ public class TabletScheduler extends MasterDaemon {
             replica.setWatermarkTxnId(nextTxnId);
             replica.setState(ReplicaState.DECOMMISSION);
             // set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task.
-            tabletCtx.setOrigPriority(Priority.NORMAL);
+            tabletCtx.setPriority(Priority.NORMAL);
             LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
                     replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
-            throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
+            throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
+                    "set watermark txn " + nextTxnId);
         } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
             long watermarkTxnId = replica.getWatermarkTxnId();
             try {
                 if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
                         tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
-                    throw new SchedException(Status.SCHEDULE_FAILED,
+                    throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
                             "wait txn before " + watermarkTxnId + " to be finished");
                 }
             } catch (AnalysisException e) {
@@ -1225,7 +1211,7 @@ public class TabletScheduler extends MasterDaemon {
         }
         for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
             // add if task from prio backend or cluster is balanced
-            if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) {
+            if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
                 addTablet(tabletCtx, false);
             }
         }
@@ -1260,7 +1246,8 @@ public class TabletScheduler extends MasterDaemon {
             LoadStatisticForTag statistic = statisticMap.get(tag);
             if (statistic == null) {
                 throw new SchedException(Status.UNRECOVERABLE,
-                        String.format("tag %s does not exist.", tag));
+                        String.format("tag %s does not exist. available tags: %s", tag,
+                            Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray())));
             }
             beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */);
         } else {
@@ -1336,7 +1323,7 @@ public class TabletScheduler extends MasterDaemon {
         }
 
         if (allFitPaths.isEmpty()) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path for new replica");
+            throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
         }
 
         // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
@@ -1390,25 +1377,109 @@ public class TabletScheduler extends MasterDaemon {
             return rootPathLoadStatistic;
         }
 
-        throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path which can be fit in");
+        throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path which can be fit in");
     }
 
-    /**
-     * For some reason, a tablet info failed to be scheduled this time,
-     * So we dynamically change its priority and add back to queue, waiting for next round.
-     */
-    private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) {
+
+    private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
         Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
-        tabletCtx.adjustPriority(stat);
         addTablet(tabletCtx, true /* force */);
     }
 
+
     private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
         // use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock)
         // remove the tablet ctx, so that no other process can see it
         removeTabletCtx(tabletCtx, reason);
         // release resources taken by tablet ctx
         releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE);
+
+        // if check immediately, then no need to wait TabletChecker's 20s
+        if (state == TabletSchedCtx.State.FINISHED) {
+            tryAddAfterFinished(tabletCtx);
+        }
+    }
+
+    private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
+        int finishedCounter = tabletCtx.getFinishedCounter();
+        finishedCounter++;
+        tabletCtx.setFinishedCounter(finishedCounter);
+        if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) {
+            return;
+        }
+
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId());
+        if (db == null) {
+            return;
+        }
+        OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId());
+        if (tbl == null) {
+            return;
+        }
+        Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
+        ReplicaAllocation replicaAlloc = null;
+        tbl.readLock();
+        try {
+            Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
+            if (partition == null) {
+                return;
+            }
+
+            MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
+            if (idx == null) {
+                return;
+            }
+
+            Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+            if (tablet == null) {
+                return;
+            }
+
+            replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+            boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
+            if (isColocateTable) {
+                GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
+                if (groupId == null) {
+                    return;
+                }
+
+                int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
+                if (tabletOrderIdx == -1) {
+                    tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
+                }
+                Preconditions.checkState(tabletOrderIdx != -1);
+
+                Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+                TabletStatus st = tablet.getColocateHealthStatus(
+                        partition.getVisibleVersion(), replicaAlloc, backendsSet);
+                statusPair = Pair.of(st, Priority.HIGH);
+            } else {
+                List<Long> aliveBeIds = infoService.getAllBackendIds(true);
+                statusPair = tablet.getHealthStatusWithPriority(
+                        infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
+
+                if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) {
+                    statusPair.second = tabletCtx.getPriority();
+                }
+            }
+        } finally {
+            tbl.readUnlock();
+        }
+
+        if (statusPair.first == TabletStatus.HEALTHY) {
+            return;
+        }
+
+        TabletSchedCtx newTabletCtx = new TabletSchedCtx(
+                TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(),
+                tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(),
+                replicaAlloc, System.currentTimeMillis());
+
+        newTabletCtx.setTabletStatus(statusPair.first);
+        newTabletCtx.setPriority(statusPair.second);
+        newTabletCtx.setFinishedCounter(finishedCounter);
+
+        addTablet(newTabletCtx, false);
     }
 
     private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) {
@@ -1422,7 +1493,7 @@ public class TabletScheduler extends MasterDaemon {
 
     private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) {
         runningTablets.remove(tabletCtx.getTabletId());
-        allTabletIds.remove(tabletCtx.getTabletId());
+        allTabletTypes.remove(tabletCtx.getTabletId());
         schedHistory.add(tabletCtx);
         LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
     }
@@ -1430,15 +1501,27 @@ public class TabletScheduler extends MasterDaemon {
     // get next batch of tablets from queue.
     private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
         List<TabletSchedCtx> list = Lists.newArrayList();
-        int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
-        while (count > 0) {
+        int slotNum = getCurrentAvailableSlotNum();
+        // Make slotNum >= 1 to ensure that it could return at least 1 ctx
+        // when the pending list is not empty.
+        if (slotNum < 1) {
+            slotNum = 1;
+        }
+        while (list.size() < Config.schedule_batch_size && slotNum > 0) {
             TabletSchedCtx tablet = pendingTablets.poll();
             if (tablet == null) {
                 // no more tablets
                 break;
             }
             list.add(tablet);
-            count--;
+            TabletStatus status = tablet.getTabletStatus();
+            // for a clone, it will take 2 slots: src slot and dst slot.
+            if (!(status == TabletStatus.REDUNDANT
+                    || status == TabletStatus.FORCE_REDUNDANT
+                    || status == TabletStatus.COLOCATE_REDUNDANT
+                    || status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) {
+                slotNum -= 2;
+            }
         }
         return list;
     }
@@ -1469,9 +1552,12 @@ public class TabletScheduler extends MasterDaemon {
             // if we have a success task, then stat must be refreshed before schedule a new task
             updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
             updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
+            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
+        } else {
+            finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+                    request.getTaskStatus().getErrorMsgs().get(0));
         }
-        // we need this function to free slot for this migration task
-        finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
+
         return true;
     }
 
@@ -1496,12 +1582,20 @@ public class TabletScheduler extends MasterDaemon {
         try {
             tabletCtx.finishCloneTask(cloneTask, request);
         } catch (SchedException e) {
-            tabletCtx.increaseFailedRunningCounter();
             tabletCtx.setErrMsg(e.getMessage());
             if (e.getStatus() == Status.RUNNING_FAILED) {
-                stat.counterCloneTaskFailed.incrementAndGet();
-                addToRunningTablets(tabletCtx);
-                return false;
+                tabletCtx.increaseFailedRunningCounter();
+                if (!tabletCtx.isExceedFailedRunningLimit()) {
+                    stat.counterCloneTaskFailed.incrementAndGet();
+                    addToRunningTablets(tabletCtx);
+                    return false;
+                } else {
+                    // unrecoverable
+                    stat.counterTabletScheduledDiscard.incrementAndGet();
+                    finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+                            e.getMessage());
+                    return true;
+                }
             } else if (e.getStatus() == Status.UNRECOVERABLE) {
                 // unrecoverable
                 stat.counterTabletScheduledDiscard.incrementAndGet();
@@ -1639,7 +1733,7 @@ public class TabletScheduler extends MasterDaemon {
     }
 
     public synchronized int getTotalNum() {
-        return allTabletIds.size();
+        return allTabletTypes.size();
     }
 
     public synchronized long getBalanceTabletsNumber() {
@@ -1655,10 +1749,12 @@ public class TabletScheduler extends MasterDaemon {
     public static class PathSlot {
         // path hash -> slot num
         private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
+        private long beId;
 
-        public PathSlot(List<Long> paths, int initSlotNum) {
+        public PathSlot(List<Long> paths, long beId) {
+            this.beId = beId;
             for (Long pathHash : paths) {
-                pathSlots.put(pathHash, new Slot(initSlotNum));
+                pathSlots.put(pathHash, new Slot(beId));
             }
         }
 
@@ -1670,22 +1766,8 @@ public class TabletScheduler extends MasterDaemon {
             // add new path
             for (Long pathHash : paths) {
                 if (!pathSlots.containsKey(pathHash)) {
-                    pathSlots.put(pathHash, new Slot(Config.schedule_slot_num_per_path));
-                }
-            }
-        }
-
-        // Update the total slots num of specified paths, increase or decrease
-        public synchronized void updateSlot(List<Long> pathHashs, int delta) {
-            for (Long pathHash : pathHashs) {
-                Slot slot = pathSlots.get(pathHash);
-                if (slot == null) {
-                    continue;
+                    pathSlots.put(pathHash, new Slot(beId));
                 }
-
-                slot.total += delta;
-                slot.rectify();
-                LOG.debug("decrease path {} slots num to {}", pathHash, pathSlots.get(pathHash).total);
             }
         }
 
@@ -1701,6 +1783,21 @@ public class TabletScheduler extends MasterDaemon {
             slot.totalCopyTimeMs += copyTimeMs;
         }
 
+        public synchronized boolean hasAvailableSlot(long pathHash) {
+            if (pathHash == -1) {
+                return false;
+            }
+
+            Slot slot = pathSlots.get(pathHash);
+            if (slot == null) {
+                return false;
+            }
+            if (slot.getAvailable() == 0) {
+                return false;
+            }
+            return true;
+        }
+
         /**
          * If the specified 'pathHash' has available slot, decrease the slot number and return this path hash
          */
@@ -1709,7 +1806,8 @@ public class TabletScheduler extends MasterDaemon {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("path hash is not set.", new Exception());
                 }
-                throw new SchedException(Status.SCHEDULE_FAILED, "path hash is not set");
+                throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+                        "path hash is not set");
             }
 
             Slot slot = pathSlots.get(pathHash);
@@ -1717,12 +1815,11 @@ public class TabletScheduler extends MasterDaemon {
                 LOG.debug("path {} is not exist", pathHash);
                 return -1;
             }
-            slot.rectify();
-            if (slot.available <= 0) {
+            if (slot.used >= slot.getTotal()) {
                 LOG.debug("path {} has no available slot", pathHash);
                 return -1;
             }
-            slot.available--;
+            slot.used++;
             return pathHash;
         }
 
@@ -1731,23 +1828,15 @@ public class TabletScheduler extends MasterDaemon {
             if (slot == null) {
                 return;
             }
-            slot.available++;
-            slot.rectify();
-        }
-
-        public synchronized int peekSlot(long pathHash) {
-            Slot slot = pathSlots.get(pathHash);
-            if (slot == null) {
-                return -1;
+            if (slot.used > 0) {
+                slot.used--;
             }
-            slot.rectify();
-            return slot.available;
         }
 
         public synchronized int getTotalAvailSlotNum() {
             int total = 0;
             for (Slot slot : pathSlots.values()) {
-                total += slot.available;
+                total += slot.getAvailable();
             }
             return total;
         }
@@ -1758,7 +1847,7 @@ public class TabletScheduler extends MasterDaemon {
         public synchronized Set<Long> getAvailPathsForBalance() {
             Set<Long> pathHashs = Sets.newHashSet();
             for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
-                if (entry.getValue().balanceSlot > 0) {
+                if (entry.getValue().getBalanceAvailable() > 0) {
                     pathHashs.add(entry.getKey());
                 }
             }
@@ -1768,7 +1857,7 @@ public class TabletScheduler extends MasterDaemon {
         public synchronized int getAvailBalanceSlotNum() {
             int num = 0;
             for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
-                num += entry.getValue().balanceSlot;
+                num += entry.getValue().getBalanceAvailable();
             }
             return num;
         }
@@ -1776,13 +1865,12 @@ public class TabletScheduler extends MasterDaemon {
         public synchronized List<List<String>> getSlotInfo(long beId) {
             List<List<String>> results = Lists.newArrayList();
             pathSlots.forEach((key, value) -> {
-                value.rectify();
                 List<String> result = Lists.newArrayList();
                 result.add(String.valueOf(beId));
                 result.add(String.valueOf(key));
-                result.add(String.valueOf(value.available));
-                result.add(String.valueOf(value.total));
-                result.add(String.valueOf(value.balanceSlot));
+                result.add(String.valueOf(value.getAvailable()));
+                result.add(String.valueOf(value.getTotal()));
+                result.add(String.valueOf(value.getBalanceAvailable()));
                 result.add(String.valueOf(value.getAvgRate()));
                 results.add(result);
             });
@@ -1794,8 +1882,8 @@ public class TabletScheduler extends MasterDaemon {
             if (slot == null) {
                 return -1;
             }
-            if (slot.balanceSlot > 0) {
-                slot.balanceSlot--;
+            if (slot.balanceUsed < slot.getBalanceTotal()) {
+                slot.balanceUsed++;
                 return pathHash;
             }
             return -1;
@@ -1807,8 +1895,8 @@ public class TabletScheduler extends MasterDaemon {
                 if (slot == null) {
                     continue;
                 }
-                if (slot.balanceSlot > 0) {
-                    slot.balanceSlot--;
+                if (slot.balanceUsed < slot.getBalanceTotal()) {
+                    slot.balanceUsed++;
                     return pathHash;
                 }
             }
@@ -1820,8 +1908,9 @@ public class TabletScheduler extends MasterDaemon {
             if (slot == null) {
                 return;
             }
-            slot.balanceSlot++;
-            slot.rectify();
+            if (slot.balanceUsed > 0) {
+                slot.balanceUsed--;
+            }
         }
 
         public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
@@ -1851,10 +1940,8 @@ public class TabletScheduler extends MasterDaemon {
     }
 
     public static class Slot {
-        public int total;
-        public int available;
-        // slot reserved for balance
-        public int balanceSlot;
+        public int used;
+        public int balanceUsed;
 
         public long totalCopySize = 0;
         public long totalCopyTimeMs = 0;
@@ -1862,23 +1949,35 @@ public class TabletScheduler extends MasterDaemon {
         // for disk balance
         public long diskBalanceLastSuccTime = 0;
 
-        public Slot(int total) {
-            this.total = total;
-            this.available = total;
-            this.balanceSlot = Config.balance_slot_num_per_path;
+        private long beId;
+
+        public Slot(long beId) {
+            this.beId = beId;
+            this.used = 0;
+            this.balanceUsed = 0;
         }
 
-        public void rectify() {
-            if (total <= 0) {
-                total = 1;
-            }
-            if (available > total) {
-                available = total;
-            }
+        public int getAvailable() {
+            return Math.max(0, getTotal() - used);
+        }
+
+        public int getTotal() {
+            int total = Math.max(1, Config.schedule_slot_num_per_path);
 
-            if (balanceSlot > Config.balance_slot_num_per_path) {
-                balanceSlot = Config.balance_slot_num_per_path;
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            if (be != null && be.isDecommissioned()) {
+                total = Math.max(1, Config.schedule_decommission_slot_num_per_path);
             }
+
+            return total;
+        }
+
+        public int getBalanceAvailable() {
+            return Math.max(0, getBalanceTotal() - balanceUsed);
+        }
+
+        public int getBalanceTotal() {
+            return Math.max(1, Config.balance_slot_num_per_path);
         }
 
         // return avg rate, Bytes/S
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index e2aada6f1f..4441a99431 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -29,15 +29,15 @@ import com.google.common.collect.Lists;
 import java.util.List;
 
 /*
- * show proc "/tablet_scheduler/pending_tablets";
- * show proc "/tablet_scheduler/running_tablets";
- * show proc "/tablet_scheduler/history_tablets";
+ * show proc "/cluster_balance/pending_tablets";
+ * show proc "/cluster_balance/running_tablets";
+ * show proc "/cluster_balance/history_tablets";
  */
 public class TabletSchedulerDetailProcDir implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
-            .add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe")
+            .add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
             .add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
-            .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer")
+            .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
             .add("CmtVer").add("ErrMsg")
             .build();
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index 41df080a75..d4578e17d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -39,17 +39,17 @@ public class TabletSchedCtxTest {
         ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
         TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
                 1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());
-        ctx1.setOrigPriority(Priority.NORMAL);
+        ctx1.setPriority(Priority.NORMAL);
         ctx1.setLastVisitedTime(2);
 
         TabletSchedCtx ctx2 = new TabletSchedCtx(Type.REPAIR,
                 1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
-        ctx2.setOrigPriority(Priority.NORMAL);
+        ctx2.setPriority(Priority.NORMAL);
         ctx2.setLastVisitedTime(3);
 
         TabletSchedCtx ctx3 = new TabletSchedCtx(Type.REPAIR,
-                1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
-        ctx3.setOrigPriority(Priority.NORMAL);
+                1, 2, 3, 4, 1002, replicaAlloc, System.currentTimeMillis());
+        ctx3.setPriority(Priority.NORMAL);
         ctx3.setLastVisitedTime(1);
 
         pendingTablets.add(ctx1);
@@ -62,8 +62,8 @@ public class TabletSchedCtxTest {
 
         // priority is not equal, info2 is HIGH, should ranks ahead
         pendingTablets.clear();
-        ctx1.setOrigPriority(Priority.NORMAL);
-        ctx2.setOrigPriority(Priority.HIGH);
+        ctx1.setPriority(Priority.NORMAL);
+        ctx2.setPriority(Priority.HIGH);
         ctx1.setLastVisitedTime(2);
         ctx2.setLastVisitedTime(2);
         pendingTablets.add(ctx2);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 95fb22eac6..209f8a1127 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 
 public class DecommissionBackendTest extends TestWithFeService {
-
     @Override
     protected int backendNum() {
         return 3;
@@ -42,6 +41,7 @@ public class DecommissionBackendTest extends TestWithFeService {
     @Override
     protected void beforeCluster() {
         FeConstants.runningUnitTest = true;
+        needCleanDir = false;
     }
 
     @BeforeAll
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index ac2fe5978d..f2610738f3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -119,6 +119,7 @@ public abstract class TestWithFeService {
     protected String dorisHome;
     protected String runningDir = "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
     protected ConnectContext connectContext;
+    protected boolean needCleanDir = true;
 
     protected static final String DEFAULT_CLUSTER_PREFIX = "default_cluster:";
 
@@ -140,7 +141,9 @@ public abstract class TestWithFeService {
         runAfterAll();
         Env.getCurrentEnv().clear();
         StatementScopeIdGenerator.clear();
-        cleanDorisFeDir();
+        if (needCleanDir) {
+            cleanDorisFeDir();
+        }
     }
 
     @BeforeEach


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org