You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/30 15:21:48 UTC
[doris] branch master updated: [refactor](load) Remove mini load (#10520)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aab7dc956f [refactor](load) Remove mini load (#10520)
aab7dc956f is described below
commit aab7dc956f6f47a79d70bafd1ffc76b676a8e2cd
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Jun 30 23:21:41 2022 +0800
[refactor](load) Remove mini load (#10520)
---
be/src/agent/agent_server.cpp | 42 -
be/src/agent/agent_server.h | 6 -
be/src/http/CMakeLists.txt | 1 -
be/src/http/action/mini_load.cpp | 966 ---------------------
be/src/http/action/mini_load.h | 109 ---
be/src/runtime/CMakeLists.txt | 1 -
be/src/runtime/etl_job_mgr.cpp | 302 -------
be/src/runtime/etl_job_mgr.h | 99 ---
be/src/runtime/exec_env.h | 2 -
be/src/runtime/exec_env_init.cpp | 4 -
.../runtime/stream_load/stream_load_executor.cpp | 12 +-
be/src/service/backend_service.h | 21 -
be/src/service/http_service.cpp | 5 +-
be/test/CMakeLists.txt | 1 -
be/test/runtime/data_stream_test.cpp | 9 -
be/test/runtime/etl_job_mgr_test.cpp | 221 -----
.../src/main/java/org/apache/doris/load/Load.java | 157 ----
.../org/apache/doris/load/loadv2/LoadManager.java | 77 --
.../org/apache/doris/load/loadv2/MiniLoadJob.java | 36 +-
.../load/loadv2/MiniLoadTxnCommitAttachment.java | 10 -
.../java/org/apache/doris/qe/MultiLoadMgr.java | 16 -
.../apache/doris/service/FrontendServiceImpl.java | 237 -----
.../java/org/apache/doris/task/AgentClient.java | 55 --
.../doris/transaction/TxnCommitAttachment.java | 2 -
.../org/apache/doris/common/GenericPoolTest.java | 19 -
.../apache/doris/utframe/MockedBackendFactory.java | 20 -
gensrc/thrift/AgentService.thrift | 26 -
gensrc/thrift/BackendService.thrift | 7 -
gensrc/thrift/FrontendService.thrift | 79 +-
samples/mini_load/python/mini_load_utils.py | 157 ----
30 files changed, 5 insertions(+), 2694 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index c907aafd23..58ce4e00b5 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -29,7 +29,6 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/snapshot_manager.h"
-#include "runtime/etl_job_mgr.h"
using std::string;
using std::vector;
@@ -247,45 +246,4 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}
-void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
- const TMiniLoadEtlTaskRequest& request) {
- Status status = _exec_env->etl_job_mgr()->start_job(request);
- auto fragment_instance_id = request.params.params.fragment_instance_id;
- if (status.ok()) {
- VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
- } else {
- VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
- << ", err_msg=" << status.get_error_msg();
- }
- status.to_thrift(&t_agent_result.status);
-}
-
-void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
- const TMiniLoadEtlStatusRequest& request) {
- Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
- if (!status.ok()) {
- LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
- }
-
- VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
- << ", status=" << t_agent_result.status.status_code
- << ", etl_state=" << t_agent_result.etl_state << ", files=";
- for (auto& item : t_agent_result.file_map) {
- VLOG_RPC << item.first << ":" << item.second << ";";
- }
- VLOG_RPC << "]";
-}
-
-void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
- const TDeleteEtlFilesRequest& request) {
- Status status = _exec_env->etl_job_mgr()->erase_job(request);
- if (!status.ok()) {
- LOG(WARNING) << "fail to delete etl files. because " << status.get_error_msg()
- << " with request " << request;
- }
-
- VLOG_RPC << "success to delete etl files. request=" << request;
- status.to_thrift(&t_agent_result.status);
-}
-
} // namespace doris
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3998a6b258..2897fab586 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -47,12 +47,6 @@ public:
// [[deprecated]]
void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request);
- // Multi-Load will still use the following 3 methods for now.
- void submit_etl_task(TAgentResult& agent_result, const TMiniLoadEtlTaskRequest& request);
- void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
- const TMiniLoadEtlStatusRequest& request);
- void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& request);
-
private:
DISALLOW_COPY_AND_ASSIGN(AgentServer);
diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt
index 8ca052da65..b956d0982b 100644
--- a/be/src/http/CMakeLists.txt
+++ b/be/src/http/CMakeLists.txt
@@ -34,7 +34,6 @@ add_library(Webserver STATIC
ev_http_server.cpp
http_client.cpp
action/download_action.cpp
- action/mini_load.cpp
action/monitor_action.cpp
action/health_action.cpp
action/tablet_migration_action.cpp
diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp
deleted file mode 100644
index 53c1464471..0000000000
--- a/be/src/http/action/mini_load.cpp
+++ /dev/null
@@ -1,966 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "http/action/mini_load.h"
-
-#include <event2/buffer.h>
-#include <event2/bufferevent.h>
-#include <event2/http.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <thrift/protocol/TDebugProtocol.h>
-#include <time.h>
-#include <unistd.h>
-
-#include <functional>
-#include <mutex>
-#include <sstream>
-#include <string>
-
-#include "agent/cgroups_mgr.h"
-#include "common/status.h"
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/FrontendService_types.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/MasterService_types.h"
-#include "http/http_channel.h"
-#include "http/http_headers.h"
-#include "http/http_parser.h"
-#include "http/http_request.h"
-#include "http/http_response.h"
-#include "http/http_status.h"
-#include "http/utils.h"
-#include "olap/file_helper.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "runtime/load_path_mgr.h"
-#include "runtime/stream_load/stream_load_context.h"
-#include "service/backend_options.h"
-#include "util/file_utils.h"
-#include "util/json_util.h"
-#include "util/string_parser.hpp"
-#include "util/string_util.h"
-#include "util/thrift_rpc_helper.h"
-#include "util/time.h"
-#include "util/url_coding.h"
-
-namespace doris {
-
-// context used to handle mini-load in asynchronous mode
-struct MiniLoadAsyncCtx {
- MiniLoadAsyncCtx(MiniLoadAction* handler_) : handler(handler_) {}
- ~MiniLoadAsyncCtx() {
- if (need_remove_handle) {
- handler->erase_handle(load_handle);
- }
- if (fd >= 0) {
- ::close(fd);
- }
- }
-
- MiniLoadAction* handler;
-
- // used to check duplicate
- LoadHandle load_handle;
- bool need_remove_handle = false;
-
- // file to save
- std::string file_path;
- int fd = -1;
-
- size_t body_bytes = 0;
- size_t bytes_written = 0;
-
- TLoadCheckRequest load_check_req;
-};
-
-struct MiniLoadCtx {
- MiniLoadCtx(bool is_streaming_) : is_streaming(is_streaming_) {}
-
- bool is_streaming = false;
- MiniLoadAsyncCtx* mini_load_async_ctx = nullptr;
- StreamLoadContext* stream_load_ctx = nullptr;
-};
-
-const std::string CLUSTER_KEY = "cluster";
-const std::string DB_KEY = "db";
-const std::string TABLE_KEY = "table";
-const std::string LABEL_KEY = "label";
-const std::string SUB_LABEL_KEY = "sub_label";
-const std::string FILE_PATH_KEY = "file_path";
-const std::string COLUMNS_KEY = "columns";
-const std::string HLL_KEY = "hll";
-const std::string COLUMN_SEPARATOR_KEY = "column_separator";
-const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio";
-const std::string STRICT_MODE_KEY = "strict_mode";
-const std::string TIMEOUT_KEY = "timeout";
-const char* k_100_continue = "100-continue";
-
-MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-static bool is_name_valid(const std::string& name) {
- return !name.empty();
-}
-
-static Status check_request(HttpRequest* req) {
- auto& params = *req->params();
-
- // check params
- if (!is_name_valid(params[DB_KEY])) {
- return Status::InternalError("Database name is not valid.");
- }
- if (!is_name_valid(params[TABLE_KEY])) {
- return Status::InternalError("Table name is not valid.");
- }
- if (!is_name_valid(params[LABEL_KEY])) {
- return Status::InternalError("Label name is not valid.");
- }
-
- return Status::OK();
-}
-
-Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string& table,
- std::string* file_path) {
- std::string prefix;
- RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(desc.db, desc.label, &prefix));
- timeval tv;
- gettimeofday(&tv, nullptr);
- struct tm tm;
- time_t cur_sec = tv.tv_sec;
- localtime_r(&cur_sec, &tm);
- char buf[64];
- strftime(buf, 64, "%Y%m%d%H%M%S", &tm);
-
- std::stringstream ss;
- ss << prefix << "/" << table << "." << desc.sub_label << "." << buf << "." << tv.tv_usec;
- *file_path = ss.str();
- return Status::OK();
-}
-
-Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path,
- const std::string& user, const std::string& cluster,
- int64_t file_size) {
- // Prepare request parameters.
- std::map<std::string, std::string> params(http_req->query_params().begin(),
- http_req->query_params().end());
- RETURN_IF_ERROR(_merge_header(http_req, ¶ms));
- params.erase(LABEL_KEY);
- params.erase(SUB_LABEL_KEY);
-
- // put here to log master information
- const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
- Status status;
- FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- std::stringstream ss;
- ss << "Connect master failed, with address(" << master_address.hostname << ":"
- << master_address.port << ")";
- LOG(WARNING) << ss.str();
- return status;
- }
- TFeResult res;
- try {
- TMiniLoadRequest req;
- req.protocolVersion = FrontendServiceVersion::V1;
- req.__set_db(http_req->param(DB_KEY));
- if (!cluster.empty()) {
- req.__set_cluster(cluster);
- }
- req.__set_tbl(http_req->param(TABLE_KEY));
- req.__set_label(http_req->param(LABEL_KEY));
- req.__set_user(user);
- // Belong to a multi-load transaction
- if (!http_req->param(SUB_LABEL_KEY).empty()) {
- req.__set_subLabel(http_req->param(SUB_LABEL_KEY));
- }
- req.__set_properties(params);
- req.files.push_back(file_path);
- req.__isset.file_size = true;
- req.file_size.push_back(file_size);
- req.backend.__set_hostname(BackendOptions::get_localhost());
- req.backend.__set_port(config::be_port);
-
- req.__set_timestamp(GetCurrentTimeMicros());
- try {
- client->miniLoad(res, req);
- } catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
- << master_address.port << ") because: " << e.what();
- status = client.reopen(config::thrift_rpc_timeout_ms);
- if (!status.ok()) {
- LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
- << ":" << master_address.port << ")";
- return status;
- }
- client->miniLoad(res, req);
- } catch (apache::thrift::TApplicationException& e) {
- LOG(WARNING) << "mini load request from master(" << master_address.hostname << ":"
- << master_address.port << ") got unknown result: " << e.what();
-
- status = client.reopen(config::thrift_rpc_timeout_ms);
- if (!status.ok()) {
- LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
- << ":" << master_address.port << ")";
- return status;
- }
- client->miniLoad(res, req);
- }
- } catch (apache::thrift::TException& e) {
- // failed when retry.
- // reopen to disable this connection
- client.reopen(config::thrift_rpc_timeout_ms);
- std::stringstream ss;
- ss << "Request miniload from master(" << master_address.hostname << ":"
- << master_address.port << ") because: " << e.what();
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- return Status(res.status);
-}
-
-Status MiniLoadAction::_merge_header(HttpRequest* http_req,
- std::map<std::string, std::string>* params) {
- if (http_req == nullptr || params == nullptr) {
- return Status::OK();
- }
- if (!http_req->header(HTTP_FORMAT_KEY).empty()) {
- (*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY);
- }
- if (!http_req->header(HTTP_COLUMNS).empty()) {
- (*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS);
- }
- if (!http_req->header(HTTP_WHERE).empty()) {
- (*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE);
- }
- if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
- (*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR);
- }
- if (!http_req->header(HTTP_PARTITIONS).empty()) {
- (*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS);
- if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
- return Status::InvalidArgument(
- "Can not specify both partitions and temporary partitions");
- }
- }
- if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
- (*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS);
- if (!http_req->header(HTTP_PARTITIONS).empty()) {
- return Status::InvalidArgument(
- "Can not specify both partitions and temporary partitions");
- }
- }
- if (!http_req->header(HTTP_NEGATIVE).empty() &&
- iequal(http_req->header(HTTP_NEGATIVE), "true")) {
- (*params)[HTTP_NEGATIVE] = "true";
- } else {
- (*params)[HTTP_NEGATIVE] = "false";
- }
- if (!http_req->header(HTTP_STRICT_MODE).empty()) {
- if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
- (*params)[HTTP_STRICT_MODE] = "false";
- } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
- (*params)[HTTP_STRICT_MODE] = "true";
- } else {
- return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
- }
- }
- if (!http_req->header(HTTP_TIMEZONE).empty()) {
- (*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE);
- }
- if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
- (*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT);
- }
- if (!http_req->header(HTTP_JSONPATHS).empty()) {
- (*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS);
- }
- if (!http_req->header(HTTP_JSONROOT).empty()) {
- (*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT);
- }
- if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
- if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
- (*params)[HTTP_STRIP_OUTER_ARRAY] = "true";
- } else {
- (*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
- }
- } else {
- (*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
- }
- if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
- if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
- (*params)[HTTP_FUZZY_PARSE] = "true";
- } else {
- (*params)[HTTP_FUZZY_PARSE] = "false";
- }
- } else {
- (*params)[HTTP_FUZZY_PARSE] = "false";
- }
- if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
- if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
- (*params)[HTTP_READ_JSON_BY_LINE] = "true";
- } else {
- (*params)[HTTP_READ_JSON_BY_LINE] = "false";
- }
- } else {
- (*params)[HTTP_READ_JSON_BY_LINE] = "false";
- }
-
- if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
- (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] =
- http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL);
- }
- if (params->find(HTTP_MERGE_TYPE) == params->end()) {
- params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND"));
- }
- StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
- {"DELETE", TMergeType::DELETE},
- {"MERGE", TMergeType::MERGE}};
- if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
- std::string merge_type = http_req->header(HTTP_MERGE_TYPE);
- auto it = merge_type_map.find(merge_type);
- if (it != merge_type_map.end()) {
- (*params)[HTTP_MERGE_TYPE] = it->first;
- } else {
- return Status::InvalidArgument("Invalid merge type " + merge_type);
- }
- }
- if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
- if ((*params)[HTTP_MERGE_TYPE] == "MERGE") {
- (*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION);
- } else {
- return Status::InvalidArgument("not support delete when merge type is " +
- (*params)[HTTP_MERGE_TYPE] + ".");
- }
- }
- return Status::OK();
-}
-
-static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd,
- std::string* cluster) {
- std::string decoded_auth;
-
- if (!base64_decode(auth, &decoded_auth)) {
- return false;
- }
- std::string::size_type pos = decoded_auth.find(':');
- if (pos == std::string::npos) {
- return false;
- }
- user->assign(decoded_auth.c_str(), pos);
- passwd->assign(decoded_auth.c_str() + pos + 1);
- const std::string::size_type cluster_pos = user->find('@');
- if (cluster_pos != std::string::npos) {
- cluster->assign(user->c_str(), cluster_pos + 1, pos - cluster_pos - 1);
- user->assign(user->c_str(), cluster_pos);
- }
- return true;
-}
-
-Status MiniLoadAction::check_auth(const HttpRequest* http_req,
- const TLoadCheckRequest& check_load_req) {
- // put here to log master information
- const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
- Status status;
- FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- std::stringstream ss;
- ss << "Connect master failed, with address(" << master_address.hostname << ":"
- << master_address.port << ")";
- LOG(WARNING) << ss.str();
- return status;
- }
-
- TFeResult res;
- try {
- try {
- client->loadCheck(res, check_load_req);
- } catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
- << master_address.port << ") because: " << e.what();
- status = client.reopen(config::thrift_rpc_timeout_ms);
- if (!status.ok()) {
- LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
- << ":" << master_address.port << ")";
- return status;
- }
- client->loadCheck(res, check_load_req);
- } catch (apache::thrift::TApplicationException& e) {
- LOG(WARNING) << "load check request from master(" << master_address.hostname << ":"
- << master_address.port << ") got unknown result: " << e.what();
-
- status = client.reopen(config::thrift_rpc_timeout_ms);
- if (!status.ok()) {
- LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
- << ":" << master_address.port << ")";
- return status;
- }
- client->loadCheck(res, check_load_req);
- }
- } catch (apache::thrift::TException& e) {
- // failed when retry.
- // reopen to disable this connection
- client.reopen(config::thrift_rpc_timeout_ms);
- std::stringstream ss;
- ss << "Request miniload from master(" << master_address.hostname << ":"
- << master_address.port << ") because: " << e.what();
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- return Status(res.status);
-}
-
-void MiniLoadAction::erase_handle(const LoadHandle& desc) {
- // remove
- std::lock_guard<std::mutex> l(_lock);
- _current_load.erase(desc);
-}
-
-int MiniLoadAction::on_header(HttpRequest* req) {
- // check authorization first, make client know what happened
- if (req->header(HttpHeaders::AUTHORIZATION).empty()) {
- HttpChannel::send_basic_challenge(req, "mini_load");
- return -1;
- }
-
- Status status;
- MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(_is_streaming(req));
- req->set_handler_ctx(mini_load_ctx);
- if (((MiniLoadCtx*)req->handler_ctx())->is_streaming) {
- status = _on_new_header(req);
- StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
- if (ctx != nullptr) {
- ctx->status = status;
- }
- } else {
- status = _on_header(req);
- }
- if (!status.ok()) {
- HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.get_error_msg());
- return -1;
- }
- return 0;
-}
-
-bool MiniLoadAction::_is_streaming(HttpRequest* req) {
- // multi load must be non-streaming
- if (!req->param(SUB_LABEL_KEY).empty()) {
- return false;
- }
-
- TIsMethodSupportedRequest request;
- request.__set_function_name(_streaming_function_name);
- const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
- TFeResult res;
- Status status = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_address.hostname, master_address.port,
- [&request, &res](FrontendServiceConnection& client) {
- client->isMethodSupported(res, request);
- });
- if (!status.ok()) {
- std::stringstream ss;
- ss << "This mini load is not streaming because: " << status.get_error_msg()
- << " with address(" << master_address.hostname << ":" << master_address.port << ")";
- LOG(INFO) << ss.str();
- return false;
- }
-
- status = Status(res.status);
- if (!status.ok()) {
- std::stringstream ss;
- ss << "This streaming mini load is not be supportd because: " << status.get_error_msg()
- << " with address(" << master_address.hostname << ":" << master_address.port << ")";
- LOG(INFO) << ss.str();
- return false;
- }
- return true;
-}
-
-Status MiniLoadAction::_on_header(HttpRequest* req) {
- size_t body_bytes = 0;
- size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
- if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
- body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
- if (body_bytes > max_body_bytes) {
- std::stringstream ss;
- ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
- return Status::InternalError(ss.str());
- }
- } else {
- evhttp_connection_set_max_body_size(
- evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
- }
-
- RETURN_IF_ERROR(check_request(req));
-
- std::unique_ptr<MiniLoadAsyncCtx> mini_load_async_ctx(new MiniLoadAsyncCtx(this));
- mini_load_async_ctx->body_bytes = body_bytes;
- mini_load_async_ctx->load_handle.db = req->param(DB_KEY);
- mini_load_async_ctx->load_handle.label = req->param(LABEL_KEY);
- mini_load_async_ctx->load_handle.sub_label = req->param(SUB_LABEL_KEY);
-
- // check if duplicate
- // Use this to prevent that two callback function write to one file
- // that file may be writen bad
- {
- std::lock_guard<std::mutex> l(_lock);
- if (_current_load.find(mini_load_async_ctx->load_handle) != _current_load.end()) {
- return Status::InternalError("Duplicate mini load request.");
- }
- _current_load.insert(mini_load_async_ctx->load_handle);
- mini_load_async_ctx->need_remove_handle = true;
- }
- // generate load check request
- RETURN_IF_ERROR(generate_check_load_req(req, &mini_load_async_ctx->load_check_req));
-
- // Check auth
- RETURN_IF_ERROR(check_auth(req, mini_load_async_ctx->load_check_req));
-
- // Receive data first, keep things easy.
- RETURN_IF_ERROR(data_saved_dir(mini_load_async_ctx->load_handle, req->param(TABLE_KEY),
- &mini_load_async_ctx->file_path));
- // destructor will close the file handle, not depend on DeferOp any more
- mini_load_async_ctx->fd =
- open(mini_load_async_ctx->file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0660);
- if (mini_load_async_ctx->fd < 0) {
- char buf[64];
- LOG(WARNING) << "open file failed, path=" << mini_load_async_ctx->file_path
- << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
- return Status::InternalError("open file failed");
- }
-
- ((MiniLoadCtx*)req->handler_ctx())->mini_load_async_ctx = mini_load_async_ctx.release();
- return Status::OK();
-}
-
-void MiniLoadAction::on_chunk_data(HttpRequest* http_req) {
- MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
- if (ctx->is_streaming) {
- _on_new_chunk_data(http_req);
- } else {
- _on_chunk_data(http_req);
- }
-}
-
-void MiniLoadAction::_on_chunk_data(HttpRequest* http_req) {
- MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
- if (ctx == nullptr) {
- return;
- }
-
- struct evhttp_request* ev_req = http_req->get_evhttp_request();
- auto evbuf = evhttp_request_get_input_buffer(ev_req);
-
- char buf[4096];
- while (evbuffer_get_length(evbuf) > 0) {
- auto n = evbuffer_remove(evbuf, buf, sizeof(buf));
- while (n > 0) {
- auto res = write(ctx->fd, buf, n);
- if (res < 0) {
- char errbuf[64];
- LOG(WARNING) << "write file failed, path=" << ctx->file_path << ", errno=" << errno
- << ", errmsg=" << strerror_r(errno, errbuf, sizeof(errbuf));
- HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
- "write file failed");
- delete ctx;
- http_req->set_handler_ctx(nullptr);
- return;
- }
- n -= res;
- ctx->bytes_written += res;
- }
- }
-}
-
-void MiniLoadAction::_on_new_chunk_data(HttpRequest* http_req) {
- StreamLoadContext* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->stream_load_ctx;
- if (ctx == nullptr || !ctx->status.ok()) {
- return;
- }
-
- struct evhttp_request* ev_req = http_req->get_evhttp_request();
- auto evbuf = evhttp_request_get_input_buffer(ev_req);
-
- while (evbuffer_get_length(evbuf) > 0) {
- auto bb = ByteBuffer::allocate(4096);
- auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
- bb->pos = remove_bytes;
- bb->flip();
- auto st = ctx->body_sink->append(bb);
- if (!st.ok()) {
- LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg()
- << ctx->brief();
- ctx->status = st;
- return;
- }
- ctx->receive_bytes += remove_bytes;
- }
-}
-
-void MiniLoadAction::free_handler_ctx(void* param) {
- MiniLoadCtx* ctx = (MiniLoadCtx*)param;
- if (ctx->is_streaming) {
- StreamLoadContext* streaming_ctx = ((MiniLoadCtx*)param)->stream_load_ctx;
- if (streaming_ctx != nullptr) {
- // sender is going, make receiver know it
- if (streaming_ctx->body_sink != nullptr) {
- LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string()
- << " because sender failed";
- streaming_ctx->body_sink->cancel("sender failed");
- }
- if (streaming_ctx->unref()) {
- delete streaming_ctx;
- }
- }
- } else {
- MiniLoadAsyncCtx* async_ctx = ((MiniLoadCtx*)param)->mini_load_async_ctx;
- delete async_ctx;
- }
- delete ctx;
-}
-
-void MiniLoadAction::handle(HttpRequest* http_req) {
- MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
- if (ctx->is_streaming) {
- _new_handle(http_req);
- } else {
- _handle(http_req);
- }
-}
-
-void MiniLoadAction::_handle(HttpRequest* http_req) {
- MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
- if (ctx == nullptr) {
- // when ctx is nullptr, there must be error happened when on_chunk_data
- // and reply is sent, we just return with no operation
- LOG(WARNING) << "handler context is nullptr when MiniLoad callback execute, uri="
- << http_req->uri();
- return;
- }
- if (ctx->body_bytes > 0 && ctx->bytes_written != ctx->body_bytes) {
- LOG(WARNING) << "bytes written is not equal with body size, uri=" << http_req->uri()
- << ", body_bytes=" << ctx->body_bytes
- << ", bytes_written=" << ctx->bytes_written;
- HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
- "receipt size not equal with body size");
- return;
- }
- auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster,
- ctx->bytes_written);
- std::string str = st.to_json();
- HttpChannel::send_reply(http_req, str);
-}
-
-Status MiniLoadAction::generate_check_load_req(const HttpRequest* http_req,
- TLoadCheckRequest* check_load_req) {
- const char k_basic[] = "Basic ";
- const std::string& auth = http_req->header(HttpHeaders::AUTHORIZATION);
- if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) {
- return Status::InternalError("Not support Basic authorization.");
- }
-
- check_load_req->protocolVersion = FrontendServiceVersion::V1;
- // Skip "Basic "
- std::string str = auth.substr(sizeof(k_basic) - 1);
- std::string cluster;
- if (!parse_auth(str, &(check_load_req->user), &(check_load_req->passwd), &cluster)) {
- LOG(WARNING) << "parse auth string failed." << auth << " and str " << str;
- return Status::InternalError("Parse authorization failed.");
- }
- if (!cluster.empty()) {
- check_load_req->__set_cluster(cluster);
- }
- check_load_req->db = http_req->param(DB_KEY);
- check_load_req->__set_tbl(http_req->param(TABLE_KEY));
- if (http_req->param(SUB_LABEL_KEY).empty()) {
- check_load_req->__set_label(http_req->param(LABEL_KEY));
- check_load_req->__set_timestamp(GetCurrentTimeMicros());
- }
-
- if (http_req->remote_host() != nullptr) {
- std::string user_ip(http_req->remote_host());
- check_load_req->__set_user_ip(user_ip);
- }
-
- return Status::OK();
-}
-
-bool LoadHandleCmp::operator()(const LoadHandle& lhs, const LoadHandle& rhs) const {
- int ret = lhs.label.compare(rhs.label);
- if (ret < 0) {
- return true;
- } else if (ret > 0) {
- return false;
- }
-
- ret = lhs.sub_label.compare(rhs.sub_label);
- if (ret < 0) {
- return true;
- } else if (ret > 0) {
- return false;
- }
-
- ret = lhs.db.compare(rhs.db);
- if (ret < 0) {
- return true;
- }
-
- return false;
-}
-
-// fe will begin the txn and record the metadata of load
-Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
- // prepare begin mini load request params
- TMiniLoadBeginRequest request;
- set_request_auth(&request, ctx->auth);
- request.db = ctx->db;
- request.tbl = ctx->table;
- request.label = ctx->label;
- if (!ctx->sub_label.empty()) {
- request.__set_sub_label(ctx->sub_label);
- }
- if (ctx->timeout_second != -1) {
- request.__set_timeout_second(ctx->timeout_second);
- }
- if (ctx->max_filter_ratio != 0.0) {
- request.__set_max_filter_ratio(ctx->max_filter_ratio);
- }
- request.__set_create_timestamp(UnixMillis());
- request.__set_request_id(ctx->id.to_thrift());
- // begin load by master
- const TNetworkAddress& master_addr = _exec_env->master_info()->network_address;
- TMiniLoadBeginResult res;
- RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&request, &res](FrontendServiceConnection& client) {
- client->miniLoadBegin(res, request);
- }));
- Status begin_status(res.status);
- if (!begin_status.ok()) {
- LOG(INFO) << "failed to begin mini load " << ctx->label
- << " with error msg:" << begin_status.get_error_msg();
- return begin_status;
- }
- ctx->txn_id = res.txn_id;
- // txn has been begun in fe
- ctx->need_rollback = true;
- LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe";
- return Status::OK();
-}
-
-Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
- // prepare request parameters
- TStreamLoadPutRequest put_request;
- set_request_auth(&put_request, ctx->auth);
- put_request.db = ctx->db;
- put_request.tbl = ctx->table;
- put_request.txnId = ctx->txn_id;
- put_request.formatType = ctx->format;
- put_request.__set_loadId(ctx->id.to_thrift());
- put_request.fileType = TFileType::FILE_STREAM;
- std::map<std::string, std::string> params(req->query_params().begin(),
- req->query_params().end());
- /* merge params of columns and hll
- * for example:
- * input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3
- * output: columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3)
- */
- auto columns_it = params.find(COLUMNS_KEY);
- if (columns_it != params.end()) {
- std::string columns_value = columns_it->second;
- auto hll_it = params.find(HLL_KEY);
- if (hll_it != params.end()) {
- std::string hll_value = hll_it->second;
- if (hll_value.empty()) {
- return Status::InvalidArgument(
- "Hll value could not be empty when hll key is exists!");
- }
- std::map<std::string, std::string> hll_map;
- RETURN_IF_ERROR(StringParser::split_string_to_map(hll_value, ":", ",", &hll_map));
- if (hll_map.empty()) {
- return Status::InvalidArgument("Hll value could not transform to hll expr: " +
- hll_value);
- }
- for (auto& hll_element : hll_map) {
- columns_value += "," + hll_element.first + "=hll_hash(" + hll_element.second + ")";
- }
- }
- put_request.__set_columns(columns_value);
- }
- auto column_separator_it = params.find(COLUMN_SEPARATOR_KEY);
- if (column_separator_it != params.end()) {
- put_request.__set_columnSeparator(column_separator_it->second);
- }
- if (ctx->timeout_second != -1) {
- put_request.__set_timeout(ctx->timeout_second);
- }
- auto strict_mode_it = params.find(STRICT_MODE_KEY);
- if (strict_mode_it != params.end()) {
- std::string strict_mode_value = strict_mode_it->second;
- if (iequal(strict_mode_value, "false")) {
- put_request.__set_strictMode(false);
- } else if (iequal(strict_mode_value, "true")) {
- put_request.__set_strictMode(true);
- } else {
- return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
- }
- }
-
- // plan this load
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
- RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&put_request, ctx](FrontendServiceConnection& client) {
- client->streamLoadPut(ctx->put_result, put_request);
- }));
- Status plan_status(ctx->put_result.status);
- if (!plan_status.ok()) {
- LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status.get_error_msg()
- << ctx->brief();
- return plan_status;
- }
- VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
- return Status::OK();
-}
-
-// new on_header of mini load
-Status MiniLoadAction::_on_new_header(HttpRequest* req) {
- size_t body_bytes = 0;
- size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
- if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
- body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
- if (body_bytes > max_body_bytes) {
- std::stringstream ss;
- ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
- return Status::InvalidArgument(ss.str());
- }
- } else {
- evhttp_connection_set_max_body_size(
- evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
- }
-
- RETURN_IF_ERROR(check_request(req));
-
- StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
- ctx->ref();
- ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx = ctx;
-
- // auth information
- if (!parse_basic_auth(*req, &ctx->auth)) {
- LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
- return Status::InvalidArgument("no valid Basic authorization");
- }
-
- ctx->load_type = TLoadType::MINI_LOAD;
- ctx->load_src_type = TLoadSourceType::RAW;
-
- ctx->db = req->param(DB_KEY);
- ctx->table = req->param(TABLE_KEY);
- ctx->label = req->param(LABEL_KEY);
- if (!req->param(SUB_LABEL_KEY).empty()) {
- ctx->sub_label = req->param(SUB_LABEL_KEY);
- }
- ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
- std::map<std::string, std::string> params(req->query_params().begin(),
- req->query_params().end());
- auto max_filter_ratio_it = params.find(MAX_FILTER_RATIO_KEY);
- if (max_filter_ratio_it != params.end()) {
- ctx->max_filter_ratio = strtod(max_filter_ratio_it->second.c_str(), nullptr);
- }
- auto timeout_it = params.find(TIMEOUT_KEY);
- if (timeout_it != params.end()) {
- try {
- ctx->timeout_second = std::stoi(timeout_it->second);
- } catch (const std::invalid_argument& e) {
- return Status::InvalidArgument("Invalid timeout format");
- }
- }
-
- LOG(INFO) << "new income mini load request." << ctx->brief() << ", db: " << ctx->db
- << ", tbl: " << ctx->table;
-
- // record metadata in frontend
- RETURN_IF_ERROR(_begin_mini_load(ctx));
-
- // open sink
- auto pipe = std::make_shared<StreamLoadPipe>();
- RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
- ctx->body_sink = pipe;
-
- // get plan from fe
- RETURN_IF_ERROR(_process_put(req, ctx));
-
- // execute plan
- return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
-}
-
-void MiniLoadAction::_new_handle(HttpRequest* req) {
- StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
- DCHECK(ctx != nullptr);
-
- if (ctx->status.ok()) {
- ctx->status = _on_new_handle(ctx);
- if (!ctx->status.ok()) {
- LOG(WARNING) << "handle mini load failed, id=" << ctx->id
- << ", errmsg=" << ctx->status.get_error_msg();
- }
- }
-
- // if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn.
- // PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt
- // to see the final result.
- if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
- if (ctx->need_rollback) {
- _exec_env->stream_load_executor()->rollback_txn(ctx);
- ctx->need_rollback = false;
- }
- if (ctx->body_sink.get() != nullptr) {
- ctx->body_sink->cancel(ctx->status.get_error_msg());
- }
- }
-
- std::string str = ctx->to_json_for_mini_load();
- str += '\n';
- HttpChannel::send_reply(req, str);
-}
-
-Status MiniLoadAction::_on_new_handle(StreamLoadContext* ctx) {
- if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
- LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes
- << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
- return Status::InternalError("receive body don't equal with body bytes");
- }
-
- // wait stream load sink finish
- RETURN_IF_ERROR(ctx->body_sink->finish());
-
- // wait stream load finish
- RETURN_IF_ERROR(ctx->future.get());
-
- // commit this load with mini load attachment
- RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
-
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/http/action/mini_load.h b/be/src/http/action/mini_load.h
deleted file mode 100644
index a21a450f19..0000000000
--- a/be/src/http/action/mini_load.h
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-#include <mutex>
-#include <set>
-#include <string>
-
-#include "common/status.h"
-#include "gen_cpp/FrontendService.h"
-#include "http/http_handler.h"
-#include "runtime/stream_load/stream_load_context.h"
-#include "util/defer_op.h"
-
-namespace doris {
-
-// Used to identify one mini load job
-struct LoadHandle {
- std::string db;
- std::string label;
- std::string sub_label;
-};
-
-struct LoadHandleCmp {
- bool operator()(const LoadHandle& lhs, const LoadHandle& rhs) const;
-};
-
-class TMasterResult;
-class ExecEnv;
-class StreamLoadContext;
-
-// This a handler for mini load
-// path is /api/{db}/{table}/_load
-class MiniLoadAction : public HttpHandler {
-public:
- MiniLoadAction(ExecEnv* exec_env);
-
- virtual ~MiniLoadAction() {}
-
- void handle(HttpRequest* req) override;
-
- bool request_will_be_read_progressively() override { return true; }
-
- int on_header(HttpRequest* req) override;
-
- void on_chunk_data(HttpRequest* req) override;
- void free_handler_ctx(void* ctx) override;
-
- void erase_handle(const LoadHandle& handle);
-
-private:
- Status _load(HttpRequest* req, const std::string& file_path, const std::string& user,
- const std::string& cluster, int64_t file_size);
-
- Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path);
-
- Status _on_header(HttpRequest* http_req);
-
- Status generate_check_load_req(const HttpRequest* http_req, TLoadCheckRequest* load_check_req);
-
- Status check_auth(const HttpRequest* http_req, const TLoadCheckRequest& load_check_req);
-
- void _on_chunk_data(HttpRequest* http_req);
-
- void _handle(HttpRequest* http_req);
-
- // streaming mini load
- Status _on_new_header(HttpRequest* req);
-
- Status _begin_mini_load(StreamLoadContext* ctx);
-
- Status _process_put(HttpRequest* req, StreamLoadContext* ctx);
-
- void _on_new_chunk_data(HttpRequest* http_req);
-
- void _new_handle(HttpRequest* req);
-
- Status _on_new_handle(StreamLoadContext* ctx);
-
- bool _is_streaming(HttpRequest* req);
-
- Status _merge_header(HttpRequest* http_req, std::map<std::string, std::string>* params);
-
- const std::string _streaming_function_name = "STREAMING_MINI_LOAD";
-
- ExecEnv* _exec_env;
-
- std::mutex _lock;
- // Used to check if load is duplicated in this instance.
- std::set<LoadHandle, LoadHandleCmp> _current_load;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 7692ae933f..2cc56e137b 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -60,7 +60,6 @@ set(RUNTIME_FILES
qsorter.cpp
fragment_mgr.cpp
dpp_sink_internal.cpp
- etl_job_mgr.cpp
load_path_mgr.cpp
types.cpp
tmp_file_mgr.cc
diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp
deleted file mode 100644
index ce4029b144..0000000000
--- a/be/src/runtime/etl_job_mgr.cpp
+++ /dev/null
@@ -1,302 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/etl_job_mgr.h"
-
-#include <filesystem>
-#include <functional>
-
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/MasterService_types.h"
-#include "gen_cpp/Status_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "runtime/plan_fragment_executor.h"
-#include "runtime/runtime_state.h"
-#include "service/backend_options.h"
-#include "util/file_utils.h"
-#include "util/uid_util.h"
-
-namespace doris {
-
-#define VLOG_ETL VLOG_CRITICAL
-
-std::string EtlJobMgr::to_http_path(const std::string& file_name) {
- std::stringstream url;
- url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
- << "/api/_download_load?"
- << "token=" << _exec_env->token() << "&file=" << file_name;
- return url.str();
-}
-
-std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) {
- std::stringstream url;
- url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
- << "/api/_load_error_log?"
- << "file=" << file_name;
- return url.str();
-}
-
-const std::string DPP_NORMAL_ALL = "dpp.norm.ALL";
-const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
-const std::string ERROR_FILE_PREFIX = "error_log";
-
-EtlJobMgr::EtlJobMgr(ExecEnv* exec_env)
- : _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {}
-
-EtlJobMgr::~EtlJobMgr() {}
-
-Status EtlJobMgr::init() {
- return Status::OK();
-}
-
-Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) {
- const TUniqueId& id = req.params.params.fragment_instance_id;
- std::lock_guard<std::mutex> l(_lock);
- auto it = _running_jobs.find(id);
- if (it != _running_jobs.end()) {
- // Already have this job, return what???
- LOG(INFO) << "Duplicated etl job(" << id << ")";
- return Status::OK();
- }
-
- // If already success, we return Status::OK()
- // and wait master ask me success information
- if (_success_jobs.exists(id)) {
- // Already success
- LOG(INFO) << "Already successful etl job(" << id << ")";
- return Status::OK();
- }
-
- RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
- req.params, std::bind<void>(&EtlJobMgr::finalize_job, this, std::placeholders::_1)));
-
- // redo this job if failed before
- if (_failed_jobs.exists(id)) {
- _failed_jobs.erase(id);
- }
-
- VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr.";
- _running_jobs.insert(id);
-
- return Status::OK();
-}
-
-void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) {
- TUpdateMiniEtlTaskStatusRequest request;
- RuntimeState* state = executor->runtime_state();
- request.protocolVersion = FrontendServiceVersion::V1;
- request.etlTaskId = state->fragment_instance_id();
- Status status = get_job_state(state->fragment_instance_id(), &request.etlTaskStatus);
- if (!status.ok()) {
- return;
- }
- const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
- FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- std::stringstream ss;
- ss << "Connect master failed, with address(" << master_address.hostname << ":"
- << master_address.port << ")";
- LOG(WARNING) << ss.str();
- return;
- }
- TFeResult res;
- try {
- try {
- client->updateMiniEtlTaskStatus(res, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "Retrying report etl jobs status to master(" << master_address.hostname
- << ":" << master_address.port << ") because: " << e.what();
- status = client.reopen(config::thrift_rpc_timeout_ms);
- if (!status.ok()) {
- LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname
- << ":" << master_address.port << ")";
- return;
- }
- client->updateMiniEtlTaskStatus(res, request);
- }
- } catch (apache::thrift::TException& e) {
- // failed when retry.
- // reopen to disable this connection
- client.reopen(config::thrift_rpc_timeout_ms);
- std::stringstream ss;
- ss << "Report etl task to master(" << master_address.hostname << ":" << master_address.port
- << ") failed because: " << e.what();
- LOG(WARNING) << ss.str();
- }
- // TODO(lingbin): check status of 'res' here.
- // because there are some checks in updateMiniEtlTaskStatus, for example max_filter_ratio.
- LOG(INFO) << "Successfully report elt job status to master.id=" << print_id(request.etlTaskId);
-}
-
-void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) {
- EtlJobResult result;
-
- RuntimeState* state = executor->runtime_state();
- if (executor->status().ok()) {
- // Get files
- for (auto& it : state->output_files()) {
- int64_t file_size = std::filesystem::file_size(it);
- result.file_map[to_http_path(it)] = file_size;
- }
- // set statistics
- result.process_normal_rows = state->num_rows_load_success();
- result.process_abnormal_rows = state->num_rows_load_filtered();
- } else {
- // get debug path
- result.process_normal_rows = state->num_rows_load_success();
- result.process_abnormal_rows = state->num_rows_load_filtered();
- }
-
- result.debug_path = state->get_error_log_file_path();
-
- finish_job(state->fragment_instance_id(), executor->status(), result);
-
- // Try to report this finished task to master
- report_to_master(executor);
-}
-
-Status EtlJobMgr::cancel_job(const TUniqueId& id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _running_jobs.find(id);
- if (it == _running_jobs.end()) {
- // Nothing to do
- LOG(INFO) << "No such job id, just print to info " << id;
- return Status::OK();
- }
- _running_jobs.erase(it);
- VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr.";
- EtlJobCtx job_ctx;
- job_ctx.finish_status = Status::Cancelled("Cancelled");
- _failed_jobs.put(id, job_ctx);
- return Status::OK();
-}
-
-Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status,
- const EtlJobResult& result) {
- std::lock_guard<std::mutex> l(_lock);
-
- auto it = _running_jobs.find(id);
- if (it == _running_jobs.end()) {
- std::stringstream ss;
- ss << "Unknown job id(" << id << ").";
- return Status::InternalError(ss.str());
- }
- _running_jobs.erase(it);
-
- EtlJobCtx ctx;
- ctx.finish_status = finish_status;
- ctx.result = result;
- if (finish_status.ok()) {
- _success_jobs.put(id, ctx);
- } else {
- _failed_jobs.put(id, ctx);
- }
-
- VLOG_ETL << "Move job(" << id << ") from running to "
- << (finish_status.ok() ? "success jobs" : "failed jobs");
-
- return Status::OK();
-}
-
-Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _running_jobs.find(id);
- if (it != _running_jobs.end()) {
- result->status.__set_status_code(TStatusCode::OK);
- result->__set_etl_state(TEtlState::RUNNING);
- return Status::OK();
- }
- // Successful
- if (_success_jobs.exists(id)) {
- EtlJobCtx ctx;
- _success_jobs.get(id, &ctx);
- result->status.__set_status_code(TStatusCode::OK);
- result->__set_etl_state(TEtlState::FINISHED);
- result->__set_file_map(ctx.result.file_map);
-
- // set counter
- std::map<std::string, std::string> counter;
- counter[DPP_NORMAL_ALL] = std::to_string(ctx.result.process_normal_rows);
- counter[DPP_ABNORMAL_ALL] = std::to_string(ctx.result.process_abnormal_rows);
- result->__set_counters(counter);
-
- if (!ctx.result.debug_path.empty()) {
- result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path));
- }
- return Status::OK();
- }
- // failed information
- if (_failed_jobs.exists(id)) {
- EtlJobCtx ctx;
- _failed_jobs.get(id, &ctx);
- result->status.__set_status_code(TStatusCode::OK);
- result->__set_etl_state(TEtlState::CANCELLED);
-
- if (!ctx.result.debug_path.empty()) {
- result->__set_tracking_url(to_http_path(ctx.result.debug_path));
- }
- return Status::OK();
- }
- // NO this jobs
- result->status.__set_status_code(TStatusCode::OK);
- result->__set_etl_state(TEtlState::CANCELLED);
- return Status::OK();
-}
-
-Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) {
- std::lock_guard<std::mutex> l(_lock);
- const TUniqueId& id = req.mini_load_id;
- auto it = _running_jobs.find(id);
- if (it != _running_jobs.end()) {
- std::stringstream ss;
- ss << "Job(" << id << ") is running, can not be deleted.";
- return Status::InternalError(ss.str());
- }
- _success_jobs.erase(id);
- _failed_jobs.erase(id);
-
- return Status::OK();
-}
-
-void EtlJobMgr::debug(std::stringstream& ss) {
- // Make things easy
- std::lock_guard<std::mutex> l(_lock);
-
- // Debug summary
- ss << "we have " << _running_jobs.size() << " jobs Running\n";
- ss << "we have " << _failed_jobs.size() << " jobs Failed\n";
- ss << "we have " << _success_jobs.size() << " jobs Successful\n";
- // Debug running jobs
- for (auto& it : _running_jobs) {
- ss << "running jobs: " << it << "\n";
- }
- // Debug success jobs
- for (auto& it : _success_jobs) {
- ss << "successful jobs: " << it.first << "\n";
- }
- // Debug failed jobs
- for (auto& it : _failed_jobs) {
- ss << "failed jobs: " << it.first << "\n";
- }
-}
-
-} // namespace doris
diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h
deleted file mode 100644
index 306cf0397f..0000000000
--- a/be/src/runtime/etl_job_mgr.h
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <pthread.h>
-
-#include <mutex>
-#include <string>
-#include <unordered_set>
-#include <vector>
-
-#include "common/status.h"
-#include "gen_cpp/Types_types.h"
-#include "http/rest_monitor_iface.h"
-#include "util/hash_util.hpp"
-#include "util/lru_cache.hpp"
-
-namespace doris {
-
-// used to report to master
-struct EtlJobResult {
- EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {}
- std::string debug_path;
- std::map<std::string, int64_t> file_map;
- int64_t process_normal_rows;
- int64_t process_abnormal_rows;
-};
-
-// used to report to master
-struct EtlJobCtx {
- Status finish_status;
- EtlJobResult result;
-};
-
-class TMiniLoadEtlStatusResult;
-class TMiniLoadEtlTaskRequest;
-class ExecEnv;
-class PlanFragmentExecutor;
-class TDeleteEtlFilesRequest;
-
-// manager of all the Etl job
-// used this because master may loop be to check if a load job is finished.
-class EtlJobMgr : public RestMonitorIface {
-public:
- EtlJobMgr(ExecEnv* exec_env);
-
- virtual ~EtlJobMgr();
-
- // make trash directory for collect
- Status init();
-
- // Make a job to running state
- // If this job is successful, return OK
- // If this job is failed, move this job from _failed_jobs to _running_jobs
- // Otherwise, put it to _running_jobs
- Status start_job(const TMiniLoadEtlTaskRequest& req);
-
- // Make a running job to failed job
- Status cancel_job(const TUniqueId& id);
-
- Status finish_job(const TUniqueId& id, const Status& finish_status, const EtlJobResult& result);
-
- Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result);
-
- Status erase_job(const TDeleteEtlFilesRequest& req);
-
- void finalize_job(PlanFragmentExecutor* executor);
-
- virtual void debug(std::stringstream& ss);
-
-private:
- std::string to_http_path(const std::string& file_path);
- std::string to_load_error_http_path(const std::string& file_path);
-
- void report_to_master(PlanFragmentExecutor* executor);
-
- ExecEnv* _exec_env;
- std::mutex _lock;
- std::unordered_set<TUniqueId> _running_jobs;
- LruCache<TUniqueId, EtlJobCtx> _success_jobs;
- LruCache<TUniqueId, EtlJobCtx> _failed_jobs;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 6fda2e6a1f..e0e09ae047 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -128,7 +128,6 @@ public:
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
TMasterInfo* master_info() { return _master_info; }
- EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; }
TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; }
@@ -207,7 +206,6 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
ResultCache* _result_cache = nullptr;
TMasterInfo* _master_info = nullptr;
- EtlJobMgr* _etl_job_mgr = nullptr;
LoadPathMgr* _load_path_mgr = nullptr;
DiskIoMgr* _disk_io_mgr = nullptr;
TmpFileMgr* _tmp_file_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 264565e4d6..2347d2bea8 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -31,7 +31,6 @@
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/disk_io_mgr.h"
-#include "runtime/etl_job_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
#include "runtime/fold_constant_executor.h"
@@ -128,7 +127,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
_master_info = new TMasterInfo();
- _etl_job_mgr = new EtlJobMgr(this);
_load_path_mgr = new LoadPathMgr(this);
_disk_io_mgr = new DiskIoMgr();
_tmp_file_mgr = new TmpFileMgr(this);
@@ -147,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_broker_client_cache->init_metrics("broker");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
- _etl_job_mgr->init();
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
@@ -335,7 +332,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_tmp_file_mgr);
SAFE_DELETE(_disk_io_mgr);
SAFE_DELETE(_load_path_mgr);
- SAFE_DELETE(_etl_job_mgr);
SAFE_DELETE(_master_info);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_cgroups_mgr);
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 4f080318b8..37491fc98e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -335,17 +335,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
}
switch (ctx->load_type) {
case TLoadType::MINI_LOAD: {
- attach->loadType = TLoadType::MINI_LOAD;
-
- TMiniLoadTxnCommitAttachment ml_attach;
- ml_attach.loadedRows = ctx->number_loaded_rows;
- ml_attach.filteredRows = ctx->number_filtered_rows;
- if (!ctx->error_url.empty()) {
- ml_attach.__set_errorLogUrl(ctx->error_url);
- }
-
- attach->mlTxnCommitAttachment = std::move(ml_attach);
- attach->__isset.mlTxnCommitAttachment = true;
+ LOG(FATAL) << "mini load is not supported any more";
break;
}
case TLoadType::ROUTINE_LOAD: {
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index bb47fe9e4e..c3f1261a63 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -36,10 +36,6 @@ class ThriftServer;
class TAgentResult;
class TAgentTaskRequest;
class TAgentPublishRequest;
-class TMiniLoadEtlTaskRequest;
-class TMiniLoadEtlStatusResult;
-class TMiniLoadEtlStatusRequest;
-class TDeleteEtlFilesRequest;
class TPlanExecRequest;
class TPlanExecParams;
class TExecPlanFragmentParams;
@@ -97,23 +93,6 @@ public:
_agent_server->publish_cluster_state(result, request);
}
- virtual void submit_etl_task(TAgentResult& result,
- const TMiniLoadEtlTaskRequest& request) override {
- VLOG_RPC << "submit_etl_task. request is "
- << apache::thrift::ThriftDebugString(request).c_str();
- _agent_server->submit_etl_task(result, request);
- }
-
- virtual void get_etl_status(TMiniLoadEtlStatusResult& result,
- const TMiniLoadEtlStatusRequest& request) override {
- _agent_server->get_etl_status(result, request);
- }
-
- virtual void delete_etl_files(TAgentResult& result,
- const TDeleteEtlFilesRequest& request) override {
- _agent_server->delete_etl_files(result, request);
- }
-
// DorisServer service
virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) override;
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 3ce77f6bb6..80dd2612b6 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -25,7 +25,6 @@
#include "http/action/health_action.h"
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
-#include "http/action/mini_load.h"
#include "http/action/pprof_actions.h"
#include "http/action/reload_tablet_action.h"
#include "http/action/reset_rpc_channel_action.h"
@@ -57,9 +56,9 @@ Status HttpService::start() {
add_default_path_handlers(_web_page_handler.get(), MemTracker::get_process_tracker());
// register load
- MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env));
- _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action);
StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
+ _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
+ streamload_action);
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load",
streamload_action);
StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env));
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 879b6f7b24..49c14c49f6 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -215,7 +215,6 @@ set(RUNTIME_TEST_FILES
# runtime/dpp_sink_internal_test.cpp
# runtime/dpp_sink_test.cpp
# runtime/data_spliter_test.cpp
- # runtime/etl_job_mgr_test.cpp
# runtime/tmp_file_mgr_test.cpp
# runtime/disk_io_mgr_test.cpp
# runtime/thread_resource_mgr_test.cpp
diff --git a/be/test/runtime/data_stream_test.cpp b/be/test/runtime/data_stream_test.cpp
index 39d7cba249..b0a8de6529 100644
--- a/be/test/runtime/data_stream_test.cpp
+++ b/be/test/runtime/data_stream_test.cpp
@@ -94,15 +94,6 @@ public:
virtual void publish_cluster_state(TAgentResult& return_val,
const TAgentPublishRequest& request) {}
- virtual void submit_etl_task(TAgentResult& return_val, const TMiniLoadEtlTaskRequest& request) {
- }
-
- virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val,
- const TMiniLoadEtlStatusRequest& request) {}
-
- virtual void delete_etl_files(TAgentResult& return_val, const TDeleteEtlFilesRequest& request) {
- }
-
virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
const int32_t num_senders) {}
diff --git a/be/test/runtime/etl_job_mgr_test.cpp b/be/test/runtime/etl_job_mgr_test.cpp
deleted file mode 100644
index edb8d140c3..0000000000
--- a/be/test/runtime/etl_job_mgr_test.cpp
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/etl_job_mgr.h"
-
-#include <gtest/gtest.h>
-
-#include "gen_cpp/Types_types.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "util/cpu_info.h"
-
-namespace doris {
-// Mock fragment mgr
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
- return Status::OK();
-}
-
-FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {}
-
-FragmentMgr::~FragmentMgr() {}
-
-void FragmentMgr::debug(std::stringstream& ss) {}
-
-class EtlJobMgrTest : public testing::Test {
-public:
- EtlJobMgrTest() {}
-
-private:
- ExecEnv _exec_env;
-};
-
-TEST_F(EtlJobMgrTest, NormalCase) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
- TMiniLoadEtlTaskRequest req;
- TDeleteEtlFilesRequest del_req;
- del_req.mini_load_id = id;
- req.params.params.fragment_instance_id = id;
-
- // make it running
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // make it finishing
- EtlJobResult job_result;
- job_result.file_map["abc"] = 100L;
- EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
- EXPECT_EQ(1, res.file_map.size());
- EXPECT_EQ(100, res.file_map["abc"]);
-
- // erase it
- EXPECT_TRUE(mgr.erase_job(del_req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, DuplicateCase) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
- TMiniLoadEtlTaskRequest req;
- req.params.params.fragment_instance_id = id;
-
- // make it running
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // Put it twice
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, RunAfterSuccess) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
- TMiniLoadEtlTaskRequest req;
- TDeleteEtlFilesRequest del_req;
- del_req.mini_load_id = id;
- req.params.params.fragment_instance_id = id;
-
- // make it running
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // make it finishing
- EtlJobResult job_result;
- job_result.file_map["abc"] = 100L;
- EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
- EXPECT_EQ(1, res.file_map.size());
- EXPECT_EQ(100, res.file_map["abc"]);
-
- // Put it twice
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
- EXPECT_EQ(1, res.file_map.size());
- EXPECT_EQ(100, res.file_map["abc"]);
-}
-
-TEST_F(EtlJobMgrTest, RunAfterFail) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
- TMiniLoadEtlTaskRequest req;
- req.params.params.fragment_instance_id = id;
-
- // make it running
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // make it finishing
- EtlJobResult job_result;
- job_result.debug_path = "abc";
- EXPECT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // Put it twice
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, CancelJob) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
- TMiniLoadEtlTaskRequest req;
- req.params.params.fragment_instance_id = id;
-
- // make it running
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // make it finishing
- EtlJobResult job_result;
- job_result.debug_path = "abc";
- EXPECT_TRUE(mgr.cancel_job(id).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
- // Put it twice
- EXPECT_TRUE(mgr.start_job(req).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, FinishUnknownJob) {
- EtlJobMgr mgr(&_exec_env);
- TUniqueId id;
- id.lo = 1;
- id.hi = 1;
-
- TMiniLoadEtlStatusResult res;
-
- // make it finishing
- EtlJobResult job_result;
- job_result.debug_path = "abc";
- EXPECT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
- EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
- EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
- EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index f6b85a0ebe..7278dd53aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -29,11 +29,9 @@ import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IsNullPredicate;
-import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
@@ -45,7 +43,6 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -88,14 +85,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
-import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.transaction.TransactionNotFoundException;
@@ -230,140 +225,6 @@ public class Load {
lock.writeLock().unlock();
}
- // return true if we truly add the load job
- // return false otherwise (eg: a retry request)
- @Deprecated
- public boolean addMiniLoadJob(TMiniLoadRequest request) throws DdlException {
- // get params
- String fullDbName = request.getDb();
- String tableName = request.getTbl();
- String label = request.getLabel();
- long timestamp = 0;
- if (request.isSetTimestamp()) {
- timestamp = request.getTimestamp();
- }
- TNetworkAddress beAddr = request.getBackend();
- String filePathsValue = request.getFiles().get(0);
- Map<String, String> params = request.getProperties();
-
- // create load stmt
- // label name
- LabelName labelName = new LabelName(fullDbName, label);
-
- // data descriptions
- // file paths
- if (Strings.isNullOrEmpty(filePathsValue)) {
- throw new DdlException("File paths are not specified");
- }
- List<String> filePaths = Arrays.asList(filePathsValue.split(","));
-
- // partitions | column names | separator | line delimiter
- List<String> partitionNames = null;
- List<String> columnNames = null;
- Separator columnSeparator = null;
- List<String> hllColumnPairList = null;
- Separator lineDelimiter = null;
- String formatType = null;
- if (params != null) {
- String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS);
- if (!Strings.isNullOrEmpty(specifiedPartitions)) {
- partitionNames = Arrays.asList(specifiedPartitions.split(","));
- }
- String specifiedColumns = params.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
- if (!Strings.isNullOrEmpty(specifiedColumns)) {
- columnNames = Arrays.asList(specifiedColumns.split(","));
- }
-
- final String hll = params.get(LoadStmt.KEY_IN_PARAM_HLL);
- if (!Strings.isNullOrEmpty(hll)) {
- hllColumnPairList = Arrays.asList(hll.split(":"));
- }
-
- String columnSeparatorStr = params.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
- if (columnSeparatorStr != null) {
- if (columnSeparatorStr.isEmpty()) {
- columnSeparatorStr = "\t";
- }
- columnSeparator = new Separator(columnSeparatorStr);
- try {
- columnSeparator.analyze();
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- }
- String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
- if (lineDelimiterStr != null) {
- if (lineDelimiterStr.isEmpty()) {
- lineDelimiterStr = "\n";
- }
- lineDelimiter = new Separator(lineDelimiterStr);
- try {
- lineDelimiter.analyze();
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- }
- formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
- }
-
- DataDescription dataDescription = new DataDescription(
- tableName,
- partitionNames != null ? new PartitionNames(false, partitionNames) : null,
- filePaths,
- columnNames,
- columnSeparator,
- formatType,
- false,
- null
- );
- dataDescription.setLineDelimiter(lineDelimiter);
- dataDescription.setBeAddr(beAddr);
- // parse hll param pair
- if (hllColumnPairList != null) {
- for (int i = 0; i < hllColumnPairList.size(); i++) {
- final String pairStr = hllColumnPairList.get(i);
- final List<String> pairList = Arrays.asList(pairStr.split(","));
- if (pairList.size() != 2) {
- throw new DdlException("hll param format error");
- }
-
- final String resultColumn = pairList.get(0);
- final String hashColumn = pairList.get(1);
- final Pair<String, List<String>> pair = new Pair<String, List<String>>(FunctionSet.HLL_HASH,
- Arrays.asList(hashColumn));
- dataDescription.addColumnMapping(resultColumn, pair);
- }
- }
-
- List<DataDescription> dataDescriptions = Lists.newArrayList(dataDescription);
-
- // job properties
- Map<String, String> properties = Maps.newHashMap();
- if (params != null) {
- String maxFilterRatio = params.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
- if (!Strings.isNullOrEmpty(maxFilterRatio)) {
- properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
- }
- String timeout = params.get(LoadStmt.TIMEOUT_PROPERTY);
- if (!Strings.isNullOrEmpty(timeout)) {
- properties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
- }
- }
- LoadStmt stmt = new LoadStmt(labelName, dataDescriptions, null, null, properties);
-
- // try to register mini label
- if (!registerMiniLabel(fullDbName, label, timestamp)) {
- return false;
- }
-
- try {
- addLoadJob(stmt, EtlJobType.MINI, timestamp);
- return true;
- } finally {
- deregisterMiniLabel(fullDbName, label);
- }
- }
-
public void addLoadJob(LoadStmt stmt, EtlJobType etlJobType, long timestamp) throws DdlException {
// get db
String dbName = stmt.getLabel().getDbName();
@@ -2641,24 +2502,6 @@ public class Load {
}
break;
case MINI:
- for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
- long backendId = taskInfo.getBackendId();
- Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
- if (backend == null) {
- LOG.warn("backend does not exist. id: {}", backendId);
- break;
- }
-
- long dbId = job.getDbId();
- Database db = Catalog.getCurrentInternalCatalog().getDbNullable(dbId);
- if (db == null) {
- LOG.warn("db does not exist. id: {}", dbId);
- break;
- }
-
- AgentClient client = new AgentClient(backend.getHost(), backend.getBePort());
- client.deleteEtlFiles(dbId, job.getId(), db.getFullName(), job.getLabel());
- }
break;
case INSERT:
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 17b7af2508..a1b2edefdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -22,14 +22,12 @@ import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
@@ -41,9 +39,6 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
-import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
@@ -137,53 +132,6 @@ public class LoadManager implements Writable {
.filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count();
}
- /**
- * This method will be invoked by streaming mini load.
- * It will begin the txn of mini load immediately without any scheduler .
- *
- */
- public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
- String cluster = SystemInfoService.DEFAULT_CLUSTER;
- if (request.isSetCluster()) {
- cluster = request.getCluster();
- }
- Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
- Table table = database.getTableOrDdlException(request.tbl);
- MiniLoadJob loadJob = null;
- writeLock();
- try {
- loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
- // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
- // NOTICE(cmy): this order is only for Mini Load, because mini load's
- // unprotectedExecute() only do beginTxn().
- // for other kind of load job, execute the job after adding job.
- // Mini load job must be executed before release write lock.
- // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
- loadJob.beginTxn();
- loadJob.unprotectedExecute();
- createLoadJob(loadJob);
- } catch (DuplicatedRequestException e) {
- // this is a duplicate request, just return previous txn id
- LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
- e.getTxnId());
- return e.getTxnId();
- } catch (UserException e) {
- if (loadJob != null) {
- loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
- false /* no need to write edit log, because createLoadJob log is not wrote yet */);
- }
- throw e;
- } finally {
- writeUnlock();
- }
-
- // The persistence of mini load must be the final step of create mini load.
- // After mini load was executed, the txn id has been set and state has been changed to loading.
- // Those two need to be record in persistence.
- Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
- return loadJob.getTransactionId();
- }
-
/**
* This method will be invoked by version1 of broker or hadoop load.
* It is used to check the label of v1 and v2 at the same time.
@@ -209,31 +157,6 @@ public class LoadManager implements Writable {
}
}
- /**
- * This method will be invoked by non-streaming of mini load.
- * It is used to check the label of v1 and v2 at the same time.
- * Finally, the non-streaming mini load will belongs to load class.
- *
- * @param request request
- * @return if: mini load is a duplicated load, return false. else: return true.
- * @deprecated not support mini load
- */
- @Deprecated
- public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
- String cluster = SystemInfoService.DEFAULT_CLUSTER;
- if (request.isSetCluster()) {
- cluster = request.getCluster();
- }
- Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
- writeLock();
- try {
- checkLabelUsed(database.getId(), request.getLabel());
- return Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request);
- } finally {
- writeUnlock();
- }
- }
-
/**
* MultiLoadMgr use.
**/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 4c59ffdaea..2da78ae1c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -20,21 +20,11 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.load.EtlJobType;
-import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,22 +45,6 @@ public class MiniLoadJob extends LoadJob {
super(EtlJobType.MINI);
}
- public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
- super(EtlJobType.MINI, dbId, request.getLabel());
- this.tableId = tableId;
- this.tableName = request.getTbl();
- if (request.isSetTimeoutSecond()) {
- setTimeout(request.getTimeoutSecond());
- }
- if (request.isSetMaxFilterRatio()) {
- setMaxFilterRatio(request.getMaxFilterRatio());
- }
- this.createTimestamp = request.getCreateTimestamp();
- this.loadStartTimestamp = createTimestamp;
- this.authorizationInfo = gatherAuthInfo();
- this.requestId = request.getRequestId();
- }
-
@Override
public Set<String> getTableNamesForShow() {
return Sets.newHashSet(tableName);
@@ -87,15 +61,7 @@ public class MiniLoadJob extends LoadJob {
}
@Override
- public void beginTxn()
- throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException,
- QuotaExceedException, MetaNotFoundException {
- transactionId = Catalog.getCurrentGlobalTransactionMgr()
- .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId,
- new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
- TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
- getTimeout());
- }
+ public void beginTxn() {}
@Override
protected void replayTxnAttachment(TransactionState txnState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
index de66082e1c..50094c91e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
@@ -18,7 +18,6 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.common.io.Text;
-import org.apache.doris.thrift.TMiniLoadTxnCommitAttachment;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;
@@ -36,15 +35,6 @@ public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment {
super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
}
- public MiniLoadTxnCommitAttachment(TMiniLoadTxnCommitAttachment tMiniLoadTxnCommitAttachment) {
- super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
- this.loadedRows = tMiniLoadTxnCommitAttachment.getLoadedRows();
- this.filteredRows = tMiniLoadTxnCommitAttachment.getFilteredRows();
- if (tMiniLoadTxnCommitAttachment.isSetErrorLogUrl()) {
- this.errorLogUrl = tMiniLoadTxnCommitAttachment.getErrorLogUrl();
- }
- }
-
public long getLoadedRows() {
return loadedRows;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index da945704eb..d837c3e1f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -43,15 +43,12 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.collect.Streams;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
@@ -64,7 +61,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
// Class used to record state of multi-load operation
public class MultiLoadMgr {
@@ -107,18 +103,6 @@ public class MultiLoadMgr {
Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromMultiStart(fullDbName, label);
}
- public void load(TMiniLoadRequest request) throws DdlException {
- if (CollectionUtils.isNotEmpty(request.getFileSize())
- && request.getFileSize().size() != request.getFiles().size()) {
- throw new DdlException("files count and file size count not match: [" + request.getFileSize().size()
- + "!=" + request.getFiles().size() + "]");
- }
- List<Pair<String, Long>> files = Streams.zip(request.getFiles().stream(),
- request.getFileSize().stream(), Pair::create).collect(Collectors.toList());
- load(request.getDb(), request.getLabel(), request.getSubLabel(), request.getTbl(),
- files, request.getBackend(), request.getProperties(), request.getTimestamp());
- }
-
// Add one load job
private void load(String fullDbName, String label,
String subLabel, String table,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index fc2bc0785c..31e44cd711 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -43,16 +43,10 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.datasource.DataSourceIf;
import org.apache.doris.datasource.InternalDataSource;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.MiniEtlTaskInfo;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.StreamLoadPlanner;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
-import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
@@ -77,10 +71,8 @@ import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
-import org.apache.doris.thrift.TIsMethodSupportedRequest;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableStatusResult;
-import org.apache.doris.thrift.TLoadCheckRequest;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
import org.apache.doris.thrift.TLoadTxn2PCResult;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -92,10 +84,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.thrift.TMiniLoadBeginResult;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TReportExecStatusParams;
@@ -109,9 +97,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TTableStatus;
-import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
-import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.DatabaseTransactionMgr;
@@ -121,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TxnCommitAttachment;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -130,8 +115,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -479,197 +462,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return masterImpl.fetchResource();
}
- @Deprecated
- @Override
- public TFeResult miniLoad(TMiniLoadRequest request) throws TException {
- LOG.debug("receive mini load request: label: {}, db: {}, tbl: {}, backend: {}",
- request.getLabel(), request.getDb(), request.getTbl(), request.getBackend());
-
- ConnectContext context = new ConnectContext(null);
- String cluster = SystemInfoService.DEFAULT_CLUSTER;
- if (request.isSetCluster()) {
- cluster = request.cluster;
- }
-
- final String fullDbName = ClusterNamespace.getFullName(cluster, request.db);
- request.setDb(fullDbName);
- context.setCluster(cluster);
- context.setDatabase(ClusterNamespace.getFullName(cluster, request.db));
- context.setQualifiedUser(ClusterNamespace.getFullName(cluster, request.user));
- context.setCatalog(Catalog.getCurrentCatalog());
- context.getState().reset();
- context.setThreadLocalInfo();
-
- TStatus status = new TStatus(TStatusCode.OK);
- TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
- try {
- if (request.isSetSubLabel()) {
- ExecuteEnv.getInstance().getMultiLoadMgr().load(request);
- } else {
- // try to add load job, label will be checked here.
- if (Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromRequest(request)) {
- try {
- // generate mini load audit log
- logMiniLoadStmt(request);
- } catch (Exception e) {
- LOG.warn("failed log mini load stmt", e);
- }
- }
- }
- } catch (UserException e) {
- LOG.warn("add mini load error: {}", e.getMessage());
- status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
- status.addToErrorMsgs(e.getMessage());
- } catch (Throwable e) {
- LOG.warn("unexpected exception when adding mini load", e);
- status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
- status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
- } finally {
- ConnectContext.remove();
- }
-
- LOG.debug("mini load result: {}", result);
- return result;
- }
-
- private void logMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
- String stmt = getMiniLoadStmt(request);
- AuditEvent auditEvent = new AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
- .setClientIp(request.user_ip + ":0")
- .setUser(request.user)
- .setDb(request.db)
- .setState(TStatusCode.OK.name())
- .setQueryTime(0)
- .setStmt(stmt).build();
-
- Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
- }
-
- private String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("curl --location-trusted -u user:passwd -T ");
-
- if (request.files.size() == 1) {
- stringBuilder.append(request.files.get(0));
- } else if (request.files.size() > 1) {
- stringBuilder.append("\"{").append(Joiner.on(",").join(request.files)).append("}\"");
- }
-
- InetAddress masterAddress = FrontendOptions.getLocalHost();
- stringBuilder.append(" http://").append(masterAddress.getHostAddress()).append(":");
- stringBuilder.append(Config.http_port).append("/api/").append(request.db).append("/");
- stringBuilder.append(request.tbl).append("/_load?label=").append(request.label);
-
- if (!request.properties.isEmpty()) {
- stringBuilder.append("&");
- List<String> props = Lists.newArrayList();
- for (Map.Entry<String, String> entry : request.properties.entrySet()) {
- String prop = entry.getKey() + "=" + entry.getValue();
- props.add(prop);
- }
- stringBuilder.append(Joiner.on("&").join(props));
- }
-
- return stringBuilder.toString();
- }
-
- @Override
- public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request) throws TException {
- TFeResult result = new TFeResult();
- result.setProtocolVersion(FrontendServiceVersion.V1);
- TStatus status = new TStatus(TStatusCode.OK);
- result.setStatus(status);
-
- // get job task info
- TUniqueId etlTaskId = request.getEtlTaskId();
- long jobId = etlTaskId.getHi();
- long taskId = etlTaskId.getLo();
- LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(jobId);
- if (job == null) {
- String failMsg = "job does not exist. id: " + jobId;
- LOG.warn(failMsg);
- status.setStatusCode(TStatusCode.CANCELLED);
- status.addToErrorMsgs(failMsg);
- return result;
- }
-
- MiniEtlTaskInfo taskInfo = job.getMiniEtlTask(taskId);
- if (taskInfo == null) {
- String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId;
- LOG.warn(failMsg);
- status.setStatusCode(TStatusCode.CANCELLED);
- status.addToErrorMsgs(failMsg);
- return result;
- }
-
- // update etl task status
- TMiniLoadEtlStatusResult statusResult = request.getEtlTaskStatus();
- LOG.debug("load job id: {}, etl task id: {}, status: {}", jobId, taskId, statusResult);
- EtlStatus taskStatus = taskInfo.getTaskStatus();
- if (taskStatus.setState(statusResult.getEtlState())) {
- if (statusResult.isSetCounters()) {
- taskStatus.setCounters(statusResult.getCounters());
- }
- if (statusResult.isSetTrackingUrl()) {
- taskStatus.setTrackingUrl(statusResult.getTrackingUrl());
- }
- if (statusResult.isSetFileMap()) {
- taskStatus.setFileMap(statusResult.getFileMap());
- }
- }
- return result;
- }
-
- @Override
- public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) throws TException {
- LOG.debug("receive mini load begin request. label: {}, user: {}, ip: {}",
- request.getLabel(), request.getUser(), request.getUserIp());
-
- TMiniLoadBeginResult result = new TMiniLoadBeginResult();
- TStatus status = new TStatus(TStatusCode.OK);
- result.setStatus(status);
- try {
- String cluster = SystemInfoService.DEFAULT_CLUSTER;
- if (request.isSetCluster()) {
- cluster = request.cluster;
- }
- // step1: check password and privs
- checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
- request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
- // step2: check label and record metadata in load manager
- if (request.isSetSubLabel()) {
- // TODO(ml): multi mini load
- } else {
- // add load metadata in loadManager
- result.setTxnId(Catalog.getCurrentCatalog().getLoadManager().createLoadJobFromMiniLoad(request));
- }
- return result;
- } catch (UserException e) {
- status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
- status.addToErrorMsgs(e.getMessage());
- return result;
- } catch (Throwable e) {
- LOG.warn("catch unknown result.", e);
- status.setStatusCode(TStatusCode.INTERNAL_ERROR);
- status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
- return result;
- }
- }
-
- @Override
- public TFeResult isMethodSupported(TIsMethodSupportedRequest request) throws TException {
- TStatus status = new TStatus(TStatusCode.OK);
- TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
- switch (request.getFunctionName()) {
- case "STREAMING_MINI_LOAD":
- break;
- default:
- status.setStatusCode(TStatusCode.NOT_IMPLEMENTED_ERROR);
- break;
- }
- return result;
- }
-
@Override
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
TNetworkAddress clientAddr = getClientAddr();
@@ -723,35 +515,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
}
- @Override
- public TFeResult loadCheck(TLoadCheckRequest request) throws TException {
- LOG.debug("receive load check request. label: {}, user: {}, ip: {}",
- request.getLabel(), request.getUser(), request.getUserIp());
-
- TStatus status = new TStatus(TStatusCode.OK);
- TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
- try {
- String cluster = SystemInfoService.DEFAULT_CLUSTER;
- if (request.isSetCluster()) {
- cluster = request.cluster;
- }
-
- checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
- request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
- } catch (UserException e) {
- status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
- status.addToErrorMsgs(e.getMessage());
- return result;
- } catch (Throwable e) {
- LOG.warn("catch unknown result.", e);
- status.setStatusCode(TStatusCode.INTERNAL_ERROR);
- status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
- return result;
- }
-
- return result;
- }
-
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
String clientAddr = getClientAddrAsString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
index 4eb52dea1d..f90aa67358 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
@@ -21,14 +21,9 @@ import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentResult;
-import org.apache.doris.thrift.TAgentServiceVersion;
import org.apache.doris.thrift.TCheckStorageFormatResult;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStatus;
@@ -52,22 +47,6 @@ public class AgentClient {
this.port = port;
}
- public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) {
- TAgentResult result = null;
- LOG.debug("submit etl task. request: {}", request);
- try {
- borrowClient();
- // submit etl task
- result = client.submitEtlTask(request);
- ok = true;
- } catch (Exception e) {
- LOG.warn("submit etl task error", e);
- } finally {
- returnClient();
- }
- return result;
- }
-
public TAgentResult makeSnapshot(TSnapshotRequest request) {
TAgentResult result = null;
LOG.debug("submit make snapshot task. request: {}", request);
@@ -116,24 +95,6 @@ public class AgentClient {
return result;
}
- public TMiniLoadEtlStatusResult getEtlStatus(long jobId, long taskId) {
- TMiniLoadEtlStatusResult result = null;
- TMiniLoadEtlStatusRequest request = new TMiniLoadEtlStatusRequest(TAgentServiceVersion.V1,
- new TUniqueId(jobId, taskId));
- LOG.debug("get mini load etl task status. request: {}", request);
- try {
- borrowClient();
- // get etl status
- result = client.getEtlStatus(request);
- ok = true;
- } catch (Exception e) {
- LOG.warn("get etl status error", e);
- } finally {
- returnClient();
- }
- return result;
- }
-
public TExportStatusResult getExportStatus(long jobId, long taskId) {
TExportStatusResult result = null;
TUniqueId request = new TUniqueId(jobId, taskId);
@@ -183,22 +144,6 @@ public class AgentClient {
return result;
}
- public void deleteEtlFiles(long dbId, long jobId, String dbName, String label) {
- TDeleteEtlFilesRequest request = new TDeleteEtlFilesRequest(TAgentServiceVersion.V1,
- new TUniqueId(dbId, jobId), dbName, label);
- LOG.debug("delete etl files. request: {}", request);
- try {
- borrowClient();
- // delete etl files
- client.deleteEtlFiles(request);
- ok = true;
- } catch (Exception e) {
- LOG.warn("delete etl files error", e);
- } finally {
- returnClient();
- }
- }
-
private void borrowClient() throws Exception {
// create agent client
ok = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
index c167c1a662..4c7d27de5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
@@ -47,8 +47,6 @@ public abstract class TxnCommitAttachment implements Writable {
switch (txnCommitAttachment.getLoadType()) {
case ROUTINE_LOAD:
return new RLTaskTxnCommitAttachment(txnCommitAttachment.getRlTaskTxnCommitAttachment());
- case MINI_LOAD:
- return new MiniLoadTxnCommitAttachment(txnCommitAttachment.getMlTxnCommitAttachment());
default:
return null;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 0fa7161a5e..09793577a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -25,7 +25,6 @@ import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
@@ -33,9 +32,6 @@ import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFetchDataParams;
import org.apache.doris.thrift.TFetchDataResult;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TRoutineLoadTask;
@@ -156,21 +152,6 @@ public class GenericPoolTest {
return null;
}
- @Override
- public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
- return null;
- }
-
- @Override
- public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
- return null;
- }
-
- @Override
- public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
- return null;
- }
-
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException {
// TODO Auto-generated method stub
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 7c9c513572..e49503ca04 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -34,9 +34,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TDiskTrashInfo;
-import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportState;
@@ -47,9 +45,6 @@ import org.apache.doris.thrift.TFetchDataResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -243,21 +238,6 @@ public class MockedBackendFactory {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
- @Override
- public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
- return new TAgentResult(new TStatus(TStatusCode.OK));
- }
-
- @Override
- public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
- return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED);
- }
-
- @Override
- public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
- return new TAgentResult(new TStatus(TStatusCode.OK));
- }
-
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws TException {
return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 740db1fd08..83f68834f8 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -409,29 +409,3 @@ struct TAgentPublishRequest {
2: required list<TTopicUpdate> updates
}
-struct TMiniLoadEtlTaskRequest {
- 1: required TAgentServiceVersion protocol_version
- 2: required PaloInternalService.TExecPlanFragmentParams params
-}
-
-struct TMiniLoadEtlStatusRequest {
- 1: required TAgentServiceVersion protocol_version
- 2: required Types.TUniqueId mini_load_id
-}
-
-struct TMiniLoadEtlStatusResult {
- 1: required Status.TStatus status
- 2: required Types.TEtlState etl_state
- 3: optional map<string, i64> file_map
- 4: optional map<string, string> counters
- 5: optional string tracking_url
- // progress
-}
-
-struct TDeleteEtlFilesRequest {
- 1: required TAgentServiceVersion protocol_version
- 2: required Types.TUniqueId mini_load_id
- 3: required string db_name
- 4: required string label
-}
-
diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift
index 8e534f4677..66a59260fc 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -146,13 +146,6 @@ service BackendService {
AgentService.TAgentResult publish_cluster_state(1:AgentService.TAgentPublishRequest request);
- AgentService.TAgentResult submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request);
-
- AgentService.TMiniLoadEtlStatusResult get_etl_status(
- 1:AgentService.TMiniLoadEtlStatusRequest request);
-
- AgentService.TAgentResult delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request);
-
Status.TStatus submit_export_task(1:TExportTaskRequest request);
PaloInternalService.TExportStatusResult get_export_status(1:Types.TUniqueId task_id);
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index b2ef1a1143..ff59b203ba 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -411,31 +411,6 @@ struct TFeResult {
2: required Status.TStatus status
}
-// Submit one table load job
-// if subLabel is set, this job belong to a multi-load transaction
-struct TMiniLoadRequest {
- 1: required FrontendServiceVersion protocolVersion
- 2: required string db
- 3: required string tbl
- 4: required string label
- 5: optional string user
- 6: required Types.TNetworkAddress backend
- 7: required list<string> files
- 8: required map<string, string> properties
- 9: optional string subLabel
- 10: optional string cluster
- 11: optional i64 timestamp
- 12: optional string user_ip
- 13: optional bool is_retry
- 14: optional list<i64> file_size
-}
-
-struct TUpdateMiniEtlTaskStatusRequest {
- 1: required FrontendServiceVersion protocolVersion
- 2: required Types.TUniqueId etlTaskId
- 3: required AgentService.TMiniLoadEtlStatusResult etlTaskStatus
-}
-
struct TMasterOpRequest {
1: required string user
2: required string db
@@ -483,44 +458,6 @@ struct TMasterOpResult {
4: optional Types.TUniqueId queryId;
}
-struct TLoadCheckRequest {
- 1: required FrontendServiceVersion protocolVersion
- 2: required string user
- 3: required string passwd
- 4: required string db
- 5: optional string label
- 6: optional string cluster
- 7: optional i64 timestamp
- 8: optional string user_ip
- 9: optional string tbl
-}
-
-struct TMiniLoadBeginRequest {
- 1: required string user
- 2: required string passwd
- 3: optional string cluster
- 4: optional string user_ip
- 5: required string db
- 6: required string tbl
- 7: required string label
- 8: optional string sub_label
- 9: optional i64 timeout_second
- 10: optional double max_filter_ratio
- 11: optional i64 auth_code
- 12: optional i64 create_timestamp
- 13: optional Types.TUniqueId request_id
- 14: optional string auth_code_uuid
-}
-
-struct TIsMethodSupportedRequest {
- 1: optional string function_name
-}
-
-struct TMiniLoadBeginResult {
- 1: required Status.TStatus status
- 2: optional i64 txn_id
-}
-
struct TUpdateExportTaskStatusRequest {
1: required FrontendServiceVersion protocolVersion
2: required Types.TUniqueId taskId
@@ -627,16 +564,10 @@ struct TRLTaskTxnCommitAttachment {
11: optional string errorLogUrl
}
-struct TMiniLoadTxnCommitAttachment {
- 1: required i64 loadedRows
- 2: required i64 filteredRows
- 3: optional string errorLogUrl
-}
-
struct TTxnCommitAttachment {
1: required Types.TLoadType loadType
2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
- 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
+// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
}
struct TLoadTxnCommitRequest {
@@ -751,14 +682,6 @@ service FrontendService {
MasterService.TMasterResult finishTask(1: MasterService.TFinishTaskRequest request)
MasterService.TMasterResult report(1: MasterService.TReportRequest request)
MasterService.TFetchResourceResult fetchResource()
-
- // those three method are used for asynchronous mini load which will be abandoned
- TFeResult miniLoad(1: TMiniLoadRequest request)
- TFeResult updateMiniEtlTaskStatus(1: TUpdateMiniEtlTaskStatusRequest request)
- TFeResult loadCheck(1: TLoadCheckRequest request)
- // this method is used for streaming mini load
- TMiniLoadBeginResult miniLoadBegin(1: TMiniLoadBeginRequest request)
- TFeResult isMethodSupported(1: TIsMethodSupportedRequest request)
TMasterOpResult forward(1: TMasterOpRequest params)
diff --git a/samples/mini_load/python/mini_load_utils.py b/samples/mini_load/python/mini_load_utils.py
deleted file mode 100644
index 88753680e0..0000000000
--- a/samples/mini_load/python/mini_load_utils.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-"""
-
-import os
-import subprocess
-
-
-class DorisMiniLoadClient(object):
- """ load file to doris """
-
- def __init__(self, db_host, db_port, db_name,
- db_user, db_password, file_name, table, load_timeout):
- """
- init
- :param db_host: db host
- :param db_port: db port
- :param db_name: db name
- :param db_user: db user
- :param db_password: db password
- :param file_name: local file path
- :param table: db table
- :param load_timeout:mini load timeout, defalut 86400 seconds.
- """
- self.file_name = file_name
- self.table = table
- self.load_host = db_host
- self.load_port = db_port
- self.load_database = db_name
- self.load_user = db_user
- self.load_password = db_password
- self.load_timeout = load_timeout
-
- def get_label(self):
- """
- 获取label前缀
- :return: label
- """
-
- return '_'.join([self.table, os.path.basename(self.file_name)])
-
- def load_doris(self):
- """
- load file to doris by curl, allow 3 times to retry.
- :return: mini load label
- """
- retry_time = 0
- label = self.get_label()
-
- while retry_time < 3:
- load_cmd = "curl"
- param_location = "--location-trusted"
- param_user = "%s:%s" % (self.load_user, self.load_password)
- param_file = "%s" % self.file_name
- param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port,
- self.load_database,
- self.table, label, self.load_timeout)
-
- load_subprocess = subprocess.Popen([load_cmd, param_location,
- "-u", param_user, "-T", param_file, param_url])
-
- # Wait for child process to terminate. Returns returncode attribute
- load_subprocess.wait()
-
- # check returncode;
- # If fail, retry 3 times
- if load_subprocess.returncode != 0:
- print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time)
- retry_time += 1
- # If success, print log, and break retry loop
- if load_subprocess.returncode == 0:
- print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time)
- break
-
- return label
-
- @classmethod
- def check_load_status(cls, label, host, port, user, password, database):
- """
- check async mini load process status.
- :param label:mini load label
- :param host: db host
- :param port: db port
- :param user: db user
- :param password: db password
- :param database: db database
- :return: check async mini load process status.
- """
-
- db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database)
-
- db_cursor = db_conn.cursor()
- check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label
-
- db_cursor.execute(check_status_sql)
- rows = db_cursor.fetchall()
-
- # timeout config: 60 minutes.
- timeout = 60 * 60
-
- while timeout > 0:
- if len(rows) == 0:
- print """Load label: %s doesn't exist""" % label
- return
- load_status = rows[0][2]
- print "mini load status: " + load_status
- if load_status == 'FINISHED':
- print """Async mini load to db success! label is %s""" % label
- break
- if load_status == 'CANCELLED':
- print """Async load to db failed! label is %s""" % label
- break
- timeout = timeout - 5
- time.sleep(5)
- db_cursor.execute(sql)
- rows = db_cursor.fetchall()
-
- if time_out <= 0:
- print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label)
-
-
-if __name__ == '__main__':
- """
- mini_load demo.
- There is no need to install subprocess in Python 2.7. It is a standard module that is built in.
- You need input db config & load param.
- """
- db_host = "db_conn_host"
- db_port = "port"
- db_name = "db_name"
- db_user = "db_user"
- db_password = "db_password"
- file_name = "file_name"
- table = "db_table"
- # default load_time_out, seconds
- load_timeout = 86400
- doris_client = DorisMiniLoadClient(
- db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout)
- doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name)
- print "load to doris end"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org