You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/13 09:09:34 UTC

[incubator-doris] branch master updated: [Feature] Add degradate strategy for local_replica_selection. (#7064)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ccef4  [Feature] Add degradate strategy for local_replica_selection. (#7064)
93ccef4 is described below

commit 93ccef4ec764e9ba153947d1657006ac65907efa
Author: 曹建华 <ca...@bytedance.com>
AuthorDate: Sat Nov 13 17:09:25 2021 +0800

    [Feature] Add degradate strategy for local_replica_selection. (#7064)
    
    When local_replica_selection is turned on, support select a non-local BE to service the query
    when the local be is unavailable
---
 docs/en/administrator-guide/config/fe_config.md    | 10 ++++++-
 docs/zh-CN/administrator-guide/config/fe_config.md | 10 ++++++-
 .../main/java/org/apache/doris/catalog/Tablet.java |  7 ++---
 .../main/java/org/apache/doris/common/Config.java  | 14 ++++++++--
 .../org/apache/doris/planner/OlapScanNode.java     | 24 ++++------------
 .../main/java/org/apache/doris/qe/Coordinator.java | 32 ++++++++++++++++++++--
 6 files changed, 67 insertions(+), 30 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md
index 2852e88..85f2249 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -783,7 +783,15 @@ If set to true, Planner will try to select replica of tablet on same host as thi
 -  N hosts with N Backends and N Frontends deployed. 
 - The data has N replicas. 
 -  High concurrency queries are syyuyuient to all Frontends evenly 
--  In this case, all Frontends can only use local replicas to do the query.
+-  In this case, all Frontends can only use local replicas to do the query. If you want to allow fallback to nonlocal replicas when no local replicas available, set enable_local_replica_selection_fallback to true.
+
+### enable_local_replica_selection_fallback
+
+Default:false
+
+IsMutable:true
+
+Used with enable_local_replica_selection. If the local replicas is not available, fallback to the nonlocal replicas.
 
 ### max_unfinished_load_job
 
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md
index d67952c..9c0fe26 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -775,7 +775,15 @@ fe 会在每隔 es_state_sync_interval_secs 调用 es api 获取 es 索引分片
 
 3. 高并发查询均匀发送到所有 Frontends
 
-在这种情况下,所有 Frontends 只能使用本地副本进行查询。
+在这种情况下,所有 Frontends 只能使用本地副本进行查询。如果想当本地副本不可用时,使用非本地副本服务查询,请将 enable_local_replica_selection_fallback 设置为 true
+
+### enable_local_replica_selection_fallback
+
+默认值:false
+
+是否可以动态配置:true
+
+与 enable_local_replica_selection 配合使用,当本地副本不可用时,使用非本地副本服务查询。
 
 ### max_unfinished_load_job
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index f40216c..fe6b7a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -213,8 +213,8 @@ public class Tablet extends MetaObject implements Writable {
     }
 
     // for query
-    public void getQueryableReplicas(List<Replica> allQuerableReplica, List<Replica> localReplicas,
-            long visibleVersion, long visibleVersionHash, long localBeId, int schemaHash) {
+    public void getQueryableReplicas(List<Replica> allQuerableReplica, long visibleVersion,
+                                     long visibleVersionHash, int schemaHash) {
         for (Replica replica : replicas) {
             if (replica.isBad()) {
                 continue;
@@ -231,9 +231,6 @@ public class Tablet extends MetaObject implements Writable {
                 if (replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)
                         && (replica.getSchemaHash() == -1 || replica.getSchemaHash() == schemaHash)) {
                     allQuerableReplica.add(replica);
-                    if (localBeId != -1 && replica.getBackendId() == localBeId) {
-                        localReplicas.add(replica);
-                    }
                 }
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index cd02ec1..3ab873e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -168,7 +168,7 @@ public class Config extends ConfigBase {
      */
     @ConfField
     public static int label_clean_interval_second = 1 * 3600; // 1 hours
-    
+
     // Configurations for meta data durability
     /**
      * Doris meta data will be saved here.
@@ -1015,11 +1015,21 @@ public class Config extends ConfigBase {
      * 2. The data has N replicas.
      * 3. High concurrency queries are sent to all Frontends evenly
      * In this case, all Frontends can only use local replicas to do the query.
+     * If you want to allow fallback to nonlocal replicas when no local replicas available,
+     * set enable_local_replica_selection_fallback to true.
      */
     @ConfField(mutable = true)
     public static boolean enable_local_replica_selection = false;
 
     /**
+     * Used with enable_local_replica_selection.
+     * If the local replicas is not available, fallback to the nonlocal replicas.
+     * */
+    @ConfField(mutable = true)
+    public static boolean enable_local_replica_selection_fallback = false;
+
+
+    /**
      * The timeout of executing async remote fragment.
      * In normal case, the async remote fragment will be executed in a short time. If system are under high load
      * condition,try to set this timeout longer.
@@ -1475,7 +1485,7 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = false, masterOnly = true)
     public static int partition_in_memory_update_interval_secs = 300;
-    
+
     @ConfField(masterOnly = true)
     public static boolean enable_concurrent_update = false;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d980c28..a1c5728 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -455,8 +455,7 @@ public class OlapScanNode extends ScanNode {
 
     private void addScanRangeLocations(Partition partition,
                                        MaterializedIndex index,
-                                       List<Tablet> tablets,
-                                       long localBeId) throws UserException {
+                                       List<Tablet> tablets) throws UserException {
         int logNum = 0;
         int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
         String schemaHashStr = String.valueOf(schemaHash);
@@ -484,11 +483,9 @@ public class OlapScanNode extends ScanNode {
             paloRange.setTabletId(tabletId);
 
             // random shuffle List && only collect one copy
-            List<Replica> allQueryableReplicas = Lists.newArrayList();
-            List<Replica> localReplicas = Lists.newArrayList();
-            tablet.getQueryableReplicas(allQueryableReplicas, localReplicas,
-                    visibleVersion, visibleVersionHash, localBeId, schemaHash);
-            if (allQueryableReplicas.isEmpty()) {
+            List<Replica> replicas = Lists.newArrayList();
+            tablet.getQueryableReplicas(replicas, visibleVersion, visibleVersionHash, schemaHash);
+            if (replicas.isEmpty()) {
                 LOG.error("no queryable replica found in tablet {}. visible version {}-{}",
                         tabletId, visibleVersion, visibleVersionHash);
                 if (LOG.isDebugEnabled()) {
@@ -499,13 +496,6 @@ public class OlapScanNode extends ScanNode {
                 throw new UserException("Failed to get scan range, no queryable replica found in tablet: " + tabletId);
             }
 
-            List<Replica> replicas = null;
-            if (!localReplicas.isEmpty()) {
-                replicas = localReplicas;
-            } else {
-                replicas = allQueryableReplicas;
-            }
-
             Collections.shuffle(replicas);
             boolean tabletIsNull = true;
             boolean collectedStat = false;
@@ -621,10 +611,6 @@ public class OlapScanNode extends ScanNode {
     }
 
     private void computeTabletInfo() throws UserException {
-        long localBeId = -1;
-        if (Config.enable_local_replica_selection) {
-            localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
-        }
         /**
          * The tablet info could be computed only once.
          * So the scanBackendIds should be empty in the beginning.
@@ -655,7 +641,7 @@ public class OlapScanNode extends ScanNode {
 
             totalTabletsNum += selectedTable.getTablets().size();
             selectedTabletsNum += tablets.size();
-            addScanRangeLocations(partition, selectedTable, tablets, localBeId);
+            addScanRangeLocations(partition, selectedTable, tablets);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a091d87..1e69a03 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1411,17 +1411,45 @@ public class Coordinator {
     public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
                                           HashMap<TNetworkAddress, Long> assignedBytesPerHost,
                                           Reference<Long> backendIdRef) throws UserException {
+        if (!Config.enable_local_replica_selection) {
+            return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, backendIdRef);
+        }
+
+        List<TScanRangeLocation> localLocations = new ArrayList<>();
+        List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
+        long localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
+        for (final TScanRangeLocation location : seqLocation.getLocations()) {
+            if (location.backend_id == localBeId) {
+                localLocations.add(location);
+            } else {
+                nonlocalLocations.add(location);
+            }
+        }
+
+        try {
+            return selectBackendsByRoundRobin(localLocations, assignedBytesPerHost, backendIdRef);
+        } catch (UserException ue) {
+            if (!Config.enable_local_replica_selection_fallback) {
+                throw ue;
+            }
+            return selectBackendsByRoundRobin(nonlocalLocations, assignedBytesPerHost, backendIdRef);
+        }
+    }
+
+    public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
+                                                         HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+                                                         Reference<Long> backendIdRef) throws UserException {
         Long minAssignedBytes = Long.MAX_VALUE;
         TScanRangeLocation minLocation = null;
         Long step = 1L;
-        for (final TScanRangeLocation location : seqLocation.getLocations()) {
+        for (final TScanRangeLocation location : locations) {
             Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
             if (assignedBytes < minAssignedBytes) {
                 minAssignedBytes = assignedBytes;
                 minLocation = location;
             }
         }
-        TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef);
+        TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations, this.idToBackend, backendIdRef);
         if (assignedBytesPerHost.containsKey(location.server)) {
             assignedBytesPerHost.put(location.server,
                     assignedBytesPerHost.get(location.server) + step);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org