You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/27 05:37:44 UTC
[incubator-doris] branch master updated: [Profile] Support show
load profile for broker load job (#6214)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f26e340 [Profile] Support show load profile for broker load job (#6214)
f26e340 is described below
commit f26e3408b23314db4bf627a8df43795ae04271d4
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Jul 27 13:37:34 2021 +0800
[Profile] Support show load profile for broker load job (#6214)
1.
Add new statement:
`SHOW LOAD PROFILE "xxx";`
2.
Improve the read performance of orc scanner
---
be/src/common/config.h | 1 +
be/src/exec/base_scanner.h | 3 +
be/src/exec/broker_scanner.cpp | 6 +-
be/src/exec/buffered_reader.cpp | 27 +++-
be/src/exec/buffered_reader.h | 17 ++-
be/src/exec/json_scanner.cpp | 3 +-
be/src/exec/orc_scanner.cpp | 4 +-
be/src/exec/parquet_scanner.cpp | 4 +-
be/test/exec/buffered_reader_test.cpp | 12 +-
fe/fe-core/src/main/cup/sql_parser.cup | 4 +
.../apache/doris/analysis/ShowLoadProfileStmt.java | 141 +++++++++++++++++++++
.../doris/analysis/ShowQueryProfileStmt.java | 10 +-
.../common/profile/MultiProfileTreeBuilder.java | 139 ++++++++++++++++++++
.../doris/common/profile/PlanTreeBuilder.java | 4 +-
.../doris/common/profile/ProfileTreeBuilder.java | 64 +++++-----
.../apache/doris/common/util/ProfileManager.java | 77 ++++++-----
.../apache/doris/load/loadv2/BrokerLoadJob.java | 2 -
.../apache/doris/load/loadv2/LoadLoadingTask.java | 7 +-
.../org/apache/doris/planner/DataPartition.java | 3 +
.../java/org/apache/doris/planner/PlanNode.java | 11 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 64 +++++++++-
.../org/apache/doris/task/ExportExportingTask.java | 2 +-
22 files changed, 502 insertions(+), 103 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bec0b9e..970ff46 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -618,6 +618,7 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
// else we will call sync method
CONF_mBool(runtime_filter_use_async_rpc, "true");
+
} // namespace config
} // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 48cef80..3d3702a 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -33,6 +33,9 @@ class MemTracker;
class RuntimeState;
class ExprContext;
+// The counter will be passed to each scanner.
+// Note that this struct is not thread safe.
+// So if we support concurrent scan in the future, we need to modify this struct.
struct ScannerCounter {
ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 058b3e8..91b2407 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -170,8 +170,7 @@ Status BrokerScanner::open_file_reader() {
case TFileType::FILE_HDFS: {
#if defined(__x86_64__)
BufferedReader* file_reader =
- new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset),
- config::remote_storage_read_buffer_mb * 1024 * 1024);
+ new BufferedReader(_profile, new HdfsFileReader(range.hdfs_params, range.path, start_offset));
RETURN_IF_ERROR(file_reader->open());
_cur_file_reader = file_reader;
break;
@@ -189,8 +188,7 @@ Status BrokerScanner::open_file_reader() {
}
case TFileType::FILE_S3: {
BufferedReader* s3_reader =
- new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
- config::remote_storage_read_buffer_mb * 1024 * 1024);
+ new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset));
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp
index a86044d..5c599d4 100644
--- a/be/src/exec/buffered_reader.cpp
+++ b/be/src/exec/buffered_reader.cpp
@@ -20,17 +20,22 @@
#include <algorithm>
#include <sstream>
+#include "common/config.h"
#include "common/logging.h"
namespace doris {
// buffered reader
-BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
- : _reader(reader),
+BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size)
+ : _profile(profile),
+ _reader(reader),
_buffer_size(buffer_size),
_buffer_offset(0),
_buffer_limit(0),
_cur_offset(0) {
+ if (_buffer_size == -1L) {
+ _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
+ }
_buffer = new char[_buffer_size];
// set the _cur_offset of this reader as same as the inner reader's,
// to make sure the buffer reader will start to read at right position.
@@ -47,6 +52,14 @@ Status BufferedReader::open() {
ss << "Open buffered reader failed, reader is null";
return Status::InternalError(ss.str());
}
+
+ // the macro ADD_XXX is idempotent.
+ // So although each scanner calls the ADD_XXX method, they all use the same counters.
+ _read_timer = ADD_TIMER(_profile, "FileReadTime");
+ _remote_read_timer = ADD_CHILD_TIMER(_profile, "FileRemoteReadTime", "FileReadTime");
+ _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
+ _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT);
+
RETURN_IF_ERROR(_reader->open());
return Status::OK();
}
@@ -68,6 +81,7 @@ Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
}
Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+ SCOPED_TIMER(_read_timer);
if (nbytes <= 0) {
*bytes_read = 0;
return Status::OK();
@@ -92,6 +106,7 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) {
+ _read_count++;
// requested bytes missed the local buffer
if (position >= _buffer_limit || position < _buffer_offset) {
// if requested length is larger than the capacity of buffer, do not
@@ -121,6 +136,7 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt
Status BufferedReader::_fill() {
if (_buffer_offset >= 0) {
int64_t bytes_read = 0;
+ SCOPED_TIMER(_remote_read_timer);
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
_buffer_limit = _buffer_offset + bytes_read;
}
@@ -144,6 +160,13 @@ Status BufferedReader::tell(int64_t* position) {
void BufferedReader::close() {
_reader->close();
SAFE_DELETE_ARRAY(_buffer);
+
+ if (_read_counter != nullptr) {
+ COUNTER_UPDATE(_read_counter, _read_count);
+ }
+ if (_remote_read_counter != nullptr) {
+ COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
+ }
}
bool BufferedReader::closed() {
diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h
index b421b1d..937e154 100644
--- a/be/src/exec/buffered_reader.h
+++ b/be/src/exec/buffered_reader.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "exec/file_reader.h"
#include "olap/olap_define.h"
+#include "util/runtime_profile.h"
namespace doris {
@@ -35,7 +36,8 @@ public:
// If the reader need the file size, set it when construct FileReader.
// There is no other way to set the file size.
// buffered_reader will acquire reader
- BufferedReader(FileReader* reader, int64_t buffer_size = 1024 * 1024);
+ // -1 means using config buffered_reader_buffer_size_bytes
+ BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t = -1L);
virtual ~BufferedReader();
virtual Status open() override;
@@ -56,12 +58,25 @@ private:
Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);
private:
+ RuntimeProfile* _profile;
std::unique_ptr<FileReader> _reader;
char* _buffer;
int64_t _buffer_size;
int64_t _buffer_offset;
int64_t _buffer_limit;
int64_t _cur_offset;
+
+ int64_t _read_count = 0;
+ int64_t _remote_read_count = 0;
+
+ // total time cost in this reader
+ RuntimeProfile::Counter* _read_timer = nullptr;
+ // time cost of "_reader", "remote" because "_reader" is always a remote reader
+ RuntimeProfile::Counter* _remote_read_timer = nullptr;
+ // counter of calling read()
+ RuntimeProfile::Counter* _read_counter = nullptr;
+ // counter of calling "remote read()"
+ RuntimeProfile::Counter* _remote_read_counter = nullptr;
};
} // namespace doris
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index c47796f..5cba337 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -160,8 +160,7 @@ Status JsonScanner::open_file_reader() {
}
case TFileType::FILE_S3: {
BufferedReader* s3_reader =
- new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
- config::remote_storage_read_buffer_mb * 1024 * 1024);
+ new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset));
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 74f6457..fafb9e2 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -396,13 +396,13 @@ Status ORCScanner::open_next_reader() {
if (range.__isset.file_size) {
file_size = range.file_size;
}
- file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses,
+ file_reader.reset(new BufferedReader(_profile, new BrokerReader(_state->exec_env(), _broker_addresses,
_params.properties, range.path, range.start_offset,
file_size)));
break;
}
case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
+ file_reader.reset(new BufferedReader(_profile,
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 72e460d..86107d9 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -145,13 +145,13 @@ Status ParquetScanner::open_next_reader() {
if (range.__isset.file_size) {
file_size = range.file_size;
}
- file_reader.reset(new BufferedReader(
+ file_reader.reset(new BufferedReader(_profile,
new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
range.path, range.start_offset, file_size)));
break;
}
case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
+ file_reader.reset(new BufferedReader(_profile,
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp
index 19525a6..b984d9c 100644
--- a/be/test/exec/buffered_reader_test.cpp
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -33,10 +33,11 @@ protected:
};
TEST_F(BufferedReaderTest, normal_use) {
+ RuntimeProfile profile("test");
// buffered_reader_test_file 950 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
- BufferedReader reader(file_reader, 1024);
+ BufferedReader reader(&profile, file_reader, 1024);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[1024];
@@ -50,10 +51,11 @@ TEST_F(BufferedReaderTest, normal_use) {
}
TEST_F(BufferedReaderTest, test_validity) {
+ RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
- BufferedReader reader(file_reader, 64);
+ BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
@@ -92,10 +94,11 @@ TEST_F(BufferedReaderTest, test_validity) {
}
TEST_F(BufferedReaderTest, test_seek) {
+ RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
- BufferedReader reader(file_reader, 64);
+ BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
@@ -143,10 +146,11 @@ TEST_F(BufferedReaderTest, test_seek) {
}
TEST_F(BufferedReaderTest, test_miss) {
+ RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
- BufferedReader reader(file_reader, 64);
+ BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[128];
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index c3c3ed1..68cfca3 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2623,6 +2623,10 @@ show_param ::=
{:
RESULT = new ShowQueryProfileStmt(queryIdPath);
:}
+ | KW_LOAD KW_PROFILE STRING_LITERAL:loadIdPath
+ {:
+ RESULT = new ShowLoadProfileStmt(loadIdPath);
+ :}
| KW_ENCRYPTKEYS opt_db:dbName opt_wild_where
{:
RESULT = new ShowEncryptKeysStmt(dbName, parser.wild);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java
new file mode 100644
index 0000000..50f8fcc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+import com.google.common.base.Strings;
+
+// For stmt like:
+// show load profile "/"; # list all saving load job ids
+// show load profile "/10014" # show task ids of specified job
+// show load profile "/10014/e0f7390f5363419e-b416a2a79996083e/" # show instance list of the task
+// show load profile "/10014/e0f7390f5363419e-b416a2a79996083e/e0f7390f5363419e-b416a2a799960906" # show instance tree graph
+public class ShowLoadProfileStmt extends ShowStmt {
+ private static final ShowResultSetMetaData META_DATA_TASK_IDS =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(128)))
+ .addColumn(new Column("ActiveTime", ScalarType.createVarchar(64)))
+ .build();
+
+ public enum PathType {
+ JOB_IDS,
+ TASK_IDS,
+ INSTANCES,
+ SINGLE_INSTANCE
+ }
+
+ private String idPath;
+ private PathType pathType;
+
+ private String jobId = "";
+ private String taskId = "";
+ private String instanceId = "";
+
+ public ShowLoadProfileStmt(String idPath) {
+ this.idPath = idPath;
+ }
+
+ public PathType getPathType() {
+ return pathType;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ if (Strings.isNullOrEmpty(idPath)) {
+ // list all query ids
+ pathType = PathType.JOB_IDS;
+ return;
+ }
+
+ if (!idPath.startsWith("/")) {
+ throw new AnalysisException("Path must starts with '/'");
+ }
+ pathType = PathType.JOB_IDS;
+ String[] parts = idPath.split("/");
+ if (parts.length > 4) {
+ throw new AnalysisException("Path must in format '/jobId/taskId/instanceId'");
+ }
+
+ for (int i = 0; i < parts.length; i++) {
+ switch (i) {
+ case 0:
+ pathType = PathType.JOB_IDS;
+ continue;
+ case 1:
+ jobId = parts[i];
+ pathType = PathType.TASK_IDS;
+ break;
+ case 2:
+ taskId = parts[i];
+ pathType = PathType.INSTANCES;
+ break;
+ case 3:
+ instanceId = parts[i];
+ pathType = PathType.SINGLE_INSTANCE;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder("SHOW LOAD PROFILE ").append(idPath);
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return toSql();
+ }
+
+ @Override
+ public ShowResultSetMetaData getMetaData() {
+ switch (pathType) {
+ case JOB_IDS:
+ return ShowQueryProfileStmt.META_DATA_QUERY_IDS;
+ case TASK_IDS:
+ return META_DATA_TASK_IDS;
+ case INSTANCES:
+ return ShowQueryProfileStmt.META_DATA_INSTANCES;
+ case SINGLE_INSTANCE:
+ return ShowQueryProfileStmt.META_DATA_SINGLE_INSTANCE;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java
index ae6a7ff..7c0994c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java
@@ -32,7 +32,7 @@ import com.google.common.base.Strings;
// show query profile "/e0f7390f5363419e-b416a2a79996083e/0/e0f7390f5363419e-b416a2a799960906" # show graph of the instance
public class ShowQueryProfileStmt extends ShowStmt {
// This should be same as ProfileManager.PROFILE_HEADERS
- private static final ShowResultSetMetaData META_DATA_QUERYIDS =
+ public static final ShowResultSetMetaData META_DATA_QUERY_IDS =
ShowResultSetMetaData.builder()
.addColumn(new Column("QueryId", ScalarType.createVarchar(128)))
.addColumn(new Column("User", ScalarType.createVarchar(128)))
@@ -45,17 +45,17 @@ public class ShowQueryProfileStmt extends ShowStmt {
.addColumn(new Column("QueryState", ScalarType.createVarchar(128)))
.build();
- private static final ShowResultSetMetaData META_DATA_FRAGMENTS =
+ public static final ShowResultSetMetaData META_DATA_FRAGMENTS =
ShowResultSetMetaData.builder()
.addColumn(new Column("Fragments", ScalarType.createVarchar(65535)))
.build();
- private static final ShowResultSetMetaData META_DATA_INSTANCES =
+ public static final ShowResultSetMetaData META_DATA_INSTANCES =
ShowResultSetMetaData.builder()
.addColumn(new Column("Instances", ScalarType.createVarchar(128)))
.addColumn(new Column("Host", ScalarType.createVarchar(64)))
.addColumn(new Column("ActiveTime", ScalarType.createVarchar(64)))
.build();
- private static final ShowResultSetMetaData META_DATA_SINGLE_INSTANCE =
+ public static final ShowResultSetMetaData META_DATA_SINGLE_INSTANCE =
ShowResultSetMetaData.builder()
.addColumn(new Column("Instance", ScalarType.createVarchar(65535)))
.build();
@@ -150,7 +150,7 @@ public class ShowQueryProfileStmt extends ShowStmt {
public ShowResultSetMetaData getMetaData() {
switch (pathType) {
case QUERY_IDS:
- return META_DATA_QUERYIDS;
+ return META_DATA_QUERY_IDS;
case FRAGMETNS:
return META_DATA_FRAGMENTS;
case INSTANCES:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java
new file mode 100644
index 0000000..08af2b9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.profile;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.Counter;
+import org.apache.doris.common.util.RuntimeProfile;
+
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.lang3.tuple.Triple;
+import org.glassfish.jersey.internal.guava.Sets;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// MultiProfileTreeBuilder saves a set of ProfileTreeBuilder.
+// For a query profile, there is usually only one ExecutionProfile node.
+// For a load job profile, it may produce multiple subtasks, so there may be multiple ExecutionProfile nodes.
+//
+// Each ExecutionProfile node corresponds to a ProfileTreeBuilder
+public class MultiProfileTreeBuilder {
+ private static final Set<String> PROFILE_ROOT_NAMES;
+ public static final String PROFILE_NAME_EXECUTION = "Execution Profile";
+
+ private static final String EXECUTION_ID_PATTERN_STR = "^Execution Profile (.*)";
+ private static final Pattern EXECUTION_ID_PATTERN;
+
+ private RuntimeProfile profileRoot;
+ private Map<String, RuntimeProfile> idToSingleProfile = Maps.newHashMap();
+ private Map<String, ProfileTreeBuilder> idToSingleTreeBuilder = Maps.newHashMap();
+
+ static {
+ PROFILE_ROOT_NAMES = Sets.newHashSet();
+ PROFILE_ROOT_NAMES.add("Query");
+ PROFILE_ROOT_NAMES.add("BrokerLoadJob");
+ EXECUTION_ID_PATTERN = Pattern.compile(EXECUTION_ID_PATTERN_STR);
+ }
+
+ public MultiProfileTreeBuilder(RuntimeProfile root) {
+ this.profileRoot = root;
+ }
+
+ public void build() throws UserException {
+ unwrapProfile();
+ buildTrees();
+ }
+
+ private void unwrapProfile() throws UserException {
+ if (PROFILE_ROOT_NAMES.stream().anyMatch(n -> profileRoot.getName().startsWith(n))) {
+ List<Pair<RuntimeProfile, Boolean>> children = profileRoot.getChildList();
+ boolean find = false;
+ for (Pair<RuntimeProfile, Boolean> pair : children) {
+ if (pair.first.getName().startsWith(PROFILE_NAME_EXECUTION)) {
+ String executionProfileId = getExecutionProfileId(pair.first.getName());
+ idToSingleProfile.put(executionProfileId, pair.first);
+ find = true;
+ }
+ }
+ if (!find) {
+ throw new UserException("Invalid profile. Expected " + PROFILE_NAME_EXECUTION);
+ }
+ }
+ }
+
+ private String getExecutionProfileId(String executionName) throws UserException {
+ Matcher m = EXECUTION_ID_PATTERN.matcher(executionName);
+ if (!m.find() || m.groupCount() != 1) {
+ throw new UserException("Invalid execution profile name: " + executionName);
+ }
+ return m.group(1);
+ }
+
+ private void buildTrees() throws UserException {
+ for (Map.Entry<String, RuntimeProfile> entry : idToSingleProfile.entrySet()) {
+ ProfileTreeBuilder builder = new ProfileTreeBuilder(entry.getValue());
+ builder.build();
+ idToSingleTreeBuilder.put(entry.getKey(), builder);
+ }
+ }
+
+ public List<List<String>> getSubTaskInfo() {
+ List<List<String>> rows = Lists.newArrayList();
+ for (Map.Entry<String, RuntimeProfile> entry : idToSingleProfile.entrySet()) {
+ List<String> row = Lists.newArrayList();
+ Counter activeCounter = entry.getValue().getCounterTotalTime();
+ row.add(entry.getKey());
+ row.add(RuntimeProfile.printCounter(activeCounter.getValue(), activeCounter.getType()));
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ public List<Triple<String, String, Long>> getInstanceList(String executionId, String fragmentId)
+ throws AnalysisException {
+ ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId);
+ return singleBuilder.getInstanceList(fragmentId);
+ }
+
+ public ProfileTreeNode getInstanceTreeRoot(String executionId, String fragmentId, String instanceId)
+ throws AnalysisException {
+ ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId);
+ return singleBuilder.getInstanceTreeRoot(fragmentId, instanceId);
+ }
+
+ public ProfileTreeNode getFragmentTreeRoot(String executionId) throws AnalysisException {
+ ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId);
+ return singleBuilder.getFragmentTreeRoot();
+ }
+
+ private ProfileTreeBuilder getExecutionProfileTreeBuilder(String executionId) throws AnalysisException {
+ ProfileTreeBuilder singleBuilder = idToSingleTreeBuilder.get(executionId);
+ if (singleBuilder == null) {
+ throw new AnalysisException("Can not find execution profile: " + executionId);
+ }
+ return singleBuilder;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java
index 39c92c8..bde6597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java
@@ -57,7 +57,7 @@ public class PlanTreeBuilder {
if (sink != null) {
StringBuilder sb = new StringBuilder();
sb.append("[").append(sink.getExchNodeId().asInt()).append(": ").append(sink.getClass().getSimpleName()).append("]");
- sb.append("\nFragment: ").append(fragment.getId().asInt());
+ sb.append("\n[Fragment: ").append(fragment.getId().asInt()).append("]");
sb.append("\n").append(sink.getExplainString("", TExplainLevel.BRIEF));
sinkNode = new PlanTreeNode(sink.getExchNodeId(), sb.toString());
if (i == 0) {
@@ -102,7 +102,7 @@ public class PlanTreeBuilder {
}
private void buildForPlanNode(PlanNode planNode, PlanTreeNode parent) {
- PlanTreeNode node = new PlanTreeNode(planNode.getId(), planNode.toString());
+ PlanTreeNode node = new PlanTreeNode(planNode.getId(), planNode.getPlanTreeExplanStr());
if (parent != null) {
parent.addChild(node);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java
index e4a34e3..eea7574 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java
@@ -23,12 +23,12 @@ import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.thrift.TUnit;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
-import com.clearspring.analytics.util.Lists;
-import com.google.common.collect.Maps;
-
import java.util.Formatter;
import java.util.List;
import java.util.Map;
@@ -46,14 +46,13 @@ import java.util.regex.Pattern;
*/
public class ProfileTreeBuilder {
- private static final String PROFILE_NAME_QUERY = "Query";
- private static final String PROFILE_NAME_EXECUTION = "Execution Profile";
private static final String PROFILE_NAME_DATA_STREAM_SENDER = "DataStreamSender";
private static final String PROFILE_NAME_DATA_BUFFER_SENDER = "DataBufferSender";
+ private static final String PROFILE_NAME_OLAP_TABLE_SINK = "OlapTableSink";
private static final String PROFILE_NAME_BLOCK_MGR = "BlockMgr";
private static final String PROFILE_NAME_BUFFER_POOL = "Buffer pool";
private static final String PROFILE_NAME_EXCHANGE_NODE = "EXCHANGE_NODE";
- public static final String DATA_BUFFER_SENDER_ID = "-1";
+ public static final String FINAL_SENDER_ID = "-1";
public static final String UNKNOWN_ID = "-2";
private RuntimeProfile profileRoot;
@@ -115,7 +114,7 @@ public class ProfileTreeBuilder {
public void build() throws UserException {
reset();
- unwrapProfile();
+ checkProfile();
analyzeAndBuildFragmentTrees();
assembleFragmentTrees();
}
@@ -128,25 +127,9 @@ public class ProfileTreeBuilder {
fragmentTreeRoot = null;
}
- private void unwrapProfile() throws UserException {
- while(true) {
- if (profileRoot.getName().startsWith(PROFILE_NAME_QUERY)) {
- List<Pair<RuntimeProfile, Boolean>> children = profileRoot.getChildList();
- boolean find = false;
- for (Pair<RuntimeProfile, Boolean> pair : children) {
- if (pair.first.getName().startsWith(PROFILE_NAME_EXECUTION)) {
- this.profileRoot = pair.first;
- find = true;
- break;
- }
- }
- if (!find) {
- throw new UserException("Invalid profile. Expected " + PROFILE_NAME_EXECUTION
- + " in " + PROFILE_NAME_QUERY);
- }
- } else {
- break;
- }
+ private void checkProfile() throws UserException {
+ if (!profileRoot.getName().startsWith(MultiProfileTreeBuilder.PROFILE_NAME_EXECUTION)) {
+ throw new UserException("Invalid profile. Expected " + MultiProfileTreeBuilder.PROFILE_NAME_EXECUTION);
}
}
@@ -179,7 +162,7 @@ public class ProfileTreeBuilder {
RuntimeProfile instanceProfile = fragmentChildren.get(0).first;
ProfileTreeNode instanceTreeRoot = buildSingleInstanceTree(instanceProfile, fragmentId, null);
instanceTreeRoot.setMaxInstanceActiveTime(RuntimeProfile.printCounter(maxActiveTimeNs, TUnit.TIME_NS));
- if (instanceTreeRoot.id.equals(DATA_BUFFER_SENDER_ID)) {
+ if (instanceTreeRoot.id.equals(FINAL_SENDER_ID)) {
fragmentTreeRoot = instanceTreeRoot;
}
@@ -195,7 +178,7 @@ public class ProfileTreeBuilder {
this.instanceTreeMap.put(fragmentId, instanceTrees);
}
- // If instanceId is null, which means this profile tree node is for bulding the entire fragment tree.
+ // If instanceId is null, which means this profile tree node is for building the entire fragment tree.
// So that we need to add sender and exchange node to the auxiliary structure.
private ProfileTreeNode buildSingleInstanceTree(RuntimeProfile instanceProfile, String fragmentId,
String instanceId) throws UserException {
@@ -205,7 +188,8 @@ public class ProfileTreeBuilder {
for (Pair<RuntimeProfile, Boolean> pair : instanceChildren) {
RuntimeProfile profile = pair.first;
if (profile.getName().startsWith(PROFILE_NAME_DATA_STREAM_SENDER)
- || profile.getName().startsWith(PROFILE_NAME_DATA_BUFFER_SENDER)) {
+ || profile.getName().startsWith(PROFILE_NAME_DATA_BUFFER_SENDER)
+ || profile.getName().startsWith(PROFILE_NAME_OLAP_TABLE_SINK)) {
senderNode = buildTreeNode(profile, null, fragmentId, instanceId);
if (instanceId == null) {
senderNodes.add(senderNode);
@@ -238,11 +222,11 @@ public class ProfileTreeBuilder {
// skip Buffer pool, and buffer pool does not has child
return null;
}
- boolean isDataBufferSender = name.startsWith(PROFILE_NAME_DATA_BUFFER_SENDER);
+ String finalSenderName = checkAndGetFinalSenderName(name);
Matcher m = EXEC_NODE_NAME_ID_PATTERN.matcher(name);
String extractName;
String extractId;
- if ((!m.find() && !isDataBufferSender) || m.groupCount() != 2) {
+ if ((!m.find() && finalSenderName == null) || m.groupCount() != 2) {
// DataStreamBuffer name like: "DataBufferSender (dst_fragment_instance_id=d95356f9219b4831-986b4602b41683ca):"
// So it has no id.
// Other profile should has id like:
@@ -251,8 +235,8 @@ public class ProfileTreeBuilder {
extractName = name;
extractId = UNKNOWN_ID;
} else {
- extractName = isDataBufferSender ? PROFILE_NAME_DATA_BUFFER_SENDER : m.group(1);
- extractId = isDataBufferSender ? DATA_BUFFER_SENDER_ID : m.group(2);
+ extractName = finalSenderName != null ? finalSenderName : m.group(1);
+ extractId = finalSenderName != null ? FINAL_SENDER_ID : m.group(2);
}
Counter activeCounter = profile.getCounterTotalTime();
ExecNodeNode node = new ExecNodeNode(extractName, extractId);
@@ -286,6 +270,18 @@ public class ProfileTreeBuilder {
return node;
}
+ // Check if the given node name is from final node, like DATA_BUFFER_SENDER or OLAP_TABLE_SINK
+ // If yes, return that name, if not, return null;
+ private String checkAndGetFinalSenderName(String name) {
+ if (name.startsWith(PROFILE_NAME_DATA_BUFFER_SENDER)) {
+ return PROFILE_NAME_DATA_BUFFER_SENDER;
+ } else if (name.startsWith(PROFILE_NAME_OLAP_TABLE_SINK)) {
+ return PROFILE_NAME_OLAP_TABLE_SINK;
+ } else {
+ return null;
+ }
+ }
+
private void buildCounterNode(RuntimeProfile profile, String counterName, CounterNode root) {
Map<String, TreeSet<String>> childCounterMap = profile.getChildCounterMap();
Set<String> childCounterSet = childCounterMap.get(counterName);
@@ -308,7 +304,7 @@ public class ProfileTreeBuilder {
private void assembleFragmentTrees() throws UserException {
for (ProfileTreeNode senderNode : senderNodes) {
- if (senderNode.id.equals(DATA_BUFFER_SENDER_ID)) {
+ if (senderNode.id.equals(FINAL_SENDER_ID)) {
// this is result sender, skip it.
continue;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 448090e..28dd935 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -18,9 +18,8 @@
package org.apache.doris.common.util;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.profile.ProfileTreeBuilder;
+import org.apache.doris.common.profile.MultiProfileTreeBuilder;
import org.apache.doris.common.profile.ProfileTreeNode;
-import org.apache.doris.common.profile.ProfileTreePrinter;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -68,6 +67,11 @@ public class ProfileManager {
public static final String SQL_STATEMENT = "Sql Statement";
public static final String IS_CACHED = "Is Cached";
+ public enum ProfileType {
+ QUERY,
+ LOAD,
+ }
+
public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));
@@ -75,7 +79,7 @@ public class ProfileManager {
private class ProfileElement {
public Map<String, String> infoStrings = Maps.newHashMap();
public String profileContent = "";
- public ProfileTreeBuilder builder = null;
+ public MultiProfileTreeBuilder builder = null;
public String errMsg = "";
}
@@ -113,8 +117,9 @@ public class ProfileManager {
for (String header : PROFILE_HEADERS) {
element.infoStrings.put(header, summaryProfile.getInfoString(header));
}
+ element.profileContent = profile.toString();
- ProfileTreeBuilder builder = new ProfileTreeBuilder(profile);
+ MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile);
try {
builder.build();
} catch (Exception e) {
@@ -122,9 +127,7 @@ public class ProfileManager {
LOG.debug("failed to build profile tree", e);
return element;
}
-
element.builder = builder;
- element.profileContent = profile.toString();
return element;
}
@@ -158,8 +161,12 @@ public class ProfileManager {
writeLock.unlock();
}
}
-
+
public List<List<String>> getAllQueries() {
+ return getQueryWithType(null);
+ }
+
+ public List<List<String>> getQueryWithType(ProfileType type) {
List<List<String>> result = Lists.newArrayList();
readLock.lock();
try {
@@ -171,9 +178,12 @@ public class ProfileManager {
continue;
}
Map<String, String> infoStrings = profileElement.infoStrings;
-
+ if (type != null && !infoStrings.get(QUERY_TYPE).equalsIgnoreCase(type.name())) {
+ continue;
+ }
+
List<String> row = Lists.newArrayList();
- for (String str : PROFILE_HEADERS ) {
+ for (String str : PROFILE_HEADERS) {
row.add(infoStrings.get(str));
}
result.add(row);
@@ -183,7 +193,7 @@ public class ProfileManager {
}
return result;
}
-
+
public String getProfile(String queryID) {
readLock.lock();
try {
@@ -191,52 +201,55 @@ public class ProfileManager {
if (element == null) {
return null;
}
-
+
return element.profileContent;
} finally {
readLock.unlock();
}
}
- public String getFragmentProfileTreeString(String queryID) {
+ public ProfileTreeNode getFragmentProfileTree(String queryID, String executionId) throws AnalysisException {
+ MultiProfileTreeBuilder builder;
readLock.lock();
try {
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
- return null;
+ throw new AnalysisException("failed to get fragment profile tree. err: "
+ + (element == null ? "not found" : element.errMsg));
}
- ProfileTreeBuilder builder = element.builder;
- return builder.getFragmentTreeRoot().debugTree(0, ProfileTreePrinter.PrintLevel.INSTANCE);
- } catch (Exception e) {
- LOG.warn("failed to get profile tree", e);
- return null;
+ builder = element.builder;
} finally {
readLock.unlock();
}
+ return builder.getFragmentTreeRoot(executionId);
}
- public ProfileTreeNode getFragmentProfileTree(String queryID) throws AnalysisException {
- ProfileTreeNode tree;
+ public List<Triple<String, String, Long>> getFragmentInstanceList(String queryID, String executionId, String fragmentId)
+ throws AnalysisException {
+ MultiProfileTreeBuilder builder;
readLock.lock();
try {
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
- throw new AnalysisException("failed to get fragment profile tree. err: "
+ throw new AnalysisException("failed to get instance list. err: "
+ (element == null ? "not found" : element.errMsg));
}
- return element.builder.getFragmentTreeRoot();
+ builder = element.builder;
} finally {
readLock.unlock();
}
+
+ return builder.getInstanceList(executionId, fragmentId);
}
- public List<Triple<String, String, Long>> getFragmentInstanceList(String queryID, String fragmentId) throws AnalysisException {
- ProfileTreeBuilder builder;
+ public ProfileTreeNode getInstanceProfileTree(String queryID, String executionId, String fragmentId, String instanceId)
+ throws AnalysisException {
+ MultiProfileTreeBuilder builder;
readLock.lock();
try {
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
- throw new AnalysisException("failed to get instance list. err: "
+ throw new AnalysisException("failed to get instance profile tree. err: "
+ (element == null ? "not found" : element.errMsg));
}
builder = element.builder;
@@ -244,16 +257,18 @@ public class ProfileManager {
readLock.unlock();
}
- return builder.getInstanceList(fragmentId);
+ return builder.getInstanceTreeRoot(executionId, fragmentId, instanceId);
}
- public ProfileTreeNode getInstanceProfileTree(String queryID, String fragmentId, String instanceId) throws AnalysisException {
- ProfileTreeBuilder builder;
+ // Return the tasks info of the specified load job
+ // Columns: TaskId, ActiveTime
+ public List<List<String>> getLoadJobTaskList(String jobId) throws AnalysisException {
+ MultiProfileTreeBuilder builder;
readLock.lock();
try {
- ProfileElement element = queryIdToProfileMap.get(queryID);
+ ProfileElement element = queryIdToProfileMap.get(jobId);
if (element == null || element.builder == null) {
- throw new AnalysisException("failed to get instance profile tree. err: "
+ throw new AnalysisException("failed to get task ids. err: "
+ (element == null ? "not found" : element.errMsg));
}
builder = element.builder;
@@ -261,6 +276,6 @@ public class ProfileManager {
readLock.unlock();
}
- return builder.getInstanceTreeRoot(fragmentId, instanceId);
+ return builder.getSubTaskInfo();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index bb3188e..e001b87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -326,8 +326,6 @@ public class BrokerLoadJob extends BulkLoadJob {
// Add the summary profile to the first
jobProfile.addFirstChild(summaryProfile);
jobProfile.computeTimeInChildProfile();
- StringBuilder builder = new StringBuilder();
- jobProfile.prettyPrint(builder, "");
ProfileManager.getInstance().pushProfile(jobProfile);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 2e0c7bb..4e0054c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -68,7 +68,6 @@ public class LoadLoadingTask extends LoadTask {
private LoadingTaskPlanner planner;
private RuntimeProfile jobProfile;
- private RuntimeProfile profile;
private long beginTime;
public LoadLoadingTask(Database db, OlapTable table,
@@ -172,17 +171,15 @@ public class LoadLoadingTask extends LoadTask {
return jobDeadlineMs - System.currentTimeMillis();
}
- public void createProfile(Coordinator coord) {
+ private void createProfile(Coordinator coord) {
if (jobProfile == null) {
// No need to gather profile
return;
}
// Summary profile
- profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId));
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime));
coord.endProfile();
- profile.addChild(coord.getQueryProfile());
- jobProfile.addChild(profile);
+ jobProfile.addChild(coord.getQueryProfile());
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index a8ee29a..fabb260 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -131,6 +131,9 @@ public class DataPartition {
public String getExplainString(TExplainLevel explainLevel) {
StringBuilder str = new StringBuilder();
str.append(type.toString());
+ if (explainLevel == TExplainLevel.BRIEF) {
+ return str.toString();
+ }
if (!partitionExprs.isEmpty()) {
List<String> strings = Lists.newArrayList();
for (Expr expr : partitionExprs) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index dc53c44..983a1a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -17,7 +17,6 @@
package org.apache.doris.planner;
-import com.google.common.base.Joiner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprId;
@@ -32,6 +31,7 @@ import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPlan;
import org.apache.doris.thrift.TPlanNode;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
@@ -658,7 +658,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
}
-
/**
* Returns the estimated combined selectivity of all conjuncts. Uses heuristics to
* address the following estimation challenges:
@@ -739,6 +738,14 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
}
+ public String getPlanTreeExplanStr() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
+ sb.append("\n[Fragment: ").append(getFragmentId().asInt()).append("]");
+ sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
+ return sb.toString();
+ }
+
public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) {
if (this instanceof ScanNode && tupleIds.contains(tupleId)) {
return (ScanNode) this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8c54369..92d93ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -47,6 +47,7 @@ import org.apache.doris.analysis.ShowFunctionsStmt;
import org.apache.doris.analysis.ShowGrantsStmt;
import org.apache.doris.analysis.ShowIndexStmt;
import org.apache.doris.analysis.ShowEncryptKeysStmt;
+import org.apache.doris.analysis.ShowLoadProfileStmt;
import org.apache.doris.analysis.ShowLoadStmt;
import org.apache.doris.analysis.ShowLoadWarningsStmt;
import org.apache.doris.analysis.ShowMigrationsStmt;
@@ -296,6 +297,8 @@ public class ShowExecutor {
handleShowPlugins();
} else if (stmt instanceof ShowQueryProfileStmt) {
handleShowQueryProfile();
+ } else if (stmt instanceof ShowLoadProfileStmt) {
+ handleShowLoadProfile();
} else {
handleEmtpy();
}
@@ -1877,10 +1880,11 @@ public class ShowExecutor {
List<List<String>> rows = Lists.newArrayList();
switch (pathType) {
case QUERY_IDS:
- rows = ProfileManager.getInstance().getAllQueries();
+ rows = ProfileManager.getInstance().getQueryWithType(ProfileManager.ProfileType.QUERY);
break;
case FRAGMETNS: {
- ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(showStmt.getQueryId());
+ ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(showStmt.getQueryId(),
+ showStmt.getQueryId());
if (treeRoot == null) {
throw new AnalysisException("Failed to get fragment tree for query: " + showStmt.getQueryId());
}
@@ -1889,8 +1893,11 @@ public class ShowExecutor {
break;
}
case INSTANCES: {
+ // For query profile, there should be only one execution profile,
+ // And the execution id is same as query id
List<Triple<String, String, Long>> instanceList
- = ProfileManager.getInstance().getFragmentInstanceList(showStmt.getQueryId(), showStmt.getFragmentId());
+ = ProfileManager.getInstance().getFragmentInstanceList(
+ showStmt.getQueryId(), showStmt.getQueryId(), showStmt.getFragmentId());
if (instanceList == null) {
throw new AnalysisException("Failed to get instance list for fragment: " + showStmt.getFragmentId());
}
@@ -1902,8 +1909,57 @@ public class ShowExecutor {
break;
}
case SINGLE_INSTANCE: {
+ // For query profile, there should be only one execution profile,
+ // And the execution id is same as query id
ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(showStmt.getQueryId(),
- showStmt.getFragmentId(), showStmt.getInstanceId());
+ showStmt.getQueryId(), showStmt.getFragmentId(), showStmt.getInstanceId());
+ if (treeRoot == null) {
+ throw new AnalysisException("Failed to get instance tree for instance: " + showStmt.getInstanceId());
+ }
+ List<String> row = Lists.newArrayList(ProfileTreePrinter.printInstanceTree(treeRoot));
+ rows.add(row);
+ break;
+ }
+ default:
+ break;
+ }
+
+ resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+ }
+
+ private void handleShowLoadProfile() throws AnalysisException {
+ ShowLoadProfileStmt showStmt = (ShowLoadProfileStmt) stmt;
+ ShowLoadProfileStmt.PathType pathType = showStmt.getPathType();
+ List<List<String>> rows = Lists.newArrayList();
+ switch (pathType) {
+ case JOB_IDS:
+ rows = ProfileManager.getInstance().getQueryWithType(ProfileManager.ProfileType.LOAD);
+ break;
+ case TASK_IDS: {
+ rows = ProfileManager.getInstance().getLoadJobTaskList(showStmt.getJobId());
+ break;
+ }
+ case INSTANCES: {
+ // For load profile, there should be only one fragment in each execution profile
+ // And the fragment id is 0.
+ List<Triple<String, String, Long>> instanceList
+ = ProfileManager.getInstance().getFragmentInstanceList(showStmt.getJobId(),
+ showStmt.getTaskId(), "0");
+ if (instanceList == null) {
+ throw new AnalysisException("Failed to get instance list for task: " + showStmt.getTaskId());
+ }
+ for (Triple<String, String, Long> triple : instanceList) {
+ List<String> row = Lists.newArrayList(triple.getLeft(), triple.getMiddle(),
+ RuntimeProfile.printCounter(triple.getRight(), TUnit.TIME_NS));
+ rows.add(row);
+ }
+ break;
+ }
+ case SINGLE_INSTANCE: {
+ // For load profile, there should be only one fragment in each execution profile.
+ // And the fragment id is 0.
+ ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(showStmt.getJobId(),
+ showStmt.getTaskId(), "0", showStmt.getInstanceId());
if (treeRoot == null) {
throw new AnalysisException("Failed to get instance tree for instance: " + showStmt.getInstanceId());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 9b72d5d..2adfb23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -252,7 +252,7 @@ public class ExportExportingTask extends MasterTask {
}
private void initProfile() {
- profile = new RuntimeProfile("Query");
+ profile = new RuntimeProfile("ExportJob");
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(job.getId()));
summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(job.getStartTimeMs()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org