You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/04 17:02:02 UTC
[doris] branch master updated: [fix](streamload) fix stream load failed when enable profile (#18364)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f8d92656e [fix](streamload) fix stream load failed when enable profile (#18364)
7f8d92656e is described below
commit 7f8d92656e80c5a507e59dee6bbd70a55cdf6695
Author: gitccl <60...@users.noreply.github.com>
AuthorDate: Wed Apr 5 01:01:46 2023 +0800
[fix](streamload) fix stream load failed when enable profile (#18364)
#18015 enables stream load profile log, however be will encounter rpc fail when loading tpch data(see #18291). This is because when `is_report_success` is true, be will reportExecStatus to fe, but fe cannot find QueryInfo in `coordinatorMap`, thus it will return error to be.
---
be/src/common/config.h | 2 --
be/src/http/action/stream_load.cpp | 13 +++++++------
be/src/http/http_common.h | 1 +
be/src/runtime/fragment_mgr.cpp | 1 +
docs/en/docs/admin-manual/config/be-config.md | 7 -------
.../data-operate/import/import-way/stream-load-manual.md | 6 ++++++
docs/zh-CN/docs/admin-manual/config/be-config.md | 7 -------
.../data-operate/import/import-way/stream-load-manual.md | 6 ++++++
.../java/org/apache/doris/planner/StreamLoadPlanner.java | 3 +--
.../src/main/java/org/apache/doris/qe/QeProcessorImpl.java | 11 +++++++++--
.../src/main/java/org/apache/doris/task/LoadTaskInfo.java | 4 ++++
.../src/main/java/org/apache/doris/task/StreamLoadTask.java | 10 +++++++++-
gensrc/thrift/FrontendService.thrift | 3 +++
13 files changed, 47 insertions(+), 27 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8f34155280..63d785b41c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -420,8 +420,6 @@ CONF_mInt32(stream_load_record_batch_size, "50");
CONF_Int32(stream_load_record_expire_time_secs, "28800");
// time interval to clean expired stream load records
CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
-// Whether to enable stream load profile to be printed to the log, the default is false.
-CONF_mBool(enable_stream_load_profile_log, "false");
// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index e69d22fc45..2520b3a5f4 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -581,6 +581,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_SKIP_LINES).empty()) {
request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
}
+ if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
+ if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
+ request.__set_enable_profile(true);
+ } else {
+ request.__set_enable_profile(false);
+ }
+ }
#ifndef BE_TEST
// plan this load
@@ -601,12 +608,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return plan_status;
}
- auto& query_options = ctx->put_result.params.query_options;
- if (query_options.__isset.is_report_success && query_options.is_report_success &&
- !config::enable_stream_load_profile_log) {
- query_options.is_report_success = false;
- }
-
VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
// if we not use streaming, we must download total content before we begin
// to process this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 6dbf93609e..2295731d6a 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -54,6 +54,7 @@ static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
+static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1558e9b855..f683828c5d 100755
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -354,6 +354,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.__set_fragment_id(req.fragment_id);
exec_status.set_t_status(¶ms);
params.__set_done(req.done);
+ params.__set_query_type(req.runtime_state->query_type());
DCHECK(req.runtime_state != nullptr);
if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) {
diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md
index 62904d50f8..62091324ac 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -769,13 +769,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
* Default value: 100
* Dynamically modifiable: Yes
-#### `enable_stream_load_profile_log`
-
-* Type: bool
-* Description: Whether to enable stream load profile to be printed to the log.
-* Default value: false
-* Dynamically modifiable: Yes
-
### Thread
#### `delete_worker_count`
diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
index 91f696b3dd..4fa8494eae 100644
--- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
@@ -181,6 +181,12 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL`
Stream load import can enable two-stage transaction commit mode: in the stream load process, the data is written and the information is returned to the user. At this time, the data is invisible and the transaction status is `PRECOMMITTED`. After the user manually triggers the commit operation, the data is visible.
++ enable_profile
+ <version since="1.2.4">
+ </version>
+
+ When `enable_profile` is true, the Stream Load profile will be printed to the log. Otherwise it won't print.
+
Example:
1. Initiate a stream load pre-commit operation
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md
index f91c63ec8b..ac3f47043d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -783,13 +783,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
* 默认值: 100
* 可动态修改:是
-#### `enable_stream_load_profile_log`
-
-* 类型:bool
-* 描述:是否将 stream load profile 打印到日志。
-* 默认值: false
-* 可动态修改:是
-
### 线程
#### `delete_worker_count`
diff --git a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index bf47e8c8f1..225d5b8544 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -191,6 +191,12 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为`PRECOMMITTED`,用户手动触发commit操作之后,数据才可见。
+- enable_profile
+ <version since="1.2.4">
+ </version>
+
+ 当 `enable_profile` 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。
+
示例:
1. 发起stream load预提交操作
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 16b45d683c..f62c6eeee1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -47,7 +47,6 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.planner.external.ExternalFileScanNode;
-import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.PaloInternalServiceVersion;
@@ -276,7 +275,7 @@ public class StreamLoadPlanner {
queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
- queryOptions.setIsReportSuccess(VariableMgr.newSessionVariable().enableProfile());
+ queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index a5611ea4ca..9b20fe4f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TStatus;
@@ -190,8 +191,14 @@ public final class QeProcessorImpl implements QeProcessor {
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
- result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
- LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id));
+ // There is no QueryInfo for StreamLoad, so we return OK
+ if (params.query_type == TQueryType.LOAD) {
+ result.setStatus(new TStatus(TStatusCode.OK));
+ } else {
+ result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
+ }
+ LOG.info("ReportExecStatus() runtime error, query {} with type {} does not exist",
+ DebugUtil.printId(params.query_id), params.query_type);
return result;
}
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index cb938e84cb..6e3133fbde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -105,6 +105,10 @@ public interface LoadTaskInfo {
return 0;
}
+ default boolean getEnableProfile() {
+ return false;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 868835fff8..2d28cc3b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -83,8 +83,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private String headerType = "";
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
-
private int skipLines = 0;
+ private boolean enableProfile = false;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
TFileCompressType compressType) {
@@ -263,6 +263,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return skipLines;
}
+ @Override
+ public boolean getEnableProfile() {
+ return enableProfile;
+ }
+
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -368,6 +373,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetSkipLines()) {
skipLines = request.getSkipLines();
}
+ if (request.isSetEnableProfile()) {
+ enableProfile = request.isEnableProfile();
+ }
}
// used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index e8630b05cb..f1cc722723 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -429,6 +429,8 @@ struct TReportExecStatusParams {
18: optional list<Types.TErrorTabletInfo> errorTabletInfos
19: optional i32 fragment_id
+
+ 20: optional PaloInternalService.TQueryType query_type
}
struct TFeResult {
@@ -570,6 +572,7 @@ struct TStreamLoadPutRequest {
41: optional i64 file_size // only for stream load with parquet or orc
42: optional bool trim_double_quotes // trim double quotes for csv
43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set.
+ 44: optional bool enable_profile
}
struct TStreamLoadPutResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org