You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/11 01:41:02 UTC
[incubator-doris] branch master updated: [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8fa0122ed0 [refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
8fa0122ed0 is described below
commit 8fa0122ed054369a6d570d2ff9848449ff269ed7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed May 11 09:40:57 2022 +0800
[refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)
There are many places in FE where a group of BE nodes needs to be selected according to certain requirements. for example:
1. When creating replicas for a tablet.
2. When selecting a BE to execute Insert.
3. When Stream Load forwards http requests to BE nodes.
These operations all have the same logic. So this CL mainly changes:
1. Create a new `BeSelectionPolicy` class to describe the set of conditions for selecting BE.
2. The logic of selecting BE nodes in `SystemInfoService` has been refactored, and the following two methods are used uniformly:
1. `selectBackendIdsByPolicy`: Select the required number of BE nodes according to the `BeSelectionPolicy`.
2. `selectBackendIdsForReplicaCreation`: Select the BE node for the replica creation operation.
Note that there are some changes here:
For the replica creation operation, the round-robin method was used to select BE nodes before,
but now it is changed to `random` selection for the following reasons:
1. Although the previous logic is round-robin, it is actually random.
2. The final diff of the random algorithm will not be greater than 5%, so it can be considered that the random algorithm
can distribute the data evenly.
---
.../java/org/apache/doris/backup/RestoreJob.java | 9 +-
.../java/org/apache/doris/catalog/Catalog.java | 8 +-
.../java/org/apache/doris/catalog/OlapTable.java | 3 +-
.../org/apache/doris/httpv2/rest/LoadAction.java | 53 ++--
.../apache/doris/httpv2/util/LoadSubmitter.java | 19 +-
.../apache/doris/qe/InsertStreamTxnExecutor.java | 14 +-
.../java/org/apache/doris/qe/MultiLoadMgr.java | 14 +-
.../org/apache/doris/system/BeSelectionPolicy.java | 131 ++++++++++
.../org/apache/doris/system/SystemInfoService.java | 251 +++++--------------
.../org/apache/doris/backup/RestoreJobTest.java | 132 +---------
.../org/apache/doris/catalog/CreateTableTest.java | 6 +-
.../apache/doris/catalog/ModifyBackendTest.java | 6 +-
.../doris/load/sync/canal/CanalSyncDataTest.java | 7 +-
.../java/org/apache/doris/qe/MultiLoadMgrTest.java | 13 +-
.../apache/doris/system/SystemInfoServiceTest.java | 268 +++++++++++++++++++++
15 files changed, 525 insertions(+), 409 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 0395b0d320..a70ad7fd57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -74,9 +74,6 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTaskType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@@ -87,6 +84,9 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table.Cell;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -1018,7 +1018,8 @@ public class RestoreJob extends AbstractJob {
// replicas
try {
- Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+ Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo()
+ .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = catalog.getNextId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index d38ac2df64..d7e3d28e9d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -271,6 +271,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
+import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -296,7 +297,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
public class Catalog {
private static final Logger LOG = LogManager.getLogger(Catalog.class);
@@ -4543,10 +4543,12 @@ public class Catalog {
// This is the first colocate table in the group, or just a normal table,
// randomly choose backends
if (!Config.disable_storage_medium_check) {
- chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
+ chosenBackendIds =
+ getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
tabletMeta.getStorageMedium());
} else {
- chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
+ chosenBackendIds =
+ getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
}
for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index b56ea19f30..e3ac7e8f5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -500,7 +500,8 @@ public class OlapTable extends Table {
// replicas
try {
- Map<Tag, List<Long>> tag2beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(
+ Map<Tag, List<Long>> tag2beIds =
+ Catalog.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, db.getClusterName(), null);
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b9c85dbdfb..ebdda12926 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -21,12 +21,14 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
@@ -41,12 +43,10 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
-import java.util.List;
-
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
import io.netty.handler.codec.http.HttpHeaderNames;
+import java.util.List;
@RestController
public class LoadAction extends RestBaseController {
@@ -145,21 +145,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
} else {
- // Choose a backend sequentially.
- SystemInfoService.BeAvailablePredicate beAvailablePredicate =
- new SystemInfoService.BeAvailablePredicate(false, false, true);
- List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
- 1, beAvailablePredicate, false, clusterName, null, null);
- if (backendIds == null) {
- return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
- }
-
- Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
- if (backend == null) {
- return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
- }
-
- redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+ redirectAddr = selectRedirectBackend(clusterName);
}
LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@@ -194,22 +180,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
}
- // Choose a backend sequentially.
- SystemInfoService.BeAvailablePredicate beAvailablePredicate =
- new SystemInfoService.BeAvailablePredicate(false, false, true);
- List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
- 1, beAvailablePredicate, false, clusterName, null, null);
- if (backendIds == null) {
- return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
- }
-
- Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
- if (backend == null) {
- return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
- }
-
- TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
-
+ TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
@@ -220,4 +191,18 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
}
+
+ private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build();
+ List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (backendIds.isEmpty()) {
+ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+ }
+
+ Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ if (backend == null) {
+ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
+ }
+ return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
index 1a715cf0e3..31aeba813d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java
@@ -19,10 +19,11 @@ package org.apache.doris.httpv2.util;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.httpv2.rest.UploadAction;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
@@ -136,19 +137,15 @@ public class LoadSubmitter {
return file;
}
- private Backend selectOneBackend() throws DdlException {
- SystemInfoService.BeAvailablePredicate beAvailablePredicate =
- new SystemInfoService.BeAvailablePredicate(false, false, true);
- List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
- 1, beAvailablePredicate, false,
- SystemInfoService.DEFAULT_CLUSTER, null, null);
- if (backendIds == null) {
- throw new DdlException("No alive backend");
+ private Backend selectOneBackend() throws LoadException {
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+ List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (backendIds.isEmpty()) {
+ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
-
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
- throw new DdlException("No alive backend");
+ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
return backend;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 2cb8fe4ea6..ea0cb6cfda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -26,7 +26,7 @@ import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -63,13 +63,11 @@ public class InsertStreamTxnExecutor {
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
- SystemInfoService.BeAvailablePredicate beAvailablePredicate =
- new SystemInfoService.BeAvailablePredicate(false, true, true);
- List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
- 1, beAvailablePredicate, false,
- txnEntry.getDb().getClusterName(), null, null);
- if (beIds == null || beIds.isEmpty()) {
- throw new UserException("there is no backend load available or scanNode backend available.");
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
+ .needLoadAvailable().needQueryAvailable().build();
+ List<Long> beIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (beIds.isEmpty()) {
+ throw new UserException("No available backend to match the policy: " + policy);
}
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index fd8cf80cac..606d68e6c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
@@ -91,14 +92,13 @@ public class MultiLoadMgr {
if (infoMap.containsKey(multiLabel)) {
throw new LabelAlreadyUsedException(label);
}
- MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
- SystemInfoService.BeAvailablePredicate beAvailablePredicate =
- new SystemInfoService.BeAvailablePredicate(false, false, true);
- List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1,
- beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null);
- if (backendIds == null) {
- throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(ConnectContext.get().getClusterName())
+ .needLoadAvailable().build();
+ List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+ if (backendIds.isEmpty()) {
+ throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy);
}
+ MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
multiLoadDesc.setBackendId(backendIds.get(0));
infoMap.put(multiLabel, multiLoadDesc);
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
new file mode 100644
index 0000000000..e995c0aff5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+/**
+ * Selection policy for building BE nodes
+ */
+public class BeSelectionPolicy {
+ public String cluster = SystemInfoService.DEFAULT_CLUSTER;
+ public boolean needScheduleAvailable = false;
+ public boolean needQueryAvailable = false;
+ public boolean needLoadAvailable = false;
+ // Resource tag. Empty means no need to consider resource tag.
+ public Set<Tag> resourceTags = Sets.newHashSet();
+ // storage medium. null means no need to consider storage medium.
+ public TStorageMedium storageMedium = null;
+ // Check if disk usage reaches limit. false means no need to check.
+ public boolean checkDiskUsage = false;
+ // If set to false, do not select backends on same host.
+ public boolean allowOnSameHost = false;
+
+ private BeSelectionPolicy() {
+
+ }
+
+ public static class Builder {
+ private BeSelectionPolicy policy;
+ public Builder() {
+ policy = new BeSelectionPolicy();
+ }
+
+ public Builder setCluster(String cluster) {
+ policy.cluster = cluster;
+ return this;
+ }
+
+ public Builder needScheduleAvailable() {
+ policy.needScheduleAvailable = true;
+ return this;
+ }
+
+ public Builder needQueryAvailable() {
+ policy.needQueryAvailable = true;
+ return this;
+ }
+
+ public Builder needLoadAvailable() {
+ policy.needLoadAvailable = true;
+ return this;
+ }
+
+ public Builder addTags(Set<Tag> tags) {
+ policy.resourceTags.addAll(tags);
+ return this;
+ }
+
+ public Builder setStorageMedium(TStorageMedium medium) {
+ policy.storageMedium = medium;
+ return this;
+ }
+
+ public Builder needCheckDiskUsage() {
+ policy.checkDiskUsage = true;
+ return this;
+ }
+
+ public Builder allowOnSameHost() {
+ policy.allowOnSameHost = true;
+ return this;
+ }
+
+ public BeSelectionPolicy build() {
+ return policy;
+ }
+ }
+
+ public boolean isMatch(Backend backend) {
+ if (needScheduleAvailable && !backend.isScheduleAvailable()
+ || needQueryAvailable && !backend.isQueryAvailable()
+ || needLoadAvailable && !backend.isLoadAvailable()
+ || !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag())
+ || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
+ return false;
+ }
+
+ if (checkDiskUsage) {
+ if (storageMedium == null && backend.diskExceedLimit()) {
+ return false;
+ }
+ if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("cluster|query|load|schedule|tags|medium: ");
+ sb.append(cluster).append("|");
+ sb.append(needQueryAvailable).append("|");
+ sb.append(needLoadAvailable).append("|");
+ sb.append(needScheduleAvailable).append("|");
+ sb.append(resourceTags).append("|");
+ sb.append(storageMedium);
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 39e577f55a..82571cd3d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -40,7 +40,6 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -61,10 +60,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class SystemInfoService {
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
@@ -75,42 +72,16 @@ public class SystemInfoService {
public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available.";
- public static class BeAvailablePredicate {
- private boolean scheduleAvailable;
+ private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
+ private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of();
- private boolean queryAvailable;
-
- private boolean loadAvailable;
-
- public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) {
- this.scheduleAvailable = scheduleAvailable;
- this.queryAvailable = queryAvailable;
- this.loadAvailable = loadAvailable;
- }
-
- public boolean isMatch(Backend backend) {
- if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() ||
- loadAvailable && !backend.isLoadAvailable()) {
- return false;
- }
- return true;
- }
- }
-
- private volatile ImmutableMap<Long, Backend> idToBackendRef;
- private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
-
- // last backend id used by round robin for sequential choosing backends for
- // tablet creation
- private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
- // last backend id used by round robin for sequential choosing backends in
- // other jobs
- private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap;
+ // last backend id used by round robin for sequential selecting backends for replica creation
+ private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap();
private long lastBackendIdForCreation = -1;
private long lastBackendIdForOther = -1;
- private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef;
+ private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of();
// sort host backends list by num of backends, descending
private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() {
@@ -124,15 +95,6 @@ public class SystemInfoService {
}
};
- public SystemInfoService() {
- idToBackendRef = ImmutableMap.<Long, Backend>of();
- idToReportVersionRef = ImmutableMap.<Long, AtomicLong>of();
-
- lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
- lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
- pathHashToDishInfoRef = ImmutableMap.<Long, DiskInfo>of();
- }
-
// for deploy manager
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG);
@@ -432,9 +394,6 @@ public class SystemInfoService {
LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size());
return null;
}
-
- lastBackendIdForCreationMap.put(clusterName, (long) -1);
- lastBackendIdForOtherMap.put(clusterName, (long) -1);
return chosenBackendIds;
}
@@ -462,9 +421,6 @@ public class SystemInfoService {
}
}
}
-
- lastBackendIdForCreationMap.remove(clusterName);
- lastBackendIdForOtherMap.remove(clusterName);
}
/**
@@ -779,21 +735,35 @@ public class SystemInfoService {
}
- // Find enough backend to allocate replica of a tablet.
- // filters include: tag, cluster, storage medium
- public Map<Tag, List<Long>> chooseBackendIdByFilters(ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
+ /**
+ * Select a set of backends for replica creation.
+ * The following parameters need to be considered when selecting backends.
+ *
+ * @param replicaAlloc
+ * @param clusterName
+ * @param storageMedium
+ * @return return the selected backend ids group by tag.
+ * @throws DdlException
+ */
+ public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
+ ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
throws DdlException {
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
short totalReplicaNum = 0;
- BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false);
+
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
- List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(),
- beAvailablePredicate, true, clusterName, storageMedium, entry.getKey());
- if (beIds == null) {
- throw new DdlException("Failed to find enough host with storage medium and tag("
- + (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey()
- + ") in all backends. need: " + entry.getValue());
+ BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
+ .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
+ .setStorageMedium(storageMedium);
+ if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
+ builder.allowOnSameHost();
+ }
+
+ BeSelectionPolicy policy = builder.build();
+ List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
+ if (beIds.isEmpty()) {
+ throw new DdlException("Failed to find " + entry.getValue() + " backends for policy: " + policy);
}
chosenBackendIds.put(entry.getKey(), beIds);
totalReplicaNum += beIds.size();
@@ -802,61 +772,34 @@ public class SystemInfoService {
return chosenBackendIds;
}
- public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate,
- boolean isCreate, String clusterName,
- TStorageMedium storageMedium, Tag tag) {
- Stream<Backend> beStream = getClusterBackends(clusterName).stream();
- if (storageMedium == null) {
- beStream = beStream.filter(v -> !v.diskExceedLimit());
- } else {
- beStream = beStream.filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium));
+ /**
+ * Select a set of backends by the given policy.
+ *
+ * @param policy
+ * @param number number of backends which need to be selected.
+ * @return return #number of backend ids,
+ * or empty set if no backends match the policy, or the number of matched backends is less than "number";
+ */
+ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
+ List<Backend> candidates =
+ idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
+ if (candidates.size() < number) {
+ return Lists.newArrayList();
}
- if (tag != null) {
- beStream = beStream.filter(v -> v.getTag().equals(tag));
+ // If only need one Backend, just return a random one.
+ if (number == 1) {
+ Collections.shuffle(candidates);
+ return Lists.newArrayList(candidates.get(0).getId());
}
- final List<Backend> backends = beStream.collect(Collectors.toList());
- return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends);
- }
- // choose backends by round robin
- // return null if not enough backend
- // use synchronized to run serially
- public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate,
- boolean isCreate, String clusterName,
- final List<Backend> srcBackends) {
- long lastBackendId;
-
- if (clusterName.equals(DEFAULT_CLUSTER)) {
- if (isCreate) {
- lastBackendId = lastBackendIdForCreation;
- } else {
- lastBackendId = lastBackendIdForOther;
- }
- } else {
- if (isCreate) {
- if (lastBackendIdForCreationMap.containsKey(clusterName)) {
- lastBackendId = lastBackendIdForCreationMap.get(clusterName);
- } else {
- lastBackendId = -1;
- lastBackendIdForCreationMap.put(clusterName, lastBackendId);
- }
- } else {
- if (lastBackendIdForOtherMap.containsKey(clusterName)) {
- lastBackendId = lastBackendIdForOtherMap.get(clusterName);
- } else {
- lastBackendId = -1;
- lastBackendIdForOtherMap.put(clusterName, lastBackendId);
- }
- }
+ if (policy.allowOnSameHost) {
+ Collections.shuffle(candidates);
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
}
- // host -> BE list
- List<Backend> sourceBackend = srcBackends;
- if (sourceBackend == null) {
- sourceBackend = getClusterBackends(clusterName);
- }
+ // for each host, random select one backend.
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
- for (Backend backend : sourceBackend) {
+ for (Backend backend : candidates) {
if (backendMaps.containsKey(backend.getHost())) {
backendMaps.get(backend.getHost()).add(backend);
} else {
@@ -865,94 +808,16 @@ public class SystemInfoService {
backendMaps.put(backend.getHost(), list);
}
}
-
- // if more than one backend exists in same host, select a backend at random
- List<Backend> backends = Lists.newArrayList();
+ candidates.clear();
for (List<Backend> list : backendMaps.values()) {
- if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
- backends.addAll(list);
- } else {
- list = list.stream().filter(beAvailablePredicate::isMatch).collect(Collectors.toList());
- if (list.isEmpty()) {
- continue;
- }
- Collections.shuffle(list);
- backends.add(list.get(0));
- }
+ Collections.shuffle(list);
+ candidates.add(list.get(0));
}
-
- Collections.shuffle(backends);
-
- List<Long> backendIds = Lists.newArrayList();
- // get last backend index
- int lastBackendIndex = -1;
- int index = -1;
- for (Backend backend : backends) {
- index++;
- if (backend.getId() == lastBackendId) {
- lastBackendIndex = index;
- break;
- }
+ if (candidates.size() < number) {
+ return Lists.newArrayList();
}
- Iterator<Backend> iterator = Iterators.cycle(backends);
- index = -1;
- boolean failed = false;
- // 2 cycle at most
- int maxIndex = 2 * backends.size();
- while (iterator.hasNext() && backendIds.size() < backendNum) {
- Backend backend = iterator.next();
- index++;
- if (index <= lastBackendIndex) {
- continue;
- }
-
- if (index > maxIndex) {
- failed = true;
- break;
- }
-
- if (!beAvailablePredicate.isMatch(backend)) {
- continue;
- }
-
- long backendId = backend.getId();
- if (!backendIds.contains(backendId)) {
- backendIds.add(backendId);
- lastBackendId = backendId;
- } else {
- failed = true;
- break;
- }
- }
-
- if (clusterName.equals(DEFAULT_CLUSTER)) {
- if (isCreate) {
- lastBackendIdForCreation = lastBackendId;
- } else {
- lastBackendIdForOther = lastBackendId;
- }
- } else {
- // update last backendId
- if (isCreate) {
- lastBackendIdForCreationMap.put(clusterName, lastBackendId);
- } else {
- lastBackendIdForOtherMap.put(clusterName, lastBackendId);
- }
- }
- if (backendIds.size() != backendNum) {
- failed = true;
- }
-
- if (!failed) {
- return backendIds;
- }
-
- // debug
- for (Backend backend : backends) {
- LOG.debug("random select: {}", backend.toString());
- }
-
- return null;
+ Collections.shuffle(candidates);
+ return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
}
public ImmutableMap<Long, Backend> getIdToBackend() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 90fc55a891..aaab4b3155 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -22,7 +22,6 @@ import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
-import org.apache.doris.backup.RestoreJob.RestoreJobState;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@@ -38,35 +37,19 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.persist.EditLog;
-import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.DirMoveTask;
-import org.apache.doris.task.DownloadTask;
-import org.apache.doris.task.SnapshotTask;
-import org.apache.doris.thrift.TBackend;
-import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
-import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
-
import mockit.Delegate;
import mockit.Expectations;
import mockit.Injectable;
@@ -161,12 +144,12 @@ public class RestoreJobTest {
new Expectations() {
{
- systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
- anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+ systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+ anyString, (TStorageMedium) any);
minTimes = 0;
result = new Delegate() {
- public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,
- boolean isCreate, String clusterName) {
+ public synchronized List<Long> selectBackendIdsForReplicaCreation(
+ ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
@@ -259,113 +242,6 @@ public class RestoreJobTest {
backupMeta = new BackupMeta(tbls, resources);
}
- @Ignore
- @Test
- public void testRun() {
- // pending
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
- Assert.assertEquals(12, job.getFileMapping().getMapping().size());
-
- // 2. snapshoting
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
- Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum());
-
- // 3. snapshot finished
- List<AgentTask> agentTasks = Lists.newArrayList();
- Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
- agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
- agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
- agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
- Assert.assertEquals(12 * 2, agentTasks.size());
-
- for (AgentTask agentTask : agentTasks) {
- if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
- continue;
- }
-
- SnapshotTask task = (SnapshotTask) agentTask;
- String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
- TStatus taskStatus = new TStatus(TStatusCode.OK);
- TBackend tBackend = new TBackend("", 0, 1);
- TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
- task.getSignature(), taskStatus);
- request.setSnapshotPath(snapshotPath);
- Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
- }
-
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
-
- // download
- AgentTaskQueue.clearAllTasks();
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
- Assert.assertEquals(9, AgentTaskQueue.getTaskNum());
-
- // downloading
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
-
- List<AgentTask> downloadTasks = Lists.newArrayList();
- runningTasks = Maps.newHashMap();
- downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
- downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
- downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
- Assert.assertEquals(9, downloadTasks.size());
-
- List<Long> downloadedTabletIds = Lists.newArrayList();
- for (AgentTask agentTask : downloadTasks) {
- TStatus taskStatus = new TStatus(TStatusCode.OK);
- TBackend tBackend = new TBackend("", 0, 1);
- TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
- agentTask.getSignature(), taskStatus);
- request.setDownloadedTabletIds(downloadedTabletIds);
- Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request));
- }
-
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.COMMIT, job.getState());
-
- // commit
- AgentTaskQueue.clearAllTasks();
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
- Assert.assertEquals(12, AgentTaskQueue.getTaskNum());
-
- // committing
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
-
- List<AgentTask> dirMoveTasks = Lists.newArrayList();
- runningTasks = Maps.newHashMap();
- dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
- dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
- dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
- Assert.assertEquals(12, dirMoveTasks.size());
-
- for (AgentTask agentTask : dirMoveTasks) {
- TStatus taskStatus = new TStatus(TStatusCode.OK);
- TBackend tBackend = new TBackend("", 0, 1);
- TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
- agentTask.getSignature(), taskStatus);
- job.finishDirMoveTask((DirMoveTask) agentTask, request);
- }
-
- job.run();
- Assert.assertEquals(Status.OK, job.getStatus());
- Assert.assertEquals(RestoreJobState.FINISHED, job.getState());
- }
-
@Test
public void testSignature() throws AnalysisException {
Adler32 sig1 = new Adler32();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 67686a7340..1253f22579 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -241,8 +241,7 @@ public class CreateTableTest {
+ "properties('replication_num' = '1', 'short_key' = '4');"));
ExceptionChecker
- .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
- "tag(NaN/{\"location\" : \"default\"}) in all backends. need: 3",
+ .expectThrowsWithMsg(DdlException.class, "Failed to find 3 backends for policy",
() -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n"
+ "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
+ "properties('replication_num' = '3');"));
@@ -259,8 +258,7 @@ public class CreateTableTest {
ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
ExceptionChecker
- .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
- "tag(SSD/{\"location\" : \"default\"}) in all backends. need: 1",
+ .expectThrowsWithMsg(DdlException.class, " Failed to find 1 backends for policy:",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
+ "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index d755a6a65e..1ded070267 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -86,7 +86,7 @@ public class ModifyBackendTest {
");";
CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
- "Failed to find enough host with storage medium and tag(HDD/{\"location\" : \"default\"}) in all backends. need: 1",
+ "Failed to find 1 backends for policy:",
() -> DdlExecutor.execute(Catalog.getCurrentCatalog(), createStmt));
createStr = "create table test.tbl1(\n" +
@@ -119,7 +119,7 @@ public class ModifyBackendTest {
Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:test");
Table tbl3 = db.getTableNullable("tbl3");
String err = Catalog.getCurrentCatalog().getDynamicPartitionScheduler().getRuntimeInfo(tbl3.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG);
- Assert.assertTrue(err.contains("Failed to find enough host with storage medium and tag"));
+ Assert.assertTrue(err.contains("Failed to find 1 backends for policy:"));
createStr = "create table test.tbl4(\n" +
"k1 date, k2 int\n" +
@@ -171,7 +171,7 @@ public class ModifyBackendTest {
+ " set ('replication_allocation' = 'tag.location.zonex:1')";
AlterTableStmt alterStmt2 = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
- "Failed to find enough host with tag({\"location\" : \"zonex\"}) in all backends. need: 1",
+ "Failed to find enough host with tag",
() -> DdlExecutor.execute(Catalog.getCurrentCatalog(), alterStmt2));
tblProperties = tableProperty.getProperties();
Assert.assertTrue(tblProperties.containsKey("default.replication_allocation"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index a3051c65dd..3b1b4eddfc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -20,12 +20,12 @@ package org.apache.doris.load.sync.canal;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
-import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -59,7 +59,6 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
@@ -150,8 +149,8 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execPlanFragmentParams;
- systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString,
- (TStorageMedium) any, (Tag) any);
+ systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+ anyString, (TStorageMedium) any);
minTimes = 0;
result = backendIds;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
index 8d4d09ae13..bd099083b1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/MultiLoadMgrTest.java
@@ -20,9 +20,8 @@ package org.apache.doris.qe;
import org.apache.doris.backup.CatalogMocker;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.DdlException;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Lists;
@@ -31,7 +30,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
-
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
@@ -45,7 +43,7 @@ public class MultiLoadMgrTest {
@Mocked
private SystemInfoService systemInfoService;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
new Expectations() {
{
ConnectContext.get();
@@ -62,13 +60,10 @@ public class MultiLoadMgrTest {
};
new Expectations() {
{
- systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
- anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
+ systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
minTimes = 0;
result = new Delegate() {
- public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(
- int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate,
- boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) {
+ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
new file mode 100644
index 0000000000..b2570095a0
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.system;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class SystemInfoServiceTest {
+
+ @Mocked
+ private Catalog catalog;
+ @Mocked
+ private EditLog editLog;
+
+ private SystemInfoService infoService;
+
+ @Before
+ public void setUp() {
+ new Expectations() {
+ {
+ catalog.getEditLog();
+ minTimes = 0;
+ result = editLog;
+
+ editLog.logAddBackend((Backend) any);
+ minTimes = 0;
+
+ Catalog.getCurrentCatalog();
+ minTimes = 0;
+ result = catalog;
+ }
+ };
+
+ infoService = new SystemInfoService();
+ }
+
+ private void addBackend(long beId, String host, int hbPort) {
+ Backend backend = new Backend(beId, host, hbPort);
+ infoService.addBackend(backend);
+ }
+
+ @Test
+ public void testSelectBackendIdsByPolicy() throws Exception {
+ // 1. no backend
+ BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 4).size());
+
+ // 2. add one backend but not alive
+ addBackend(10001, "192.168.1.1", 9050);
+ Backend be1 = infoService.getBackend(10001);
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 0).size());
+ // policy with no condition
+ BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size());
+
+ // 3. add more backends
+ addBackend(10002, "192.168.1.2", 9050);
+ Backend be2 = infoService.getBackend(10002);
+ be2.setAlive(true);
+ addBackend(10003, "192.168.1.3", 9050);
+ Backend be3 = infoService.getBackend(10003);
+ be3.setAlive(true);
+ addBackend(10004, "192.168.1.4", 9050);
+ Backend be4 = infoService.getBackend(10004);
+ be4.setAlive(true);
+ addBackend(10005, "192.168.1.5", 9050);
+ Backend be5 = infoService.getBackend(10005);
+
+ // b1 and be5 is dead, be2,3,4 is alive
+ BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size());
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10001L));
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10005L));
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy3, 2).size());
+ Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy3, 3).size());
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10002L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10003L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10004L));
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy3, 4).size());
+
+ // 4. set be status
+ be2.setLoadDisabled(true);
+ be3.setQueryDisabled(true);
+ be4.setDecommissioned(true);
+ // now, only b3,b4 is loadable, only be2,b4 is queryable, only be2,3 is schedulable
+ BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size());
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10001L));
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10004L));
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10005L));
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy4, 2).size());
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10002L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10003L));
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size());
+
+ BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size());
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L));
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L));
+ Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L));
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size());
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L));
+
+ // 5. set tags
+ // reset all be
+ be1.setAlive(true);
+ be2.setLoadDisabled(false);
+ be3.setQueryDisabled(false);
+ be5.setAlive(true);
+ be3.setAlive(true);
+ be4.setAlive(true);
+ be4.setDecommissioned(false);
+ be5.setAlive(true);
+ BeSelectionPolicy policy6 = new BeSelectionPolicy.Builder().needQueryAvailable().build();
+ Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy6, 5).size());
+
+ Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+ Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb");
+ be1.setTag(taga);
+ be2.setTag(taga);
+ be3.setTag(tagb);
+ be4.setTag(tagb);
+ be5.setTag(tagb);
+
+ BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)).build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size());
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size());
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L));
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size());
+
+ BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)).build();
+ Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size());
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L));
+ Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10005L));
+
+ BeSelectionPolicy policy9 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga, tagb)).build();
+ Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy9, 5).size());
+
+ // 6. check storage medium
+ addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 1 * 1024 * 1024L);
+ addDisk(be2, "path2", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ addDisk(be3, "path3", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ addDisk(be4, "path4", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ addDisk(be5, "path5", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+
+ BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
+ .setStorageMedium(TStorageMedium.SSD).build();
+ Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
+
+ BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
+ .setStorageMedium(TStorageMedium.HDD).build();
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
+
+ // 7. check disk usage
+ BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+ .setStorageMedium(TStorageMedium.HDD).build();
+ Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
+ BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+ .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy13, 1).size());
+
+ // 8. check same host
+ addBackend(10006, "192.168.1.1", 9051);
+ Backend be6 = infoService.getBackend(10006);
+ be6.setTag(taga);
+ be6.setAlive(true);
+ addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+ addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
+ BeSelectionPolicy policy14 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+ .setStorageMedium(TStorageMedium.HDD).build();
+ Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy14, 2).size());
+ BeSelectionPolicy policy15 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
+ .setStorageMedium(TStorageMedium.HDD).allowOnSameHost().build();
+ Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size());
+ }
+
+ @Test
+ public void testSelectBackendIdsForReplicaCreation() throws Exception {
+ addBackend(10001, "192.168.1.1", 9050);
+ Backend be1 = infoService.getBackend(10001);
+ addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ be1.setAlive(true);
+ addBackend(10002, "192.168.1.2", 9050);
+ Backend be2 = infoService.getBackend(10002);
+ addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ be2.setAlive(true);
+ addBackend(10003, "192.168.1.3", 9050);
+ Backend be3 = infoService.getBackend(10003);
+ addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ be3.setAlive(true);
+ addBackend(10004, "192.168.1.4", 9050);
+ Backend be4 = infoService.getBackend(10004);
+ addDisk(be4, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ be4.setAlive(true);
+ addBackend(10005, "192.168.1.5", 9050);
+ Backend be5 = infoService.getBackend(10005);
+ addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
+ be5.setAlive(true);
+
+ ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+ // also check if the random selection logic can evenly distribute the replica.
+ Map<Long, Integer> beCounterMap = Maps.newHashMap();
+ for (int i = 0; i < 10000; ++i) {
+ Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
+ SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD);
+ Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
+ for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
+ beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
+ }
+ }
+ System.out.println(beCounterMap);
+ List<Integer> list = Lists.newArrayList(beCounterMap.values());
+ Collections.sort(list);
+ int diff = list.get(list.size() - 1) - list.get(0);
+ // The max replica num and min replica num's diff is less than 5%.
+ Assert.assertTrue((diff * 1.0 / list.get(0)) < 0.05);
+ }
+
+ private void addDisk(Backend be, String path, TStorageMedium medium, long totalB, long availB) {
+ DiskInfo diskInfo1 = new DiskInfo(path);
+ diskInfo1.setTotalCapacityB(totalB);
+ diskInfo1.setAvailableCapacityB(availB);
+ diskInfo1.setStorageMedium(medium);
+ Map<String, DiskInfo> map = Maps.newHashMap();
+ map.put(diskInfo1.getRootPath(), diskInfo1);
+ be.setDisks(ImmutableMap.copyOf(map));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org