You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/26 00:42:15 UTC

[incubator-doris] branch master updated: [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c70359404 [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)
0c70359404 is described below

commit 0c70359404f7b4ecfb53ec17c994d0a3a6e5c08a
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu May 26 08:42:09 2022 +0800

    [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)
    
    This CL mainly changes:
    1. Broker Load
        When assigning backends, use user level resource tag to find available backends.
        If user level resource tag is not set, broker load task can be assigned to any BE node,
        otherwise, task can only be assigned to BE node which match the user level tags.
    
    2. Routine Load
        The current routine load job does not have user info, so it can not get user level tag when assigning tasks.
        So there are 2 ways:
        1. For old routine load job, use tags of replica allocation info to select BE nodes.
        2. For new routine load job, the user info will be added and persisted in routine load job.
---
 docs/en/admin-manual/multi-tenant.md               | 12 ++++
 docs/zh-CN/admin-manual/multi-tenant.md            | 14 +++-
 .../org/apache/doris/common/FeMetaVersion.java     |  6 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  8 ++-
 .../doris/load/routineload/RoutineLoadJob.java     | 27 +++++++-
 .../doris/load/routineload/RoutineLoadManager.java | 74 ++++++++++++++++++++--
 .../load/routineload/RoutineLoadTaskScheduler.java |  3 +-
 .../mysql/privilege/UserResourceProperty.java      | 31 ---------
 .../org/apache/doris/planner/BrokerScanNode.java   | 45 +++++++++----
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 26 ++++----
 .../doris/load/routineload/RoutineLoadJobTest.java | 16 +++--
 .../load/routineload/RoutineLoadManagerTest.java   | 32 ++++++----
 .../load/routineload/RoutineLoadSchedulerTest.java | 15 +++--
 .../transaction/GlobalTransactionMgrTest.java      | 13 ++--
 15 files changed, 223 insertions(+), 101 deletions(-)

diff --git a/docs/en/admin-manual/multi-tenant.md b/docs/en/admin-manual/multi-tenant.md
index fb121fa191..41663a1e4f 100644
--- a/docs/en/admin-manual/multi-tenant.md
+++ b/docs/en/admin-manual/multi-tenant.md
@@ -133,6 +133,18 @@ Node resource division refers to setting tags for BE nodes in a Doris cluster, a
     In this way, we have achieved physical resource isolation for different user queries by dividing nodes and restricting user resource usage. Furthermore, we can create different users for different business departments and restrict each user from using different resource groups. In order to avoid the use of resource interference between different business parts. For example, there is a business table in the cluster that needs to be shared by all 9 business departments, but it is hoped [...]
     
     On the other hand, for the isolation of online and offline tasks. We can use resource groups to achieve this. For example, we can divide nodes into two resource groups, Online and Offline. The table data is still stored in 3 copies, of which 2 copies are stored in the Online resource group, and 1 copy is stored in the Offline resource group. The Online resource group is mainly used for online data services with high concurrency and low latency. Some large queries or offline ETL opera [...]
+
+4. Resource group assignments for load job
+
+    The resource usage of load jobs (including insert, broker load, routine load, stream load, etc.) can be divided into two parts:
+    1. Computing resources: responsible for reading data sources, data transformation and distribution.
+    2. Write resource: responsible for data encoding, compression and writing to disk.
+
+    The write resource must be the node where the replica is located, and the computing resource can theoretically select any node to complete. Therefore, the allocation of resource groups for load jobs is divided into two steps:
+    1. Use user-level resource tags to limit the resource groups that computing resources can use.
+    2. Use the resource tag of the replica to limit the resource group that the write resource can use.
+
+    So if you want all the resources used by the load operation to be limited to the resource group where the data is located, you only need to set the resource tag of the user level to the same as the resource tag of the replica.
     
 ## Single query resource limit
 
diff --git a/docs/zh-CN/admin-manual/multi-tenant.md b/docs/zh-CN/admin-manual/multi-tenant.md
index dadabd1a5a..acc1775c0a 100644
--- a/docs/zh-CN/admin-manual/multi-tenant.md
+++ b/docs/zh-CN/admin-manual/multi-tenant.md
@@ -134,6 +134,18 @@ FE 不参与用户数据的处理计算等工作,因此是一个资源消耗
 
    另一方面,针对在线和离线任务的隔离。我们可以利用资源组的方式实现。比如我们可以将节点划分为 Online 和 Offline 两个资源组。表数据依然以3副本的方式存储,其中 2 个副本存放在 Online 资源组,1 个副本存放在 Offline 资源组。Online 资源组主要用于高并发低延迟的在线数据服务,而一些大查询或离线ETL操作,则可以使用 Offline 资源组中的节点执行。从而实现在统一集群内同时提供在线和离线服务的能力。
 
+4. 导入作业的资源组分配
+
+   导入作业(包括insert、broker load、routine load、stream load等)的资源使用可以分为两部分:
+   1. 计算资源:负责读取数据源、数据转换和分发。
+   2. 写入资源:负责数据编码、压缩并写入磁盘。
+
+   其中写入资源必须是数据副本所在的节点,而计算资源理论上可以选择任意节点完成。所以对于导入作业的资源组的分配分成两个步骤:
+   1. 使用用户级别的 resource tag 来限定计算资源所能使用的资源组。
+   2. 使用副本的 resource tag 来限定写入资源所能使用的资源组。
+
+   所以如果希望导入操作所使用的全部资源都限定在数据所在的资源组的话,只需将用户级别的 resource tag 设置为和副本的 resource tag 相同即可。
+
 ## 单查询资源限制
 
 前面提到的资源组方法是节点级别的资源隔离和限制。而在资源组内,依然可能发生资源抢占问题。比如前文提到的将3个业务部门安排在同一资源组内。虽然降低了资源竞争程度,但是这3个部门的查询依然有可能相互影响。
@@ -217,4 +229,4 @@ Tag 划分和 CPU 限制是 0.15 版本中的新功能。为了保证可以从
 
    等数据重分布完毕后。我们就可以开始设置用户的资源标签权限了。因为默认情况下,用户的 `resource_tags.location` 属性为空,即可以访问任意 Tag 的 BE。所以在前面步骤中,不会影响到已有用户的正常查询。当 `resource_tags.location` 属性非空时,用户将被限制访问指定 Tag 的 BE。
 
-通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。
\ No newline at end of file
+通过以上4步,我们可以较为平滑的在原有集群升级后,使用资源划分功能。
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ed107ec857..6158431607 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -38,8 +38,10 @@ public final class FeMetaVersion {
     public static final int VERSION_108 = 108;
     // add row policy
     public static final int VERSION_109 = 109;
-    // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_109;
+    // For routine load user info
+    public static final int VERSION_110 = 110;
+    // NOTE: when increment meta version, should assign the latest version to VERSION_CURRENT
+    public static final int VERSION_CURRENT = VERSION_110;
 
     // all logs meta version should >= the minimum version, so that we could remove many if clause, for example
     // if (FE_METAVERSION < VERSION_94) ...
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 78fa7a23b3..240edf92fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -123,7 +123,7 @@ public class LoadingTaskPlanner {
         // 1. Broker scan node
         BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
                 fileStatusesList, filesAdded);
-        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
+        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         if (Config.enable_vectorized_load) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8b9da7713d..f645ed93b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -102,8 +103,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     public KafkaRoutineLoadJob(Long id, String name, String clusterName,
-                               long dbId, long tableId, String brokerList, String topic) {
-        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
+                               long dbId, long tableId, String brokerList, String topic,
+                               UserIdentity userIdentity) {
+        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA, userIdentity);
         this.brokerList = brokerList;
         this.topic = topic;
         this.progress = new KafkaProgress();
@@ -393,7 +395,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         long id = Catalog.getCurrentCatalog().getNextId();
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
                 db.getClusterName(), db.getId(), tableId,
-                stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
+                stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo());
         kafkaRoutineLoadJob.setOptional(stmt);
         kafkaRoutineLoadJob.checkCustomProperties();
         kafkaRoutineLoadJob.checkCustomPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 4ee92ca025..34df317104 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -228,6 +229,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
     // because we can not serialize the Expressions contained in job.
     protected OriginStatement origStmt;
+    // User who submit this job. Maybe null for the old version job(before v1.1)
+    protected UserIdentity userIdentity;
 
     protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
@@ -249,13 +252,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     }
 
     public RoutineLoadJob(Long id, String name, String clusterName,
-            long dbId, long tableId, LoadDataSourceType dataSourceType) {
+                          long dbId, long tableId, LoadDataSourceType dataSourceType,
+                          UserIdentity userIdentity) {
         this(id, dataSourceType);
         this.name = name;
         this.clusterName = clusterName;
         this.dbId = dbId;
         this.tableId = tableId;
         this.authCode = 0;
+        this.userIdentity = userIdentity;
 
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
@@ -428,6 +433,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         return partitions;
     }
 
+    public UserIdentity getUserIdentity() {
+        return userIdentity;
+    }
+
     @Override
     public LoadTask.MergeType getMergeType() {
         return mergeType;
@@ -1531,6 +1540,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             Text.writeString(out, entry.getKey());
             Text.writeString(out, entry.getValue());
         }
+
+        if (userIdentity == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            userIdentity.write(out);
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -1606,6 +1622,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         } catch (Exception e) {
             throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_110) {
+            if (in.readBoolean()) {
+                userIdentity = UserIdentity.read(in);
+                userIdentity.setIsAnalyzed();
+            } else {
+                userIdentity = null;
+            }
+        }
     }
 
     abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index cb07031765..3a2e932bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -24,6 +24,11 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -38,15 +43,19 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -410,11 +419,8 @@ public class RoutineLoadManager implements Writable {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
-        List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
-        if (beIdsInCluster == null) {
-            throw new LoadException("The " + clusterName + " has been deleted");
-        }
+    public long getAvailableBeForTask(long jobId, long previousBeId, String clusterName) throws LoadException {
+        List<Long> availableBeIds = getAvailableBackendIds(jobId, clusterName);
 
         // check if be has idle slot
         readLock();
@@ -422,7 +428,7 @@ public class RoutineLoadManager implements Writable {
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
 
             // 1. Find if the given BE id has available slots
-            if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
+            if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
                 // get the previousBackend info
                 Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
                 // check previousBackend is not null && load available
@@ -446,7 +452,7 @@ public class RoutineLoadManager implements Writable {
             int idleTaskNum = 0;
             long resultBeId = -1L;
             int maxIdleSlotNum = 0;
-            for (Long beId : beIdsInCluster) {
+            for (Long beId : availableBeIds) {
                 if (beIdToMaxConcurrentTasks.containsKey(beId)) {
                     if (beIdToConcurrentTasks.containsKey(beId)) {
                         idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
@@ -467,6 +473,60 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    /**
+     * The routine load task can only be scheduled on backends which has proper resource tags.
+     * The tags should be got from user property.
+     * But in the old version, the routine load job does not have user info, so for compatibility,
+     * if there is no user info, we will get tags from replica allocation of the first partition of the table.
+     *
+     * @param jobId
+     * @param cluster
+     * @return
+     * @throws LoadException
+     */
+    private List<Long> getAvailableBackendIds(long jobId, String cluster) throws LoadException {
+        RoutineLoadJob job = getJob(jobId);
+        if (job == null) {
+            throw new LoadException("job " + jobId + " does not exist");
+        }
+        Set<Tag> tags;
+        if (job.getUserIdentity() == null) {
+            // For old job, there may be no user info. So we have to use tags from replica allocation
+            tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+        } else {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                // user may be dropped. Here we fall back to use replica tag
+                tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+            }
+        }
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
+                .addTags(tags).build();
+        return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+    }
+
+    private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
+        try {
+            Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
+            OlapTable tbl = db.getTableOrMetaException(tblId, Table.TableType.OLAP);
+            tbl.readLock();
+            try {
+                PartitionInfo partitionInfo = tbl.getPartitionInfo();
+                for (Partition partition : tbl.getPartitions()) {
+                    ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
+                    // just use the first one
+                    return replicaAlloc.getAllocMap().keySet();
+                }
+                // Should not run into here. Just make compiler happy.
+                return Sets.newHashSet();
+            } finally {
+                tbl.readUnlock();
+            }
+        } catch (MetaNotFoundException e) {
+            throw new LoadException(e.getMessage());
+        }
+    }
+
     public RoutineLoadJob getJob(long jobId) {
         return idToRoutineLoadJob.get(jobId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index ab608393b4..b01b196595 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -286,7 +286,8 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     // return true if allocate successfully. return false if failed.
     // throw exception if unrecoverable errors happen.
     private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
-        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
+        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
+                routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
         if (beId == -1L) {
             return false;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
deleted file mode 100644
index e4b609de95..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mysql.privilege;
-
-import org.apache.doris.common.io.Writable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class UserResourceProperty implements Writable {
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index f6f137f77f..a824050258 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
@@ -41,7 +42,11 @@ import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.mysql.privilege.UserProperty;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TBrokerRangeDesc;
@@ -60,6 +65,8 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -70,6 +77,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -115,6 +123,7 @@ public class BrokerScanNode extends LoadScanNode {
     protected List<BrokerFileGroup> fileGroups;
     private boolean strictMode = false;
     private int loadParallelism = 1;
+    private UserIdentity userIdentity;
 
     protected List<List<TBrokerFileStatus>> fileStatusesList;
     // file num
@@ -137,18 +146,26 @@ public class BrokerScanNode extends LoadScanNode {
 
     private List<ParamCreateContext> paramCreateContexts;
 
+    // For broker load and external broker table
     public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
                           List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
         super(id, destTupleDesc, planNodeName, NodeType.BROKER_SCAN_NODE);
         this.fileStatusesList = fileStatusesList;
         this.filesAdded = filesAdded;
+        if (ConnectContext.get() != null) {
+            this.userIdentity = ConnectContext.get().getUserIdentity();
+        }
     }
 
+    // For hive and iceberg scan node
     public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
                           List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded, NodeType nodeType) {
         super(id, destTupleDesc, planNodeName, nodeType);
         this.fileStatusesList = fileStatusesList;
         this.filesAdded = filesAdded;
+        if (ConnectContext.get() != null) {
+            this.userIdentity = ConnectContext.get().getUserIdentity();
+        }
     }
 
     @Override
@@ -189,22 +206,14 @@ public class BrokerScanNode extends LoadScanNode {
         return desc.getTable() == null;
     }
 
-    @Deprecated
-    public void setLoadInfo(Table targetTable,
-                            BrokerDesc brokerDesc,
-                            List<BrokerFileGroup> fileGroups) {
-        this.targetTable = targetTable;
-        this.brokerDesc = brokerDesc;
-        this.fileGroups = fileGroups;
-    }
-
     public void setLoadInfo(long loadJobId,
                             long txnId,
                             Table targetTable,
                             BrokerDesc brokerDesc,
                             List<BrokerFileGroup> fileGroups,
                             boolean strictMode,
-                            int loadParallelism) {
+                            int loadParallelism,
+                            UserIdentity userIdentity) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.targetTable = targetTable;
@@ -212,6 +221,7 @@ public class BrokerScanNode extends LoadScanNode {
         this.fileGroups = fileGroups;
         this.strictMode = strictMode;
         this.loadParallelism = loadParallelism;
+        this.userIdentity = userIdentity;
     }
 
     // Called from init, construct source tuple information
@@ -398,10 +408,21 @@ public class BrokerScanNode extends LoadScanNode {
     }
 
     private void assignBackends() throws UserException {
+        Set<Tag> tags = Sets.newHashSet();
+        if (userIdentity != null) {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(userIdentity.getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                throw new UserException("No valid resource tag for user: " + userIdentity.getQualifiedUser());
+            }
+        } else {
+            LOG.debug("user info in BrokerScanNode should not be null, add log to observer");
+        }
         backends = Lists.newArrayList();
+        // broker scan node is used for query or load
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable()
+                .addTags(tags).build();
         for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
-            // broker scan node is used for query or load
-            if (be.isQueryAvailable() && be.isLoadAvailable()) {
+            if (policy.isMatch(be)) {
                 backends.add(be);
             }
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index deac5298ec..436a93de7a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
 import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -47,12 +48,7 @@ import org.apache.doris.transaction.GlobalTransactionMgr;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import mockit.Verifications;
+
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -65,6 +61,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Verifications;
 
 public class KafkaRoutineLoadJobTest {
     private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class);
@@ -129,25 +131,25 @@ public class KafkaRoutineLoadJobTest {
         // 2 partitions, 1 be
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1);
         Assert.assertEquals(2, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 3 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2);
         Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 4 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3);
         Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 7 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4);
         Assert.assertEquals(6, routineLoadJob.calculateCurrentConcurrentTaskNum());
     }
@@ -162,7 +164,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
 
         new Expectations(catalog) {
             {
@@ -207,7 +209,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         long maxBatchIntervalS = 10;
         Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS);
         new Expectations() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 4fd5a03080..f78f5a563b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
@@ -34,18 +35,19 @@ import org.apache.doris.transaction.TransactionState;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java_cup.runtime.Symbol;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
+
 import org.apache.kafka.common.PartitionInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java_cup.runtime.Symbol;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 public class RoutineLoadJobTest {
 
@@ -316,7 +318,7 @@ public class RoutineLoadJobTest {
     @Test
     public void testGetShowCreateInfo() throws UserException {
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", "test", 1,
-                11, "localhost:9092", "test_topic");
+                11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 7b20ed28d0..402205fae0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
@@ -43,16 +44,13 @@ import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.junit.Assert;
@@ -62,6 +60,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 public class RoutineLoadManagerTest {
 
@@ -98,7 +101,7 @@ public class RoutineLoadManagerTest {
         createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         new MockUp<KafkaRoutineLoadJob>() {
             @Mock
@@ -197,7 +200,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -205,7 +208,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
         dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
@@ -227,7 +230,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -243,7 +246,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
@@ -737,23 +740,26 @@ public class RoutineLoadManagerTest {
 
     @Test
     public void testCheckBeToTask(@Mocked Catalog catalog,
-                                  @Mocked SystemInfoService systemInfoService) throws LoadException {
+                                  @Mocked SystemInfoService systemInfoService) throws LoadException, DdlException {
         List<Long> beIdsInCluster = Lists.newArrayList();
         beIdsInCluster.add(1L);
         Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
         beIdToMaxConcurrentTasks.put(1L, 10);
         new Expectations() {
             {
-                systemInfoService.getClusterBackendIds("default", true);
+                systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
                 minTimes = 0;
                 result = beIdsInCluster;
             }
         };
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER,
+                10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN);
+        routineLoadManager.addRoutineLoadJob(job, "testdb");
         Config.max_routine_load_task_num_per_be = 10;
         Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
-        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
+        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default"));
     }
 
     @Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index ee49368f02..97afc556af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.routineload;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -33,15 +34,17 @@ import org.apache.doris.thrift.TResourceInfo;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mocked;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
 
 public class RoutineLoadSchedulerTest {
 
@@ -73,7 +76,7 @@ public class RoutineLoadSchedulerTest {
         Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L,
-                "xxx", "test");
+                "xxx", "test", UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
         List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
         routineLoadJobList.add(kafkaRoutineLoadJob);
@@ -136,7 +139,7 @@ public class RoutineLoadSchedulerTest {
         };
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
-                "10.74.167.16:8092", "test");
+                "10.74.167.16:8092", "test", UserIdentity.ADMIN);
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
 
@@ -167,7 +170,7 @@ public class RoutineLoadSchedulerTest {
         executorService.submit(routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition",
-                "default_cluster", 1L, 1L, "xxx", "test_1");
+                "default_cluster", 1L, 1L, "xxx", "test_1", UserIdentity.ADMIN);
         List<Integer> customKafkaPartitions = new ArrayList<>();
         customKafkaPartitions.add(2);
         Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 39749d0965..06acd52dd6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.FakeCatalog;
@@ -55,8 +56,7 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import mockit.Injectable;
-import mockit.Mocked;
+
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.Assert;
 import org.junit.Before;
@@ -67,6 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import mockit.Injectable;
+import mockit.Mocked;
+
 
 public class GlobalTransactionMgrTest {
 
@@ -313,7 +316,7 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
 
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port",
-                "topic");
+                "topic", UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
@@ -382,7 +385,9 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
 
-        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
+        KafkaRoutineLoadJob routineLoadJob =
+                new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic",
+                        UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org