You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/11 01:41:02 UTC

[incubator-doris] branch master updated: [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fa0122ed0 [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
8fa0122ed0 is described below

commit 8fa0122ed054369a6d570d2ff9848449ff269ed7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed May 11 09:40:57 2022 +0800

    [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
    
    There are many places in FE where a group of BE nodes needs to be selected according to certain requirements. for example:
    1. When creating replicas for a tablet.
    2. When selecting a BE to execute Insert.
    3. When Stream Load forwards http requests to BE nodes.
    
    These operations all have the same logic. So this CL mainly changes:
    1. Create a new `BeSelectionPolicy` class to describe the set of conditions for selecting BE.
    2. The logic of selecting BE nodes in `SystemInfoService` has been refactored, and the following two methods are used uniformly:
        1. `selectBackendIdsByPolicy`: Select the required number of BE nodes according to the `BeSelectionPolicy`.
        2. `selectBackendIdsForReplicaCreation`: Select the BE node for the replica creation operation.
    
    Note that there are some changes here:
    For the replica creation operation, the round-robin method was used to select BE nodes before,
    but now it is changed to `random` selection for the following reasons:
    1. Although the previous logic is round-robin, it is actually random.
    2. The final diff of the random algorithm will not be greater than 5%, so it can be considered that the random algorithm
         can distribute the data evenly.
---
 .../java/org/apache/doris/backup/RestoreJob.java   |   9 +-
 .../java/org/apache/doris/catalog/Catalog.java     |   8 +-
 .../java/org/apache/doris/catalog/OlapTable.java   |   3 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  53 ++--
 .../apache/doris/httpv2/util/LoadSubmitter.java    |  19 +-
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |  14 +-
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  14 +-
 .../org/apache/doris/system/BeSelectionPolicy.java | 131 ++++++++++
 .../org/apache/doris/system/SystemInfoService.java | 251 +++++--------------
 .../org/apache/doris/backup/RestoreJobTest.java    | 132 +---------
 .../org/apache/doris/catalog/CreateTableTest.java  |   6 +-
 .../apache/doris/catalog/ModifyBackendTest.java    |   6 +-
 .../doris/load/sync/canal/CanalSyncDataTest.java   |   7 +-
 .../java/org/apache/doris/qe/MultiLoadMgrTest.java |  13 +-
 .../apache/doris/system/SystemInfoServiceTest.java | 268 +++++++++++++++++++++
 15 files changed, 525 insertions(+), 409 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 0395b0d320..a70ad7fd57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -74,9 +74,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTaskType;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -87,6 +84,9 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Table.Cell;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -1018,7 +1018,8 @@ public class RestoreJob extends AbstractJob {
 
                 // replicas
                 try {
-                    Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+                    Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo()
+                            .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
                     for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
                         for (Long beId : entry.getValue()) {
                             long newReplicaId = catalog.getNextId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index d38ac2df64..d7e3d28e9d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -271,6 +271,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import javax.annotation.Nullable;
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -296,7 +297,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 
 public class Catalog {
     private static final Logger LOG = LogManager.getLogger(Catalog.class);
@@ -4543,10 +4543,12 @@ public class Catalog {
                 // This is the first colocate table in the group, or just a normal table,
                 // randomly choose backends
                 if (!Config.disable_storage_medium_check) {
-                    chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
+                    chosenBackendIds =
+                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
                             tabletMeta.getStorageMedium());
                 } else {
-                    chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+                    chosenBackendIds =
+                            getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
                 }
 
                 for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index b56ea19f30..e3ac7e8f5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -500,7 +500,8 @@ public class OlapTable extends Table {
 
                     // replicas
                     try {
-                        Map<Tag, List<Long>> tag2beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(
+                        Map<Tag, List<Long>> tag2beIds =
+                                Catalog.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
                                 replicaAlloc, db.getClusterName(), null);
                         for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
                             for (Long beId : entry3.getValue()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b9c85dbdfb..ebdda12926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -21,12 +21,14 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TNetworkAddress;
 
@@ -41,12 +43,10 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
-import java.util.List;
-
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import io.netty.handler.codec.http.HttpHeaderNames;
+import java.util.List;
 
 @RestController
 public class LoadAction extends RestBaseController {
@@ -145,21 +145,7 @@ public class LoadAction extends RestBaseController {
                     return new RestBaseResult(e.getMessage());
                 }
             } else {
-                // Choose a backend sequentially.
-                SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                        new SystemInfoService.BeAvailablePredicate(false, false, true);
-                List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                        1, beAvailablePredicate, false, clusterName, null, null);
-                if (backendIds == null) {
-                    return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-                }
-
-                Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
-                if (backend == null) {
-                    return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-                }
-
-                redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+                redirectAddr = selectRedirectBackend(clusterName);
             }
 
             LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@@ -194,22 +180,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
             }
 
-            // Choose a backend sequentially.
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                    1, beAvailablePredicate, false, clusterName, null, null);
-            if (backendIds == null) {
-                return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-            }
-
-            Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
-            if (backend == null) {
-                return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
-            }
-
-            TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
-
+            TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
             LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
                     redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
 
@@ -220,4 +191,18 @@ public class LoadAction extends RestBaseController {
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build();
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        if (backendIds.isEmpty()) {
+            throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+        }
+
+        Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+        if (backend == null) {
+            throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+        }
+        return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index 1a715cf0e3..31aeba813d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,10 +19,11 @@ package org.apache.doris.httpv2.util;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.httpv2.rest.UploadAction;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.base.Strings;
@@ -136,19 +137,15 @@ public class LoadSubmitter {
             return file;
         }
 
-        private Backend selectOneBackend() throws DdlException {
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                    1, beAvailablePredicate, false,
-                    SystemInfoService.DEFAULT_CLUSTER, null, null);
-            if (backendIds == null) {
-                throw new DdlException("No alive backend");
+        private Backend selectOneBackend() throws LoadException {
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+            List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (backendIds.isEmpty()) {
+                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
             }
-
             Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
             if (backend == null) {
-                throw new DdlException("No alive backend");
+                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
             }
             return backend;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 2cb8fe4ea6..ea0cb6cfda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -26,7 +26,7 @@ import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -63,13 +63,11 @@ public class InsertStreamTxnExecutor {
         StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
         StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
         TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
-        SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                new SystemInfoService.BeAvailablePredicate(false, true, true);
-        List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
-                1, beAvailablePredicate, false,
-                txnEntry.getDb().getClusterName(), null, null);
-        if (beIds == null || beIds.isEmpty()) {
-            throw new UserException("there is no backend load available or scanNode backend available.");
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
+                .needLoadAvailable().needQueryAvailable().build();
+        List<Long> beIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        if (beIds.isEmpty()) {
+            throw new UserException("No available backend to match the policy: " + policy);
         }
 
         tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index fd8cf80cac..606d68e6c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -91,14 +92,13 @@ public class MultiLoadMgr {
             if (infoMap.containsKey(multiLabel)) {
                 throw new LabelAlreadyUsedException(label);
             }
-            MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
-            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
-                    new SystemInfoService.BeAvailablePredicate(false, false, true);
-            List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1,
-                    beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null);
-            if (backendIds == null) {
-                throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(ConnectContext.get().getClusterName())
+                    .needLoadAvailable().build();
+            List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+            if (backendIds.isEmpty()) {
+                throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy);
             }
+            MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
             multiLoadDesc.setBackendId(backendIds.get(0));
             infoMap.put(multiLabel, multiLoadDesc);
         } finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
new file mode 100644
index 0000000000..e995c0aff5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Selection policy for building BE nodes
+ */
+public class BeSelectionPolicy {
+    public String cluster = SystemInfoService.DEFAULT_CLUSTER;
+    public boolean needScheduleAvailable = false;
+    public boolean needQueryAvailable = false;
+    public boolean needLoadAvailable = false;
+    // Resource tag. Empty means no need to consider resource tag.
+    public Set<Tag> resourceTags = Sets.newHashSet();
+    // storage medium. null means no need to consider storage medium.
+    public TStorageMedium storageMedium = null;
+    // Check if disk usage reaches limit. false means no need to check.
+    public boolean checkDiskUsage = false;
+    // If set to false, do not select backends on same host.
+    public boolean allowOnSameHost = false;
+
+    private BeSelectionPolicy() {
+
+    }
+
+    public static class Builder {
+        private BeSelectionPolicy policy;
+        public Builder() {
+            policy = new BeSelectionPolicy();
+        }
+
+        public Builder setCluster(String cluster) {
+            policy.cluster = cluster;
+            return this;
+        }
+
+        public Builder needScheduleAvailable() {
+            policy.needScheduleAvailable = true;
+            return this;
+        }
+
+        public Builder needQueryAvailable() {
+            policy.needQueryAvailable = true;
+            return this;
+        }
+
+        public Builder needLoadAvailable() {
+            policy.needLoadAvailable = true;
+            return this;
+        }
+
+        public Builder addTags(Set<Tag> tags) {
+            policy.resourceTags.addAll(tags);
+            return this;
+        }
+
+        public Builder setStorageMedium(TStorageMedium medium) {
+            policy.storageMedium = medium;
+            return this;
+        }
+
+        public Builder needCheckDiskUsage() {
+            policy.checkDiskUsage = true;
+            return this;
+        }
+
+        public Builder allowOnSameHost() {
+            policy.allowOnSameHost = true;
+            return this;
+        }
+
+        public BeSelectionPolicy build() {
+            return policy;
+        }
+    }
+
+    public boolean isMatch(Backend backend) {
+        if (needScheduleAvailable && !backend.isScheduleAvailable()
+                || needQueryAvailable && !backend.isQueryAvailable()
+                || needLoadAvailable && !backend.isLoadAvailable()
+                || !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag())
+                || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
+            return false;
+        }
+
+        if (checkDiskUsage) {
+            if (storageMedium == null && backend.diskExceedLimit()) {
+                return false;
+            }
+            if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("cluster|query|load|schedule|tags|medium: ");
+        sb.append(cluster).append("|");
+        sb.append(needQueryAvailable).append("|");
+        sb.append(needLoadAvailable).append("|");
+        sb.append(needScheduleAvailable).append("|");
+        sb.append(resourceTags).append("|");
+        sb.append(storageMedium);
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 39e577f55a..82571cd3d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -40,7 +40,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -61,10 +60,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public class SystemInfoService {
     private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
@@ -75,42 +72,16 @@ public class SystemInfoService {
 
     public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available.";
 
-    public static class BeAvailablePredicate {
-        private boolean scheduleAvailable;
+    private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
+    private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of();
 
-        private boolean queryAvailable;
-
-        private boolean loadAvailable;
-
-        public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) {
-            this.scheduleAvailable = scheduleAvailable;
-            this.queryAvailable = queryAvailable;
-            this.loadAvailable = loadAvailable;
-        }
-
-        public boolean isMatch(Backend backend) {
-            if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() ||
-                    loadAvailable && !backend.isLoadAvailable()) {
-                return false;
-            }
-            return true;
-        }
-    }
-
-    private volatile ImmutableMap<Long, Backend> idToBackendRef;
-    private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
-
-    // last backend id used by round robin for sequential choosing backends for
-    // tablet creation
-    private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
-    // last backend id used by round robin for sequential choosing backends in
-    // other jobs
-    private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap;
+    // last backend id used by round robin for sequential selecting backends for replica creation
+    private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap();
 
     private long lastBackendIdForCreation = -1;
     private long lastBackendIdForOther = -1;
 
-    private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef;
+    private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of();
 
     // sort host backends list by num of backends, descending
     private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() {
@@ -124,15 +95,6 @@ public class SystemInfoService {
         }
     };
 
-    public SystemInfoService() {
-        idToBackendRef = ImmutableMap.<Long, Backend>of();
-        idToReportVersionRef = ImmutableMap.<Long, AtomicLong>of();
-
-        lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
-        lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
-        pathHashToDishInfoRef = ImmutableMap.<Long, DiskInfo>of();
-    }
-
     // for deploy manager
     public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
         addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG);
@@ -432,9 +394,6 @@ public class SystemInfoService {
             LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size());
             return null;
         }
-
-        lastBackendIdForCreationMap.put(clusterName, (long) -1);
-        lastBackendIdForOtherMap.put(clusterName, (long) -1);
         return chosenBackendIds;
     }
 
@@ -462,9 +421,6 @@ public class SystemInfoService {
                 }
             }
         }
-
-        lastBackendIdForCreationMap.remove(clusterName);
-        lastBackendIdForOtherMap.remove(clusterName);
     }
 
     /**
@@ -779,21 +735,35 @@ public class SystemInfoService {
     }
 
 
-    // Find enough backend to allocate replica of a tablet.
-    // filters include: tag, cluster, storage medium
-    public Map<Tag, List<Long>> chooseBackendIdByFilters(ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
+    /**
+     * Select a set of backends for replica creation.
+     * The following parameters need to be considered when selecting backends.
+     *
+     * @param replicaAlloc
+     * @param clusterName
+     * @param storageMedium
+     * @return return the selected backend ids group by tag.
+     * @throws DdlException
+     */
+    public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
+            ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
             throws DdlException {
         Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
         Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
         short totalReplicaNum = 0;
-        BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false);
+
         for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
-            List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(),
-                    beAvailablePredicate, true, clusterName, storageMedium, entry.getKey());
-            if (beIds == null) {
-                throw new DdlException("Failed to find enough host with storage medium and tag("
-                        + (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey()
-                        + ") in all backends. need: " + entry.getValue());
+            BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
+                    .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
+                    .setStorageMedium(storageMedium);
+            if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
+                builder.allowOnSameHost();
+            }
+
+            BeSelectionPolicy policy = builder.build();
+            List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
+            if (beIds.isEmpty()) {
+                throw new DdlException("Failed to find " + entry.getValue() + " backends for policy: " + policy);
             }
             chosenBackendIds.put(entry.getKey(), beIds);
             totalReplicaNum += beIds.size();
@@ -802,61 +772,34 @@ public class SystemInfoService {
         return chosenBackendIds;
     }
 
-    public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate,
-                                                               boolean isCreate, String clusterName,
-                                                               TStorageMedium storageMedium, Tag tag) {
-        Stream<Backend> beStream = getClusterBackends(clusterName).stream();
-        if (storageMedium == null) {
-            beStream = beStream.filter(v -> !v.diskExceedLimit());
-        } else {
-            beStream = beStream.filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium));
+    /**
+     * Select a set of backends by the given policy.
+     *
+     * @param policy
+     * @param number number of backends which need to be selected.
+     * @return return #number of backend ids,
+     * or empty set if no backends match the policy, or the number of matched backends is less than "number";
+     */
+    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+        List<Backend> candidates =
+                idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
+        if (candidates.size() < number) {
+            return Lists.newArrayList();
         }
-        if (tag != null) {
-            beStream = beStream.filter(v -> v.getTag().equals(tag));
+        // If only need one Backend, just return a random one.
+        if (number == 1) {
+            Collections.shuffle(candidates);
+            return Lists.newArrayList(candidates.get(0).getId());
         }
-        final List<Backend> backends = beStream.collect(Collectors.toList());
-        return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends);
-    }
 
-    // choose backends by round robin
-    // return null if not enough backend
-    // use synchronized to run serially
-    public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate,
-                                                       boolean isCreate, String clusterName,
-                                                       final List<Backend> srcBackends) {
-        long lastBackendId;
-
-        if (clusterName.equals(DEFAULT_CLUSTER)) {
-            if (isCreate) {
-                lastBackendId = lastBackendIdForCreation;
-            } else {
-                lastBackendId = lastBackendIdForOther;
-            }
-        } else {
-            if (isCreate) {
-                if (lastBackendIdForCreationMap.containsKey(clusterName)) {
-                    lastBackendId = lastBackendIdForCreationMap.get(clusterName);
-                } else {
-                    lastBackendId = -1;
-                    lastBackendIdForCreationMap.put(clusterName, lastBackendId);
-                }
-            } else {
-                if (lastBackendIdForOtherMap.containsKey(clusterName)) {
-                    lastBackendId = lastBackendIdForOtherMap.get(clusterName);
-                } else {
-                    lastBackendId = -1;
-                    lastBackendIdForOtherMap.put(clusterName, lastBackendId);
-                }
-            }
+        if (policy.allowOnSameHost) {
+            Collections.shuffle(candidates);
+            return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
         }
 
-        // host -> BE list
-        List<Backend> sourceBackend = srcBackends;
-        if (sourceBackend == null) {
-            sourceBackend = getClusterBackends(clusterName);
-        }
+        // for each host, random select one backend.
         Map<String, List<Backend>> backendMaps = Maps.newHashMap();
-        for (Backend backend : sourceBackend) {
+        for (Backend backend : candidates) {
             if (backendMaps.containsKey(backend.getHost())) {
                 backendMaps.get(backend.getHost()).add(backend);
             } else {
@@ -865,94 +808,16 @@ public class SystemInfoService {
                 backendMaps.put(backend.getHost(), list);
             }
         }
-
-        // if more than one backend exists in same host, select a backend at random
-        List<Backend> backends = Lists.newArrayList();
+        candidates.clear();
         for (List<Backend> list : backendMaps.values()) {
-            if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
-                backends.addAll(list);
-            } else {
-                list = list.stream().filter(beAvailablePredicate::isMatch).collect(Collectors.toList());
-                if (list.isEmpty()) {
-                    continue;
-                }
-                Collections.shuffle(list);
-                backends.add(list.get(0));
-            }
+            Collections.shuffle(list);
+            candidates.add(list.get(0));
         }
-
-        Collections.shuffle(backends);
-
-        List<Long> backendIds = Lists.newArrayList();
-        // get last backend index
-        int lastBackendIndex = -1;
-        int index = -1;
-        for (Backend backend : backends) {
-            index++;
-            if (backend.getId() == lastBackendId) {
-                lastBackendIndex = index;
-                break;
-            }
+        if (candidates.size() < number) {
+            return Lists.newArrayList();
         }
-        Iterator<Backend> iterator = Iterators.cycle(backends);
-        index = -1;
-        boolean failed = false;
-        // 2 cycle at most
-        int maxIndex = 2 * backends.size();
-        while (iterator.hasNext() && backendIds.size() < backendNum) {
-            Backend backend = iterator.next();
-            index++;
-            if (index <= lastBackendIndex) {
-                continue;
-            }
-
-            if (index > maxIndex) {
-                failed = true;
-                break;
-            }
-
-            if (!beAvailablePredicate.isMatch(backend)) {
-                continue;
-            }
-
-            long backendId = backend.getId();
-            if (!backendIds.contains(backendId)) {
-                backendIds.add(backendId);
-                lastBackendId = backendId;
-            } else {
-                failed = true;
-                break;
-            }
-        }
-
-        if (clusterName.equals(DEFAULT_CLUSTER)) {
-            if (isCreate) {
-                lastBackendIdForCreation = lastBackendId;
-            } else {
-                lastBackendIdForOther = lastBackendId;
-            }
-        } else {
-            // update last backendId
-            if (isCreate) {
-                lastBackendIdForCreationMap.put(clusterName, lastBackendId);
-            } else {
-                lastBackendIdForOtherMap.put(clusterName, lastBackendId);
-            }
-        }
-        if (backendIds.size() != backendNum) {
-            failed = true;
-        }
-
-        if (!failed) {
-            return backendIds;
-        }
-
-        // debug
-        for (Backend backend : backends) {
-            LOG.debug("random select: {}", backend.toString());
-        }
-
-        return null;
+        Collections.shuffle(candidates);
+        return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
     }
 
     public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 90fc55a891..aaab4b3155 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -22,7 +22,6 @@ import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
-import org.apache.doris.backup.RestoreJob.RestoreJobState;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -38,35 +37,19 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.persist.EditLog;
-import org.apache.doris.resource.Tag;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.DirMoveTask;
-import org.apache.doris.task.DownloadTask;
-import org.apache.doris.task.SnapshotTask;
-import org.apache.doris.thrift.TBackend;
-import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
-import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
-
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Injectable;
@@ -161,12 +144,12 @@ public class RestoreJobTest {
         
         new Expectations() {
             {
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
-                        anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+                        anyString, (TStorageMedium) any);
                 minTimes = 0;
                 result = new Delegate() {
-                    public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,
-                                                                       boolean isCreate, String clusterName) {
+                    public synchronized List<Long> selectBackendIdsForReplicaCreation(
+                            ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) {
                         List<Long> beIds = Lists.newArrayList();
                         beIds.add(CatalogMocker.BACKEND1_ID);
                         beIds.add(CatalogMocker.BACKEND2_ID);
@@ -259,113 +242,6 @@ public class RestoreJobTest {
         backupMeta = new BackupMeta(tbls, resources);
     }
 
-    @Ignore
-    @Test
-    public void testRun() {
-        // pending
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
-        Assert.assertEquals(12, job.getFileMapping().getMapping().size());
-
-        // 2. snapshoting
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
-        Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum());
-
-        // 3. snapshot finished
-        List<AgentTask> agentTasks = Lists.newArrayList();
-        Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(12 * 2, agentTasks.size());
-
-        for (AgentTask agentTask : agentTasks) {
-            if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
-                continue;
-            }
-
-            SnapshotTask task = (SnapshotTask) agentTask;
-            String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    task.getSignature(), taskStatus);
-            request.setSnapshotPath(snapshotPath);
-            Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
-
-        // download
-        AgentTaskQueue.clearAllTasks();
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
-        Assert.assertEquals(9, AgentTaskQueue.getTaskNum());
-
-        // downloading
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
-
-        List<AgentTask> downloadTasks = Lists.newArrayList();
-        runningTasks = Maps.newHashMap();
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(9, downloadTasks.size());
-        
-        List<Long> downloadedTabletIds = Lists.newArrayList();
-        for (AgentTask agentTask : downloadTasks) {
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    agentTask.getSignature(), taskStatus);
-            request.setDownloadedTabletIds(downloadedTabletIds);
-            Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request));
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMIT, job.getState());
-
-        // commit
-        AgentTaskQueue.clearAllTasks();
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
-        Assert.assertEquals(12, AgentTaskQueue.getTaskNum());
-
-        // committing
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
-
-        List<AgentTask> dirMoveTasks = Lists.newArrayList();
-        runningTasks = Maps.newHashMap();
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
-        dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
-        Assert.assertEquals(12, dirMoveTasks.size());
-        
-        for (AgentTask agentTask : dirMoveTasks) {
-            TStatus taskStatus = new TStatus(TStatusCode.OK);
-            TBackend tBackend = new TBackend("", 0, 1);
-            TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
-                    agentTask.getSignature(), taskStatus);
-            job.finishDirMoveTask((DirMoveTask) agentTask, request);
-        }
-
-        job.run();
-        Assert.assertEquals(Status.OK, job.getStatus());
-        Assert.assertEquals(RestoreJobState.FINISHED, job.getState());
-    }
-
     @Test
     public void testSignature() throws AnalysisException {
         Adler32 sig1 = new Adler32();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 67686a7340..1253f22579 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -241,8 +241,7 @@ public class CreateTableTest {
                         + "properties('replication_num' = '1', 'short_key' = '4');"));
 
         ExceptionChecker
-                .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
-                                "tag(NaN/{\"location\" : \"default\"}) in all backends. need: 3",
+                .expectThrowsWithMsg(DdlException.class, "Failed to find 3 backends for policy",
                         () -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n"
                                 + "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
                                 + "properties('replication_num' = '3');"));
@@ -259,8 +258,7 @@ public class CreateTableTest {
 
         ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
         ExceptionChecker
-                .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
-                                "tag(SSD/{\"location\" : \"default\"}) in all backends. need: 1",
+                .expectThrowsWithMsg(DdlException.class, " Failed to find 1 backends for policy:",
                         () -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
                                 + "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index d755a6a65e..1ded070267 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -86,7 +86,7 @@ public class ModifyBackendTest {
                 ");";
         CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
         ExceptionChecker.expectThrowsWithMsg(DdlException.class,
-                "Failed to find enough host with storage medium and tag(HDD/{\"location\" : \"default\"}) in all backends. need: 1",
+                "Failed to find 1 backends for policy:",
                 () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), createStmt));
 
         createStr = "create table test.tbl1(\n" +
@@ -119,7 +119,7 @@ public class ModifyBackendTest {
         Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:test");
         Table tbl3 = db.getTableNullable("tbl3");
         String err = Catalog.getCurrentCatalog().getDynamicPartitionScheduler().getRuntimeInfo(tbl3.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG);
-        Assert.assertTrue(err.contains("Failed to find enough host with storage medium and tag"));
+        Assert.assertTrue(err.contains("Failed to find 1 backends for policy:"));
 
         createStr = "create table test.tbl4(\n" +
                 "k1 date, k2 int\n" +
@@ -171,7 +171,7 @@ public class ModifyBackendTest {
                 + " set ('replication_allocation' = 'tag.location.zonex:1')";
         AlterTableStmt alterStmt2 = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStr, connectContext);
         ExceptionChecker.expectThrowsWithMsg(DdlException.class,
-                "Failed to find enough host with tag({\"location\" : \"zonex\"}) in all backends. need: 1",
+                "Failed to find enough host with tag",
                 () -> DdlExecutor.execute(Catalog.getCurrentCatalog(), alterStmt2));
         tblProperties = tableProperty.getProperties();
         Assert.assertTrue(tblProperties.containsKey("default.replication_allocation"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index a3051c65dd..3b1b4eddfc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -20,12 +20,12 @@ package org.apache.doris.load.sync.canal;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
-import org.apache.doris.resource.Tag;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -59,7 +59,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
-
 import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
@@ -150,8 +149,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execPlanFragmentParams;
 
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString,
-                        (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+                        anyString, (TStorageMedium) any);
                 minTimes = 0;
                 result = backendIds;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
index 8d4d09ae13..bd099083b1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
@@ -20,9 +20,8 @@ package org.apache.doris.qe;
 import org.apache.doris.backup.CatalogMocker;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.Lists;
 
@@ -31,7 +30,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
-
 import mockit.Delegate;
 import mockit.Expectations;
 import mockit.Mocked;
@@ -45,7 +43,7 @@ public class MultiLoadMgrTest {
     @Mocked
     private SystemInfoService systemInfoService;
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         new Expectations() {
             {
                 ConnectContext.get();
@@ -62,13 +60,10 @@ public class MultiLoadMgrTest {
         };
         new Expectations() {
             {
-                systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
-                        anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+                systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
                 minTimes = 0;
                 result = new Delegate() {
-                    public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(
-                            int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate,
-                            boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) {
+                    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
                         List<Long> beIds = Lists.newArrayList();
                         beIds.add(CatalogMocker.BACKEND1_ID);
                         beIds.add(CatalogMocker.BACKEND2_ID);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
new file mode 100644
index 0000000000..b2570095a0
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class SystemInfoServiceTest {
+
+    @Mocked
+    private Catalog catalog;
+    @Mocked
+    private EditLog editLog;
+
+    private SystemInfoService infoService;
+
+    @Before
+    public void setUp() {
+        new Expectations() {
+            {
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+
+                editLog.logAddBackend((Backend) any);
+                minTimes = 0;
+
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+            }
+        };
+
+        infoService = new SystemInfoService();
+    }
+
+    private void addBackend(long beId, String host, int hbPort) {
+        Backend backend = new Backend(beId, host, hbPort);
+        infoService.addBackend(backend);
+    }
+
+    @Test
+    public void testSelectBackendIdsByPolicy() throws Exception {
+        // 1. no backend
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 4).size());
+
+        // 2. add one backend but not alive
+        addBackend(10001, "192.168.1.1", 9050);
+        Backend be1 = infoService.getBackend(10001);
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 0).size());
+        // policy with no condition
+        BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size());
+
+        // 3. add more backends
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        be2.setAlive(true);
+        addBackend(10003, "192.168.1.3", 9050);
+        Backend be3 = infoService.getBackend(10003);
+        be3.setAlive(true);
+        addBackend(10004, "192.168.1.4", 9050);
+        Backend be4 = infoService.getBackend(10004);
+        be4.setAlive(true);
+        addBackend(10005, "192.168.1.5", 9050);
+        Backend be5 = infoService.getBackend(10005);
+
+        // b1 and be5 is dead, be2,3,4 is alive
+        BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy3, 2).size());
+        Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy3, 3).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10002L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10004L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy3, 4).size());
+
+        // 4. set be status
+        be2.setLoadDisabled(true);
+        be3.setQueryDisabled(true);
+        be4.setDecommissioned(true);
+        // now, only b3,b4 is loadable, only be2,b4 is queryable, only be2,3 is schedulable
+        BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10004L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy4, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10002L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10003L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size());
+
+        BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size());
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L));
+        Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L));
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L));
+
+        // 5. set tags
+        // reset all be
+        be1.setAlive(true);
+        be2.setLoadDisabled(false);
+        be3.setQueryDisabled(false);
+        be5.setAlive(true);
+        be3.setAlive(true);
+        be4.setAlive(true);
+        be4.setDecommissioned(false);
+        be5.setAlive(true);
+        BeSelectionPolicy policy6 = new BeSelectionPolicy.Builder().needQueryAvailable().build();
+        Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy6, 5).size());
+
+        Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+        Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb");
+        be1.setTag(taga);
+        be2.setTag(taga);
+        be3.setTag(tagb);
+        be4.setTag(tagb);
+        be5.setTag(tagb);
+
+        BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size());
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L));
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size());
+
+        BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)).build();
+        Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size());
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L));
+        Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10005L));
+
+        BeSelectionPolicy policy9 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga, tagb)).build();
+        Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy9, 5).size());
+
+        // 6. check storage medium
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 1 * 1024 * 1024L);
+        addDisk(be2, "path2", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be3, "path3", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be4, "path4", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        addDisk(be5, "path5", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+
+        BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
+                .setStorageMedium(TStorageMedium.SSD).build();
+        Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
+
+        BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
+
+        // 7. check disk usage
+        BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
+        BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy13, 1).size());
+
+        // 8. check same host
+        addBackend(10006, "192.168.1.1", 9051);
+        Backend be6 = infoService.getBackend(10006);
+        be6.setTag(taga);
+        be6.setAlive(true);
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+        addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+        BeSelectionPolicy policy14 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).build();
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy14, 2).size());
+        BeSelectionPolicy policy15 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+                .setStorageMedium(TStorageMedium.HDD).allowOnSameHost().build();
+        Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size());
+    }
+
+    @Test
+    public void testSelectBackendIdsForReplicaCreation() throws Exception {
+        addBackend(10001, "192.168.1.1", 9050);
+        Backend be1 = infoService.getBackend(10001);
+        addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be1.setAlive(true);
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be2.setAlive(true);
+        addBackend(10003, "192.168.1.3", 9050);
+        Backend be3 = infoService.getBackend(10003);
+        addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be3.setAlive(true);
+        addBackend(10004, "192.168.1.4", 9050);
+        Backend be4 = infoService.getBackend(10004);
+        addDisk(be4, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be4.setAlive(true);
+        addBackend(10005, "192.168.1.5", 9050);
+        Backend be5 = infoService.getBackend(10005);
+        addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+        be5.setAlive(true);
+
+        ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+        // also check if the random selection logic can evenly distribute the replica.
+        Map<Long, Integer> beCounterMap = Maps.newHashMap();
+        for (int i = 0; i < 10000; ++i) {
+            Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
+                    SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD);
+            Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
+            for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
+                beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
+            }
+        }
+        System.out.println(beCounterMap);
+        List<Integer> list = Lists.newArrayList(beCounterMap.values());
+        Collections.sort(list);
+        int diff = list.get(list.size() - 1) - list.get(0);
+        // The max replica num and min replica num's diff is less than 5%.
+        Assert.assertTrue((diff * 1.0 / list.get(0)) < 0.05);
+    }
+
+    private void addDisk(Backend be, String path, TStorageMedium medium, long totalB, long availB) {
+        DiskInfo diskInfo1 = new DiskInfo(path);
+        diskInfo1.setTotalCapacityB(totalB);
+        diskInfo1.setAvailableCapacityB(availB);
+        diskInfo1.setStorageMedium(medium);
+        Map<String, DiskInfo> map = Maps.newHashMap();
+        map.put(diskInfo1.getRootPath(), diskInfo1);
+        be.setDisks(ImmutableMap.copyOf(map));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org