You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/12 17:03:01 UTC

[doris] 17/33: [fix](streamload) fix stream load failed when enable profile (#18364)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5868083b0b60e3c2783338054afa4236d649c90b
Author: gitccl <60...@users.noreply.github.com>
AuthorDate: Wed Apr 5 01:01:46 2023 +0800

    [fix](streamload) fix stream load failed when enable profile (#18364)
    
    #18015 enables stream load profile log,  however be will encounter rpc fail when loading tpch data(see #18291). This is because when `is_report_success` is true, be will reportExecStatus to fe, but fe cannot find QueryInfo in `coordinatorMap`, thus it will return error to be.
---
 be/src/common/config.h                                      |  2 --
 be/src/http/action/stream_load.cpp                          | 13 +++++++------
 be/src/http/http_common.h                                   |  1 +
 be/src/runtime/fragment_mgr.cpp                             |  1 +
 docs/en/docs/admin-manual/config/be-config.md               |  7 -------
 .../data-operate/import/import-way/stream-load-manual.md    |  6 ++++++
 docs/zh-CN/docs/admin-manual/config/be-config.md            |  7 -------
 .../data-operate/import/import-way/stream-load-manual.md    |  6 ++++++
 .../java/org/apache/doris/planner/StreamLoadPlanner.java    |  3 +--
 .../src/main/java/org/apache/doris/qe/QeProcessorImpl.java  | 11 +++++++++--
 .../src/main/java/org/apache/doris/task/LoadTaskInfo.java   |  4 ++++
 .../src/main/java/org/apache/doris/task/StreamLoadTask.java | 10 +++++++++-
 gensrc/thrift/FrontendService.thrift                        |  3 +++
 13 files changed, 47 insertions(+), 27 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8f34155280..63d785b41c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -420,8 +420,6 @@ CONF_mInt32(stream_load_record_batch_size, "50");
 CONF_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
-// Whether to enable stream load profile to be printed to the log, the default is false.
-CONF_mBool(enable_stream_load_profile_log, "false");
 
 // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
 // You may need to lower the speed when the sink receiver bes are too busy.
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index e69d22fc45..2520b3a5f4 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -581,6 +581,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (!http_req->header(HTTP_SKIP_LINES).empty()) {
         request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
     }
+    if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
+        if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
+            request.__set_enable_profile(true);
+        } else {
+            request.__set_enable_profile(false);
+        }
+    }
 
 #ifndef BE_TEST
     // plan this load
@@ -601,12 +608,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
         return plan_status;
     }
 
-    auto& query_options = ctx->put_result.params.query_options;
-    if (query_options.__isset.is_report_success && query_options.is_report_success &&
-        !config::enable_stream_load_profile_log) {
-        query_options.is_report_success = false;
-    }
-
     VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
     // if we not use streaming, we must download total content before we begin
     // to process this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 6dbf93609e..2295731d6a 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -54,6 +54,7 @@ static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
 static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
 static const std::string HTTP_SKIP_LINES = "skip_lines";
 static const std::string HTTP_COMMENT = "comment";
+static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
 
 static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
 static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1558e9b855..f683828c5d 100755
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -354,6 +354,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
     params.__set_fragment_id(req.fragment_id);
     exec_status.set_t_status(&params);
     params.__set_done(req.done);
+    params.__set_query_type(req.runtime_state->query_type());
 
     DCHECK(req.runtime_state != nullptr);
     if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) {
diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md
index 62904d50f8..62091324ac 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -769,13 +769,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * Default value: 100
 * Dynamically modifiable: Yes
 
-#### `enable_stream_load_profile_log`
-
-* Type: bool
-* Description: Whether to enable stream load profile to be printed to the log.
-* Default value: false
-* Dynamically modifiable: Yes
-
 ### Thread
 
 #### `delete_worker_count`
diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
index 91f696b3dd..4fa8494eae 100644
--- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
@@ -181,6 +181,12 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL`
 
   Stream load import can enable two-stage transaction commit mode: in the stream load process, the data is written and the information is returned to the user. At this time, the data is invisible and the transaction status is `PRECOMMITTED`. After the user manually triggers the commit operation, the data is visible.
 
++ enable_profile
+  <version since="1.2.4">
+  </version>
+
+  When `enable_profile` is true, the Stream Load profile will be printed to the log. Otherwise it won't print.
+
   Example:
 
     1. Initiate a stream load pre-commit operation
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md
index f91c63ec8b..ac3f47043d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -783,13 +783,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 * 默认值: 100
 * 可动态修改:是
 
-#### `enable_stream_load_profile_log`
-
-* 类型:bool
-* 描述:是否将 stream load profile 打印到日志。
-* 默认值: false
-* 可动态修改:是
-
 ### 线程
 
 #### `delete_worker_count`
diff --git a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index bf47e8c8f1..225d5b8544 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -191,6 +191,12 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
 
   Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为`PRECOMMITTED`,用户手动触发commit操作之后,数据才可见。
 
+- enable_profile
+  <version since="1.2.4">
+  </version>
+
+  当 `enable_profile` 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。
+
   示例:
 
   1. 发起stream load预提交操作
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 16b45d683c..f62c6eeee1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -47,7 +47,6 @@ import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.planner.external.ExternalFileScanNode;
-import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
@@ -276,7 +275,7 @@ public class StreamLoadPlanner {
         queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
         queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
-        queryOptions.setIsReportSuccess(VariableMgr.newSessionVariable().enableProfile());
+        queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
 
         params.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index a5611ea4ca..9b20fe4f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileWriter;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TStatus;
@@ -190,8 +191,14 @@ public final class QeProcessorImpl implements QeProcessor {
         final QueryInfo info = coordinatorMap.get(params.query_id);
 
         if (info == null) {
-            result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
-            LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id));
+            // There is no QueryInfo for StreamLoad, so we return OK
+            if (params.query_type == TQueryType.LOAD) {
+                result.setStatus(new TStatus(TStatusCode.OK));
+            } else {
+                result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
+            }
+            LOG.info("ReportExecStatus() runtime error, query {} with type {} does not exist",
+                    DebugUtil.printId(params.query_id), params.query_type);
             return result;
         }
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index cb938e84cb..6e3133fbde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -105,6 +105,10 @@ public interface LoadTaskInfo {
         return 0;
     }
 
+    default boolean getEnableProfile() {
+        return false;
+    }
+
     class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
         public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 868835fff8..2d28cc3b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -83,8 +83,8 @@ public class StreamLoadTask implements LoadTaskInfo {
     private String headerType = "";
     private List<String> hiddenColumns;
     private boolean trimDoubleQuotes = false;
-
     private int skipLines = 0;
+    private boolean enableProfile = false;
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
             TFileCompressType compressType) {
@@ -263,6 +263,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return skipLines;
     }
 
+    @Override
+    public boolean getEnableProfile() {
+        return enableProfile;
+    }
+
     public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
         StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
                 request.getFileType(), request.getFormatType(),
@@ -368,6 +373,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetSkipLines()) {
             skipLines = request.getSkipLines();
         }
+        if (request.isSetEnableProfile()) {
+            enableProfile = request.isEnableProfile();
+        }
     }
 
     // used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index e8630b05cb..f1cc722723 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -429,6 +429,8 @@ struct TReportExecStatusParams {
   18: optional list<Types.TErrorTabletInfo> errorTabletInfos
 
   19: optional i32 fragment_id
+
+  20: optional PaloInternalService.TQueryType query_type
 }
 
 struct TFeResult {
@@ -570,6 +572,7 @@ struct TStreamLoadPutRequest {
     41: optional i64 file_size // only for stream load with parquet or orc
     42: optional bool trim_double_quotes // trim double quotes for csv
     43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set.
+    44: optional bool enable_profile
 }
 
 struct TStreamLoadPutResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org