You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/27 15:08:44 UTC

[incubator-doris] 01/08: [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d0c4b5c503ffea1b41566e01126ad6522ab11e98
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu May 26 08:42:09 2022 +0800

    [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)
    
    This CL mainly changes:
    1. Broker Load
        When assigning backends, use user level resource tag to find available backends.
        If user level resource tag is not set, broker load task can be assigned to any BE node,
        otherwise, task can only be assigned to BE node which match the user level tags.
    
    2. Routine Load
        The current routine load job does not have user info, so it can not get user level tag when assigning tasks.
        So there are 2 ways:
        1. For old routine load job, use tags of replica allocation info to select BE nodes.
        2. For new routine load job, the user info will be added and persisted in routine load job.
---
 docs/en/administrator-guide/multi-tenant.md        | 12 ++++
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  8 ++-
 .../doris/load/routineload/RoutineLoadJob.java     | 15 ++++-
 .../doris/load/routineload/RoutineLoadManager.java | 73 +++++++++++++++++++---
 .../load/routineload/RoutineLoadTaskScheduler.java |  3 +-
 .../mysql/privilege/UserResourceProperty.java      | 31 ---------
 .../org/apache/doris/planner/BrokerScanNode.java   | 40 ++++++++----
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 19 ++++--
 .../doris/load/routineload/RoutineLoadJobTest.java |  9 ++-
 .../load/routineload/RoutineLoadManagerTest.java   | 26 +++++---
 .../load/routineload/RoutineLoadSchedulerTest.java |  7 ++-
 .../transaction/GlobalTransactionMgrTest.java      | 10 ++-
 13 files changed, 179 insertions(+), 76 deletions(-)

diff --git a/docs/en/administrator-guide/multi-tenant.md b/docs/en/administrator-guide/multi-tenant.md
index 8c37afad23..dd587feedd 100644
--- a/docs/en/administrator-guide/multi-tenant.md
+++ b/docs/en/administrator-guide/multi-tenant.md
@@ -135,6 +135,18 @@ Node resource division refers to setting tags for BE nodes in a Doris cluster, a
     In this way, we have achieved physical resource isolation for different user queries by dividing nodes and restricting user resource usage. Furthermore, we can create different users for different business departments and restrict each user from using different resource groups. In order to avoid the use of resource interference between different business parts. For example, there is a business table in the cluster that needs to be shared by all 9 business departments, but it is hoped [...]
     
     On the other hand, for the isolation of online and offline tasks. We can use resource groups to achieve this. For example, we can divide nodes into two resource groups, Online and Offline. The table data is still stored in 3 copies, of which 2 copies are stored in the Online resource group, and 1 copy is stored in the Offline resource group. The Online resource group is mainly used for online data services with high concurrency and low latency. Some large queries or offline ETL opera [...]
+
+4. Resource group assignments for load job
+
+    The resource usage of load jobs (including insert, broker load, routine load, stream load, etc.) can be divided into two parts:
+    1. Computing resources: responsible for reading data sources, data transformation and distribution.
+    2. Write resource: responsible for data encoding, compression and writing to disk.
+
+    The write resource must be the node where the replica is located, and the computing resource can theoretically select any node to complete. Therefore, the allocation of resource groups for load jobs is divided into two steps:
+    1. Use user-level resource tags to limit the resource groups that computing resources can use.
+    2. Use the resource tag of the replica to limit the resource group that the write resource can use.
+
+    So if you want all the resources used by the load operation to be limited to the resource group where the data is located, you only need to set the resource tag of the user level to the same as the resource tag of the replica.
     
 ## Single query resource limit
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 903e65ff6d..bf33ab2f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -123,7 +123,7 @@ public class LoadingTaskPlanner {
         // 1. Broker scan node
         BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
                 fileStatusesList, filesAdded);
-        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
+        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNodes.add(scanNode);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index f85898f10b..e740d92965 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -103,8 +104,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     public KafkaRoutineLoadJob(Long id, String name, String clusterName,
-                               long dbId, long tableId, String brokerList, String topic) {
-        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
+                               long dbId, long tableId, String brokerList, String topic,
+                               UserIdentity userIdentity) {
+        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA, userIdentity);
         this.brokerList = brokerList;
         this.topic = topic;
         this.progress = new KafkaProgress();
@@ -394,7 +396,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         long id = Catalog.getCurrentCatalog().getNextId();
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
                 db.getClusterName(), db.getId(), tableId,
-                stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
+                stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo());
         kafkaRoutineLoadJob.setOptional(stmt);
         kafkaRoutineLoadJob.checkCustomProperties();
         kafkaRoutineLoadJob.checkCustomPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index e308371459..7c0b24d69c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -229,6 +230,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
     // because we can not serialize the Expressions contained in job.
     protected OriginStatement origStmt;
+    // User who submit this job. Maybe null for the old version job(before v1.1)
+    protected UserIdentity userIdentity;
 
     protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
@@ -250,13 +253,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     }
 
     public RoutineLoadJob(Long id, String name, String clusterName,
-            long dbId, long tableId, LoadDataSourceType dataSourceType) {
+                          long dbId, long tableId, LoadDataSourceType dataSourceType,
+                          UserIdentity userIdentity) {
         this(id, dataSourceType);
         this.name = name;
         this.clusterName = clusterName;
         this.dbId = dbId;
         this.tableId = tableId;
         this.authCode = 0;
+        this.userIdentity = userIdentity;
 
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
@@ -436,6 +441,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         return partitions;
     }
 
+    public UserIdentity getUserIdentity() {
+        return userIdentity;
+    }
+
     @Override
     public LoadTask.MergeType getMergeType() {
         return mergeType;
@@ -1611,6 +1620,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         } catch (Exception e) {
             throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
         }
+
+        // ATTN(cmy): this is a temp code, not like code in trunk.
+        // we don't persist userIdentity because we can not increase fe meta version for dev-1.0.1
+        userIdentity = null;
     }
 
     abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 2d6c13adab..ed220f88b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -24,6 +24,11 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -38,10 +43,13 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -411,11 +419,8 @@ public class RoutineLoadManager implements Writable {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
-        List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
-        if (beIdsInCluster == null) {
-            throw new LoadException("The " + clusterName + " has been deleted");
-        }
+    public long getAvailableBeForTask(long jobId, long previousBeId, String clusterName) throws LoadException {
+        List<Long> availableBeIds = getAvailableBackendIds(jobId, clusterName);
 
         // check if be has idle slot
         readLock();
@@ -423,7 +428,7 @@ public class RoutineLoadManager implements Writable {
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
 
             // 1. Find if the given BE id has available slots
-            if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
+            if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
                 // get the previousBackend info
                 Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
                 // check previousBackend is not null && load available
@@ -447,7 +452,7 @@ public class RoutineLoadManager implements Writable {
             int idleTaskNum = 0;
             long resultBeId = -1L;
             int maxIdleSlotNum = 0;
-            for (Long beId : beIdsInCluster) {
+            for (Long beId : availableBeIds) {
                 if (beIdToMaxConcurrentTasks.containsKey(beId)) {
                     if (beIdToConcurrentTasks.containsKey(beId)) {
                         idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
@@ -468,6 +473,60 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    /**
+     * The routine load task can only be scheduled on backends which has proper resource tags.
+     * The tags should be got from user property.
+     * But in the old version, the routine load job does not have user info, so for compatibility,
+     * if there is no user info, we will get tags from replica allocation of the first partition of the table.
+     *
+     * @param jobId
+     * @param cluster
+     * @return
+     * @throws LoadException
+     */
+    private List<Long> getAvailableBackendIds(long jobId, String cluster) throws LoadException {
+        RoutineLoadJob job = getJob(jobId);
+        if (job == null) {
+            throw new LoadException("job " + jobId + " does not exist");
+        }
+        Set<Tag> tags;
+        if (job.getUserIdentity() == null) {
+            // For old job, there may be no user info. So we have to use tags from replica allocation
+            tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+        } else {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                // user may be dropped. Here we fall back to use replica tag
+                tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+            }
+        }
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
+                .addTags(tags).build();
+        return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+    }
+
+    private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
+        try {
+            Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
+            OlapTable tbl = db.getTableOrMetaException(tblId, Table.TableType.OLAP);
+            tbl.readLock();
+            try {
+                PartitionInfo partitionInfo = tbl.getPartitionInfo();
+                for (Partition partition : tbl.getPartitions()) {
+                    ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
+                    // just use the first one
+                    return replicaAlloc.getAllocMap().keySet();
+                }
+                // Should not run into here. Just make compiler happy.
+                return Sets.newHashSet();
+            } finally {
+                tbl.readUnlock();
+            }
+        } catch (MetaNotFoundException e) {
+            throw new LoadException(e.getMessage());
+        }
+    }
+
     public RoutineLoadJob getJob(long jobId) {
         return idToRoutineLoadJob.get(jobId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index f2f03a666a..fd07c2617e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -287,7 +287,8 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     // return true if allocate successfully. return false if failed.
     // throw exception if unrecoverable errors happen.
     private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
-        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
+        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
+                routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
         if (beId == -1L) {
             return false;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
deleted file mode 100644
index e4b609de95..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mysql.privilege;
-
-import org.apache.doris.common.io.Writable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class UserResourceProperty implements Writable {
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index a6f26c4920..3a99bc0203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
@@ -40,7 +41,11 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.mysql.privilege.UserProperty;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TBrokerRangeDesc;
@@ -61,6 +66,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -69,6 +75,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -114,6 +121,7 @@ public class BrokerScanNode extends LoadScanNode {
     protected List<BrokerFileGroup> fileGroups;
     private boolean strictMode = false;
     private int loadParallelism = 1;
+    private UserIdentity userIdentity;
 
     protected List<List<TBrokerFileStatus>> fileStatusesList;
     // file num
@@ -136,11 +144,15 @@ public class BrokerScanNode extends LoadScanNode {
 
     private List<ParamCreateContext> paramCreateContexts;
 
+    // For broker load and external broker table
     public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
                           List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
         super(id, destTupleDesc, planNodeName);
         this.fileStatusesList = fileStatusesList;
         this.filesAdded = filesAdded;
+        if (ConnectContext.get() != null) {
+            this.userIdentity = ConnectContext.get().getUserIdentity();
+        }
     }
 
     @Override
@@ -181,22 +193,14 @@ public class BrokerScanNode extends LoadScanNode {
         return desc.getTable() == null;
     }
 
-    @Deprecated
-    public void setLoadInfo(Table targetTable,
-                            BrokerDesc brokerDesc,
-                            List<BrokerFileGroup> fileGroups) {
-        this.targetTable = targetTable;
-        this.brokerDesc = brokerDesc;
-        this.fileGroups = fileGroups;
-    }
-
     public void setLoadInfo(long loadJobId,
                             long txnId,
                             Table targetTable,
                             BrokerDesc brokerDesc,
                             List<BrokerFileGroup> fileGroups,
                             boolean strictMode,
-                            int loadParallelism) {
+                            int loadParallelism,
+                            UserIdentity userIdentity) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.targetTable = targetTable;
@@ -204,6 +208,7 @@ public class BrokerScanNode extends LoadScanNode {
         this.fileGroups = fileGroups;
         this.strictMode = strictMode;
         this.loadParallelism = loadParallelism;
+        this.userIdentity = userIdentity;
     }
 
     // Called from init, construct source tuple information
@@ -389,10 +394,21 @@ public class BrokerScanNode extends LoadScanNode {
     }
 
     private void assignBackends() throws UserException {
+        Set<Tag> tags = Sets.newHashSet();
+        if (userIdentity != null) {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(userIdentity.getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                throw new UserException("No valid resource tag for user: " + userIdentity.getQualifiedUser());
+            }
+        } else {
+            LOG.debug("user info in BrokerScanNode should not be null, add log to observer");
+        }
         backends = Lists.newArrayList();
+        // broker scan node is used for query or load
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable()
+                .addTags(tags).build();
         for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
-            // broker scan node is used for query or load
-            if (be.isQueryAvailable() && be.isLoadAvailable()) {
+            if (policy.isMatch(be)) {
                 backends.add(be);
             }
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 9a820ba1a1..8d46d74baa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
 import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -60,6 +61,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Verifications;
 
 import mockit.Expectations;
 import mockit.Injectable;
@@ -131,25 +138,25 @@ public class KafkaRoutineLoadJobTest {
         // 2 partitions, 1 be
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1);
         Assert.assertEquals(2, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 3 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2);
         Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 4 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3);
         Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 7 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4);
         Assert.assertEquals(6, routineLoadJob.calculateCurrentConcurrentTaskNum());
     }
@@ -164,7 +171,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
 
         new Expectations(catalog) {
             {
@@ -209,7 +216,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         long maxBatchIntervalS = 10;
         Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS);
         new Expectations() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8846515334..e791d11749 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
@@ -41,6 +42,12 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java_cup.runtime.Symbol;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 import java_cup.runtime.Symbol;
 import mockit.Expectations;
@@ -318,7 +325,7 @@ public class RoutineLoadJobTest {
     @Test
     public void testGetShowCreateInfo() throws UserException {
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", "test", 1,
-                11, "localhost:9092", "test_topic");
+                11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index f9d538fb1b..8cc5b80efb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
@@ -43,6 +44,7 @@ import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
 
@@ -58,6 +60,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 import mockit.Expectations;
 import mockit.Injectable;
@@ -100,7 +107,7 @@ public class RoutineLoadManagerTest {
         createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         new MockUp<KafkaRoutineLoadJob>() {
             @Mock
@@ -199,7 +206,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -207,7 +214,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
         dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
@@ -229,7 +236,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -245,7 +252,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
@@ -739,23 +746,26 @@ public class RoutineLoadManagerTest {
 
     @Test
     public void testCheckBeToTask(@Mocked Catalog catalog,
-                                  @Mocked SystemInfoService systemInfoService) throws LoadException {
+                                  @Mocked SystemInfoService systemInfoService) throws LoadException, DdlException {
         List<Long> beIdsInCluster = Lists.newArrayList();
         beIdsInCluster.add(1L);
         Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
         beIdToMaxConcurrentTasks.put(1L, 10);
         new Expectations() {
             {
-                systemInfoService.getClusterBackendIds("default", true);
+                systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
                 minTimes = 0;
                 result = beIdsInCluster;
             }
         };
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER,
+                10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN);
+        routineLoadManager.addRoutineLoadJob(job, "testdb");
         Config.max_routine_load_task_num_per_be = 10;
         Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
-        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
+        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default"));
     }
 
     @Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index f072804295..16c21fb611 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.routineload;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -76,7 +77,7 @@ public class RoutineLoadSchedulerTest {
         Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L,
-                "xxx", "test");
+                "xxx", "test", UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
         List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
         routineLoadJobList.add(kafkaRoutineLoadJob);
@@ -139,7 +140,7 @@ public class RoutineLoadSchedulerTest {
         };
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
-                "10.74.167.16:8092", "test");
+                "10.74.167.16:8092", "test", UserIdentity.ADMIN);
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
 
@@ -170,7 +171,7 @@ public class RoutineLoadSchedulerTest {
         executorService.submit(routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition",
-                "default_cluster", 1L, 1L, "xxx", "test_1");
+                "default_cluster", 1L, 1L, "xxx", "test_1", UserIdentity.ADMIN);
         List<Integer> customKafkaPartitions = new ArrayList<>();
         customKafkaPartitions.add(2);
         Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index abdb01f940..defc2fe1c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.FakeCatalog;
@@ -66,6 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import mockit.Injectable;
+import mockit.Mocked;
+
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -319,7 +323,7 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
 
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port",
-                "topic");
+                "topic", UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
@@ -388,7 +392,9 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
 
-        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
+        KafkaRoutineLoadJob routineLoadJob =
+                new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic",
+                        UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org