You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/11/18 13:52:11 UTC

[incubator-doris] branch master updated: [New Feature]Support udf when loading data (#4863)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec9da30  [New Feature]Support udf  when loading data (#4863)
ec9da30 is described below

commit ec9da30c9cad4d552907b83e0b80029918939468
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Wed Nov 18 21:51:59 2020 +0800

    [New Feature]Support udf  when loading data (#4863)
    
    Many time, our users want to use UDFs they developed to ETL the data
    when loading the data into Doris.
    But currently, broker load does not support to use UDF.
    As UDF belongs to a database, it needs to check whether it has the SELECT permission of the database.
    This patch try to solve this problem.
---
 .../org/apache/doris/analysis/StatementBase.java   | 10 ++++
 .../org/apache/doris/common/FeMetaVersion.java     |  4 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  9 ++--
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  | 23 +++++++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  6 ++-
 .../doris/load/loadv2/LoadingTaskPlanner.java      | 22 +++++----
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  6 ++-
 .../java/org/apache/doris/qe/ConnectProcessor.java |  1 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  1 +
 .../doris/load/loadv2/BrokerLoadJobTest.java       | 56 +++++++++++++++++++++-
 .../apache/doris/load/loadv2/SparkLoadJobTest.java |  7 +--
 .../org/apache/doris/qe/ConnectProcessorTest.java  | 19 ++++++++
 .../java/org/apache/doris/qe/StmtExecutorTest.java | 11 +++++
 13 files changed, 149 insertions(+), 26 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java
index 0ea0aa9..4fb5f64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java
@@ -51,6 +51,8 @@ public abstract class StatementBase implements ParseNode {
 
     private OriginStatement origStmt;
 
+    private UserIdentity userInfo;
+
     protected StatementBase() { }
 
     /**
@@ -170,6 +172,14 @@ public abstract class StatementBase implements ParseNode {
         return origStmt;
     }
 
+    public UserIdentity getUserInfo() {
+        return userInfo;
+    }
+
+    public void setUserInfo(UserIdentity userInfo) {
+        this.userInfo = userInfo;
+    }
+
     /**
      * Resets the internal analysis state of this node.
      * For easier maintenance, class members that need to be reset are grouped into
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 0eaf61e..e3ea090 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -196,6 +196,8 @@ public final class FeMetaVersion {
     public static final int VERSION_91 = 91;
     // for mysql external table support resource
     public static final int VERSION_92 = 92;
+    //jira: 4863 for load job support udf
+    public static final int VERSION_93 = 93;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_92;
+    public static final int VERSION_CURRENT = VERSION_93;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 719a4fe..cba085f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -70,9 +71,10 @@ public class BrokerLoadJob extends BulkLoadJob {
         this.jobType = EtlJobType.BROKER;
     }
 
-    public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
+    public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
+                         OriginStatement originStmt, UserIdentity userInfo)
             throws MetaNotFoundException {
-        super(dbId, label, originStmt);
+        super(dbId, label, originStmt, userInfo);
         this.timeoutSecond = Config.broker_load_default_timeout_second;
         this.brokerDesc = brokerDesc;
         this.jobType = EtlJobType.BROKER;
@@ -194,7 +196,8 @@ public class BrokerLoadJob extends BulkLoadJob {
                         strictMode, transactionId, this, timezone, timeoutSecond);
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
-                task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey));
+                task.init(loadId, attachment.getFileStatusByTable(aggKey),
+                        attachment.getFileNumByTable(aggKey), getUserInfo());
                 idToTasks.put(task.getSignature(), task);
                 // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks.
                 // use newLoadingTasks to save new created loading tasks and submit them later.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index c3a6ba5..8ad3c53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
@@ -74,6 +75,8 @@ public abstract class BulkLoadJob extends LoadJob {
     // the expr of columns will be reanalyze when the log is replayed
     private OriginStatement originStmt;
 
+    private UserIdentity userInfo;
+
     // include broker desc and data desc
     protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
     protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();
@@ -87,10 +90,11 @@ public abstract class BulkLoadJob extends LoadJob {
         super();
     }
 
-    public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException {
+    public BulkLoadJob(long dbId, String label, OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException {
         super(dbId, label);
         this.originStmt = originStmt;
         this.authorizationInfo = gatherAuthInfo();
+        this.userInfo = userInfo;
 
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
@@ -114,11 +118,11 @@ public abstract class BulkLoadJob extends LoadJob {
             switch (stmt.getEtlJobType()) {
                 case BROKER:
                     bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
-                                                    stmt.getBrokerDesc(), stmt.getOrigStmt());
+                                                    stmt.getBrokerDesc(), stmt.getOrigStmt(), stmt.getUserInfo());
                     break;
                 case SPARK:
                     bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(),
-                                                   stmt.getResourceDesc(), stmt.getOrigStmt());
+                                                   stmt.getResourceDesc(), stmt.getOrigStmt(), stmt.getUserInfo());
                     break;
                 case MINI:
                 case DELETE:
@@ -304,6 +308,7 @@ public abstract class BulkLoadJob extends LoadJob {
         super.write(out);
         brokerDesc.write(out);
         originStmt.write(out);
+        userInfo.write(out);
 
         out.writeInt(sessionVariables.size());
         for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
@@ -333,6 +338,11 @@ public abstract class BulkLoadJob extends LoadJob {
         // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
         // The origin stmt will be analyzed after the replay is completed.
 
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_93) {
+            userInfo = UserIdentity.read(in);
+        } else {
+            userInfo = new UserIdentity("","");
+        }
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
             int size = in.readInt();
             for (int i = 0; i < size; i++) {
@@ -346,4 +356,11 @@ public abstract class BulkLoadJob extends LoadJob {
         }
     }
 
+    public UserIdentity getUserInfo() {
+        return userInfo;
+    }
+
+    public void setUserInfo(UserIdentity userInfo) {
+        this.userInfo = userInfo;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 85c939f..33f0049 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.LoadException;
@@ -83,9 +84,10 @@ public class LoadLoadingTask extends LoadTask {
         this.timeoutS = timeoutS;
     }
 
-    public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
+    public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
         this.loadId = loadId;
-        planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone, this.timeoutS);
+        planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table,
+                brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, userInfo);
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index ab46d8c..c728701 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
@@ -32,6 +33,7 @@ import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.BrokerScanNode;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.OlapTableSink;
@@ -65,7 +67,7 @@ public class LoadingTaskPlanner {
     private final List<BrokerFileGroup> fileGroups;
     private final boolean strictMode;
     private final long timeoutS;    // timeout of load job, in second
-
+    private UserIdentity userInfo;
     // Something useful
     // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
     private Analyzer analyzer = new Analyzer(Catalog.getCurrentCatalog(), new ConnectContext());
@@ -79,7 +81,8 @@ public class LoadingTaskPlanner {
 
     public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
                               BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
-                              boolean strictMode, String timezone, long timeoutS) {
+                              boolean strictMode, String timezone, long timeoutS,
+                              UserIdentity userInfo) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.dbId = dbId;
@@ -89,14 +92,13 @@ public class LoadingTaskPlanner {
         this.strictMode = strictMode;
         this.analyzer.setTimezone(timezone);
         this.timeoutS = timeoutS;
-
-        /*
-         * TODO(cmy): UDF currently belongs to a database. Therefore, before using UDF,
-         * we need to check whether the user has corresponding permissions on this database.
-         * But here we have lost user information and therefore cannot check permissions.
-         * So here we first prohibit users from using UDF in load. If necessary, improve it later.
-         */
-        this.analyzer.setUDFAllowed(false);
+        this.userInfo = userInfo;
+        if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo,
+                Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) {
+            this.analyzer.setUDFAllowed(true);
+        } else {
+            this.analyzer.setUDFAllowed(false);
+        }
     }
 
     public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 6ee5d87..41da412 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.ResourceDesc;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -146,9 +147,10 @@ public class SparkLoadJob extends BulkLoadJob {
         jobType = EtlJobType.SPARK;
     }
 
-    public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt)
+    public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc,
+                        OriginStatement originStmt, UserIdentity userInfo)
             throws MetaNotFoundException {
-        super(dbId, label, originStmt);
+        super(dbId, label, originStmt, userInfo);
         this.resourceDesc = resourceDesc;
         timeoutSecond = Config.spark_load_default_timeout_second;
         jobType = EtlJobType.SPARK;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index b7a9059..e2908c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -186,6 +186,7 @@ public class ConnectProcessor {
                 }
                 parsedStmt = stmts.get(i);
                 parsedStmt.setOrigStmt(new OriginStatement(originStmt, i));
+                parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
                 executor = new StmtExecutor(ctx, parsedStmt);
                 ctx.setExecutor(executor);
                 executor.execute();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index a2db0bf..0b2338f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -406,6 +406,7 @@ public class StmtExecutor {
             try {
                 parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx);
                 parsedStmt.setOrigStmt(originStmt);
+                parsedStmt.setUserInfo(context.getCurrentUserIdentity());
             } catch (Error e) {
                 LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
                 throw new AnalysisException("sql parsing error, please check your sql");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 4b0286c..10a5732 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -19,14 +19,17 @@ package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DataDescription;
-import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.LoadStmt;
-import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.jmockit.Deencapsulation;
@@ -38,6 +41,9 @@ import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.Source;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.planner.BrokerScanNode;
+import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.transaction.TransactionState;
 
@@ -52,12 +58,16 @@ import org.junit.Test;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.UUID;
 
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
+import org.apache.doris.thrift.TUniqueId;
 
 public class BrokerLoadJobTest {
 
@@ -324,6 +334,48 @@ public class BrokerLoadJobTest {
     }
 
     @Test
+    public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment,
+                                          @Mocked Catalog catalog,
+                                          @Injectable BrokerDesc brokerDesc,
+                                          @Injectable LoadTaskCallback callback,
+                                          @Injectable Database database,
+                                          @Injectable FileGroupAggKey aggKey,
+                                          @Mocked OlapTable olapTable,
+                                          @Mocked PlanFragment sinkFragment,
+                                          @Mocked OlapTableSink olapTableSink,
+                                          @Mocked BrokerScanNode scanNode) throws Exception{
+        List<Column> schema = new ArrayList<>();
+        schema.add(new Column("a", PrimitiveType.BIGINT));
+        Map<String, String> properties = new HashMap<>();
+        properties.put("broker_name", "test");
+        properties.put("path", "hdfs://www.test.com");
+        BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties);
+        BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable);
+        List<Long> partitionIds = new ArrayList<>();
+        partitionIds.add(123L);
+        Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds);
+        List<BrokerFileGroup> fileGroups = Lists.newArrayList();
+        fileGroups.add(brokerFileGroup);
+        UUID uuid = UUID.randomUUID();
+        TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+        LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
+                100, 100,false, 100, callback, "", 100);
+        try {
+            UserIdentity userInfo = new UserIdentity("root", "localhost");
+            userInfo.setIsAnalyzed();
+            task.init(loadId,
+                    attachment.getFileStatusByTable(aggKey),
+                    attachment.getFileNumByTable(aggKey),
+                    userInfo);
+            LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner");
+            Analyzer al = Deencapsulation.getField(planner, "analyzer");
+            Assert.assertFalse(al.isUDFAllowed());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
     public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment,
                                                             @Injectable LoadTask loadTask1,
                                                             @Injectable LoadTask loadTask2) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index c3c64d1..6cbd24d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -218,7 +219,7 @@ public class SparkLoadJobTest {
         };
 
         ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
-        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0));
+        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
         job.execute();
 
         // check transaction id and id to tasks
@@ -229,7 +230,7 @@ public class SparkLoadJobTest {
     @Test
     public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException {
         ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
-        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0));
+        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
         SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
         attachment.setAppId(appId);
         attachment.setOutputPath(etlOutputPath);
@@ -248,7 +249,7 @@ public class SparkLoadJobTest {
         sparkConfigs.put("spark.master", "yarn");
         sparkConfigs.put("spark.submit.deployMode", "cluster");
         sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
-        SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0));
+        SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
         job.state = JobState.ETL;
         job.maxFilterRatio = 0.15;
         job.transactionId = transactionId;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index 678751b..11bf6f2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -313,6 +313,25 @@ public class ConnectProcessorTest {
     }
 
     @Test
+    public void testQueryWithUserInfo(@Mocked StmtExecutor executor) throws Exception {
+        ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
+
+        ConnectProcessor processor = new ConnectProcessor(ctx);
+
+        // Mock statement executor
+        new Expectations() {
+            {
+                executor.getQueryStatisticsForAuditLog();
+                minTimes = 0;
+                result = statistics;
+            }
+        };
+        processor.processOnce();
+        StmtExecutor er = Deencapsulation.getField(processor, "executor");
+        Assert.assertTrue(er.getParsedStmt().getUserInfo() != null);
+    }
+
+    @Test
     public void testQueryFail(@Mocked StmtExecutor executor) throws Exception {
         ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 26191d7..4c09fdb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.analysis.SetStmt;
 import org.apache.doris.analysis.ShowAuthorStmt;
 import org.apache.doris.analysis.ShowStmt;
 import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.UseStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.DdlException;
@@ -528,6 +529,16 @@ public class StmtExecutorTest {
     }
 
     @Test
+    public void testStmtWithUserInfo(@Mocked StatementBase stmt, @Mocked ConnectContext context) throws Exception {
+        StmtExecutor stmtExecutor = new StmtExecutor(ctx, stmt);
+        Deencapsulation.setField(stmtExecutor, "parsedStmt", null);
+        Deencapsulation.setField(stmtExecutor, "originStmt", new OriginStatement("show databases;", 1));
+        stmtExecutor.execute();
+        StatementBase newstmt = (StatementBase)Deencapsulation.getField(stmtExecutor, "parsedStmt");
+        Assert.assertTrue(newstmt.getUserInfo() != null);
+    }
+
+    @Test
     public void testSetFail(@Mocked SetStmt setStmt, @Mocked SqlParser parser, @Mocked SetExecutor executor) throws Exception {
         new Expectations() {
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org