You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/16 15:52:27 UTC
[incubator-doris] branch master updated: [Enhance] Add profile for
load job (#5052)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b640991 [Enhance] Add profile for load job (#5052)
b640991 is described below
commit b640991e43147b8659045cd0a976de047aaf4431
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Dec 16 23:52:10 2020 +0800
[Enhance] Add profile for load job (#5052)
Add viewable profile for broker load. Similar to the query profile,
the user can submit the import job by setting the session variable is_report_success to true,
and then view the running profile of the job on the FE web page for easy analysis and debugging.
---
.../load-data/broker-load-manual.md | 8 +++
.../load-data/broker-load-manual.md | 8 +++
.../apache/doris/common/util/RuntimeProfile.java | 19 +++++--
.../apache/doris/load/loadv2/BrokerLoadJob.java | 58 ++++++++++++++++++++--
.../apache/doris/load/loadv2/LoadLoadingTask.java | 25 +++++++++-
.../doris/load/loadv2/BrokerLoadJobTest.java | 35 +++++++------
6 files changed, 127 insertions(+), 26 deletions(-)
diff --git a/docs/en/administrator-guide/load-data/broker-load-manual.md b/docs/en/administrator-guide/load-data/broker-load-manual.md
index 6275576..f6dd7e1 100644
--- a/docs/en/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/en/administrator-guide/load-data/broker-load-manual.md
@@ -456,6 +456,14 @@ We will only discuss the case of a single BE. If the user cluster has more than
```
+### Performance analysis
+
+You can execute `set is_report_success=true` to open the load job profile before submitting the import job. After the import job is completed, you can view the profile of the import job in the `Queris` tab of the FE web page.
+
+This profile can help analyze the running status of the import job.
+
+Currently, the profile can only be viewed after the job is successfully executed.
+
### Complete examples
Data situation: User data in HDFS, file address is hdfs://abc.com:8888/store_sales, HDFS authentication user name is root, password is password, data size is about 30G, hope to import into database bj_sales table store_sales.
diff --git a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
index ff6b0f0..33969fd 100644
--- a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md
@@ -464,6 +464,14 @@ LoadFinishTime: 2019-07-27 11:50:16
注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
```
+
+### 性能分析
+
+可以在提交 LOAD 作业前,先执行 `set is_report_success=true` 打开会话变量。然后提交导入作业。待导入作业完成后,可以在 FE 的 web 页面的 `Queris` 标签中查看到导入作业的 Profile。
+
+这个 Profile 可以帮助分析导入作业的运行状态。
+
+当前只有作业成功执行后,才能查看 Profile。
### 完整例子
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 8d10831..0eb78d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -24,16 +24,17 @@ import org.apache.doris.thrift.TRuntimeProfileNode;
import org.apache.doris.thrift.TRuntimeProfileTree;
import org.apache.doris.thrift.TUnit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.Collections;
import java.util.Comparator;
import java.util.Formatter;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,7 +59,7 @@ public class RuntimeProfile {
private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
private Map<String, TreeSet<String>> childCounterMap = Maps.newHashMap();
- private List<Pair<RuntimeProfile, Boolean>> childList = Lists.newArrayList();
+ private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList();
private String name;
@@ -318,6 +319,16 @@ public class RuntimeProfile {
this.childList.add(pair);
}
+ public void addFirstChild(RuntimeProfile child) {
+ if (child == null) {
+ return;
+ }
+
+ this.childMap.put(child.name, child);
+ Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
+ this.childList.addFirst(pair);
+ }
+
// Because the profile of summary and child fragment is not a real parent-child relationship
// Each child profile needs to calculate the time proportion consumed by itself
public void computeTimeInChildProfile() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index cba085f..aeae05b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -29,13 +29,18 @@ import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.ProfileManager;
+import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
@@ -44,12 +49,12 @@ import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -65,7 +70,12 @@ public class BrokerLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
- // only for log replay
+ // Profile of this load job, including all tasks' profiles
+ private RuntimeProfile jobProfile;
+ // If set to true, the profile of load job with be pushed to ProfileManager
+ private boolean isReportSuccess = false;
+
+ // for log replay and unit test
public BrokerLoadJob() {
super();
this.jobType = EtlJobType.BROKER;
@@ -78,6 +88,9 @@ public class BrokerLoadJob extends BulkLoadJob {
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
this.jobType = EtlJobType.BROKER;
+ if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isReportSucc()) {
+ isReportSuccess = true;
+ }
}
@Override
@@ -173,6 +186,7 @@ public class BrokerLoadJob extends BulkLoadJob {
private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
// divide job into broker loading task by table
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
+ this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label);
db.readLock();
try {
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
@@ -193,7 +207,8 @@ public class BrokerLoadJob extends BulkLoadJob {
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), execMemLimit,
- strictMode, transactionId, this, timezone, timeoutSecond);
+ strictMode, transactionId, this, timezone, timeoutSecond,
+ isReportSuccess ? jobProfile : null);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey),
@@ -300,6 +315,32 @@ public class BrokerLoadJob extends BulkLoadJob {
}
}
+ private void writeProfile() {
+ if (!isReportSuccess) {
+ return;
+ }
+
+ RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
+ summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(id));
+ summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp));
+ summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp));
+ summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp));
+
+ summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
+ summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A");
+ summaryProfile.addInfoString(ProfileManager.USER, "N/A");
+ summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, "N/A");
+ summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, "N/A");
+ summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A");
+
+ // Add the summary profile to the first
+ jobProfile.addFirstChild(summaryProfile);
+ jobProfile.computeTimeInChildProfile();
+ StringBuilder builder = new StringBuilder();
+ jobProfile.prettyPrint(builder, "");
+ ProfileManager.getInstance().pushProfile(jobProfile);
+ }
+
private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
loadingStatus.replaceCounter(DPP_ABNORMAL_ALL,
increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
@@ -327,4 +368,11 @@ public class BrokerLoadJob extends BulkLoadJob {
}
return String.valueOf(value);
}
+
+ @Override
+ public void afterVisible(TransactionState txnState, boolean txnOperated) {
+ super.afterVisible(txnState, txnOperated);
+ writeProfile();
+ }
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 33f0049..0e288ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -27,6 +27,8 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.Coordinator;
@@ -64,11 +66,15 @@ public class LoadLoadingTask extends LoadTask {
private LoadingTaskPlanner planner;
+ private RuntimeProfile jobProfile;
+ private RuntimeProfile profile;
+ private long beginTime;
+
public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode,
long txnId, LoadTaskCallback callback, String timezone,
- long timeoutS) {
+ long timeoutS, RuntimeProfile profile) {
super(callback, TaskType.LOADING);
this.db = db;
this.table = table;
@@ -82,6 +88,7 @@ public class LoadLoadingTask extends LoadTask {
this.retryTime = 2; // 2 times is enough
this.timezone = timezone;
this.timeoutS = timeoutS;
+ this.jobProfile = profile;
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
@@ -100,6 +107,7 @@ public class LoadLoadingTask extends LoadTask {
LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}",
DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
retryTime--;
+ beginTime = System.nanoTime();
executeOnce();
}
@@ -148,6 +156,8 @@ public class LoadLoadingTask extends LoadTask {
curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl(),
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
+ // Create profile of this task and add to the job profile.
+ createProfile(curCoordinator);
} else {
throw new LoadException(status.getErrorMsg());
}
@@ -160,6 +170,19 @@ public class LoadLoadingTask extends LoadTask {
return jobDeadlineMs - System.currentTimeMillis();
}
+ public void createProfile(Coordinator coord) {
+ if (jobProfile == null) {
+ // No need to gather profile
+ return;
+ }
+ // Summary profile
+ profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId));
+ coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime));
+ coord.endProfile();
+ profile.addChild(coord.getQueryProfile());
+ jobProfile.addChild(profile);
+ }
+
@Override
public void updateRetryInfo() {
super.updateRetryInfo();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 10a5732..c23cf95 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -17,22 +17,23 @@
package org.apache.doris.load.loadv2;
+import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
-import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.BrokerTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
@@ -45,29 +46,29 @@ import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.task.MasterTaskExecutor;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
-import org.apache.doris.thrift.TUniqueId;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
public class BrokerLoadJobTest {
@@ -358,8 +359,10 @@ public class BrokerLoadJobTest {
fileGroups.add(brokerFileGroup);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+ RuntimeProfile jobProfile = new RuntimeProfile("test");
LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
- 100, 100,false, 100, callback, "", 100);
+ 100, 100, false, 100, callback, "", 100,
+ jobProfile);
try {
UserIdentity userInfo = new UserIdentity("root", "localhost");
userInfo.setIsAnalyzed();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org