You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/20 14:06:45 UTC
[doris] 01/21: [Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit b6ab4a2cd881d113fa5dcf606d52e8f6526da8ae
Author: yujun <yu...@gmail.com>
AuthorDate: Tue Jul 18 23:25:22 2023 +0800
[Improvement](tablet clone) impr tablet sched speed and fix tablet sched failed too many times (#21856)
---
docker/runtime/doris-compose/database.py | 241 +++++++++++
.../main/java/org/apache/doris/common/Config.java | 16 +-
.../main/java/org/apache/doris/catalog/Tablet.java | 15 +-
.../org/apache/doris/clone/BeLoadRebalancer.java | 50 ++-
.../clone/ColocateTableCheckerAndBalancer.java | 5 +-
.../org/apache/doris/clone/DiskRebalancer.java | 4 +-
.../apache/doris/clone/PartitionRebalancer.java | 6 +-
.../org/apache/doris/clone/SchedException.java | 16 +
.../java/org/apache/doris/clone/TabletChecker.java | 6 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 245 +++++------
.../org/apache/doris/clone/TabletScheduler.java | 461 +++++++++++++--------
.../common/proc/TabletSchedulerDetailProcDir.java | 10 +-
.../org/apache/doris/clone/TabletSchedCtxTest.java | 12 +-
.../doris/cluster/DecommissionBackendTest.java | 2 +-
.../apache/doris/utframe/TestWithFeService.java | 5 +-
15 files changed, 726 insertions(+), 368 deletions(-)
diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py
new file mode 100644
index 0000000000..57fc90275a
--- /dev/null
+++ b/docker/runtime/doris-compose/database.py
@@ -0,0 +1,241 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cluster as CLUSTER
+import os.path
+import pymysql
+import time
+import utils
+
+LOG = utils.get_logger()
+
+
+class FEState(object):
+
+ def __init__(self, id, query_port, is_master, alive, last_heartbeat,
+ err_msg):
+ self.id = id
+ self.query_port = query_port
+ self.is_master = is_master
+ self.alive = alive
+ self.last_heartbeat = last_heartbeat
+ self.err_msg = err_msg
+
+
+class BEState(object):
+
+ def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat,
+ err_msg):
+ self.id = id
+ self.decommissioned = decommissioned
+ self.alive = alive
+ self.tablet_num = tablet_num
+ self.last_heartbeat = last_heartbeat
+ self.err_msg = err_msg
+
+
+class DBManager(object):
+
+ def __init__(self):
+ self.fe_states = {}
+ self.be_states = {}
+ self.query_port = -1
+ self.conn = None
+
+ def set_query_port(self, query_port):
+ self.query_port = query_port
+
+ def get_fe(self, id):
+ return self.fe_states.get(id, None)
+
+ def get_be(self, id):
+ return self.be_states.get(id, None)
+
+ def load_states(self, query_ports):
+ self._load_fe_states(query_ports)
+ self._load_be_states()
+
+ def drop_fe(self, fe_endpoint):
+ id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
+ LOG.info("Drop fe {} with id {} from db succ.".format(
+ fe_endpoint, id))
+ except Exception as e:
+ if str(e).find("frontend does not exist") >= 0:
+ LOG.info(
+ "Drop fe {} with id {} from db succ cause it does not exist in db."
+ .format(fe_endpoint, id))
+ return
+ raise e
+
+ def drop_be(self, be_endpoint):
+ id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
+ LOG.info("Drop be {} with id {} from db succ.".format(
+ be_endpoint, id))
+ except Exception as e:
+ if str(e).find("backend does not exists") >= 0:
+ LOG.info(
+ "Drop be {} with id {} from db succ cause it does not exist in db."
+ .format(be_endpoint, id))
+ return
+ raise e
+
+ def decommission_be(self, be_endpoint):
+ old_tablet_num = 0
+ id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+ start_ts = time.time()
+ if id not in self.be_states:
+ self._load_be_states()
+ if id in self.be_states:
+ be = self.be_states[id]
+ old_tablet_num = be.tablet_num
+ if not be.alive:
+ raise Exception("Decommission be {} with id {} fail " \
+ "cause it's not alive, maybe you should specific --drop-force " \
+ " to dropp it from db".format(be_endpoint, id))
+ try:
+ self._exec_query(
+ "ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
+ LOG.info("Mark be {} with id {} as decommissioned, start migrate its tablets, " \
+ "wait migrating job finish.".format(be_endpoint, id))
+ except Exception as e:
+ if str(e).find("Backend does not exist") >= 0:
+ LOG.info("Decommission be {} with id {} from db succ " \
+ "cause it does not exist in db.",format(be_endpoint, id))
+ return
+ raise e
+
+ while True:
+ self._load_be_states()
+ be = self.be_states.get(id, None)
+ if not be:
+ LOG.info("Decommission be {} succ, total migrate {} tablets, " \
+ "has drop it from db.".format(be_endpoint, old_tablet_num))
+ return
+ LOG.info(
+ "Decommission be {} status: alive {}, decommissioned {}. " \
+ "It is migrating its tablets, left {}/{} tablets. Time elapse {} s."
+ .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num,
+ int(time.time() - start_ts)))
+
+ time.sleep(5)
+
+ def _load_fe_states(self, query_ports):
+ fe_states = {}
+ alive_master_fe_port = None
+ for record in self._exec_query("show frontends;"):
+ ip = record[1]
+ is_master = record[7] == "true"
+ alive = record[10] == "true"
+ last_heartbeat = record[12]
+ err_msg = record[14]
+ id = CLUSTER.Node.get_id_from_ip(ip)
+ query_port = query_ports.get(id, None)
+ fe = FEState(id, query_port, is_master, alive, last_heartbeat,
+ err_msg)
+ fe_states[id] = fe
+ if is_master and alive and query_port:
+ alive_master_fe_port = query_port
+ self.fe_states = fe_states
+ if alive_master_fe_port and alive_master_fe_port != self.query_port:
+ self.query_port = alive_master_fe_port
+ self._reset_conn()
+
+ def _load_be_states(self):
+ be_states = {}
+ for record in self._exec_query("show backends;"):
+ ip = record[1]
+ last_heartbeat = record[7]
+ alive = record[8] == "true"
+ decommissioned = record[9] == "true"
+ tablet_num = int(record[10])
+ err_msg = record[18]
+ id = CLUSTER.Node.get_id_from_ip(ip)
+ be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat,
+ err_msg)
+ be_states[id] = be
+ self.be_states = be_states
+
+ def _exec_query(self, sql):
+ self._prepare_conn()
+ with self.conn.cursor() as cursor:
+ cursor.execute(sql)
+ return cursor.fetchall()
+
+ def _prepare_conn(self):
+ if self.conn:
+ return
+ if self.query_port <= 0:
+ raise Exception("Not set query_port")
+ self._reset_conn()
+
+ def _reset_conn(self):
+ self.conn = pymysql.connect(user="root",
+ host="127.0.0.1",
+ read_timeout=10,
+ port=self.query_port)
+
+
+def get_db_mgr(cluster_name, required_load_succ=True):
+ assert cluster_name
+ db_mgr = DBManager()
+ containers = utils.get_doris_containers(cluster_name).get(
+ cluster_name, None)
+ if not containers:
+ return db_mgr
+ alive_fe_ports = {}
+ for container in containers:
+ if utils.is_container_running(container):
+ _, node_type, id = utils.parse_service_name(container.name)
+ if node_type == CLUSTER.Node.TYPE_FE:
+ query_port = utils.get_map_ports(container).get(
+ CLUSTER.FE_QUERY_PORT, None)
+ if query_port:
+ alive_fe_ports[id] = query_port
+ if not alive_fe_ports:
+ return db_mgr
+
+ master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
+ "master_fe_ip")
+ query_port = None
+ if os.path.exists(master_fe_ip_file):
+ with open(master_fe_ip_file, "r") as f:
+ master_fe_ip = f.read()
+ if master_fe_ip:
+ master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
+ query_port = alive_fe_ports.get(master_id, None)
+ if not query_port:
+ # A new cluster's master is fe-1
+ if 1 in alive_fe_ports:
+ query_port = alive_fe_ports[1]
+ else:
+ query_port = list(alive_fe_ports.values())[0]
+
+ db_mgr.set_query_port(query_port)
+ try:
+ db_mgr.load_states(alive_fe_ports)
+ except Exception as e:
+ if required_load_succ:
+ raise e
+ #LOG.exception(e)
+
+ return db_mgr
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 373cb777e4..4a7c9e929d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -904,7 +904,21 @@ public class Config extends ConfigBase {
* the default slot number per path in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
*/
- @ConfField public static int schedule_slot_num_per_path = 2;
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_slot_num_per_path = 4;
+
+ /**
+ * the default slot number per path in tablet scheduler for decommission backend
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_decommission_slot_num_per_path = 8;
+
+ /**
+ * the default batch size in tablet scheduler for a single schedule.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_batch_size = 50;
+
/**
* Deprecated after 0.10
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 74f1c31cbf..0e93a4dd0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -703,11 +703,24 @@ public class Tablet extends MetaObject implements Writable {
* NORMAL: delay Config.tablet_repair_delay_factor_second * 2;
* LOW: delay Config.tablet_repair_delay_factor_second * 3;
*/
- public boolean readyToBeRepaired(TabletSchedCtx.Priority priority) {
+ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.Priority priority) {
if (priority == Priority.VERY_HIGH) {
return true;
}
+ boolean allBeAliveOrDecommissioned = true;
+ for (Replica replica : replicas) {
+ Backend backend = infoService.getBackend(replica.getBackendId());
+ if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) {
+ allBeAliveOrDecommissioned = false;
+ break;
+ }
+ }
+
+ if (allBeAliveOrDecommissioned) {
+ return true;
+ }
+
long currentTime = System.currentTimeMillis();
// first check, wait for next round
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index ebbebe6806..5317725881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
@@ -166,7 +167,7 @@ public class BeLoadRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// balance task's priority is always LOW
- tabletCtx.setOrigPriority(Priority.LOW);
+ tabletCtx.setPriority(Priority.LOW);
alternativeTablets.add(tabletCtx);
if (--numOfLowPaths <= 0) {
@@ -262,7 +263,7 @@ public class BeLoadRebalancer extends Rebalancer {
}
// Select a low load backend as destination.
- boolean setDest = false;
+ List<BackendLoadStatistic> candidates = Lists.newArrayList();
for (BackendLoadStatistic beStat : lowBe) {
if (beStat.isAvailable() && replicas.stream().noneMatch(r -> r.getBackendId() == beStat.getBeId())) {
// check if on same host.
@@ -296,27 +297,36 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
- // classify the paths.
- // And we only select path from 'low' and 'mid' paths
- Set<Long> pathLow = Sets.newHashSet();
- Set<Long> pathMid = Sets.newHashSet();
- Set<Long> pathHigh = Sets.newHashSet();
- beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
- pathLow.addAll(pathMid);
-
- long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
- if (pathHash == -1) {
- LOG.debug("paths has no available balance slot: {}", pathLow);
- } else {
- tabletCtx.setDest(beStat.getBeId(), pathHash);
- setDest = true;
- break;
- }
+ candidates.add(beStat);
}
}
- if (!setDest) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find low backend");
+ if (candidates.isEmpty()) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find low backend");
}
+
+ for (BackendLoadStatistic beStat : candidates) {
+ PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
+ if (slot == null) {
+ continue;
+ }
+
+ // classify the paths.
+ // And we only select path from 'low' and 'mid' paths
+ Set<Long> pathLow = Sets.newHashSet();
+ Set<Long> pathMid = Sets.newHashSet();
+ Set<Long> pathHigh = Sets.newHashSet();
+ beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
+ pathLow.addAll(pathMid);
+
+ long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
+ if (pathHash != -1) {
+ tabletCtx.setDest(beStat.getBeId(), pathHash);
+ return;
+ }
+ }
+
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "unable to find low backend");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index b3fdc3ca71..b34bc926dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -202,6 +202,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
*/
private void matchGroup() {
Env env = Env.getCurrentEnv();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
TabletScheduler tabletScheduler = env.getTabletScheduler();
@@ -254,7 +255,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
+ " status: %s", tablet.getId(), st);
LOG.debug(unstableReason);
- if (!tablet.readyToBeRepaired(Priority.NORMAL)) {
+ if (!tablet.readyToBeRepaired(infoService, Priority.NORMAL)) {
continue;
}
@@ -265,7 +266,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(st);
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
tabletCtx.setTabletOrderIdx(idx);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 9d676d950c..abac0c2d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -208,10 +208,10 @@ public class DiskRebalancer extends Rebalancer {
tabletCtx.setTag(clusterStat.getTag());
if (prioBackends.containsKey(beStat.getBeId())) {
// priority of balance task of prio BE is NORMAL
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
} else {
// balance task's default priority is LOW
- tabletCtx.setOrigPriority(Priority.LOW);
+ tabletCtx.setPriority(Priority.LOW);
}
// we must set balanceType to DISK_BALANCE for create migration task
tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 9e3e37ef00..d9d3f27cc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -151,7 +151,7 @@ public class PartitionRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
- tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+ tabletCtx.setPriority(TabletSchedCtx.Priority.LOW);
alternativeTablets.add(tabletCtx);
// Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L before scheduled successfully
movesInProgress.get().put(pickedTabletId,
@@ -251,7 +251,7 @@ public class PartitionRebalancer extends Rebalancer {
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
- throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+ throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
"no slot for src replica " + srcReplica + ", pathHash " + srcReplica.getPathHash());
}
@@ -269,7 +269,7 @@ public class PartitionRebalancer extends Rebalancer {
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
- throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+ throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
} else {
tabletCtx.setDest(beStat.getBeId(), pathHash);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
index 0ad83e8909..a343e6543c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
@@ -27,14 +27,30 @@ public class SchedException extends Exception {
FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED
}
+ public enum SubCode {
+ NONE,
+ WAITING_DECOMMISSION,
+ WAITING_SLOT,
+ }
+
private Status status;
+ private SubCode subCode;
public SchedException(Status status, String errorMsg) {
+ this(status, SubCode.NONE, errorMsg);
+ }
+
+ public SchedException(Status status, SubCode subCode, String errorMsg) {
super(errorMsg);
this.status = status;
+ this.subCode = subCode;
}
public Status getStatus() {
return status;
}
+
+ public SubCode getSubCode() {
+ return subCode;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 67226eae67..d9330e0f16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -372,9 +372,7 @@ public class TabletChecker extends MasterDaemon {
}
counter.unhealthyTabletNum++;
-
- if (!tablet.readyToBeRepaired(statusWithPrio.second)) {
- counter.tabletNotReady++;
+ if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) {
continue;
}
@@ -386,7 +384,7 @@ public class TabletChecker extends MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(statusWithPrio.first);
- tabletCtx.setOrigPriority(statusWithPrio.second);
+ tabletCtx.setPriority(statusWithPrio.second);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 58c411c40f..286b70f65e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@@ -102,6 +103,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
+ public static final int FINISHED_COUNTER_THRESHOLD = 3;
+
private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();
public enum Type {
@@ -117,22 +120,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
NORMAL,
HIGH,
VERY_HIGH;
-
- // VERY_HIGH can only be downgraded to NORMAL
- // LOW can only be upgraded to HIGH
- public Priority adjust(Priority origPriority, boolean isUp) {
- switch (this) {
- case VERY_HIGH:
- return isUp ? VERY_HIGH : HIGH;
- case HIGH:
- return isUp ? (origPriority == LOW ? HIGH : VERY_HIGH) : NORMAL;
- case NORMAL:
- return isUp ? HIGH : (origPriority == Priority.VERY_HIGH ? NORMAL : LOW);
- default:
- return isUp ? NORMAL : LOW;
- }
- }
-
}
public enum State {
@@ -147,23 +134,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private Type type;
private BalanceType balanceType;
- /*
- * origPriority is the origin priority being set when this tablet being added to scheduler.
- * dynamicPriority will be set during tablet schedule processing, it will not be prior than origin priority.
- * And dynamic priority is also used in priority queue compare in tablet scheduler.
- */
- private Priority origPriority;
- private Priority dynamicPriority;
+ private Priority priority;
// we change the dynamic priority based on how many times it fails to be scheduled
private int failedSchedCounter = 0;
// clone task failed counter
private int failedRunningCounter = 0;
+ // When finish a tablet ctx, it will check the tablet's health status.
+ // If the tablet is unhealthy, it will add a new ctx.
+ // The new ctx's finishedCounter = old ctx's finishedCounter + 1.
+ private int finishedCounter = 0;
// last time this tablet being scheduled
private long lastSchedTime = 0;
- // last time the dynamic priority being adjusted
- private long lastAdjustPrioTime = 0;
// last time this tablet being visited.
// being visited means:
@@ -180,6 +163,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private State state;
private TabletStatus tabletStatus;
+ private long decommissionTime = -1;
+
private long dbId;
private long tblId;
private long partitionId;
@@ -223,6 +208,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// tag is only set for BALANCE task, used to identify which workload group this Balance job is in
private Tag tag;
+ private SubCode schedFailedCode;
+
public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) {
this.type = type;
@@ -236,12 +223,17 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.state = State.PENDING;
this.replicaAlloc = replicaAlloc;
this.balanceType = BalanceType.BE_BALANCE;
+ this.schedFailedCode = SubCode.NONE;
}
public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
+ public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+ this.replicaAlloc = replicaAlloc;
+ }
+
public void setTag(Tag tag) {
this.tag = tag;
}
@@ -266,21 +258,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
return balanceType;
}
- public Priority getOrigPriority() {
- return origPriority;
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Priority priority) {
+ this.priority = priority;
}
- public void setOrigPriority(Priority origPriority) {
- this.origPriority = origPriority;
- // reset dynamic priority along with the origin priority being set.
- this.dynamicPriority = origPriority;
- this.failedSchedCounter = 0;
- this.lastSchedTime = 0;
- this.lastAdjustPrioTime = 0;
+ public int getFinishedCounter() {
+ return finishedCounter;
}
- public Priority getDynamicPriority() {
- return dynamicPriority;
+ public void setFinishedCounter(int finishedCounter) {
+ this.finishedCounter = finishedCounter;
}
public void increaseFailedSchedCounter() {
@@ -295,8 +286,27 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
++failedRunningCounter;
}
- public int getFailedRunningCounter() {
- return failedRunningCounter;
+ public boolean isExceedFailedRunningLimit() {
+ return failedRunningCounter >= RUNNING_FAILED_COUNTER_THRESHOLD;
+ }
+
+ public boolean onSchedFailedAndCheckExceedLimit(SubCode code) {
+ schedFailedCode = code;
+ failedSchedCounter++;
+ if (code == SubCode.WAITING_DECOMMISSION) {
+ failedSchedCounter = 0;
+ if (decommissionTime < 0) {
+ decommissionTime = System.currentTimeMillis();
+ }
+ return System.currentTimeMillis() > decommissionTime + 10 * 60 * 1000L;
+ } else {
+ decommissionTime = -1;
+ if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
+ return failedSchedCounter > 30 * 1000 / TabletScheduler.SCHEDULE_INTERVAL_MS;
+ } else {
+ return failedSchedCounter > 10;
+ }
+ }
}
public void setLastSchedTime(long lastSchedTime) {
@@ -311,6 +321,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.finishedTime = finishedTime;
}
+ public void setDecommissionTime(long decommissionTime) {
+ this.decommissionTime = decommissionTime;
+ }
+
public State getState() {
return state;
}
@@ -615,7 +629,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
setSrc(srcReplica);
return;
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "unable to find source slot");
}
/*
@@ -641,7 +656,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWorkingSlots)
throws SchedException {
- Replica chosenReplica = null;
+ List<Replica> candidates = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (replica.isBad()) {
LOG.debug("replica {} is bad, skip. tablet: {}",
@@ -660,18 +675,37 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// if the replica's state is DECOMMISSION, it may be chose as dest replica,
// and its state will be set to NORMAL later.
if (replica.getLastFailedVersion() <= 0
- && ((replica.getVersion() == visibleVersion)
- || replica.getVersion() > visibleVersion) && replica.getState() != ReplicaState.DECOMMISSION) {
+ && replica.getVersion() >= visibleVersion
+ && replica.getState() != ReplicaState.DECOMMISSION) {
// skip healthy replica
LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}",
replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId);
continue;
}
+ candidates.add(replica);
+ }
+
+ if (candidates.isEmpty()) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
+ }
+
+ Replica chosenReplica = null;
+ for (Replica replica : candidates) {
+ PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
+ if (slot == null || !slot.hasAvailableSlot(replica.getPathHash())) {
+ if (!replica.needFurtherRepair()) {
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "replica " + replica + " has not slot");
+ }
+
+ continue;
+ }
+
if (replica.needFurtherRepair()) {
+ chosenReplica = replica;
LOG.debug("replica {} need further repair, choose it. tablet: {}",
replica.getId(), tabletId);
- chosenReplica = replica;
break;
}
@@ -686,20 +720,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
- if (chosenReplica == null) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to choose dest replica");
- }
-
// check if the dest replica has available slot
+ // it should not happen cause it just check hasAvailableSlot yet.
PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId());
if (slot == null) {
- throw new SchedException(Status.SCHEDULE_FAILED, "backend of dest replica is missing");
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "backend of dest replica is missing");
}
-
long destPathHash = slot.takeSlot(chosenReplica.getPathHash());
if (destPathHash == -1) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "unable to take slot of dest path");
}
+
if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of VERSION_INCOMPLETE,
// it means that this replica needs to be able to accept loading data.
@@ -717,6 +750,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// forever, because the replica in the DECOMMISSION state will not receive the load task.
chosenReplica.setWatermarkTxnId(-1);
chosenReplica.setState(ReplicaState.NORMAL);
+ setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete,"
+ " and change state from DECOMMISSION to NORMAL",
chosenReplica.getId(), chosenReplica.getBackendId(), tabletId);
@@ -941,7 +975,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
cloneTask.getDbId(), cloneTask.getTableId(), cloneTask.getPartitionId(),
cloneTask.getIndexId(), cloneTask.getTabletId(), cloneTask.getBackendId(),
dbId, tblId, partitionId, indexId, tablet.getId(), destBackendId);
- throw new SchedException(Status.RUNNING_FAILED, msg);
+ throw new SchedException(Status.UNRECOVERABLE, msg);
}
// 1. check the tablet status first
@@ -1041,13 +1075,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
state = State.FINISHED;
LOG.info("clone finished: {}", this);
- } catch (SchedException e) {
- // if failed to too many times, remove this task
- ++failedRunningCounter;
- if (failedRunningCounter > RUNNING_FAILED_COUNTER_THRESHOLD) {
- throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
- }
- throw e;
} finally {
olapTable.writeUnlock();
}
@@ -1061,73 +1088,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
- /*
- * we try to adjust the priority based on schedule history
- * 1. If failed counter is larger than FAILED_COUNTER_THRESHOLD, which means this tablet is being scheduled
- * at least FAILED_TIME_THRESHOLD times and all are failed. So we downgrade its priority.
- * Also reset the failedCounter, or it will be downgraded forever.
- *
- * 2. Else, if it has been a long time since last time the tablet being scheduled, we upgrade its
- * priority to let it more available to be scheduled.
- *
- * The time gap between adjustment should be larger than MIN_ADJUST_PRIORITY_INTERVAL_MS, to avoid
- * being downgraded too fast.
- *
- * eg:
- * A tablet has been scheduled for 5 times and all were failed. its priority will be downgraded. And if it is
- * scheduled for 5 times and all are failed again, it will be downgraded again, until to the LOW.
- * And than, because of LOW, this tablet can not be scheduled for a long time, and it will be upgraded
- * to NORMAL, if still not being scheduled, it will be upgraded up to VERY_HIGH.
- *
- * return true if dynamic priority changed
- */
- public boolean adjustPriority(TabletSchedulerStat stat) {
- long currentTime = System.currentTimeMillis();
- if (lastAdjustPrioTime == 0) {
- // skip the first time we adjust this priority
- lastAdjustPrioTime = currentTime;
- return false;
- } else {
- if (currentTime - lastAdjustPrioTime < MIN_ADJUST_PRIORITY_INTERVAL_MS) {
- return false;
- }
- }
-
- boolean isDowngrade = false;
- boolean isUpgrade = false;
-
- if (failedSchedCounter > SCHED_FAILED_COUNTER_THRESHOLD) {
- isDowngrade = true;
- } else {
- long lastTime = lastSchedTime == 0 ? createTime : lastSchedTime;
- if (currentTime - lastTime > MAX_NOT_BEING_SCHEDULED_INTERVAL_MS) {
- isUpgrade = true;
- }
- }
-
- Priority originDynamicPriority = dynamicPriority;
- if (isDowngrade) {
- dynamicPriority = dynamicPriority.adjust(origPriority, false /* downgrade */);
- failedSchedCounter = 0;
- if (originDynamicPriority != dynamicPriority) {
- LOG.debug("downgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
- originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
- stat.counterTabletPrioDowngraded.incrementAndGet();
- return true;
- }
- } else if (isUpgrade) {
- dynamicPriority = dynamicPriority.adjust(origPriority, true /* upgrade */);
- // no need to set lastSchedTime, lastSchedTime is set each time we schedule this tablet
- if (originDynamicPriority != dynamicPriority) {
- LOG.debug("upgrade dynamic priority from {} to {}, origin: {}, tablet: {}",
- originDynamicPriority.name(), dynamicPriority.name(), origPriority.name(), tabletId);
- stat.counterTabletPrioUpgraded.incrementAndGet();
- return true;
- }
- }
- return false;
- }
-
public boolean isTimeout() {
if (state != TabletSchedCtx.State.RUNNING) {
return false;
@@ -1144,8 +1104,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name());
result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name());
result.add(state.name());
- result.add(origPriority.name());
- result.add(dynamicPriority.name());
+ result.add(schedFailedCode.name());
+ result.add(priority.name());
result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId()));
result.add(String.valueOf(srcPathHash));
result.add(String.valueOf(destBackendId));
@@ -1158,7 +1118,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs / 1000.0) : FeConstants.null_string);
result.add(String.valueOf(failedSchedCounter));
result.add(String.valueOf(failedRunningCounter));
- result.add(TimeUtils.longToTimeString(lastAdjustPrioTime));
result.add(String.valueOf(visibleVersion));
result.add(String.valueOf(committedVersion));
result.add(Strings.nullToEmpty(errMsg));
@@ -1171,19 +1130,23 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
*/
@Override
public int compareTo(TabletSchedCtx o) {
- if (dynamicPriority.ordinal() < o.dynamicPriority.ordinal()) {
- return 1;
- } else if (dynamicPriority.ordinal() > o.dynamicPriority.ordinal()) {
- return -1;
- } else {
- if (lastVisitedTime < o.lastVisitedTime) {
- return -1;
- } else if (lastVisitedTime > o.lastVisitedTime) {
- return 1;
- } else {
- return 0;
- }
+ return Long.compare(getCompareValue(), o.getCompareValue());
+ }
+
+ private long getCompareValue() {
+ long value = createTime;
+ if (lastVisitedTime > 0) {
+ value = lastVisitedTime;
}
+
+ value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L;
+ value += 5000L * (failedSchedCounter / 10);
+
+ if (type == Type.BALANCE) {
+ value += 30 * 60 * 1000L;
+ }
+
+ return value;
}
@Override
@@ -1227,6 +1190,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
* call this when releaseTabletCtx()
*/
public void resetReplicaState() {
+ setDecommissionTime(-1);
if (tablet != null) {
for (Replica replica : tablet.getReplicas()) {
// To address issue: https://github.com/apache/doris/issues/9422
@@ -1243,5 +1207,4 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
}
-
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index dbf3d46237..aaaac36a24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -32,10 +32,12 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.AnalysisException;
@@ -59,6 +61,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableMap;
@@ -100,21 +103,21 @@ public class TabletScheduler extends MasterDaemon {
// the minimum interval of updating cluster statistics and priority of tablet info
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
- private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
+ public static final long SCHEDULE_INTERVAL_MS = 100;
/*
- * Tablet is added to pendingTablets as well it's id in allTabletIds.
- * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when
+ * Tablet is added to pendingTablets as well it's id in allTabletTypes.
+ * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletTypes when
* handling a tablet.
* Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished).
- * So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler.
+ * So if a tablet's id is still in allTabletTypes, TabletChecker can not add tablet to TabletScheduler.
*
- * pendingTablets + runningTablets = allTabletIds
+ * pendingTablets + runningTablets = allTabletTypes
*
- * pendingTablets, allTabletIds, runningTablets and schedHistory are protected by 'synchronized'
+ * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized'
*/
private PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
- private Set<Long> allTabletIds = Sets.newHashSet();
+ private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
// save the latest 1000 scheduled tablet info
@@ -128,6 +131,8 @@ public class TabletScheduler extends MasterDaemon {
private long lastSlotAdjustTime = 0;
+ private long lastCheckTimeoutTime = 0;
+
private Env env;
private SystemInfoService infoService;
private TabletInvertedIndex invertedIndex;
@@ -175,7 +180,8 @@ public class TabletScheduler extends MasterDaemon {
// when upgrading, backend may not get path info yet. so return false and wait for next round.
// and we should check if backend is alive. If backend is dead when upgrading, this backend
// will never report its path hash, and tablet scheduler is blocked.
- LOG.info("not all backends have path info");
+ LOG.info("backend {}:{} with id {} doesn't have path info.", backend.getHost(),
+ backend.getBePort(), backend.getId());
return false;
}
}
@@ -204,7 +210,7 @@ public class TabletScheduler extends MasterDaemon {
if (!backendsWorkingSlots.containsKey(be.getId())) {
List<Long> pathHashes = be.getDisks().values().stream()
.map(DiskInfo::getPathHash).collect(Collectors.toList());
- PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path);
+ PathSlot slot = new PathSlot(pathHashes, be.getId());
backendsWorkingSlots.put(be.getId(), slot);
LOG.info("add new backend {} with slots num: {}", be.getId(), be.getDisks().size());
}
@@ -225,7 +231,18 @@ public class TabletScheduler extends MasterDaemon {
if (!force && Config.disable_tablet_scheduler) {
return AddResult.DISABLED;
}
- if (!force && containsTablet(tablet.getTabletId())) {
+
+ // REPAIR has higher priority than BALANCE.
+ // Suppose adding a BALANCE tablet successfully, then adding this tablet's REPAIR ctx will fail.
+ // But we set allTabletTypes[tabletId] to REPAIR. Later at the beginning of scheduling this tablet,
+ // it will reset its type as allTabletTypes[tabletId], so its type will convert to REPAIR.
+
+ long tabletId = tablet.getTabletId();
+ boolean contains = allTabletTypes.containsKey(tabletId);
+ if (contains && !force) {
+ if (tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+ allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR);
+ }
return AddResult.ALREADY_IN;
}
@@ -238,13 +255,22 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.LIMIT_EXCEED;
}
- allTabletIds.add(tablet.getTabletId());
+ if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+ allTabletTypes.put(tabletId, tablet.getType());
+ }
+
pendingTablets.offer(tablet);
+ if (!contains) {
+ LOG.info("Add tablet to pending queue, tablet id {}, type {}, status {}, priority {}",
+ tablet.getTabletId(), tablet.getType(), tablet.getTabletStatus(),
+ tablet.getPriority());
+ }
+
return AddResult.ADDED;
}
public synchronized boolean containsTablet(long tabletId) {
- return allTabletIds.contains(tabletId);
+ return allTabletTypes.containsKey(tabletId);
}
public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
@@ -263,7 +289,7 @@ public class TabletScheduler extends MasterDaemon {
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
- tabletCtx.setOrigPriority(Priority.VERY_HIGH);
+ tabletCtx.setPriority(Priority.VERY_HIGH);
}
newPendingTablets.add(tabletCtx);
}
@@ -293,14 +319,15 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- updateLoadStatisticsAndPriorityIfNecessary();
+ if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
+ updateLoadStatisticsAndPriorityIfNecessary();
+ handleRunningTablets();
+ selectTabletsForBalance();
+ lastCheckTimeoutTime = System.currentTimeMillis();
+ }
schedulePendingTablets();
- handleRunningTablets();
-
- selectTabletsForBalance();
-
stat.counterTabletScheduleRound.incrementAndGet();
}
@@ -314,8 +341,6 @@ public class TabletScheduler extends MasterDaemon {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
- adjustPriorities();
-
lastStatUpdateTime = System.currentTimeMillis();
}
@@ -332,7 +357,7 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex);
loadStatistic.init();
newStatisticMap.put(tag, loadStatistic);
- LOG.debug("update load statistic:\n{}", loadStatistic.getBrief());
+ LOG.debug("update load statistic for tag {}:\n{}", tag, loadStatistic.getBrief());
}
this.statisticMap = newStatisticMap;
@@ -342,28 +367,6 @@ public class TabletScheduler extends MasterDaemon {
return statisticMap;
}
- /**
- * adjust priorities of all tablet infos
- */
- private synchronized void adjustPriorities() {
- int size = pendingTablets.size();
- int changedNum = 0;
- TabletSchedCtx tabletCtx;
- for (int i = 0; i < size; i++) {
- tabletCtx = pendingTablets.poll();
- if (tabletCtx == null) {
- break;
- }
-
- if (tabletCtx.adjustPriority(stat)) {
- changedNum++;
- }
- pendingTablets.add(tabletCtx);
- }
-
- LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
- }
-
/**
* get at most BATCH_NUM tablets from queue, and try to schedule them.
* After handle, the tablet info should be
@@ -371,7 +374,7 @@ public class TabletScheduler extends MasterDaemon {
* 2. or in schedHistory with state CANCELLING, if some unrecoverable error happens.
* 3. or in pendingTablets with state PENDING, if failed to be scheduled.
*
- * if in schedHistory, it should be removed from allTabletIds.
+ * if in schedHistory, it should be removed from allTabletTypes.
*/
private void schedulePendingTablets() {
long start = System.currentTimeMillis();
@@ -385,36 +388,25 @@ public class TabletScheduler extends MasterDaemon {
// do not schedule more tablet is tablet scheduler is disabled.
throw new SchedException(Status.FINISHED, "tablet scheduler is disabled");
}
+ if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ "config disable balance");
+ continue;
+ }
scheduleTablet(tabletCtx, batchTask);
} catch (SchedException e) {
- tabletCtx.increaseFailedSchedCounter();
tabletCtx.setErrMsg(e.getMessage());
-
if (e.getStatus() == Status.SCHEDULE_FAILED) {
- if (tabletCtx.getType() == Type.BALANCE) {
- // if balance is disabled, remove this tablet
- if (Config.disable_balance) {
- finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
- "disable balance and " + e.getMessage());
- } else {
- // remove the balance task if it fails to be scheduled many times
- if (tabletCtx.getFailedSchedCounter() > 10) {
- finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
- "schedule failed too many times and " + e.getMessage());
- } else {
- // we must release resource it current hold, and be scheduled again
- tabletCtx.releaseResource(this);
- // adjust priority to avoid some higher priority always be the first in pendingTablets
- stat.counterTabletScheduledFailed.incrementAndGet();
- dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
- }
- }
+ boolean isExceedLimit = tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode());
+ if (isExceedLimit) {
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(),
+ "schedule failed too many times and " + e.getMessage());
} else {
// we must release resource it current hold, and be scheduled again
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
- dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
+ addBackToPendingTablets(tabletCtx);
}
} else if (e.getStatus() == Status.FINISHED) {
// schedule redundant tablet or scheduler disabled will throw this exception
@@ -485,6 +477,8 @@ public class TabletScheduler extends MasterDaemon {
tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
+ tbl.getName() + " does not exist"));
try {
+ long tabletId = tabletCtx.getTabletId();
+
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
OlapTableState tableState = tbl.getState();
@@ -499,8 +493,9 @@ public class TabletScheduler extends MasterDaemon {
throw new SchedException(Status.UNRECOVERABLE, "index does not exist");
}
- Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+ Tablet tablet = idx.getTablet(tabletId);
Preconditions.checkNotNull(tablet);
+ ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
@@ -516,18 +511,26 @@ public class TabletScheduler extends MasterDaemon {
Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
TabletStatus st = tablet.getColocateHealthStatus(
- partition.getVisibleVersion(),
- tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
- backendsSet);
+ partition.getVisibleVersion(), replicaAlloc, backendsSet);
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
- infoService,
- partition.getVisibleVersion(),
- tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
- aliveBeIds);
+ infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
+ }
+
+ if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
+ TabletSchedCtx.Type curType = tabletCtx.getType();
+ TabletSchedCtx.Type newType = allTabletTypes.get(tabletId);
+ if (curType == TabletSchedCtx.Type.BALANCE && newType == TabletSchedCtx.Type.REPAIR) {
+ tabletCtx.setType(newType);
+ tabletCtx.setReplicaAlloc(replicaAlloc);
+ tabletCtx.setTag(null);
+ } else {
+ throw new SchedException(Status.UNRECOVERABLE, "can not convert type of tablet "
+ + tabletId + " from " + curType.name() + " to " + newType.name());
+ }
}
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE && tableState != OlapTableState.NORMAL) {
@@ -732,11 +735,11 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaVersionMissingErr.incrementAndGet();
-
try {
tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
} catch (SchedException e) {
- if (e.getMessage().equals("unable to choose dest replica")) {
+ // could not find dest, try add a missing.
+ if (e.getStatus() == Status.UNRECOVERABLE) {
// This situation may occur when the BE nodes
// where all replicas of a tablet are located are decommission,
// and this task is a VERSION_INCOMPLETE task.
@@ -779,25 +782,7 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaUnavailableErr.incrementAndGet();
- try {
- handleReplicaVersionIncomplete(tabletCtx, batchTask);
- LOG.debug("succeed to find version incomplete replica from tablet relocating. tablet id: {}",
- tabletCtx.getTabletId());
- } catch (SchedException e) {
- if (e.getStatus() == Status.SCHEDULE_FAILED) {
- LOG.debug("failed to find version incomplete replica from tablet relocating. tablet id: {}, "
- + "try to find a new backend", tabletCtx.getTabletId());
- // the dest or src slot may be taken after calling handleReplicaVersionIncomplete(),
- // so we need to release these slots first.
- // and reserve the tablet in TabletSchedCtx so that it can continue to be scheduled.
- tabletCtx.releaseResource(this, true);
- tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING);
- handleReplicaMissing(tabletCtx, batchTask);
- LOG.debug("succeed to find new backend for tablet relocating. tablet id: {}", tabletCtx.getTabletId());
- } else {
- throw e;
- }
- }
+ handleReplicaVersionIncomplete(tabletCtx, batchTask);
}
/**
@@ -831,7 +816,7 @@ public class TabletScheduler extends MasterDaemon {
// to remove this tablet from the pendingTablets(consider it as finished)
throw new SchedException(Status.FINISHED, "redundant replica is deleted");
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any redundant replicas");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
}
private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean force) throws SchedException {
@@ -1035,7 +1020,7 @@ public class TabletScheduler extends MasterDaemon {
deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false);
throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted");
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any colocate redundant replicas");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas");
}
/**
@@ -1102,16 +1087,17 @@ public class TabletScheduler extends MasterDaemon {
replica.setWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
// set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task.
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
- throw new SchedException(Status.SCHEDULE_FAILED, "set watermark txn " + nextTxnId);
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
+ "set watermark txn " + nextTxnId);
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
try {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
- throw new SchedException(Status.SCHEDULE_FAILED,
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before " + watermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
@@ -1225,7 +1211,7 @@ public class TabletScheduler extends MasterDaemon {
}
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
// add if task from prio backend or cluster is balanced
- if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) {
+ if (alternativeTablets.isEmpty() || tabletCtx.getPriority() == TabletSchedCtx.Priority.NORMAL) {
addTablet(tabletCtx, false);
}
}
@@ -1260,7 +1246,8 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag statistic = statisticMap.get(tag);
if (statistic == null) {
throw new SchedException(Status.UNRECOVERABLE,
- String.format("tag %s does not exist.", tag));
+ String.format("tag %s does not exist. available tags: %s", tag,
+ Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray())));
}
beStatistics = statistic.getSortedBeLoadStats(null /* sorted ignore medium */);
} else {
@@ -1336,7 +1323,7 @@ public class TabletScheduler extends MasterDaemon {
}
if (allFitPaths.isEmpty()) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path for new replica");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
}
// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
@@ -1390,25 +1377,109 @@ public class TabletScheduler extends MasterDaemon {
return rootPathLoadStatistic;
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest path which can be fit in");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path which can be fit in");
}
- /**
- * For some reason, a tablet info failed to be scheduled this time,
- * So we dynamically change its priority and add back to queue, waiting for next round.
- */
- private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx tabletCtx, String message) {
+
+ private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING);
- tabletCtx.adjustPriority(stat);
addTablet(tabletCtx, true /* force */);
}
+
private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
// use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock)
// remove the tablet ctx, so that no other process can see it
removeTabletCtx(tabletCtx, reason);
// release resources taken by tablet ctx
releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE);
+
+ // if check immediately, then no need to wait TabletChecker's 20s
+ if (state == TabletSchedCtx.State.FINISHED) {
+ tryAddAfterFinished(tabletCtx);
+ }
+ }
+
+ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
+ int finishedCounter = tabletCtx.getFinishedCounter();
+ finishedCounter++;
+ tabletCtx.setFinishedCounter(finishedCounter);
+ if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) {
+ return;
+ }
+
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId());
+ if (db == null) {
+ return;
+ }
+ OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId());
+ if (tbl == null) {
+ return;
+ }
+ Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
+ ReplicaAllocation replicaAlloc = null;
+ tbl.readLock();
+ try {
+ Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
+ if (partition == null) {
+ return;
+ }
+
+ MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
+ if (idx == null) {
+ return;
+ }
+
+ Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+ if (tablet == null) {
+ return;
+ }
+
+ replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+ boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
+ if (isColocateTable) {
+ GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
+ if (groupId == null) {
+ return;
+ }
+
+ int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
+ if (tabletOrderIdx == -1) {
+ tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
+ }
+ Preconditions.checkState(tabletOrderIdx != -1);
+
+ Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+ TabletStatus st = tablet.getColocateHealthStatus(
+ partition.getVisibleVersion(), replicaAlloc, backendsSet);
+ statusPair = Pair.of(st, Priority.HIGH);
+ } else {
+ List<Long> aliveBeIds = infoService.getAllBackendIds(true);
+ statusPair = tablet.getHealthStatusWithPriority(
+ infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
+
+ if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) {
+ statusPair.second = tabletCtx.getPriority();
+ }
+ }
+ } finally {
+ tbl.readUnlock();
+ }
+
+ if (statusPair.first == TabletStatus.HEALTHY) {
+ return;
+ }
+
+ TabletSchedCtx newTabletCtx = new TabletSchedCtx(
+ TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(),
+ tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(),
+ replicaAlloc, System.currentTimeMillis());
+
+ newTabletCtx.setTabletStatus(statusPair.first);
+ newTabletCtx.setPriority(statusPair.second);
+ newTabletCtx.setFinishedCounter(finishedCounter);
+
+ addTablet(newTabletCtx, false);
}
private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) {
@@ -1422,7 +1493,7 @@ public class TabletScheduler extends MasterDaemon {
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) {
runningTablets.remove(tabletCtx.getTabletId());
- allTabletIds.remove(tabletCtx.getTabletId());
+ allTabletTypes.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
}
@@ -1430,15 +1501,27 @@ public class TabletScheduler extends MasterDaemon {
// get next batch of tablets from queue.
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
List<TabletSchedCtx> list = Lists.newArrayList();
- int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
- while (count > 0) {
+ int slotNum = getCurrentAvailableSlotNum();
+ // Make slotNum >= 1 to ensure that it could return at least 1 ctx
+ // when the pending list is not empty.
+ if (slotNum < 1) {
+ slotNum = 1;
+ }
+ while (list.size() < Config.schedule_batch_size && slotNum > 0) {
TabletSchedCtx tablet = pendingTablets.poll();
if (tablet == null) {
// no more tablets
break;
}
list.add(tablet);
- count--;
+ TabletStatus status = tablet.getTabletStatus();
+ // for a clone, it will take 2 slots: src slot and dst slot.
+ if (!(status == TabletStatus.REDUNDANT
+ || status == TabletStatus.FORCE_REDUNDANT
+ || status == TabletStatus.COLOCATE_REDUNDANT
+ || status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) {
+ slotNum -= 2;
+ }
}
return list;
}
@@ -1469,9 +1552,12 @@ public class TabletScheduler extends MasterDaemon {
// if we have a success task, then stat must be refreshed before schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
+ } else {
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ request.getTaskStatus().getErrorMsgs().get(0));
}
- // we need this function to free slot for this migration task
- finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
+
return true;
}
@@ -1496,12 +1582,20 @@ public class TabletScheduler extends MasterDaemon {
try {
tabletCtx.finishCloneTask(cloneTask, request);
} catch (SchedException e) {
- tabletCtx.increaseFailedRunningCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.RUNNING_FAILED) {
- stat.counterCloneTaskFailed.incrementAndGet();
- addToRunningTablets(tabletCtx);
- return false;
+ tabletCtx.increaseFailedRunningCounter();
+ if (!tabletCtx.isExceedFailedRunningLimit()) {
+ stat.counterCloneTaskFailed.incrementAndGet();
+ addToRunningTablets(tabletCtx);
+ return false;
+ } else {
+ // unrecoverable
+ stat.counterTabletScheduledDiscard.incrementAndGet();
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ e.getMessage());
+ return true;
+ }
} else if (e.getStatus() == Status.UNRECOVERABLE) {
// unrecoverable
stat.counterTabletScheduledDiscard.incrementAndGet();
@@ -1639,7 +1733,7 @@ public class TabletScheduler extends MasterDaemon {
}
public synchronized int getTotalNum() {
- return allTabletIds.size();
+ return allTabletTypes.size();
}
public synchronized long getBalanceTabletsNumber() {
@@ -1655,10 +1749,12 @@ public class TabletScheduler extends MasterDaemon {
public static class PathSlot {
// path hash -> slot num
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
+ private long beId;
- public PathSlot(List<Long> paths, int initSlotNum) {
+ public PathSlot(List<Long> paths, long beId) {
+ this.beId = beId;
for (Long pathHash : paths) {
- pathSlots.put(pathHash, new Slot(initSlotNum));
+ pathSlots.put(pathHash, new Slot(beId));
}
}
@@ -1670,22 +1766,8 @@ public class TabletScheduler extends MasterDaemon {
// add new path
for (Long pathHash : paths) {
if (!pathSlots.containsKey(pathHash)) {
- pathSlots.put(pathHash, new Slot(Config.schedule_slot_num_per_path));
- }
- }
- }
-
- // Update the total slots num of specified paths, increase or decrease
- public synchronized void updateSlot(List<Long> pathHashs, int delta) {
- for (Long pathHash : pathHashs) {
- Slot slot = pathSlots.get(pathHash);
- if (slot == null) {
- continue;
+ pathSlots.put(pathHash, new Slot(beId));
}
-
- slot.total += delta;
- slot.rectify();
- LOG.debug("decrease path {} slots num to {}", pathHash, pathSlots.get(pathHash).total);
}
}
@@ -1701,6 +1783,21 @@ public class TabletScheduler extends MasterDaemon {
slot.totalCopyTimeMs += copyTimeMs;
}
+ public synchronized boolean hasAvailableSlot(long pathHash) {
+ if (pathHash == -1) {
+ return false;
+ }
+
+ Slot slot = pathSlots.get(pathHash);
+ if (slot == null) {
+ return false;
+ }
+ if (slot.getAvailable() == 0) {
+ return false;
+ }
+ return true;
+ }
+
/**
* If the specified 'pathHash' has available slot, decrease the slot number and return this path hash
*/
@@ -1709,7 +1806,8 @@ public class TabletScheduler extends MasterDaemon {
if (LOG.isDebugEnabled()) {
LOG.debug("path hash is not set.", new Exception());
}
- throw new SchedException(Status.SCHEDULE_FAILED, "path hash is not set");
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "path hash is not set");
}
Slot slot = pathSlots.get(pathHash);
@@ -1717,12 +1815,11 @@ public class TabletScheduler extends MasterDaemon {
LOG.debug("path {} is not exist", pathHash);
return -1;
}
- slot.rectify();
- if (slot.available <= 0) {
+ if (slot.used >= slot.getTotal()) {
LOG.debug("path {} has no available slot", pathHash);
return -1;
}
- slot.available--;
+ slot.used++;
return pathHash;
}
@@ -1731,23 +1828,15 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
- slot.available++;
- slot.rectify();
- }
-
- public synchronized int peekSlot(long pathHash) {
- Slot slot = pathSlots.get(pathHash);
- if (slot == null) {
- return -1;
+ if (slot.used > 0) {
+ slot.used--;
}
- slot.rectify();
- return slot.available;
}
public synchronized int getTotalAvailSlotNum() {
int total = 0;
for (Slot slot : pathSlots.values()) {
- total += slot.available;
+ total += slot.getAvailable();
}
return total;
}
@@ -1758,7 +1847,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized Set<Long> getAvailPathsForBalance() {
Set<Long> pathHashs = Sets.newHashSet();
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- if (entry.getValue().balanceSlot > 0) {
+ if (entry.getValue().getBalanceAvailable() > 0) {
pathHashs.add(entry.getKey());
}
}
@@ -1768,7 +1857,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized int getAvailBalanceSlotNum() {
int num = 0;
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- num += entry.getValue().balanceSlot;
+ num += entry.getValue().getBalanceAvailable();
}
return num;
}
@@ -1776,13 +1865,12 @@ public class TabletScheduler extends MasterDaemon {
public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
- value.rectify();
List<String> result = Lists.newArrayList();
result.add(String.valueOf(beId));
result.add(String.valueOf(key));
- result.add(String.valueOf(value.available));
- result.add(String.valueOf(value.total));
- result.add(String.valueOf(value.balanceSlot));
+ result.add(String.valueOf(value.getAvailable()));
+ result.add(String.valueOf(value.getTotal()));
+ result.add(String.valueOf(value.getBalanceAvailable()));
result.add(String.valueOf(value.getAvgRate()));
results.add(result);
});
@@ -1794,8 +1882,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return -1;
}
- if (slot.balanceSlot > 0) {
- slot.balanceSlot--;
+ if (slot.balanceUsed < slot.getBalanceTotal()) {
+ slot.balanceUsed++;
return pathHash;
}
return -1;
@@ -1807,8 +1895,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
continue;
}
- if (slot.balanceSlot > 0) {
- slot.balanceSlot--;
+ if (slot.balanceUsed < slot.getBalanceTotal()) {
+ slot.balanceUsed++;
return pathHash;
}
}
@@ -1820,8 +1908,9 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
- slot.balanceSlot++;
- slot.rectify();
+ if (slot.balanceUsed > 0) {
+ slot.balanceUsed--;
+ }
}
public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
@@ -1851,10 +1940,8 @@ public class TabletScheduler extends MasterDaemon {
}
public static class Slot {
- public int total;
- public int available;
- // slot reserved for balance
- public int balanceSlot;
+ public int used;
+ public int balanceUsed;
public long totalCopySize = 0;
public long totalCopyTimeMs = 0;
@@ -1862,23 +1949,35 @@ public class TabletScheduler extends MasterDaemon {
// for disk balance
public long diskBalanceLastSuccTime = 0;
- public Slot(int total) {
- this.total = total;
- this.available = total;
- this.balanceSlot = Config.balance_slot_num_per_path;
+ private long beId;
+
+ public Slot(long beId) {
+ this.beId = beId;
+ this.used = 0;
+ this.balanceUsed = 0;
}
- public void rectify() {
- if (total <= 0) {
- total = 1;
- }
- if (available > total) {
- available = total;
- }
+ public int getAvailable() {
+ return Math.max(0, getTotal() - used);
+ }
+
+ public int getTotal() {
+ int total = Math.max(1, Config.schedule_slot_num_per_path);
- if (balanceSlot > Config.balance_slot_num_per_path) {
- balanceSlot = Config.balance_slot_num_per_path;
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ if (be != null && be.isDecommissioned()) {
+ total = Math.max(1, Config.schedule_decommission_slot_num_per_path);
}
+
+ return total;
+ }
+
+ public int getBalanceAvailable() {
+ return Math.max(0, getBalanceTotal() - balanceUsed);
+ }
+
+ public int getBalanceTotal() {
+ return Math.max(1, Config.balance_slot_num_per_path);
}
// return avg rate, Bytes/S
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index e2aada6f1f..4441a99431 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -29,15 +29,15 @@ import com.google.common.collect.Lists;
import java.util.List;
/*
- * show proc "/tablet_scheduler/pending_tablets";
- * show proc "/tablet_scheduler/running_tablets";
- * show proc "/tablet_scheduler/history_tablets";
+ * show proc "/cluster_balance/pending_tablets";
+ * show proc "/cluster_balance/running_tablets";
+ * show proc "/cluster_balance/history_tablets";
*/
public class TabletSchedulerDetailProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
- .add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe")
+ .add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
- .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer")
+ .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
.add("CmtVer").add("ErrMsg")
.build();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index 41df080a75..d4578e17d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -39,17 +39,17 @@ public class TabletSchedCtxTest {
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());
- ctx1.setOrigPriority(Priority.NORMAL);
+ ctx1.setPriority(Priority.NORMAL);
ctx1.setLastVisitedTime(2);
TabletSchedCtx ctx2 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
- ctx2.setOrigPriority(Priority.NORMAL);
+ ctx2.setPriority(Priority.NORMAL);
ctx2.setLastVisitedTime(3);
TabletSchedCtx ctx3 = new TabletSchedCtx(Type.REPAIR,
- 1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
- ctx3.setOrigPriority(Priority.NORMAL);
+ 1, 2, 3, 4, 1002, replicaAlloc, System.currentTimeMillis());
+ ctx3.setPriority(Priority.NORMAL);
ctx3.setLastVisitedTime(1);
pendingTablets.add(ctx1);
@@ -62,8 +62,8 @@ public class TabletSchedCtxTest {
// priority is not equal, info2 is HIGH, should ranks ahead
pendingTablets.clear();
- ctx1.setOrigPriority(Priority.NORMAL);
- ctx2.setOrigPriority(Priority.HIGH);
+ ctx1.setPriority(Priority.NORMAL);
+ ctx2.setPriority(Priority.HIGH);
ctx1.setLastVisitedTime(2);
ctx2.setLastVisitedTime(2);
pendingTablets.add(ctx2);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 95fb22eac6..209f8a1127 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test;
import java.util.List;
public class DecommissionBackendTest extends TestWithFeService {
-
@Override
protected int backendNum() {
return 3;
@@ -42,6 +41,7 @@ public class DecommissionBackendTest extends TestWithFeService {
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
+ needCleanDir = false;
}
@BeforeAll
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index ac2fe5978d..f2610738f3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -119,6 +119,7 @@ public abstract class TestWithFeService {
protected String dorisHome;
protected String runningDir = "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
protected ConnectContext connectContext;
+ protected boolean needCleanDir = true;
protected static final String DEFAULT_CLUSTER_PREFIX = "default_cluster:";
@@ -140,7 +141,9 @@ public abstract class TestWithFeService {
runAfterAll();
Env.getCurrentEnv().clear();
StatementScopeIdGenerator.clear();
- cleanDorisFeDir();
+ if (needCleanDir) {
+ cleanDorisFeDir();
+ }
}
@BeforeEach
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org