You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/13 09:09:34 UTC
[incubator-doris] branch master updated: [Feature] Add degradate strategy for local_replica_selection. (#7064)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 93ccef4 [Feature] Add degradate strategy for local_replica_selection. (#7064)
93ccef4 is described below
commit 93ccef4ec764e9ba153947d1657006ac65907efa
Author: 曹建华 <ca...@bytedance.com>
AuthorDate: Sat Nov 13 17:09:25 2021 +0800
[Feature] Add degradate strategy for local_replica_selection. (#7064)
When local_replica_selection is turned on, support select a non-local BE to service the query
when the local be is unavailable
---
docs/en/administrator-guide/config/fe_config.md | 10 ++++++-
docs/zh-CN/administrator-guide/config/fe_config.md | 10 ++++++-
.../main/java/org/apache/doris/catalog/Tablet.java | 7 ++---
.../main/java/org/apache/doris/common/Config.java | 14 ++++++++--
.../org/apache/doris/planner/OlapScanNode.java | 24 ++++------------
.../main/java/org/apache/doris/qe/Coordinator.java | 32 ++++++++++++++++++++--
6 files changed, 67 insertions(+), 30 deletions(-)
diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md
index 2852e88..85f2249 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -783,7 +783,15 @@ If set to true, Planner will try to select replica of tablet on same host as thi
- N hosts with N Backends and N Frontends deployed.
- The data has N replicas.
- High concurrency queries are syyuyuient to all Frontends evenly
-- In this case, all Frontends can only use local replicas to do the query.
+- In this case, all Frontends can only use local replicas to do the query. If you want to allow fallback to nonlocal replicas when no local replicas available, set enable_local_replica_selection_fallback to true.
+
+### enable_local_replica_selection_fallback
+
+Default:false
+
+IsMutable:true
+
+Used with enable_local_replica_selection. If the local replicas is not available, fallback to the nonlocal replicas.
### max_unfinished_load_job
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md
index d67952c..9c0fe26 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -775,7 +775,15 @@ fe 会在每隔 es_state_sync_interval_secs 调用 es api 获取 es 索引分片
3. 高并发查询均匀发送到所有 Frontends
-在这种情况下,所有 Frontends 只能使用本地副本进行查询。
+在这种情况下,所有 Frontends 只能使用本地副本进行查询。如果想当本地副本不可用时,使用非本地副本服务查询,请将 enable_local_replica_selection_fallback 设置为 true
+
+### enable_local_replica_selection_fallback
+
+默认值:false
+
+是否可以动态配置:true
+
+与 enable_local_replica_selection 配合使用,当本地副本不可用时,使用非本地副本服务查询。
### max_unfinished_load_job
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index f40216c..fe6b7a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -213,8 +213,8 @@ public class Tablet extends MetaObject implements Writable {
}
// for query
- public void getQueryableReplicas(List<Replica> allQuerableReplica, List<Replica> localReplicas,
- long visibleVersion, long visibleVersionHash, long localBeId, int schemaHash) {
+ public void getQueryableReplicas(List<Replica> allQuerableReplica, long visibleVersion,
+ long visibleVersionHash, int schemaHash) {
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
@@ -231,9 +231,6 @@ public class Tablet extends MetaObject implements Writable {
if (replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)
&& (replica.getSchemaHash() == -1 || replica.getSchemaHash() == schemaHash)) {
allQuerableReplica.add(replica);
- if (localBeId != -1 && replica.getBackendId() == localBeId) {
- localReplicas.add(replica);
- }
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index cd02ec1..3ab873e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -168,7 +168,7 @@ public class Config extends ConfigBase {
*/
@ConfField
public static int label_clean_interval_second = 1 * 3600; // 1 hours
-
+
// Configurations for meta data durability
/**
* Doris meta data will be saved here.
@@ -1015,11 +1015,21 @@ public class Config extends ConfigBase {
* 2. The data has N replicas.
* 3. High concurrency queries are sent to all Frontends evenly
* In this case, all Frontends can only use local replicas to do the query.
+ * If you want to allow fallback to nonlocal replicas when no local replicas available,
+ * set enable_local_replica_selection_fallback to true.
*/
@ConfField(mutable = true)
public static boolean enable_local_replica_selection = false;
/**
+ * Used with enable_local_replica_selection.
+ * If the local replicas is not available, fallback to the nonlocal replicas.
+ * */
+ @ConfField(mutable = true)
+ public static boolean enable_local_replica_selection_fallback = false;
+
+
+ /**
* The timeout of executing async remote fragment.
* In normal case, the async remote fragment will be executed in a short time. If system are under high load
* condition,try to set this timeout longer.
@@ -1475,7 +1485,7 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = false, masterOnly = true)
public static int partition_in_memory_update_interval_secs = 300;
-
+
@ConfField(masterOnly = true)
public static boolean enable_concurrent_update = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d980c28..a1c5728 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -455,8 +455,7 @@ public class OlapScanNode extends ScanNode {
private void addScanRangeLocations(Partition partition,
MaterializedIndex index,
- List<Tablet> tablets,
- long localBeId) throws UserException {
+ List<Tablet> tablets) throws UserException {
int logNum = 0;
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
String schemaHashStr = String.valueOf(schemaHash);
@@ -484,11 +483,9 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
- List<Replica> allQueryableReplicas = Lists.newArrayList();
- List<Replica> localReplicas = Lists.newArrayList();
- tablet.getQueryableReplicas(allQueryableReplicas, localReplicas,
- visibleVersion, visibleVersionHash, localBeId, schemaHash);
- if (allQueryableReplicas.isEmpty()) {
+ List<Replica> replicas = Lists.newArrayList();
+ tablet.getQueryableReplicas(replicas, visibleVersion, visibleVersionHash, schemaHash);
+ if (replicas.isEmpty()) {
LOG.error("no queryable replica found in tablet {}. visible version {}-{}",
tabletId, visibleVersion, visibleVersionHash);
if (LOG.isDebugEnabled()) {
@@ -499,13 +496,6 @@ public class OlapScanNode extends ScanNode {
throw new UserException("Failed to get scan range, no queryable replica found in tablet: " + tabletId);
}
- List<Replica> replicas = null;
- if (!localReplicas.isEmpty()) {
- replicas = localReplicas;
- } else {
- replicas = allQueryableReplicas;
- }
-
Collections.shuffle(replicas);
boolean tabletIsNull = true;
boolean collectedStat = false;
@@ -621,10 +611,6 @@ public class OlapScanNode extends ScanNode {
}
private void computeTabletInfo() throws UserException {
- long localBeId = -1;
- if (Config.enable_local_replica_selection) {
- localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
- }
/**
* The tablet info could be computed only once.
* So the scanBackendIds should be empty in the beginning.
@@ -655,7 +641,7 @@ public class OlapScanNode extends ScanNode {
totalTabletsNum += selectedTable.getTablets().size();
selectedTabletsNum += tablets.size();
- addScanRangeLocations(partition, selectedTable, tablets, localBeId);
+ addScanRangeLocations(partition, selectedTable, tablets);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a091d87..1e69a03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1411,17 +1411,45 @@ public class Coordinator {
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
Reference<Long> backendIdRef) throws UserException {
+ if (!Config.enable_local_replica_selection) {
+ return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, backendIdRef);
+ }
+
+ List<TScanRangeLocation> localLocations = new ArrayList<>();
+ List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
+ long localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
+ for (final TScanRangeLocation location : seqLocation.getLocations()) {
+ if (location.backend_id == localBeId) {
+ localLocations.add(location);
+ } else {
+ nonlocalLocations.add(location);
+ }
+ }
+
+ try {
+ return selectBackendsByRoundRobin(localLocations, assignedBytesPerHost, backendIdRef);
+ } catch (UserException ue) {
+ if (!Config.enable_local_replica_selection_fallback) {
+ throw ue;
+ }
+ return selectBackendsByRoundRobin(nonlocalLocations, assignedBytesPerHost, backendIdRef);
+ }
+ }
+
+ public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
+ HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+ Reference<Long> backendIdRef) throws UserException {
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
Long step = 1L;
- for (final TScanRangeLocation location : seqLocation.getLocations()) {
+ for (final TScanRangeLocation location : locations) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
- TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef);
+ TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations, this.idToBackend, backendIdRef);
if (assignedBytesPerHost.containsKey(location.server)) {
assignedBytesPerHost.put(location.server,
assignedBytesPerHost.get(location.server) + step);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org