You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/16 15:52:27 UTC

[incubator-doris] branch master updated: [Enhance] Add profile for load job (#5052)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b640991  [Enhance] Add profile for load job (#5052)
b640991 is described below

commit b640991e43147b8659045cd0a976de047aaf4431
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Dec 16 23:52:10 2020 +0800

    [Enhance] Add profile for load job (#5052)
    
    Add viewable profile for broker load. Similar to the query profile,
    the user can submit the import job by setting the session variable is_report_success to true,
    and then view the running profile of the job on the FE web page for easy analysis and debugging.
---
 .../load-data/broker-load-manual.md                |  8 +++
 .../load-data/broker-load-manual.md                |  8 +++
 .../apache/doris/common/util/RuntimeProfile.java   | 19 +++++--
 .../apache/doris/load/loadv2/BrokerLoadJob.java    | 58 ++++++++++++++++++++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  | 25 +++++++++-
 .../doris/load/loadv2/BrokerLoadJobTest.java       | 35 +++++++------
 6 files changed, 127 insertions(+), 26 deletions(-)

diff --git a/docs/en/administrator-guide/load-data/broker-load-manual.md b/docs/en/administrator-guide/load-data/broker-load-manual.md
index 6275576..f6dd7e1 100644
--- a/docs/en/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/en/administrator-guide/load-data/broker-load-manual.md
@@ -456,6 +456,14 @@ We will only discuss the case of a single BE. If the user cluster has more than
 		
 		```
 
+### Performance analysis
+
+You can execute `set is_report_success=true` to open the load job profile before submitting the import job. After the import job is completed, you can view the profile of the import job in the `Queris` tab of the FE web page.
+
+This profile can help analyze the running status of the import job.
+
+Currently, the profile can only be viewed after the job is successfully executed.
+
 ### Complete examples
 
 Data situation: User data in HDFS, file address is hdfs://abc.com:8888/store_sales, HDFS authentication user name is root, password is password, data size is about 30G, hope to import into database bj_sales table store_sales.
diff --git a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
index ff6b0f0..33969fd 100644
--- a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
@@ -464,6 +464,14 @@ LoadFinishTime: 2019-07-27 11:50:16
         注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
         
         ```
+
+### 性能分析
+
+可以在提交 LOAD 作业前,先执行 `set is_report_success=true` 打开会话变量。然后提交导入作业。待导入作业完成后,可以在 FE 的 web 页面的 `Queris` 标签中查看到导入作业的 Profile。
+
+这个 Profile 可以帮助分析导入作业的运行状态。
+
+当前只有作业成功执行后,才能查看 Profile。
         
 ### 完整例子
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 8d10831..0eb78d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -24,16 +24,17 @@ import org.apache.doris.thrift.TRuntimeProfileNode;
 import org.apache.doris.thrift.TRuntimeProfileTree;
 import org.apache.doris.thrift.TUnit;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Formatter;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,7 +59,7 @@ public class RuntimeProfile {
     private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
 
     private Map<String, TreeSet<String>> childCounterMap = Maps.newHashMap();
-    private List<Pair<RuntimeProfile, Boolean>> childList = Lists.newArrayList();
+    private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList();
 
     private String name;
     
@@ -318,6 +319,16 @@ public class RuntimeProfile {
         this.childList.add(pair);
     }
 
+    public void addFirstChild(RuntimeProfile child) {
+        if (child == null) {
+            return;
+        }
+
+        this.childMap.put(child.name, child);
+        Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
+        this.childList.addFirst(pair);
+    }
+
     // Because the profile of summary and child fragment is not a real parent-child relationship
     // Each child profile needs to calculate the time proportion consumed by itself
     public void computeTimeInChildProfile() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index cba085f..aeae05b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -29,13 +29,18 @@ import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.ProfileManager;
+import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TUniqueId;
@@ -44,12 +49,12 @@ import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -65,7 +70,12 @@ public class BrokerLoadJob extends BulkLoadJob {
 
     private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
 
-    // only for log replay
+    // Profile of this load job, including all tasks' profiles
+    private RuntimeProfile jobProfile;
+    // If set to true, the profile of load job with be pushed to ProfileManager
+    private boolean isReportSuccess = false;
+
+    // for log replay and unit test
     public BrokerLoadJob() {
         super();
         this.jobType = EtlJobType.BROKER;
@@ -78,6 +88,9 @@ public class BrokerLoadJob extends BulkLoadJob {
         this.timeoutSecond = Config.broker_load_default_timeout_second;
         this.brokerDesc = brokerDesc;
         this.jobType = EtlJobType.BROKER;
+        if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isReportSucc()) {
+            isReportSuccess = true;
+        }
     }
 
     @Override
@@ -173,6 +186,7 @@ public class BrokerLoadJob extends BulkLoadJob {
     private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
         // divide job into broker loading task by table
         List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
+        this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label);
         db.readLock();
         try {
             for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
@@ -193,7 +207,8 @@ public class BrokerLoadJob extends BulkLoadJob {
                 // Generate loading task and init the plan of task
                 LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
                         brokerFileGroups, getDeadlineMs(), execMemLimit,
-                        strictMode, transactionId, this, timezone, timeoutSecond);
+                        strictMode, transactionId, this, timezone, timeoutSecond,
+                        isReportSuccess ? jobProfile : null);
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                 task.init(loadId, attachment.getFileStatusByTable(aggKey),
@@ -300,6 +315,32 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
     }
 
+    private void writeProfile() {
+        if (!isReportSuccess) {
+            return;
+        }
+
+        RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
+        summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(id));
+        summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp));
+        summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp));
+        summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp));
+
+        summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
+        summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A");
+        summaryProfile.addInfoString(ProfileManager.USER, "N/A");
+        summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, "N/A");
+        summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, "N/A");
+        summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A");
+
+        // Add the summary profile to the first
+        jobProfile.addFirstChild(summaryProfile);
+        jobProfile.computeTimeInChildProfile();
+        StringBuilder builder = new StringBuilder();
+        jobProfile.prettyPrint(builder, "");
+        ProfileManager.getInstance().pushProfile(jobProfile);
+    }
+
     private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
         loadingStatus.replaceCounter(DPP_ABNORMAL_ALL,
                                      increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
@@ -327,4 +368,11 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
         return String.valueOf(value);
     }
+
+    @Override
+    public void afterVisible(TransactionState txnState, boolean txnOperated) {
+        super.afterVisible(txnState, txnOperated);
+        writeProfile();
+    }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 33f0049..0e288ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -27,6 +27,8 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.Coordinator;
@@ -64,11 +66,15 @@ public class LoadLoadingTask extends LoadTask {
 
     private LoadingTaskPlanner planner;
 
+    private RuntimeProfile jobProfile;
+    private RuntimeProfile profile;
+    private long beginTime;
+
     public LoadLoadingTask(Database db, OlapTable table,
                            BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
                            long jobDeadlineMs, long execMemLimit, boolean strictMode,
                            long txnId, LoadTaskCallback callback, String timezone,
-                           long timeoutS) {
+                           long timeoutS, RuntimeProfile profile) {
         super(callback, TaskType.LOADING);
         this.db = db;
         this.table = table;
@@ -82,6 +88,7 @@ public class LoadLoadingTask extends LoadTask {
         this.retryTime = 2; // 2 times is enough
         this.timezone = timezone;
         this.timeoutS = timeoutS;
+        this.jobProfile = profile;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
@@ -100,6 +107,7 @@ public class LoadLoadingTask extends LoadTask {
         LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}",
                 DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
         retryTime--;
+        beginTime = System.nanoTime();
         executeOnce();
     }
 
@@ -148,6 +156,8 @@ public class LoadLoadingTask extends LoadTask {
                                                              curCoordinator.getLoadCounters(),
                                                              curCoordinator.getTrackingUrl(),
                                                              TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
+                // Create profile of this task and add to the job profile.
+                createProfile(curCoordinator);
             } else {
                 throw new LoadException(status.getErrorMsg());
             }
@@ -160,6 +170,19 @@ public class LoadLoadingTask extends LoadTask {
         return jobDeadlineMs - System.currentTimeMillis();
     }
 
+    public void createProfile(Coordinator coord) {
+        if (jobProfile == null) {
+            // No need to gather profile
+            return;
+        }
+        // Summary profile
+        profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId));
+        coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime));
+        coord.endProfile();
+        profile.addChild(coord.getQueryProfile());
+        jobProfile.addChild(profile);
+    }
+
     @Override
     public void updateRetryInfo() {
         super.updateRetryInfo();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 10a5732..c23cf95 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -17,22 +17,23 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.LoadStmt;
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.BrokerTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
@@ -45,29 +46,29 @@ import org.apache.doris.planner.BrokerScanNode;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.task.MasterTaskExecutor;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
-import org.apache.doris.thrift.TUniqueId;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 public class BrokerLoadJobTest {
 
@@ -358,8 +359,10 @@ public class BrokerLoadJobTest {
         fileGroups.add(brokerFileGroup);
         UUID uuid = UUID.randomUUID();
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+        RuntimeProfile jobProfile = new RuntimeProfile("test");
         LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
-                100, 100,false, 100, callback, "", 100);
+                100, 100, false, 100, callback, "", 100,
+                jobProfile);
         try {
             UserIdentity userInfo = new UserIdentity("root", "localhost");
             userInfo.setIsAnalyzed();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org