You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/30 15:21:48 UTC

[doris] branch master updated: [refactor](load) Remove mini load (#10520)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aab7dc956f [refactor](load) Remove mini load (#10520)
aab7dc956f is described below

commit aab7dc956f6f47a79d70bafd1ffc76b676a8e2cd
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Jun 30 23:21:41 2022 +0800

    [refactor](load) Remove mini load (#10520)
---
 be/src/agent/agent_server.cpp                      |  42 -
 be/src/agent/agent_server.h                        |   6 -
 be/src/http/CMakeLists.txt                         |   1 -
 be/src/http/action/mini_load.cpp                   | 966 ---------------------
 be/src/http/action/mini_load.h                     | 109 ---
 be/src/runtime/CMakeLists.txt                      |   1 -
 be/src/runtime/etl_job_mgr.cpp                     | 302 -------
 be/src/runtime/etl_job_mgr.h                       |  99 ---
 be/src/runtime/exec_env.h                          |   2 -
 be/src/runtime/exec_env_init.cpp                   |   4 -
 .../runtime/stream_load/stream_load_executor.cpp   |  12 +-
 be/src/service/backend_service.h                   |  21 -
 be/src/service/http_service.cpp                    |   5 +-
 be/test/CMakeLists.txt                             |   1 -
 be/test/runtime/data_stream_test.cpp               |   9 -
 be/test/runtime/etl_job_mgr_test.cpp               | 221 -----
 .../src/main/java/org/apache/doris/load/Load.java  | 157 ----
 .../org/apache/doris/load/loadv2/LoadManager.java  |  77 --
 .../org/apache/doris/load/loadv2/MiniLoadJob.java  |  36 +-
 .../load/loadv2/MiniLoadTxnCommitAttachment.java   |  10 -
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  16 -
 .../apache/doris/service/FrontendServiceImpl.java  | 237 -----
 .../java/org/apache/doris/task/AgentClient.java    |  55 --
 .../doris/transaction/TxnCommitAttachment.java     |   2 -
 .../org/apache/doris/common/GenericPoolTest.java   |  19 -
 .../apache/doris/utframe/MockedBackendFactory.java |  20 -
 gensrc/thrift/AgentService.thrift                  |  26 -
 gensrc/thrift/BackendService.thrift                |   7 -
 gensrc/thrift/FrontendService.thrift               |  79 +-
 samples/mini_load/python/mini_load_utils.py        | 157 ----
 30 files changed, 5 insertions(+), 2694 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index c907aafd23..58ce4e00b5 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -29,7 +29,6 @@
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
 #include "olap/snapshot_manager.h"
-#include "runtime/etl_job_mgr.h"
 
 using std::string;
 using std::vector;
@@ -247,45 +246,4 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
     status.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
-                                  const TMiniLoadEtlTaskRequest& request) {
-    Status status = _exec_env->etl_job_mgr()->start_job(request);
-    auto fragment_instance_id = request.params.params.fragment_instance_id;
-    if (status.ok()) {
-        VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
-    } else {
-        VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
-                 << ", err_msg=" << status.get_error_msg();
-    }
-    status.to_thrift(&t_agent_result.status);
-}
-
-void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
-                                 const TMiniLoadEtlStatusRequest& request) {
-    Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
-    if (!status.ok()) {
-        LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
-    }
-
-    VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
-             << ", status=" << t_agent_result.status.status_code
-             << ", etl_state=" << t_agent_result.etl_state << ", files=";
-    for (auto& item : t_agent_result.file_map) {
-        VLOG_RPC << item.first << ":" << item.second << ";";
-    }
-    VLOG_RPC << "]";
-}
-
-void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
-                                   const TDeleteEtlFilesRequest& request) {
-    Status status = _exec_env->etl_job_mgr()->erase_job(request);
-    if (!status.ok()) {
-        LOG(WARNING) << "fail to delete etl files. because " << status.get_error_msg()
-                     << " with request " << request;
-    }
-
-    VLOG_RPC << "success to delete etl files. request=" << request;
-    status.to_thrift(&t_agent_result.status);
-}
-
 } // namespace doris
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3998a6b258..2897fab586 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -47,12 +47,6 @@ public:
     // [[deprecated]]
     void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request);
 
-    // Multi-Load will still use the following 3 methods for now.
-    void submit_etl_task(TAgentResult& agent_result, const TMiniLoadEtlTaskRequest& request);
-    void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
-                        const TMiniLoadEtlStatusRequest& request);
-    void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& request);
-
 private:
     DISALLOW_COPY_AND_ASSIGN(AgentServer);
 
diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt
index 8ca052da65..b956d0982b 100644
--- a/be/src/http/CMakeLists.txt
+++ b/be/src/http/CMakeLists.txt
@@ -34,7 +34,6 @@ add_library(Webserver STATIC
   ev_http_server.cpp
   http_client.cpp
   action/download_action.cpp
-  action/mini_load.cpp
   action/monitor_action.cpp
   action/health_action.cpp
   action/tablet_migration_action.cpp
diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp
deleted file mode 100644
index 53c1464471..0000000000
--- a/be/src/http/action/mini_load.cpp
+++ /dev/null
@@ -1,966 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "http/action/mini_load.h"
-
-#include <event2/buffer.h>
-#include <event2/bufferevent.h>
-#include <event2/http.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <thrift/protocol/TDebugProtocol.h>
-#include <time.h>
-#include <unistd.h>
-
-#include <functional>
-#include <mutex>
-#include <sstream>
-#include <string>
-
-#include "agent/cgroups_mgr.h"
-#include "common/status.h"
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/FrontendService_types.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/MasterService_types.h"
-#include "http/http_channel.h"
-#include "http/http_headers.h"
-#include "http/http_parser.h"
-#include "http/http_request.h"
-#include "http/http_response.h"
-#include "http/http_status.h"
-#include "http/utils.h"
-#include "olap/file_helper.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "runtime/load_path_mgr.h"
-#include "runtime/stream_load/stream_load_context.h"
-#include "service/backend_options.h"
-#include "util/file_utils.h"
-#include "util/json_util.h"
-#include "util/string_parser.hpp"
-#include "util/string_util.h"
-#include "util/thrift_rpc_helper.h"
-#include "util/time.h"
-#include "util/url_coding.h"
-
-namespace doris {
-
-// context used to handle mini-load in asynchronous mode
-struct MiniLoadAsyncCtx {
-    MiniLoadAsyncCtx(MiniLoadAction* handler_) : handler(handler_) {}
-    ~MiniLoadAsyncCtx() {
-        if (need_remove_handle) {
-            handler->erase_handle(load_handle);
-        }
-        if (fd >= 0) {
-            ::close(fd);
-        }
-    }
-
-    MiniLoadAction* handler;
-
-    // used to check duplicate
-    LoadHandle load_handle;
-    bool need_remove_handle = false;
-
-    // file to save
-    std::string file_path;
-    int fd = -1;
-
-    size_t body_bytes = 0;
-    size_t bytes_written = 0;
-
-    TLoadCheckRequest load_check_req;
-};
-
-struct MiniLoadCtx {
-    MiniLoadCtx(bool is_streaming_) : is_streaming(is_streaming_) {}
-
-    bool is_streaming = false;
-    MiniLoadAsyncCtx* mini_load_async_ctx = nullptr;
-    StreamLoadContext* stream_load_ctx = nullptr;
-};
-
-const std::string CLUSTER_KEY = "cluster";
-const std::string DB_KEY = "db";
-const std::string TABLE_KEY = "table";
-const std::string LABEL_KEY = "label";
-const std::string SUB_LABEL_KEY = "sub_label";
-const std::string FILE_PATH_KEY = "file_path";
-const std::string COLUMNS_KEY = "columns";
-const std::string HLL_KEY = "hll";
-const std::string COLUMN_SEPARATOR_KEY = "column_separator";
-const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio";
-const std::string STRICT_MODE_KEY = "strict_mode";
-const std::string TIMEOUT_KEY = "timeout";
-const char* k_100_continue = "100-continue";
-
-MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
-
-static bool is_name_valid(const std::string& name) {
-    return !name.empty();
-}
-
-static Status check_request(HttpRequest* req) {
-    auto& params = *req->params();
-
-    // check params
-    if (!is_name_valid(params[DB_KEY])) {
-        return Status::InternalError("Database name is not valid.");
-    }
-    if (!is_name_valid(params[TABLE_KEY])) {
-        return Status::InternalError("Table name is not valid.");
-    }
-    if (!is_name_valid(params[LABEL_KEY])) {
-        return Status::InternalError("Label name is not valid.");
-    }
-
-    return Status::OK();
-}
-
-Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string& table,
-                                      std::string* file_path) {
-    std::string prefix;
-    RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(desc.db, desc.label, &prefix));
-    timeval tv;
-    gettimeofday(&tv, nullptr);
-    struct tm tm;
-    time_t cur_sec = tv.tv_sec;
-    localtime_r(&cur_sec, &tm);
-    char buf[64];
-    strftime(buf, 64, "%Y%m%d%H%M%S", &tm);
-
-    std::stringstream ss;
-    ss << prefix << "/" << table << "." << desc.sub_label << "." << buf << "." << tv.tv_usec;
-    *file_path = ss.str();
-    return Status::OK();
-}
-
-Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path,
-                             const std::string& user, const std::string& cluster,
-                             int64_t file_size) {
-    // Prepare request parameters.
-    std::map<std::string, std::string> params(http_req->query_params().begin(),
-                                              http_req->query_params().end());
-    RETURN_IF_ERROR(_merge_header(http_req, &params));
-    params.erase(LABEL_KEY);
-    params.erase(SUB_LABEL_KEY);
-
-    // put here to log master information
-    const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
-    Status status;
-    FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
-                                     config::thrift_rpc_timeout_ms, &status);
-    if (!status.ok()) {
-        std::stringstream ss;
-        ss << "Connect master failed, with address(" << master_address.hostname << ":"
-           << master_address.port << ")";
-        LOG(WARNING) << ss.str();
-        return status;
-    }
-    TFeResult res;
-    try {
-        TMiniLoadRequest req;
-        req.protocolVersion = FrontendServiceVersion::V1;
-        req.__set_db(http_req->param(DB_KEY));
-        if (!cluster.empty()) {
-            req.__set_cluster(cluster);
-        }
-        req.__set_tbl(http_req->param(TABLE_KEY));
-        req.__set_label(http_req->param(LABEL_KEY));
-        req.__set_user(user);
-        // Belong to a multi-load transaction
-        if (!http_req->param(SUB_LABEL_KEY).empty()) {
-            req.__set_subLabel(http_req->param(SUB_LABEL_KEY));
-        }
-        req.__set_properties(params);
-        req.files.push_back(file_path);
-        req.__isset.file_size = true;
-        req.file_size.push_back(file_size);
-        req.backend.__set_hostname(BackendOptions::get_localhost());
-        req.backend.__set_port(config::be_port);
-
-        req.__set_timestamp(GetCurrentTimeMicros());
-        try {
-            client->miniLoad(res, req);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
-                         << master_address.port << ") because: " << e.what();
-            status = client.reopen(config::thrift_rpc_timeout_ms);
-            if (!status.ok()) {
-                LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
-                             << ":" << master_address.port << ")";
-                return status;
-            }
-            client->miniLoad(res, req);
-        } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "mini load request from master(" << master_address.hostname << ":"
-                         << master_address.port << ") got unknown result: " << e.what();
-
-            status = client.reopen(config::thrift_rpc_timeout_ms);
-            if (!status.ok()) {
-                LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
-                             << ":" << master_address.port << ")";
-                return status;
-            }
-            client->miniLoad(res, req);
-        }
-    } catch (apache::thrift::TException& e) {
-        // failed when retry.
-        // reopen to disable this connection
-        client.reopen(config::thrift_rpc_timeout_ms);
-        std::stringstream ss;
-        ss << "Request miniload from master(" << master_address.hostname << ":"
-           << master_address.port << ") because: " << e.what();
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-
-    return Status(res.status);
-}
-
-Status MiniLoadAction::_merge_header(HttpRequest* http_req,
-                                     std::map<std::string, std::string>* params) {
-    if (http_req == nullptr || params == nullptr) {
-        return Status::OK();
-    }
-    if (!http_req->header(HTTP_FORMAT_KEY).empty()) {
-        (*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY);
-    }
-    if (!http_req->header(HTTP_COLUMNS).empty()) {
-        (*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS);
-    }
-    if (!http_req->header(HTTP_WHERE).empty()) {
-        (*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE);
-    }
-    if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
-        (*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR);
-    }
-    if (!http_req->header(HTTP_PARTITIONS).empty()) {
-        (*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS);
-        if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
-            return Status::InvalidArgument(
-                    "Can not specify both partitions and temporary partitions");
-        }
-    }
-    if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
-        (*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS);
-        if (!http_req->header(HTTP_PARTITIONS).empty()) {
-            return Status::InvalidArgument(
-                    "Can not specify both partitions and temporary partitions");
-        }
-    }
-    if (!http_req->header(HTTP_NEGATIVE).empty() &&
-        iequal(http_req->header(HTTP_NEGATIVE), "true")) {
-        (*params)[HTTP_NEGATIVE] = "true";
-    } else {
-        (*params)[HTTP_NEGATIVE] = "false";
-    }
-    if (!http_req->header(HTTP_STRICT_MODE).empty()) {
-        if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
-            (*params)[HTTP_STRICT_MODE] = "false";
-        } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
-            (*params)[HTTP_STRICT_MODE] = "true";
-        } else {
-            return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
-        }
-    }
-    if (!http_req->header(HTTP_TIMEZONE).empty()) {
-        (*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE);
-    }
-    if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
-        (*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT);
-    }
-    if (!http_req->header(HTTP_JSONPATHS).empty()) {
-        (*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS);
-    }
-    if (!http_req->header(HTTP_JSONROOT).empty()) {
-        (*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT);
-    }
-    if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
-        if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
-            (*params)[HTTP_STRIP_OUTER_ARRAY] = "true";
-        } else {
-            (*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
-        }
-    } else {
-        (*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
-    }
-    if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
-        if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
-            (*params)[HTTP_FUZZY_PARSE] = "true";
-        } else {
-            (*params)[HTTP_FUZZY_PARSE] = "false";
-        }
-    } else {
-        (*params)[HTTP_FUZZY_PARSE] = "false";
-    }
-    if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
-        if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
-            (*params)[HTTP_READ_JSON_BY_LINE] = "true";
-        } else {
-            (*params)[HTTP_READ_JSON_BY_LINE] = "false";
-        }
-    } else {
-        (*params)[HTTP_READ_JSON_BY_LINE] = "false";
-    }
-
-    if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
-        (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] =
-                http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL);
-    }
-    if (params->find(HTTP_MERGE_TYPE) == params->end()) {
-        params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND"));
-    }
-    StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
-                                                      {"DELETE", TMergeType::DELETE},
-                                                      {"MERGE", TMergeType::MERGE}};
-    if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
-        std::string merge_type = http_req->header(HTTP_MERGE_TYPE);
-        auto it = merge_type_map.find(merge_type);
-        if (it != merge_type_map.end()) {
-            (*params)[HTTP_MERGE_TYPE] = it->first;
-        } else {
-            return Status::InvalidArgument("Invalid merge type " + merge_type);
-        }
-    }
-    if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
-        if ((*params)[HTTP_MERGE_TYPE] == "MERGE") {
-            (*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION);
-        } else {
-            return Status::InvalidArgument("not support delete when merge type is " +
-                                           (*params)[HTTP_MERGE_TYPE] + ".");
-        }
-    }
-    return Status::OK();
-}
-
-static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd,
-                       std::string* cluster) {
-    std::string decoded_auth;
-
-    if (!base64_decode(auth, &decoded_auth)) {
-        return false;
-    }
-    std::string::size_type pos = decoded_auth.find(':');
-    if (pos == std::string::npos) {
-        return false;
-    }
-    user->assign(decoded_auth.c_str(), pos);
-    passwd->assign(decoded_auth.c_str() + pos + 1);
-    const std::string::size_type cluster_pos = user->find('@');
-    if (cluster_pos != std::string::npos) {
-        cluster->assign(user->c_str(), cluster_pos + 1, pos - cluster_pos - 1);
-        user->assign(user->c_str(), cluster_pos);
-    }
-    return true;
-}
-
-Status MiniLoadAction::check_auth(const HttpRequest* http_req,
-                                  const TLoadCheckRequest& check_load_req) {
-    // put here to log master information
-    const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
-    Status status;
-    FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
-                                     config::thrift_rpc_timeout_ms, &status);
-    if (!status.ok()) {
-        std::stringstream ss;
-        ss << "Connect master failed, with address(" << master_address.hostname << ":"
-           << master_address.port << ")";
-        LOG(WARNING) << ss.str();
-        return status;
-    }
-
-    TFeResult res;
-    try {
-        try {
-            client->loadCheck(res, check_load_req);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
-                         << master_address.port << ") because: " << e.what();
-            status = client.reopen(config::thrift_rpc_timeout_ms);
-            if (!status.ok()) {
-                LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
-                             << ":" << master_address.port << ")";
-                return status;
-            }
-            client->loadCheck(res, check_load_req);
-        } catch (apache::thrift::TApplicationException& e) {
-            LOG(WARNING) << "load check request from master(" << master_address.hostname << ":"
-                         << master_address.port << ") got unknown result: " << e.what();
-
-            status = client.reopen(config::thrift_rpc_timeout_ms);
-            if (!status.ok()) {
-                LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
-                             << ":" << master_address.port << ")";
-                return status;
-            }
-            client->loadCheck(res, check_load_req);
-        }
-    } catch (apache::thrift::TException& e) {
-        // failed when retry.
-        // reopen to disable this connection
-        client.reopen(config::thrift_rpc_timeout_ms);
-        std::stringstream ss;
-        ss << "Request miniload from master(" << master_address.hostname << ":"
-           << master_address.port << ") because: " << e.what();
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-
-    return Status(res.status);
-}
-
-void MiniLoadAction::erase_handle(const LoadHandle& desc) {
-    // remove
-    std::lock_guard<std::mutex> l(_lock);
-    _current_load.erase(desc);
-}
-
-int MiniLoadAction::on_header(HttpRequest* req) {
-    // check authorization first, make client know what happened
-    if (req->header(HttpHeaders::AUTHORIZATION).empty()) {
-        HttpChannel::send_basic_challenge(req, "mini_load");
-        return -1;
-    }
-
-    Status status;
-    MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(_is_streaming(req));
-    req->set_handler_ctx(mini_load_ctx);
-    if (((MiniLoadCtx*)req->handler_ctx())->is_streaming) {
-        status = _on_new_header(req);
-        StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
-        if (ctx != nullptr) {
-            ctx->status = status;
-        }
-    } else {
-        status = _on_header(req);
-    }
-    if (!status.ok()) {
-        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.get_error_msg());
-        return -1;
-    }
-    return 0;
-}
-
-bool MiniLoadAction::_is_streaming(HttpRequest* req) {
-    // multi load must be non-streaming
-    if (!req->param(SUB_LABEL_KEY).empty()) {
-        return false;
-    }
-
-    TIsMethodSupportedRequest request;
-    request.__set_function_name(_streaming_function_name);
-    const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
-    TFeResult res;
-    Status status = ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_address.hostname, master_address.port,
-            [&request, &res](FrontendServiceConnection& client) {
-                client->isMethodSupported(res, request);
-            });
-    if (!status.ok()) {
-        std::stringstream ss;
-        ss << "This mini load is not streaming because: " << status.get_error_msg()
-           << " with address(" << master_address.hostname << ":" << master_address.port << ")";
-        LOG(INFO) << ss.str();
-        return false;
-    }
-
-    status = Status(res.status);
-    if (!status.ok()) {
-        std::stringstream ss;
-        ss << "This streaming mini load is not be supportd because: " << status.get_error_msg()
-           << " with address(" << master_address.hostname << ":" << master_address.port << ")";
-        LOG(INFO) << ss.str();
-        return false;
-    }
-    return true;
-}
-
-Status MiniLoadAction::_on_header(HttpRequest* req) {
-    size_t body_bytes = 0;
-    size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
-    if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
-        body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
-        if (body_bytes > max_body_bytes) {
-            std::stringstream ss;
-            ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
-            return Status::InternalError(ss.str());
-        }
-    } else {
-        evhttp_connection_set_max_body_size(
-                evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
-    }
-
-    RETURN_IF_ERROR(check_request(req));
-
-    std::unique_ptr<MiniLoadAsyncCtx> mini_load_async_ctx(new MiniLoadAsyncCtx(this));
-    mini_load_async_ctx->body_bytes = body_bytes;
-    mini_load_async_ctx->load_handle.db = req->param(DB_KEY);
-    mini_load_async_ctx->load_handle.label = req->param(LABEL_KEY);
-    mini_load_async_ctx->load_handle.sub_label = req->param(SUB_LABEL_KEY);
-
-    // check if duplicate
-    // Use this to prevent that two callback function write to one file
-    // that file may be writen bad
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        if (_current_load.find(mini_load_async_ctx->load_handle) != _current_load.end()) {
-            return Status::InternalError("Duplicate mini load request.");
-        }
-        _current_load.insert(mini_load_async_ctx->load_handle);
-        mini_load_async_ctx->need_remove_handle = true;
-    }
-    // generate load check request
-    RETURN_IF_ERROR(generate_check_load_req(req, &mini_load_async_ctx->load_check_req));
-
-    // Check auth
-    RETURN_IF_ERROR(check_auth(req, mini_load_async_ctx->load_check_req));
-
-    // Receive data first, keep things easy.
-    RETURN_IF_ERROR(data_saved_dir(mini_load_async_ctx->load_handle, req->param(TABLE_KEY),
-                                   &mini_load_async_ctx->file_path));
-    // destructor will close the file handle, not depend on DeferOp any more
-    mini_load_async_ctx->fd =
-            open(mini_load_async_ctx->file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0660);
-    if (mini_load_async_ctx->fd < 0) {
-        char buf[64];
-        LOG(WARNING) << "open file failed, path=" << mini_load_async_ctx->file_path
-                     << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
-        return Status::InternalError("open file failed");
-    }
-
-    ((MiniLoadCtx*)req->handler_ctx())->mini_load_async_ctx = mini_load_async_ctx.release();
-    return Status::OK();
-}
-
-void MiniLoadAction::on_chunk_data(HttpRequest* http_req) {
-    MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
-    if (ctx->is_streaming) {
-        _on_new_chunk_data(http_req);
-    } else {
-        _on_chunk_data(http_req);
-    }
-}
-
-void MiniLoadAction::_on_chunk_data(HttpRequest* http_req) {
-    MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
-    if (ctx == nullptr) {
-        return;
-    }
-
-    struct evhttp_request* ev_req = http_req->get_evhttp_request();
-    auto evbuf = evhttp_request_get_input_buffer(ev_req);
-
-    char buf[4096];
-    while (evbuffer_get_length(evbuf) > 0) {
-        auto n = evbuffer_remove(evbuf, buf, sizeof(buf));
-        while (n > 0) {
-            auto res = write(ctx->fd, buf, n);
-            if (res < 0) {
-                char errbuf[64];
-                LOG(WARNING) << "write file failed, path=" << ctx->file_path << ", errno=" << errno
-                             << ", errmsg=" << strerror_r(errno, errbuf, sizeof(errbuf));
-                HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
-                                        "write file failed");
-                delete ctx;
-                http_req->set_handler_ctx(nullptr);
-                return;
-            }
-            n -= res;
-            ctx->bytes_written += res;
-        }
-    }
-}
-
-void MiniLoadAction::_on_new_chunk_data(HttpRequest* http_req) {
-    StreamLoadContext* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->stream_load_ctx;
-    if (ctx == nullptr || !ctx->status.ok()) {
-        return;
-    }
-
-    struct evhttp_request* ev_req = http_req->get_evhttp_request();
-    auto evbuf = evhttp_request_get_input_buffer(ev_req);
-
-    while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(4096);
-        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
-        bb->pos = remove_bytes;
-        bb->flip();
-        auto st = ctx->body_sink->append(bb);
-        if (!st.ok()) {
-            LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg()
-                         << ctx->brief();
-            ctx->status = st;
-            return;
-        }
-        ctx->receive_bytes += remove_bytes;
-    }
-}
-
-void MiniLoadAction::free_handler_ctx(void* param) {
-    MiniLoadCtx* ctx = (MiniLoadCtx*)param;
-    if (ctx->is_streaming) {
-        StreamLoadContext* streaming_ctx = ((MiniLoadCtx*)param)->stream_load_ctx;
-        if (streaming_ctx != nullptr) {
-            // sender is going, make receiver know it
-            if (streaming_ctx->body_sink != nullptr) {
-                LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string()
-                             << " because sender failed";
-                streaming_ctx->body_sink->cancel("sender failed");
-            }
-            if (streaming_ctx->unref()) {
-                delete streaming_ctx;
-            }
-        }
-    } else {
-        MiniLoadAsyncCtx* async_ctx = ((MiniLoadCtx*)param)->mini_load_async_ctx;
-        delete async_ctx;
-    }
-    delete ctx;
-}
-
-void MiniLoadAction::handle(HttpRequest* http_req) {
-    MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
-    if (ctx->is_streaming) {
-        _new_handle(http_req);
-    } else {
-        _handle(http_req);
-    }
-}
-
-void MiniLoadAction::_handle(HttpRequest* http_req) {
-    MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
-    if (ctx == nullptr) {
-        // when ctx is nullptr, there must be error happened when on_chunk_data
-        // and reply is sent, we just return with no operation
-        LOG(WARNING) << "handler context is nullptr when MiniLoad callback execute, uri="
-                     << http_req->uri();
-        return;
-    }
-    if (ctx->body_bytes > 0 && ctx->bytes_written != ctx->body_bytes) {
-        LOG(WARNING) << "bytes written is not equal with body size, uri=" << http_req->uri()
-                     << ", body_bytes=" << ctx->body_bytes
-                     << ", bytes_written=" << ctx->bytes_written;
-        HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
-                                "receipt size not equal with body size");
-        return;
-    }
-    auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster,
-                    ctx->bytes_written);
-    std::string str = st.to_json();
-    HttpChannel::send_reply(http_req, str);
-}
-
-Status MiniLoadAction::generate_check_load_req(const HttpRequest* http_req,
-                                               TLoadCheckRequest* check_load_req) {
-    const char k_basic[] = "Basic ";
-    const std::string& auth = http_req->header(HttpHeaders::AUTHORIZATION);
-    if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) {
-        return Status::InternalError("Not support Basic authorization.");
-    }
-
-    check_load_req->protocolVersion = FrontendServiceVersion::V1;
-    // Skip "Basic "
-    std::string str = auth.substr(sizeof(k_basic) - 1);
-    std::string cluster;
-    if (!parse_auth(str, &(check_load_req->user), &(check_load_req->passwd), &cluster)) {
-        LOG(WARNING) << "parse auth string failed." << auth << " and str " << str;
-        return Status::InternalError("Parse authorization failed.");
-    }
-    if (!cluster.empty()) {
-        check_load_req->__set_cluster(cluster);
-    }
-    check_load_req->db = http_req->param(DB_KEY);
-    check_load_req->__set_tbl(http_req->param(TABLE_KEY));
-    if (http_req->param(SUB_LABEL_KEY).empty()) {
-        check_load_req->__set_label(http_req->param(LABEL_KEY));
-        check_load_req->__set_timestamp(GetCurrentTimeMicros());
-    }
-
-    if (http_req->remote_host() != nullptr) {
-        std::string user_ip(http_req->remote_host());
-        check_load_req->__set_user_ip(user_ip);
-    }
-
-    return Status::OK();
-}
-
-bool LoadHandleCmp::operator()(const LoadHandle& lhs, const LoadHandle& rhs) const {
-    int ret = lhs.label.compare(rhs.label);
-    if (ret < 0) {
-        return true;
-    } else if (ret > 0) {
-        return false;
-    }
-
-    ret = lhs.sub_label.compare(rhs.sub_label);
-    if (ret < 0) {
-        return true;
-    } else if (ret > 0) {
-        return false;
-    }
-
-    ret = lhs.db.compare(rhs.db);
-    if (ret < 0) {
-        return true;
-    }
-
-    return false;
-}
-
-// fe will begin the txn and record the metadata of load
-Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
-    // prepare begin mini load request params
-    TMiniLoadBeginRequest request;
-    set_request_auth(&request, ctx->auth);
-    request.db = ctx->db;
-    request.tbl = ctx->table;
-    request.label = ctx->label;
-    if (!ctx->sub_label.empty()) {
-        request.__set_sub_label(ctx->sub_label);
-    }
-    if (ctx->timeout_second != -1) {
-        request.__set_timeout_second(ctx->timeout_second);
-    }
-    if (ctx->max_filter_ratio != 0.0) {
-        request.__set_max_filter_ratio(ctx->max_filter_ratio);
-    }
-    request.__set_create_timestamp(UnixMillis());
-    request.__set_request_id(ctx->id.to_thrift());
-    // begin load by master
-    const TNetworkAddress& master_addr = _exec_env->master_info()->network_address;
-    TMiniLoadBeginResult res;
-    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&request, &res](FrontendServiceConnection& client) {
-                client->miniLoadBegin(res, request);
-            }));
-    Status begin_status(res.status);
-    if (!begin_status.ok()) {
-        LOG(INFO) << "failed to begin mini load " << ctx->label
-                  << " with error msg:" << begin_status.get_error_msg();
-        return begin_status;
-    }
-    ctx->txn_id = res.txn_id;
-    // txn has been begun in fe
-    ctx->need_rollback = true;
-    LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe";
-    return Status::OK();
-}
-
-Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
-    // prepare request parameters
-    TStreamLoadPutRequest put_request;
-    set_request_auth(&put_request, ctx->auth);
-    put_request.db = ctx->db;
-    put_request.tbl = ctx->table;
-    put_request.txnId = ctx->txn_id;
-    put_request.formatType = ctx->format;
-    put_request.__set_loadId(ctx->id.to_thrift());
-    put_request.fileType = TFileType::FILE_STREAM;
-    std::map<std::string, std::string> params(req->query_params().begin(),
-                                              req->query_params().end());
-    /* merge params of columns and hll
-     * for example:
-     * input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3
-     * output: columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3)
-     */
-    auto columns_it = params.find(COLUMNS_KEY);
-    if (columns_it != params.end()) {
-        std::string columns_value = columns_it->second;
-        auto hll_it = params.find(HLL_KEY);
-        if (hll_it != params.end()) {
-            std::string hll_value = hll_it->second;
-            if (hll_value.empty()) {
-                return Status::InvalidArgument(
-                        "Hll value could not be empty when hll key is exists!");
-            }
-            std::map<std::string, std::string> hll_map;
-            RETURN_IF_ERROR(StringParser::split_string_to_map(hll_value, ":", ",", &hll_map));
-            if (hll_map.empty()) {
-                return Status::InvalidArgument("Hll value could not transform to hll expr: " +
-                                               hll_value);
-            }
-            for (auto& hll_element : hll_map) {
-                columns_value += "," + hll_element.first + "=hll_hash(" + hll_element.second + ")";
-            }
-        }
-        put_request.__set_columns(columns_value);
-    }
-    auto column_separator_it = params.find(COLUMN_SEPARATOR_KEY);
-    if (column_separator_it != params.end()) {
-        put_request.__set_columnSeparator(column_separator_it->second);
-    }
-    if (ctx->timeout_second != -1) {
-        put_request.__set_timeout(ctx->timeout_second);
-    }
-    auto strict_mode_it = params.find(STRICT_MODE_KEY);
-    if (strict_mode_it != params.end()) {
-        std::string strict_mode_value = strict_mode_it->second;
-        if (iequal(strict_mode_value, "false")) {
-            put_request.__set_strictMode(false);
-        } else if (iequal(strict_mode_value, "true")) {
-            put_request.__set_strictMode(true);
-        } else {
-            return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
-        }
-    }
-
-    // plan this load
-    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
-    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&put_request, ctx](FrontendServiceConnection& client) {
-                client->streamLoadPut(ctx->put_result, put_request);
-            }));
-    Status plan_status(ctx->put_result.status);
-    if (!plan_status.ok()) {
-        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status.get_error_msg()
-                     << ctx->brief();
-        return plan_status;
-    }
-    VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
-    return Status::OK();
-}
-
-// new on_header of mini load
-Status MiniLoadAction::_on_new_header(HttpRequest* req) {
-    size_t body_bytes = 0;
-    size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
-    if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
-        body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
-        if (body_bytes > max_body_bytes) {
-            std::stringstream ss;
-            ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
-            return Status::InvalidArgument(ss.str());
-        }
-    } else {
-        evhttp_connection_set_max_body_size(
-                evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
-    }
-
-    RETURN_IF_ERROR(check_request(req));
-
-    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
-    ctx->ref();
-    ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx = ctx;
-
-    // auth information
-    if (!parse_basic_auth(*req, &ctx->auth)) {
-        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
-        return Status::InvalidArgument("no valid Basic authorization");
-    }
-
-    ctx->load_type = TLoadType::MINI_LOAD;
-    ctx->load_src_type = TLoadSourceType::RAW;
-
-    ctx->db = req->param(DB_KEY);
-    ctx->table = req->param(TABLE_KEY);
-    ctx->label = req->param(LABEL_KEY);
-    if (!req->param(SUB_LABEL_KEY).empty()) {
-        ctx->sub_label = req->param(SUB_LABEL_KEY);
-    }
-    ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
-    std::map<std::string, std::string> params(req->query_params().begin(),
-                                              req->query_params().end());
-    auto max_filter_ratio_it = params.find(MAX_FILTER_RATIO_KEY);
-    if (max_filter_ratio_it != params.end()) {
-        ctx->max_filter_ratio = strtod(max_filter_ratio_it->second.c_str(), nullptr);
-    }
-    auto timeout_it = params.find(TIMEOUT_KEY);
-    if (timeout_it != params.end()) {
-        try {
-            ctx->timeout_second = std::stoi(timeout_it->second);
-        } catch (const std::invalid_argument& e) {
-            return Status::InvalidArgument("Invalid timeout format");
-        }
-    }
-
-    LOG(INFO) << "new income mini load request." << ctx->brief() << ", db: " << ctx->db
-              << ", tbl: " << ctx->table;
-
-    // record metadata in frontend
-    RETURN_IF_ERROR(_begin_mini_load(ctx));
-
-    // open sink
-    auto pipe = std::make_shared<StreamLoadPipe>();
-    RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
-    ctx->body_sink = pipe;
-
-    // get plan from fe
-    RETURN_IF_ERROR(_process_put(req, ctx));
-
-    // execute plan
-    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
-}
-
-void MiniLoadAction::_new_handle(HttpRequest* req) {
-    StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
-    DCHECK(ctx != nullptr);
-
-    if (ctx->status.ok()) {
-        ctx->status = _on_new_handle(ctx);
-        if (!ctx->status.ok()) {
-            LOG(WARNING) << "handle mini load failed, id=" << ctx->id
-                         << ", errmsg=" << ctx->status.get_error_msg();
-        }
-    }
-
-    // if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn.
-    // PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt
-    // to see the final result.
-    if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
-        if (ctx->need_rollback) {
-            _exec_env->stream_load_executor()->rollback_txn(ctx);
-            ctx->need_rollback = false;
-        }
-        if (ctx->body_sink.get() != nullptr) {
-            ctx->body_sink->cancel(ctx->status.get_error_msg());
-        }
-    }
-
-    std::string str = ctx->to_json_for_mini_load();
-    str += '\n';
-    HttpChannel::send_reply(req, str);
-}
-
-Status MiniLoadAction::_on_new_handle(StreamLoadContext* ctx) {
-    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
-        LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes
-                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
-        return Status::InternalError("receive body don't equal with body bytes");
-    }
-
-    // wait stream load sink finish
-    RETURN_IF_ERROR(ctx->body_sink->finish());
-
-    // wait stream load finish
-    RETURN_IF_ERROR(ctx->future.get());
-
-    // commit this load with mini load attachment
-    RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
-
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/http/action/mini_load.h b/be/src/http/action/mini_load.h
deleted file mode 100644
index a21a450f19..0000000000
--- a/be/src/http/action/mini_load.h
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-#include <mutex>
-#include <set>
-#include <string>
-
-#include "common/status.h"
-#include "gen_cpp/FrontendService.h"
-#include "http/http_handler.h"
-#include "runtime/stream_load/stream_load_context.h"
-#include "util/defer_op.h"
-
-namespace doris {
-
-// Used to identify one mini load job
-struct LoadHandle {
-    std::string db;
-    std::string label;
-    std::string sub_label;
-};
-
-struct LoadHandleCmp {
-    bool operator()(const LoadHandle& lhs, const LoadHandle& rhs) const;
-};
-
-class TMasterResult;
-class ExecEnv;
-class StreamLoadContext;
-
-// This a handler for mini load
-// path is /api/{db}/{table}/_load
-class MiniLoadAction : public HttpHandler {
-public:
-    MiniLoadAction(ExecEnv* exec_env);
-
-    virtual ~MiniLoadAction() {}
-
-    void handle(HttpRequest* req) override;
-
-    bool request_will_be_read_progressively() override { return true; }
-
-    int on_header(HttpRequest* req) override;
-
-    void on_chunk_data(HttpRequest* req) override;
-    void free_handler_ctx(void* ctx) override;
-
-    void erase_handle(const LoadHandle& handle);
-
-private:
-    Status _load(HttpRequest* req, const std::string& file_path, const std::string& user,
-                 const std::string& cluster, int64_t file_size);
-
-    Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path);
-
-    Status _on_header(HttpRequest* http_req);
-
-    Status generate_check_load_req(const HttpRequest* http_req, TLoadCheckRequest* load_check_req);
-
-    Status check_auth(const HttpRequest* http_req, const TLoadCheckRequest& load_check_req);
-
-    void _on_chunk_data(HttpRequest* http_req);
-
-    void _handle(HttpRequest* http_req);
-
-    // streaming mini load
-    Status _on_new_header(HttpRequest* req);
-
-    Status _begin_mini_load(StreamLoadContext* ctx);
-
-    Status _process_put(HttpRequest* req, StreamLoadContext* ctx);
-
-    void _on_new_chunk_data(HttpRequest* http_req);
-
-    void _new_handle(HttpRequest* req);
-
-    Status _on_new_handle(StreamLoadContext* ctx);
-
-    bool _is_streaming(HttpRequest* req);
-
-    Status _merge_header(HttpRequest* http_req, std::map<std::string, std::string>* params);
-
-    const std::string _streaming_function_name = "STREAMING_MINI_LOAD";
-
-    ExecEnv* _exec_env;
-
-    std::mutex _lock;
-    // Used to check if load is duplicated in this instance.
-    std::set<LoadHandle, LoadHandleCmp> _current_load;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 7692ae933f..2cc56e137b 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -60,7 +60,6 @@ set(RUNTIME_FILES
     qsorter.cpp
     fragment_mgr.cpp
     dpp_sink_internal.cpp
-    etl_job_mgr.cpp
     load_path_mgr.cpp
     types.cpp
     tmp_file_mgr.cc
diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp
deleted file mode 100644
index ce4029b144..0000000000
--- a/be/src/runtime/etl_job_mgr.cpp
+++ /dev/null
@@ -1,302 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/etl_job_mgr.h"
-
-#include <filesystem>
-#include <functional>
-
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/MasterService_types.h"
-#include "gen_cpp/Status_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "runtime/plan_fragment_executor.h"
-#include "runtime/runtime_state.h"
-#include "service/backend_options.h"
-#include "util/file_utils.h"
-#include "util/uid_util.h"
-
-namespace doris {
-
-#define VLOG_ETL VLOG_CRITICAL
-
-std::string EtlJobMgr::to_http_path(const std::string& file_name) {
-    std::stringstream url;
-    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
-        << "/api/_download_load?"
-        << "token=" << _exec_env->token() << "&file=" << file_name;
-    return url.str();
-}
-
-std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) {
-    std::stringstream url;
-    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
-        << "/api/_load_error_log?"
-        << "file=" << file_name;
-    return url.str();
-}
-
-const std::string DPP_NORMAL_ALL = "dpp.norm.ALL";
-const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
-const std::string ERROR_FILE_PREFIX = "error_log";
-
-EtlJobMgr::EtlJobMgr(ExecEnv* exec_env)
-        : _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {}
-
-EtlJobMgr::~EtlJobMgr() {}
-
-Status EtlJobMgr::init() {
-    return Status::OK();
-}
-
-Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) {
-    const TUniqueId& id = req.params.params.fragment_instance_id;
-    std::lock_guard<std::mutex> l(_lock);
-    auto it = _running_jobs.find(id);
-    if (it != _running_jobs.end()) {
-        // Already have this job, return what???
-        LOG(INFO) << "Duplicated etl job(" << id << ")";
-        return Status::OK();
-    }
-
-    // If already success, we return Status::OK()
-    // and wait master ask me success information
-    if (_success_jobs.exists(id)) {
-        // Already success
-        LOG(INFO) << "Already successful etl job(" << id << ")";
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
-            req.params, std::bind<void>(&EtlJobMgr::finalize_job, this, std::placeholders::_1)));
-
-    // redo this job if failed before
-    if (_failed_jobs.exists(id)) {
-        _failed_jobs.erase(id);
-    }
-
-    VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr.";
-    _running_jobs.insert(id);
-
-    return Status::OK();
-}
-
-void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) {
-    TUpdateMiniEtlTaskStatusRequest request;
-    RuntimeState* state = executor->runtime_state();
-    request.protocolVersion = FrontendServiceVersion::V1;
-    request.etlTaskId = state->fragment_instance_id();
-    Status status = get_job_state(state->fragment_instance_id(), &request.etlTaskStatus);
-    if (!status.ok()) {
-        return;
-    }
-    const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
-    FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
-                                     config::thrift_rpc_timeout_ms, &status);
-    if (!status.ok()) {
-        std::stringstream ss;
-        ss << "Connect master failed, with address(" << master_address.hostname << ":"
-           << master_address.port << ")";
-        LOG(WARNING) << ss.str();
-        return;
-    }
-    TFeResult res;
-    try {
-        try {
-            client->updateMiniEtlTaskStatus(res, request);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "Retrying report etl jobs status to master(" << master_address.hostname
-                         << ":" << master_address.port << ") because: " << e.what();
-            status = client.reopen(config::thrift_rpc_timeout_ms);
-            if (!status.ok()) {
-                LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname
-                             << ":" << master_address.port << ")";
-                return;
-            }
-            client->updateMiniEtlTaskStatus(res, request);
-        }
-    } catch (apache::thrift::TException& e) {
-        // failed when retry.
-        // reopen to disable this connection
-        client.reopen(config::thrift_rpc_timeout_ms);
-        std::stringstream ss;
-        ss << "Report etl task to master(" << master_address.hostname << ":" << master_address.port
-           << ") failed because: " << e.what();
-        LOG(WARNING) << ss.str();
-    }
-    // TODO(lingbin): check status of 'res' here.
-    // because there are some checks in updateMiniEtlTaskStatus, for example max_filter_ratio.
-    LOG(INFO) << "Successfully report elt job status to master.id=" << print_id(request.etlTaskId);
-}
-
-void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) {
-    EtlJobResult result;
-
-    RuntimeState* state = executor->runtime_state();
-    if (executor->status().ok()) {
-        // Get files
-        for (auto& it : state->output_files()) {
-            int64_t file_size = std::filesystem::file_size(it);
-            result.file_map[to_http_path(it)] = file_size;
-        }
-        // set statistics
-        result.process_normal_rows = state->num_rows_load_success();
-        result.process_abnormal_rows = state->num_rows_load_filtered();
-    } else {
-        // get debug path
-        result.process_normal_rows = state->num_rows_load_success();
-        result.process_abnormal_rows = state->num_rows_load_filtered();
-    }
-
-    result.debug_path = state->get_error_log_file_path();
-
-    finish_job(state->fragment_instance_id(), executor->status(), result);
-
-    // Try to report this finished task to master
-    report_to_master(executor);
-}
-
-Status EtlJobMgr::cancel_job(const TUniqueId& id) {
-    std::lock_guard<std::mutex> l(_lock);
-    auto it = _running_jobs.find(id);
-    if (it == _running_jobs.end()) {
-        // Nothing to do
-        LOG(INFO) << "No such job id, just print to info " << id;
-        return Status::OK();
-    }
-    _running_jobs.erase(it);
-    VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr.";
-    EtlJobCtx job_ctx;
-    job_ctx.finish_status = Status::Cancelled("Cancelled");
-    _failed_jobs.put(id, job_ctx);
-    return Status::OK();
-}
-
-Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status,
-                             const EtlJobResult& result) {
-    std::lock_guard<std::mutex> l(_lock);
-
-    auto it = _running_jobs.find(id);
-    if (it == _running_jobs.end()) {
-        std::stringstream ss;
-        ss << "Unknown job id(" << id << ").";
-        return Status::InternalError(ss.str());
-    }
-    _running_jobs.erase(it);
-
-    EtlJobCtx ctx;
-    ctx.finish_status = finish_status;
-    ctx.result = result;
-    if (finish_status.ok()) {
-        _success_jobs.put(id, ctx);
-    } else {
-        _failed_jobs.put(id, ctx);
-    }
-
-    VLOG_ETL << "Move job(" << id << ") from running to "
-             << (finish_status.ok() ? "success jobs" : "failed jobs");
-
-    return Status::OK();
-}
-
-Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result) {
-    std::lock_guard<std::mutex> l(_lock);
-    auto it = _running_jobs.find(id);
-    if (it != _running_jobs.end()) {
-        result->status.__set_status_code(TStatusCode::OK);
-        result->__set_etl_state(TEtlState::RUNNING);
-        return Status::OK();
-    }
-    // Successful
-    if (_success_jobs.exists(id)) {
-        EtlJobCtx ctx;
-        _success_jobs.get(id, &ctx);
-        result->status.__set_status_code(TStatusCode::OK);
-        result->__set_etl_state(TEtlState::FINISHED);
-        result->__set_file_map(ctx.result.file_map);
-
-        // set counter
-        std::map<std::string, std::string> counter;
-        counter[DPP_NORMAL_ALL] = std::to_string(ctx.result.process_normal_rows);
-        counter[DPP_ABNORMAL_ALL] = std::to_string(ctx.result.process_abnormal_rows);
-        result->__set_counters(counter);
-
-        if (!ctx.result.debug_path.empty()) {
-            result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path));
-        }
-        return Status::OK();
-    }
-    // failed information
-    if (_failed_jobs.exists(id)) {
-        EtlJobCtx ctx;
-        _failed_jobs.get(id, &ctx);
-        result->status.__set_status_code(TStatusCode::OK);
-        result->__set_etl_state(TEtlState::CANCELLED);
-
-        if (!ctx.result.debug_path.empty()) {
-            result->__set_tracking_url(to_http_path(ctx.result.debug_path));
-        }
-        return Status::OK();
-    }
-    // NO this jobs
-    result->status.__set_status_code(TStatusCode::OK);
-    result->__set_etl_state(TEtlState::CANCELLED);
-    return Status::OK();
-}
-
-Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) {
-    std::lock_guard<std::mutex> l(_lock);
-    const TUniqueId& id = req.mini_load_id;
-    auto it = _running_jobs.find(id);
-    if (it != _running_jobs.end()) {
-        std::stringstream ss;
-        ss << "Job(" << id << ") is running, can not be deleted.";
-        return Status::InternalError(ss.str());
-    }
-    _success_jobs.erase(id);
-    _failed_jobs.erase(id);
-
-    return Status::OK();
-}
-
-void EtlJobMgr::debug(std::stringstream& ss) {
-    // Make things easy
-    std::lock_guard<std::mutex> l(_lock);
-
-    // Debug summary
-    ss << "we have " << _running_jobs.size() << " jobs Running\n";
-    ss << "we have " << _failed_jobs.size() << " jobs Failed\n";
-    ss << "we have " << _success_jobs.size() << " jobs Successful\n";
-    // Debug running jobs
-    for (auto& it : _running_jobs) {
-        ss << "running jobs: " << it << "\n";
-    }
-    // Debug success jobs
-    for (auto& it : _success_jobs) {
-        ss << "successful jobs: " << it.first << "\n";
-    }
-    // Debug failed jobs
-    for (auto& it : _failed_jobs) {
-        ss << "failed jobs: " << it.first << "\n";
-    }
-}
-
-} // namespace doris
diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h
deleted file mode 100644
index 306cf0397f..0000000000
--- a/be/src/runtime/etl_job_mgr.h
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <pthread.h>
-
-#include <mutex>
-#include <string>
-#include <unordered_set>
-#include <vector>
-
-#include "common/status.h"
-#include "gen_cpp/Types_types.h"
-#include "http/rest_monitor_iface.h"
-#include "util/hash_util.hpp"
-#include "util/lru_cache.hpp"
-
-namespace doris {
-
-// used to report to master
-struct EtlJobResult {
-    EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {}
-    std::string debug_path;
-    std::map<std::string, int64_t> file_map;
-    int64_t process_normal_rows;
-    int64_t process_abnormal_rows;
-};
-
-// used to report to master
-struct EtlJobCtx {
-    Status finish_status;
-    EtlJobResult result;
-};
-
-class TMiniLoadEtlStatusResult;
-class TMiniLoadEtlTaskRequest;
-class ExecEnv;
-class PlanFragmentExecutor;
-class TDeleteEtlFilesRequest;
-
-// manager of all the Etl job
-// used this because master may loop be to check if a load job is finished.
-class EtlJobMgr : public RestMonitorIface {
-public:
-    EtlJobMgr(ExecEnv* exec_env);
-
-    virtual ~EtlJobMgr();
-
-    // make trash directory for collect
-    Status init();
-
-    // Make a job to running state
-    // If this job is successful, return OK
-    // If this job is failed, move this job from _failed_jobs to _running_jobs
-    // Otherwise, put it to _running_jobs
-    Status start_job(const TMiniLoadEtlTaskRequest& req);
-
-    // Make a running job to failed job
-    Status cancel_job(const TUniqueId& id);
-
-    Status finish_job(const TUniqueId& id, const Status& finish_status, const EtlJobResult& result);
-
-    Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result);
-
-    Status erase_job(const TDeleteEtlFilesRequest& req);
-
-    void finalize_job(PlanFragmentExecutor* executor);
-
-    virtual void debug(std::stringstream& ss);
-
-private:
-    std::string to_http_path(const std::string& file_path);
-    std::string to_load_error_http_path(const std::string& file_path);
-
-    void report_to_master(PlanFragmentExecutor* executor);
-
-    ExecEnv* _exec_env;
-    std::mutex _lock;
-    std::unordered_set<TUniqueId> _running_jobs;
-    LruCache<TUniqueId, EtlJobCtx> _success_jobs;
-    LruCache<TUniqueId, EtlJobCtx> _failed_jobs;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 6fda2e6a1f..e0e09ae047 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -128,7 +128,6 @@ public:
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     ResultCache* result_cache() { return _result_cache; }
     TMasterInfo* master_info() { return _master_info; }
-    EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
     LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
     DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; }
     TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; }
@@ -207,7 +206,6 @@ private:
     FragmentMgr* _fragment_mgr = nullptr;
     ResultCache* _result_cache = nullptr;
     TMasterInfo* _master_info = nullptr;
-    EtlJobMgr* _etl_job_mgr = nullptr;
     LoadPathMgr* _load_path_mgr = nullptr;
     DiskIoMgr* _disk_io_mgr = nullptr;
     TmpFileMgr* _tmp_file_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 264565e4d6..2347d2bea8 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -31,7 +31,6 @@
 #include "runtime/client_cache.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/disk_io_mgr.h"
-#include "runtime/etl_job_mgr.h"
 #include "runtime/exec_env.h"
 #include "runtime/external_scan_context_mgr.h"
 #include "runtime/fold_constant_executor.h"
@@ -128,7 +127,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
                                     config::query_cache_elasticity_size_mb);
     _master_info = new TMasterInfo();
-    _etl_job_mgr = new EtlJobMgr(this);
     _load_path_mgr = new LoadPathMgr(this);
     _disk_io_mgr = new DiskIoMgr();
     _tmp_file_mgr = new TmpFileMgr(this);
@@ -147,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _broker_client_cache->init_metrics("broker");
     _result_mgr->init();
     _cgroups_mgr->init_cgroups();
-    _etl_job_mgr->init();
     Status status = _load_path_mgr->init();
     if (!status.ok()) {
         LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
@@ -335,7 +332,6 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_tmp_file_mgr);
     SAFE_DELETE(_disk_io_mgr);
     SAFE_DELETE(_load_path_mgr);
-    SAFE_DELETE(_etl_job_mgr);
     SAFE_DELETE(_master_info);
     SAFE_DELETE(_fragment_mgr);
     SAFE_DELETE(_cgroups_mgr);
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp
index 4f080318b8..37491fc98e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -335,17 +335,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
     }
     switch (ctx->load_type) {
     case TLoadType::MINI_LOAD: {
-        attach->loadType = TLoadType::MINI_LOAD;
-
-        TMiniLoadTxnCommitAttachment ml_attach;
-        ml_attach.loadedRows = ctx->number_loaded_rows;
-        ml_attach.filteredRows = ctx->number_filtered_rows;
-        if (!ctx->error_url.empty()) {
-            ml_attach.__set_errorLogUrl(ctx->error_url);
-        }
-
-        attach->mlTxnCommitAttachment = std::move(ml_attach);
-        attach->__isset.mlTxnCommitAttachment = true;
+        LOG(FATAL) << "mini load is not supported any more";
         break;
     }
     case TLoadType::ROUTINE_LOAD: {
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index bb47fe9e4e..c3f1261a63 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -36,10 +36,6 @@ class ThriftServer;
 class TAgentResult;
 class TAgentTaskRequest;
 class TAgentPublishRequest;
-class TMiniLoadEtlTaskRequest;
-class TMiniLoadEtlStatusResult;
-class TMiniLoadEtlStatusRequest;
-class TDeleteEtlFilesRequest;
 class TPlanExecRequest;
 class TPlanExecParams;
 class TExecPlanFragmentParams;
@@ -97,23 +93,6 @@ public:
         _agent_server->publish_cluster_state(result, request);
     }
 
-    virtual void submit_etl_task(TAgentResult& result,
-                                 const TMiniLoadEtlTaskRequest& request) override {
-        VLOG_RPC << "submit_etl_task. request is "
-                 << apache::thrift::ThriftDebugString(request).c_str();
-        _agent_server->submit_etl_task(result, request);
-    }
-
-    virtual void get_etl_status(TMiniLoadEtlStatusResult& result,
-                                const TMiniLoadEtlStatusRequest& request) override {
-        _agent_server->get_etl_status(result, request);
-    }
-
-    virtual void delete_etl_files(TAgentResult& result,
-                                  const TDeleteEtlFilesRequest& request) override {
-        _agent_server->delete_etl_files(result, request);
-    }
-
     // DorisServer service
     virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
                                     const TExecPlanFragmentParams& params) override;
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 3ce77f6bb6..80dd2612b6 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -25,7 +25,6 @@
 #include "http/action/health_action.h"
 #include "http/action/meta_action.h"
 #include "http/action/metrics_action.h"
-#include "http/action/mini_load.h"
 #include "http/action/pprof_actions.h"
 #include "http/action/reload_tablet_action.h"
 #include "http/action/reset_rpc_channel_action.h"
@@ -57,9 +56,9 @@ Status HttpService::start() {
     add_default_path_handlers(_web_page_handler.get(), MemTracker::get_process_tracker());
 
     // register load
-    MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env));
-    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action);
     StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
+    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
+                                      streamload_action);
     _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load",
                                       streamload_action);
     StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env));
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 879b6f7b24..49c14c49f6 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -215,7 +215,6 @@ set(RUNTIME_TEST_FILES
     # runtime/dpp_sink_internal_test.cpp
     # runtime/dpp_sink_test.cpp
     # runtime/data_spliter_test.cpp
-    # runtime/etl_job_mgr_test.cpp
     # runtime/tmp_file_mgr_test.cpp
     # runtime/disk_io_mgr_test.cpp
     # runtime/thread_resource_mgr_test.cpp
diff --git a/be/test/runtime/data_stream_test.cpp b/be/test/runtime/data_stream_test.cpp
index 39d7cba249..b0a8de6529 100644
--- a/be/test/runtime/data_stream_test.cpp
+++ b/be/test/runtime/data_stream_test.cpp
@@ -94,15 +94,6 @@ public:
     virtual void publish_cluster_state(TAgentResult& return_val,
                                        const TAgentPublishRequest& request) {}
 
-    virtual void submit_etl_task(TAgentResult& return_val, const TMiniLoadEtlTaskRequest& request) {
-    }
-
-    virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val,
-                                const TMiniLoadEtlStatusRequest& request) {}
-
-    virtual void delete_etl_files(TAgentResult& return_val, const TDeleteEtlFilesRequest& request) {
-    }
-
     virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
                                          const int32_t num_senders) {}
 
diff --git a/be/test/runtime/etl_job_mgr_test.cpp b/be/test/runtime/etl_job_mgr_test.cpp
deleted file mode 100644
index edb8d140c3..0000000000
--- a/be/test/runtime/etl_job_mgr_test.cpp
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/etl_job_mgr.h"
-
-#include <gtest/gtest.h>
-
-#include "gen_cpp/Types_types.h"
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
-#include "util/cpu_info.h"
-
-namespace doris {
-// Mock fragment mgr
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
-    return Status::OK();
-}
-
-FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {}
-
-FragmentMgr::~FragmentMgr() {}
-
-void FragmentMgr::debug(std::stringstream& ss) {}
-
-class EtlJobMgrTest : public testing::Test {
-public:
-    EtlJobMgrTest() {}
-
-private:
-    ExecEnv _exec_env;
-};
-
-TEST_F(EtlJobMgrTest, NormalCase) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-    TMiniLoadEtlTaskRequest req;
-    TDeleteEtlFilesRequest del_req;
-    del_req.mini_load_id = id;
-    req.params.params.fragment_instance_id = id;
-
-    // make it running
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // make it finishing
-    EtlJobResult job_result;
-    job_result.file_map["abc"] = 100L;
-    EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-    EXPECT_EQ(1, res.file_map.size());
-    EXPECT_EQ(100, res.file_map["abc"]);
-
-    // erase it
-    EXPECT_TRUE(mgr.erase_job(del_req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, DuplicateCase) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-    TMiniLoadEtlTaskRequest req;
-    req.params.params.fragment_instance_id = id;
-
-    // make it running
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // Put it twice
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, RunAfterSuccess) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-    TMiniLoadEtlTaskRequest req;
-    TDeleteEtlFilesRequest del_req;
-    del_req.mini_load_id = id;
-    req.params.params.fragment_instance_id = id;
-
-    // make it running
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // make it finishing
-    EtlJobResult job_result;
-    job_result.file_map["abc"] = 100L;
-    EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-    EXPECT_EQ(1, res.file_map.size());
-    EXPECT_EQ(100, res.file_map["abc"]);
-
-    // Put it twice
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-    EXPECT_EQ(1, res.file_map.size());
-    EXPECT_EQ(100, res.file_map["abc"]);
-}
-
-TEST_F(EtlJobMgrTest, RunAfterFail) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-    TMiniLoadEtlTaskRequest req;
-    req.params.params.fragment_instance_id = id;
-
-    // make it running
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // make it finishing
-    EtlJobResult job_result;
-    job_result.debug_path = "abc";
-    EXPECT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // Put it twice
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, CancelJob) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-    TMiniLoadEtlTaskRequest req;
-    req.params.params.fragment_instance_id = id;
-
-    // make it running
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // make it finishing
-    EtlJobResult job_result;
-    job_result.debug_path = "abc";
-    EXPECT_TRUE(mgr.cancel_job(id).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-
-    // Put it twice
-    EXPECT_TRUE(mgr.start_job(req).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-TEST_F(EtlJobMgrTest, FinishUnknownJob) {
-    EtlJobMgr mgr(&_exec_env);
-    TUniqueId id;
-    id.lo = 1;
-    id.hi = 1;
-
-    TMiniLoadEtlStatusResult res;
-
-    // make it finishing
-    EtlJobResult job_result;
-    job_result.debug_path = "abc";
-    EXPECT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
-    EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
-    EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
-    EXPECT_EQ(TStatusCode::OK, res.status.status_code);
-}
-
-} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index f6b85a0ebe..7278dd53aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -29,11 +29,9 @@ import org.apache.doris.analysis.FunctionName;
 import org.apache.doris.analysis.FunctionParams;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.IsNullPredicate;
-import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StorageBackend;
@@ -45,7 +43,6 @@ import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -88,14 +85,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Backend;
-import org.apache.doris.task.AgentClient;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.thrift.TBrokerScanRangeParams;
 import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.transaction.TransactionNotFoundException;
@@ -230,140 +225,6 @@ public class Load {
         lock.writeLock().unlock();
     }
 
-    // return true if we truly add the load job
-    // return false otherwise (eg: a retry request)
-    @Deprecated
-    public boolean addMiniLoadJob(TMiniLoadRequest request) throws DdlException {
-        // get params
-        String fullDbName = request.getDb();
-        String tableName = request.getTbl();
-        String label = request.getLabel();
-        long timestamp = 0;
-        if (request.isSetTimestamp()) {
-            timestamp = request.getTimestamp();
-        }
-        TNetworkAddress beAddr = request.getBackend();
-        String filePathsValue = request.getFiles().get(0);
-        Map<String, String> params = request.getProperties();
-
-        // create load stmt
-        // label name
-        LabelName labelName = new LabelName(fullDbName, label);
-
-        // data descriptions
-        // file paths
-        if (Strings.isNullOrEmpty(filePathsValue)) {
-            throw new DdlException("File paths are not specified");
-        }
-        List<String> filePaths = Arrays.asList(filePathsValue.split(","));
-
-        // partitions | column names | separator | line delimiter
-        List<String> partitionNames = null;
-        List<String> columnNames = null;
-        Separator columnSeparator = null;
-        List<String> hllColumnPairList = null;
-        Separator lineDelimiter = null;
-        String formatType = null;
-        if (params != null) {
-            String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS);
-            if (!Strings.isNullOrEmpty(specifiedPartitions)) {
-                partitionNames = Arrays.asList(specifiedPartitions.split(","));
-            }
-            String specifiedColumns = params.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
-            if (!Strings.isNullOrEmpty(specifiedColumns)) {
-                columnNames = Arrays.asList(specifiedColumns.split(","));
-            }
-
-            final String hll = params.get(LoadStmt.KEY_IN_PARAM_HLL);
-            if (!Strings.isNullOrEmpty(hll)) {
-                hllColumnPairList = Arrays.asList(hll.split(":"));
-            }
-
-            String columnSeparatorStr = params.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
-            if (columnSeparatorStr != null) {
-                if (columnSeparatorStr.isEmpty()) {
-                    columnSeparatorStr = "\t";
-                }
-                columnSeparator = new Separator(columnSeparatorStr);
-                try {
-                    columnSeparator.analyze();
-                } catch (AnalysisException e) {
-                    throw new DdlException(e.getMessage());
-                }
-            }
-            String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
-            if (lineDelimiterStr != null) {
-                if (lineDelimiterStr.isEmpty()) {
-                    lineDelimiterStr = "\n";
-                }
-                lineDelimiter = new Separator(lineDelimiterStr);
-                try {
-                    lineDelimiter.analyze();
-                } catch (AnalysisException e) {
-                    throw new DdlException(e.getMessage());
-                }
-            }
-            formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
-        }
-
-        DataDescription dataDescription = new DataDescription(
-                tableName,
-                partitionNames != null ? new PartitionNames(false, partitionNames) : null,
-                filePaths,
-                columnNames,
-                columnSeparator,
-                formatType,
-                false,
-                null
-        );
-        dataDescription.setLineDelimiter(lineDelimiter);
-        dataDescription.setBeAddr(beAddr);
-        // parse hll param pair
-        if (hllColumnPairList != null) {
-            for (int i = 0; i < hllColumnPairList.size(); i++) {
-                final String pairStr = hllColumnPairList.get(i);
-                final List<String> pairList = Arrays.asList(pairStr.split(","));
-                if (pairList.size() != 2) {
-                    throw new DdlException("hll param format error");
-                }
-
-                final String resultColumn = pairList.get(0);
-                final String hashColumn = pairList.get(1);
-                final Pair<String, List<String>> pair = new Pair<String, List<String>>(FunctionSet.HLL_HASH,
-                        Arrays.asList(hashColumn));
-                dataDescription.addColumnMapping(resultColumn, pair);
-            }
-        }
-
-        List<DataDescription> dataDescriptions = Lists.newArrayList(dataDescription);
-
-        // job properties
-        Map<String, String> properties = Maps.newHashMap();
-        if (params != null) {
-            String maxFilterRatio = params.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
-            if (!Strings.isNullOrEmpty(maxFilterRatio)) {
-                properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
-            }
-            String timeout = params.get(LoadStmt.TIMEOUT_PROPERTY);
-            if (!Strings.isNullOrEmpty(timeout)) {
-                properties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
-            }
-        }
-        LoadStmt stmt = new LoadStmt(labelName, dataDescriptions, null, null, properties);
-
-        // try to register mini label
-        if (!registerMiniLabel(fullDbName, label, timestamp)) {
-            return false;
-        }
-
-        try {
-            addLoadJob(stmt, EtlJobType.MINI, timestamp);
-            return true;
-        } finally {
-            deregisterMiniLabel(fullDbName, label);
-        }
-    }
-
     public void addLoadJob(LoadStmt stmt, EtlJobType etlJobType, long timestamp) throws DdlException {
         // get db
         String dbName = stmt.getLabel().getDbName();
@@ -2641,24 +2502,6 @@ public class Load {
                 }
                 break;
             case MINI:
-                for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
-                    long backendId = taskInfo.getBackendId();
-                    Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
-                    if (backend == null) {
-                        LOG.warn("backend does not exist. id: {}", backendId);
-                        break;
-                    }
-
-                    long dbId = job.getDbId();
-                    Database db = Catalog.getCurrentInternalCatalog().getDbNullable(dbId);
-                    if (db == null) {
-                        LOG.warn("db does not exist. id: {}", dbId);
-                        break;
-                    }
-
-                    AgentClient client = new AgentClient(backend.getHost(), backend.getBePort());
-                    client.deleteEtlFiles(dbId, job.getId(), db.getFullName(), job.getLabel());
-                }
                 break;
             case INSERT:
                 break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 17b7af2508..a1b2edefdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -22,14 +22,12 @@ import org.apache.doris.analysis.CompoundPredicate.Operator;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Table;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.PatternMatcher;
@@ -41,9 +39,6 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.Load;
-import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
 
@@ -137,53 +132,6 @@ public class LoadManager implements Writable {
                 .filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count();
     }
 
-    /**
-     * This method will be invoked by streaming mini load.
-     * It will begin the txn of mini load immediately without any scheduler .
-     *
-     */
-    public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
-        String cluster = SystemInfoService.DEFAULT_CLUSTER;
-        if (request.isSetCluster()) {
-            cluster = request.getCluster();
-        }
-        Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
-        Table table = database.getTableOrDdlException(request.tbl);
-        MiniLoadJob loadJob = null;
-        writeLock();
-        try {
-            loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
-            // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
-            // NOTICE(cmy): this order is only for Mini Load, because mini load's
-            // unprotectedExecute() only do beginTxn().
-            // for other kind of load job, execute the job after adding job.
-            // Mini load job must be executed before release write lock.
-            // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
-            loadJob.beginTxn();
-            loadJob.unprotectedExecute();
-            createLoadJob(loadJob);
-        } catch (DuplicatedRequestException e) {
-            // this is a duplicate request, just return previous txn id
-            LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
-                    e.getTxnId());
-            return e.getTxnId();
-        } catch (UserException e) {
-            if (loadJob != null) {
-                loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
-                        false /* no need to write edit log, because createLoadJob log is not wrote yet */);
-            }
-            throw e;
-        } finally {
-            writeUnlock();
-        }
-
-        // The persistence of mini load must be the final step of create mini load.
-        // After mini load was executed, the txn id has been set and state has been changed to loading.
-        // Those two need to be record in persistence.
-        Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
-        return loadJob.getTransactionId();
-    }
-
     /**
      * This method will be invoked by version1 of broker or hadoop load.
      * It is used to check the label of v1 and v2 at the same time.
@@ -209,31 +157,6 @@ public class LoadManager implements Writable {
         }
     }
 
-    /**
-     * This method will be invoked by non-streaming of mini load.
-     * It is used to check the label of v1 and v2 at the same time.
-     * Finally, the non-streaming mini load will belongs to load class.
-     *
-     * @param request request
-     * @return if: mini load is a duplicated load, return false. else: return true.
-     * @deprecated not support mini load
-     */
-    @Deprecated
-    public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
-        String cluster = SystemInfoService.DEFAULT_CLUSTER;
-        if (request.isSetCluster()) {
-            cluster = request.getCluster();
-        }
-        Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
-        writeLock();
-        try {
-            checkLabelUsed(database.getId(), request.getLabel());
-            return Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request);
-        } finally {
-            writeUnlock();
-        }
-    }
-
     /**
      * MultiLoadMgr use.
      **/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 4c59ffdaea..2da78ae1c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -20,21 +20,11 @@ package org.apache.doris.load.loadv2;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.load.EtlJobType;
-import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -55,22 +45,6 @@ public class MiniLoadJob extends LoadJob {
         super(EtlJobType.MINI);
     }
 
-    public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
-        super(EtlJobType.MINI, dbId, request.getLabel());
-        this.tableId = tableId;
-        this.tableName = request.getTbl();
-        if (request.isSetTimeoutSecond()) {
-            setTimeout(request.getTimeoutSecond());
-        }
-        if (request.isSetMaxFilterRatio()) {
-            setMaxFilterRatio(request.getMaxFilterRatio());
-        }
-        this.createTimestamp = request.getCreateTimestamp();
-        this.loadStartTimestamp = createTimestamp;
-        this.authorizationInfo = gatherAuthInfo();
-        this.requestId = request.getRequestId();
-    }
-
     @Override
     public Set<String> getTableNamesForShow() {
         return Sets.newHashSet(tableName);
@@ -87,15 +61,7 @@ public class MiniLoadJob extends LoadJob {
     }
 
     @Override
-    public void beginTxn()
-            throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException,
-            QuotaExceedException, MetaNotFoundException {
-        transactionId = Catalog.getCurrentGlobalTransactionMgr()
-                .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId,
-                                  new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
-                                  TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
-                                  getTimeout());
-    }
+    public void beginTxn() {}
 
     @Override
     protected void replayTxnAttachment(TransactionState txnState) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
index de66082e1c..50094c91e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java
@@ -18,7 +18,6 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.common.io.Text;
-import org.apache.doris.thrift.TMiniLoadTxnCommitAttachment;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
@@ -36,15 +35,6 @@ public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment {
         super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
     }
 
-    public MiniLoadTxnCommitAttachment(TMiniLoadTxnCommitAttachment tMiniLoadTxnCommitAttachment) {
-        super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
-        this.loadedRows = tMiniLoadTxnCommitAttachment.getLoadedRows();
-        this.filteredRows = tMiniLoadTxnCommitAttachment.getFilteredRows();
-        if (tMiniLoadTxnCommitAttachment.isSetErrorLogUrl()) {
-            this.errorLogUrl = tMiniLoadTxnCommitAttachment.getErrorLogUrl();
-        }
-    }
-
     public long getLoadedRows() {
         return loadedRows;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index da945704eb..d837c3e1f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -43,15 +43,12 @@ import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.collect.Streams;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.awaitility.Awaitility;
@@ -64,7 +61,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 // Class used to record state of multi-load operation
 public class MultiLoadMgr {
@@ -107,18 +103,6 @@ public class MultiLoadMgr {
         Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromMultiStart(fullDbName, label);
     }
 
-    public void load(TMiniLoadRequest request) throws DdlException {
-        if (CollectionUtils.isNotEmpty(request.getFileSize())
-                && request.getFileSize().size() != request.getFiles().size()) {
-            throw new DdlException("files count and file size count not match: [" + request.getFileSize().size()
-                    + "!=" + request.getFiles().size() + "]");
-        }
-        List<Pair<String, Long>> files = Streams.zip(request.getFiles().stream(),
-                        request.getFileSize().stream(), Pair::create).collect(Collectors.toList());
-        load(request.getDb(), request.getLabel(), request.getSubLabel(), request.getTbl(),
-                files, request.getBackend(), request.getProperties(), request.getTimestamp());
-    }
-
     // Add one load job
     private void load(String fullDbName, String label,
                      String subLabel, String table,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index fc2bc0785c..31e44cd711 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -43,16 +43,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.datasource.DataSourceIf;
 import org.apache.doris.datasource.InternalDataSource;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.MiniEtlTaskInfo;
 import org.apache.doris.master.MasterImpl;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.StreamLoadPlanner;
-import org.apache.doris.plugin.AuditEvent;
-import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
-import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.QeProcessorImpl;
@@ -77,10 +71,8 @@ import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
-import org.apache.doris.thrift.TIsMethodSupportedRequest;
 import org.apache.doris.thrift.TListPrivilegesResult;
 import org.apache.doris.thrift.TListTableStatusResult;
-import org.apache.doris.thrift.TLoadCheckRequest;
 import org.apache.doris.thrift.TLoadTxn2PCRequest;
 import org.apache.doris.thrift.TLoadTxn2PCResult;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
@@ -92,10 +84,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TMasterResult;
-import org.apache.doris.thrift.TMiniLoadBeginRequest;
-import org.apache.doris.thrift.TMiniLoadBeginResult;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TReportExecStatusParams;
@@ -109,9 +97,7 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
 import org.apache.doris.thrift.TTableStatus;
-import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
-import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
 import org.apache.doris.transaction.DatabaseTransactionMgr;
@@ -121,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -130,8 +115,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -479,197 +462,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return masterImpl.fetchResource();
     }
 
-    @Deprecated
-    @Override
-    public TFeResult miniLoad(TMiniLoadRequest request) throws TException {
-        LOG.debug("receive mini load request: label: {}, db: {}, tbl: {}, backend: {}",
-                request.getLabel(), request.getDb(), request.getTbl(), request.getBackend());
-
-        ConnectContext context = new ConnectContext(null);
-        String cluster = SystemInfoService.DEFAULT_CLUSTER;
-        if (request.isSetCluster()) {
-            cluster = request.cluster;
-        }
-
-        final String fullDbName = ClusterNamespace.getFullName(cluster, request.db);
-        request.setDb(fullDbName);
-        context.setCluster(cluster);
-        context.setDatabase(ClusterNamespace.getFullName(cluster, request.db));
-        context.setQualifiedUser(ClusterNamespace.getFullName(cluster, request.user));
-        context.setCatalog(Catalog.getCurrentCatalog());
-        context.getState().reset();
-        context.setThreadLocalInfo();
-
-        TStatus status = new TStatus(TStatusCode.OK);
-        TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
-        try {
-            if (request.isSetSubLabel()) {
-                ExecuteEnv.getInstance().getMultiLoadMgr().load(request);
-            } else {
-                // try to add load job, label will be checked here.
-                if (Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromRequest(request)) {
-                    try {
-                        // generate mini load audit log
-                        logMiniLoadStmt(request);
-                    } catch (Exception e) {
-                        LOG.warn("failed log mini load stmt", e);
-                    }
-                }
-            }
-        } catch (UserException e) {
-            LOG.warn("add mini load error: {}", e.getMessage());
-            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
-            status.addToErrorMsgs(e.getMessage());
-        } catch (Throwable e) {
-            LOG.warn("unexpected exception when adding mini load", e);
-            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
-            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
-        } finally {
-            ConnectContext.remove();
-        }
-
-        LOG.debug("mini load result: {}", result);
-        return result;
-    }
-
-    private void logMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
-        String stmt = getMiniLoadStmt(request);
-        AuditEvent auditEvent = new AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
-                .setClientIp(request.user_ip + ":0")
-                .setUser(request.user)
-                .setDb(request.db)
-                .setState(TStatusCode.OK.name())
-                .setQueryTime(0)
-                .setStmt(stmt).build();
-
-        Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
-    }
-
-    private String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("curl --location-trusted -u user:passwd -T ");
-
-        if (request.files.size() == 1) {
-            stringBuilder.append(request.files.get(0));
-        } else if (request.files.size() > 1) {
-            stringBuilder.append("\"{").append(Joiner.on(",").join(request.files)).append("}\"");
-        }
-
-        InetAddress masterAddress = FrontendOptions.getLocalHost();
-        stringBuilder.append(" http://").append(masterAddress.getHostAddress()).append(":");
-        stringBuilder.append(Config.http_port).append("/api/").append(request.db).append("/");
-        stringBuilder.append(request.tbl).append("/_load?label=").append(request.label);
-
-        if (!request.properties.isEmpty()) {
-            stringBuilder.append("&");
-            List<String> props = Lists.newArrayList();
-            for (Map.Entry<String, String> entry : request.properties.entrySet()) {
-                String prop = entry.getKey() + "=" + entry.getValue();
-                props.add(prop);
-            }
-            stringBuilder.append(Joiner.on("&").join(props));
-        }
-
-        return stringBuilder.toString();
-    }
-
-    @Override
-    public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request) throws TException {
-        TFeResult result = new TFeResult();
-        result.setProtocolVersion(FrontendServiceVersion.V1);
-        TStatus status = new TStatus(TStatusCode.OK);
-        result.setStatus(status);
-
-        // get job task info
-        TUniqueId etlTaskId = request.getEtlTaskId();
-        long jobId = etlTaskId.getHi();
-        long taskId = etlTaskId.getLo();
-        LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(jobId);
-        if (job == null) {
-            String failMsg = "job does not exist. id: " + jobId;
-            LOG.warn(failMsg);
-            status.setStatusCode(TStatusCode.CANCELLED);
-            status.addToErrorMsgs(failMsg);
-            return result;
-        }
-
-        MiniEtlTaskInfo taskInfo = job.getMiniEtlTask(taskId);
-        if (taskInfo == null) {
-            String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId;
-            LOG.warn(failMsg);
-            status.setStatusCode(TStatusCode.CANCELLED);
-            status.addToErrorMsgs(failMsg);
-            return result;
-        }
-
-        // update etl task status
-        TMiniLoadEtlStatusResult statusResult = request.getEtlTaskStatus();
-        LOG.debug("load job id: {}, etl task id: {}, status: {}", jobId, taskId, statusResult);
-        EtlStatus taskStatus = taskInfo.getTaskStatus();
-        if (taskStatus.setState(statusResult.getEtlState())) {
-            if (statusResult.isSetCounters()) {
-                taskStatus.setCounters(statusResult.getCounters());
-            }
-            if (statusResult.isSetTrackingUrl()) {
-                taskStatus.setTrackingUrl(statusResult.getTrackingUrl());
-            }
-            if (statusResult.isSetFileMap()) {
-                taskStatus.setFileMap(statusResult.getFileMap());
-            }
-        }
-        return result;
-    }
-
-    @Override
-    public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) throws TException {
-        LOG.debug("receive mini load begin request. label: {}, user: {}, ip: {}",
-                request.getLabel(), request.getUser(), request.getUserIp());
-
-        TMiniLoadBeginResult result = new TMiniLoadBeginResult();
-        TStatus status = new TStatus(TStatusCode.OK);
-        result.setStatus(status);
-        try {
-            String cluster = SystemInfoService.DEFAULT_CLUSTER;
-            if (request.isSetCluster()) {
-                cluster = request.cluster;
-            }
-            // step1: check password and privs
-            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
-                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
-            // step2: check label and record metadata in load manager
-            if (request.isSetSubLabel()) {
-                // TODO(ml): multi mini load
-            } else {
-                // add load metadata in loadManager
-                result.setTxnId(Catalog.getCurrentCatalog().getLoadManager().createLoadJobFromMiniLoad(request));
-            }
-            return result;
-        } catch (UserException e) {
-            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
-            status.addToErrorMsgs(e.getMessage());
-            return result;
-        } catch (Throwable e) {
-            LOG.warn("catch unknown result.", e);
-            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
-            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
-            return result;
-        }
-    }
-
-    @Override
-    public TFeResult isMethodSupported(TIsMethodSupportedRequest request) throws TException {
-        TStatus status = new TStatus(TStatusCode.OK);
-        TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
-        switch (request.getFunctionName()) {
-            case "STREAMING_MINI_LOAD":
-                break;
-            default:
-                status.setStatusCode(TStatusCode.NOT_IMPLEMENTED_ERROR);
-                break;
-        }
-        return result;
-    }
-
     @Override
     public TMasterOpResult forward(TMasterOpRequest params) throws TException {
         TNetworkAddress clientAddr = getClientAddr();
@@ -723,35 +515,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
     }
 
-    @Override
-    public TFeResult loadCheck(TLoadCheckRequest request) throws TException {
-        LOG.debug("receive load check request. label: {}, user: {}, ip: {}",
-                request.getLabel(), request.getUser(), request.getUserIp());
-
-        TStatus status = new TStatus(TStatusCode.OK);
-        TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
-        try {
-            String cluster = SystemInfoService.DEFAULT_CLUSTER;
-            if (request.isSetCluster()) {
-                cluster = request.cluster;
-            }
-
-            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
-                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
-        } catch (UserException e) {
-            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
-            status.addToErrorMsgs(e.getMessage());
-            return result;
-        } catch (Throwable e) {
-            LOG.warn("catch unknown result.", e);
-            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
-            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
-            return result;
-        }
-
-        return result;
-    }
-
     @Override
     public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
         String clientAddr = getClientAddrAsString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
index 4eb52dea1d..f90aa67358 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java
@@ -21,14 +21,9 @@ import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Status;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TAgentResult;
-import org.apache.doris.thrift.TAgentServiceVersion;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
 import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TSnapshotRequest;
 import org.apache.doris.thrift.TStatus;
@@ -52,22 +47,6 @@ public class AgentClient {
         this.port = port;
     }
 
-    public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) {
-        TAgentResult result = null;
-        LOG.debug("submit etl task. request: {}", request);
-        try {
-            borrowClient();
-            // submit etl task
-            result = client.submitEtlTask(request);
-            ok = true;
-        } catch (Exception e) {
-            LOG.warn("submit etl task error", e);
-        } finally {
-            returnClient();
-        }
-        return result;
-    }
-
     public TAgentResult makeSnapshot(TSnapshotRequest request) {
         TAgentResult result = null;
         LOG.debug("submit make snapshot task. request: {}", request);
@@ -116,24 +95,6 @@ public class AgentClient {
         return result;
     }
 
-    public TMiniLoadEtlStatusResult getEtlStatus(long jobId, long taskId) {
-        TMiniLoadEtlStatusResult result = null;
-        TMiniLoadEtlStatusRequest request = new TMiniLoadEtlStatusRequest(TAgentServiceVersion.V1,
-                new TUniqueId(jobId, taskId));
-        LOG.debug("get mini load etl task status. request: {}", request);
-        try {
-            borrowClient();
-            // get etl status
-            result = client.getEtlStatus(request);
-            ok = true;
-        } catch (Exception e) {
-            LOG.warn("get etl status error", e);
-        } finally {
-            returnClient();
-        }
-        return result;
-    }
-
     public TExportStatusResult getExportStatus(long jobId, long taskId) {
         TExportStatusResult result = null;
         TUniqueId request = new TUniqueId(jobId, taskId);
@@ -183,22 +144,6 @@ public class AgentClient {
         return result;
     }
 
-    public void deleteEtlFiles(long dbId, long jobId, String dbName, String label) {
-        TDeleteEtlFilesRequest request = new TDeleteEtlFilesRequest(TAgentServiceVersion.V1,
-                new TUniqueId(dbId, jobId), dbName, label);
-        LOG.debug("delete etl files. request: {}", request);
-        try {
-            borrowClient();
-            // delete etl files
-            client.deleteEtlFiles(request);
-            ok = true;
-        } catch (Exception e) {
-            LOG.warn("delete etl files error", e);
-        } finally {
-            returnClient();
-        }
-    }
-
     private void borrowClient() throws Exception {
         // create agent client
         ok = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
index c167c1a662..4c7d27de5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
@@ -47,8 +47,6 @@ public abstract class TxnCommitAttachment implements Writable {
             switch (txnCommitAttachment.getLoadType()) {
                 case ROUTINE_LOAD:
                     return new RLTaskTxnCommitAttachment(txnCommitAttachment.getRlTaskTxnCommitAttachment());
-                case MINI_LOAD:
-                    return new MiniLoadTxnCommitAttachment(txnCommitAttachment.getMlTxnCommitAttachment());
                 default:
                     return null;
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 0fa7161a5e..09793577a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -25,7 +25,6 @@ import org.apache.doris.thrift.TAgentTaskRequest;
 import org.apache.doris.thrift.TCancelPlanFragmentParams;
 import org.apache.doris.thrift.TCancelPlanFragmentResult;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
 import org.apache.doris.thrift.TDiskTrashInfo;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
@@ -33,9 +32,6 @@ import org.apache.doris.thrift.TExportStatusResult;
 import org.apache.doris.thrift.TExportTaskRequest;
 import org.apache.doris.thrift.TFetchDataParams;
 import org.apache.doris.thrift.TFetchDataResult;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TRoutineLoadTask;
@@ -156,21 +152,6 @@ public class GenericPoolTest {
             return null;
         }
 
-        @Override
-        public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
-            return null;
-        }
-
-        @Override
-        public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
-            return null;
-        }
-
-        @Override
-        public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
-            return null;
-        }
-
         @Override
         public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException {
             // TODO Auto-generated method stub
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 7c9c513572..e49503ca04 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -34,9 +34,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
 import org.apache.doris.thrift.TCancelPlanFragmentResult;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
 import org.apache.doris.thrift.TCloneReq;
-import org.apache.doris.thrift.TDeleteEtlFilesRequest;
 import org.apache.doris.thrift.TDiskTrashInfo;
-import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportState;
@@ -47,9 +45,6 @@ import org.apache.doris.thrift.TFetchDataResult;
 import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TMasterInfo;
-import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -243,21 +238,6 @@ public class MockedBackendFactory {
             return new TAgentResult(new TStatus(TStatusCode.OK));
         }
 
-        @Override
-        public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
-            return new TAgentResult(new TStatus(TStatusCode.OK));
-        }
-
-        @Override
-        public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
-            return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED);
-        }
-
-        @Override
-        public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
-            return new TAgentResult(new TStatus(TStatusCode.OK));
-        }
-
         @Override
         public TStatus submitExportTask(TExportTaskRequest request) throws TException {
             return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 740db1fd08..83f68834f8 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -409,29 +409,3 @@ struct TAgentPublishRequest {
     2: required list<TTopicUpdate> updates
 }
 
-struct TMiniLoadEtlTaskRequest {
-    1: required TAgentServiceVersion protocol_version
-    2: required PaloInternalService.TExecPlanFragmentParams params
-}
-
-struct TMiniLoadEtlStatusRequest {
-    1: required TAgentServiceVersion protocol_version
-    2: required Types.TUniqueId mini_load_id
-}
-
-struct TMiniLoadEtlStatusResult {
-    1: required Status.TStatus status
-    2: required Types.TEtlState etl_state
-    3: optional map<string, i64> file_map
-    4: optional map<string, string> counters
-    5: optional string tracking_url
-    // progress
-}
-
-struct TDeleteEtlFilesRequest {
-    1: required TAgentServiceVersion protocol_version
-    2: required Types.TUniqueId mini_load_id
-    3: required string db_name
-    4: required string label
-}
-
diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift
index 8e534f4677..66a59260fc 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -146,13 +146,6 @@ service BackendService {
 
     AgentService.TAgentResult publish_cluster_state(1:AgentService.TAgentPublishRequest request);
 
-    AgentService.TAgentResult submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request);
-
-    AgentService.TMiniLoadEtlStatusResult get_etl_status(
-            1:AgentService.TMiniLoadEtlStatusRequest request);
-
-    AgentService.TAgentResult delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request);
-
     Status.TStatus submit_export_task(1:TExportTaskRequest request);
 
     PaloInternalService.TExportStatusResult get_export_status(1:Types.TUniqueId task_id);
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index b2ef1a1143..ff59b203ba 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -411,31 +411,6 @@ struct TFeResult {
     2: required Status.TStatus status
 }
 
-// Submit one table load job
-// if subLabel is set, this job belong to a multi-load transaction
-struct TMiniLoadRequest {
-    1: required FrontendServiceVersion protocolVersion
-    2: required string db
-    3: required string tbl
-    4: required string label
-    5: optional string user
-    6: required Types.TNetworkAddress backend
-    7: required list<string> files
-    8: required map<string, string> properties
-    9: optional string subLabel
-    10: optional string cluster
-    11: optional i64 timestamp
-    12: optional string user_ip
-    13: optional bool is_retry
-    14: optional list<i64> file_size
-}
-
-struct TUpdateMiniEtlTaskStatusRequest {
-    1: required FrontendServiceVersion protocolVersion
-    2: required Types.TUniqueId etlTaskId
-    3: required AgentService.TMiniLoadEtlStatusResult etlTaskStatus
-}
-
 struct TMasterOpRequest {
     1: required string user
     2: required string db
@@ -483,44 +458,6 @@ struct TMasterOpResult {
     4: optional Types.TUniqueId queryId;
 }
 
-struct TLoadCheckRequest {
-    1: required FrontendServiceVersion protocolVersion
-    2: required string user
-    3: required string passwd
-    4: required string db
-    5: optional string label
-    6: optional string cluster
-    7: optional i64 timestamp
-    8: optional string user_ip
-    9: optional string tbl
-}
-
-struct TMiniLoadBeginRequest {
-    1: required string user
-    2: required string passwd
-    3: optional string cluster
-    4: optional string user_ip
-    5: required string db
-    6: required string tbl
-    7: required string label
-    8: optional string sub_label
-    9: optional i64 timeout_second
-    10: optional double max_filter_ratio 
-    11: optional i64 auth_code
-    12: optional i64 create_timestamp
-    13: optional Types.TUniqueId request_id
-    14: optional string auth_code_uuid
-}
-
-struct TIsMethodSupportedRequest {
-    1: optional string function_name
-}
-
-struct TMiniLoadBeginResult {
-    1: required Status.TStatus status
-    2: optional i64 txn_id
-}
-
 struct TUpdateExportTaskStatusRequest {
     1: required FrontendServiceVersion protocolVersion
     2: required Types.TUniqueId taskId
@@ -627,16 +564,10 @@ struct TRLTaskTxnCommitAttachment {
     11: optional string errorLogUrl
 }
 
-struct TMiniLoadTxnCommitAttachment {
-    1: required i64 loadedRows
-    2: required i64 filteredRows
-    3: optional string errorLogUrl
-} 
-
 struct TTxnCommitAttachment {
     1: required Types.TLoadType loadType
     2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
-    3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment 
+//    3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment 
 }
 
 struct TLoadTxnCommitRequest {
@@ -751,14 +682,6 @@ service FrontendService {
     MasterService.TMasterResult finishTask(1: MasterService.TFinishTaskRequest request)
     MasterService.TMasterResult report(1: MasterService.TReportRequest request)
     MasterService.TFetchResourceResult fetchResource()
-    
-    // those three method are used for asynchronous mini load which will be abandoned
-    TFeResult miniLoad(1: TMiniLoadRequest request)
-    TFeResult updateMiniEtlTaskStatus(1: TUpdateMiniEtlTaskStatusRequest request)
-    TFeResult loadCheck(1: TLoadCheckRequest request)
-    // this method is used for streaming mini load
-    TMiniLoadBeginResult miniLoadBegin(1: TMiniLoadBeginRequest request)
-    TFeResult isMethodSupported(1: TIsMethodSupportedRequest request)
 
     TMasterOpResult forward(1: TMasterOpRequest params)
 
diff --git a/samples/mini_load/python/mini_load_utils.py b/samples/mini_load/python/mini_load_utils.py
deleted file mode 100644
index 88753680e0..0000000000
--- a/samples/mini_load/python/mini_load_utils.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-"""
-
-import os
-import subprocess
-
-
-class DorisMiniLoadClient(object):
-    """ load file to doris """
-
-    def __init__(self, db_host, db_port, db_name, 
-                db_user, db_password, file_name, table, load_timeout):
-        """
-        init
-        :param db_host: db host
-        :param db_port: db port
-        :param db_name: db name
-        :param db_user: db user
-        :param db_password: db password
-        :param file_name: local file path
-        :param table: db table
-        :param load_timeout:mini load timeout, defalut 86400 seconds.
-        """
-        self.file_name = file_name
-        self.table = table
-        self.load_host = db_host
-        self.load_port = db_port
-        self.load_database = db_name
-        self.load_user = db_user
-        self.load_password = db_password
-        self.load_timeout = load_timeout
-
-    def get_label(self):
-        """
-        获取label前缀
-        :return: label
-        """
-
-        return '_'.join([self.table, os.path.basename(self.file_name)])
-
-    def load_doris(self):
-        """
-        load file to doris by curl, allow 3 times to retry.
-        :return: mini load label
-        """
-        retry_time = 0
-        label = self.get_label()
-
-        while retry_time < 3:
-            load_cmd = "curl"
-            param_location = "--location-trusted"
-            param_user = "%s:%s" % (self.load_user, self.load_password)
-            param_file = "%s" % self.file_name
-            param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port,
-                                                                   self.load_database,
-                                                                   self.table, label, self.load_timeout)
-
-            load_subprocess = subprocess.Popen([load_cmd, param_location,
-                                                "-u", param_user, "-T", param_file, param_url])
-
-            # Wait for child process to terminate.  Returns returncode attribute
-            load_subprocess.wait()
-
-            # check returncode;
-            # If fail, retry 3 times
-            if load_subprocess.returncode != 0:
-                print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time)
-                retry_time += 1
-            # If success, print log, and break retry loop
-            if load_subprocess.returncode == 0:
-                print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time)
-                break
-
-        return label
-
-    @classmethod
-    def check_load_status(cls, label, host, port, user, password, database):
-        """
-        check async mini load process status.
-        :param label:mini load label
-        :param host: db host
-        :param port: db port
-        :param user: db user
-        :param password: db password
-        :param database: db database
-        :return: check async mini load process status.
-        """
-
-        db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database)
-
-        db_cursor = db_conn.cursor()
-        check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label
-
-        db_cursor.execute(check_status_sql)
-        rows = db_cursor.fetchall()
-
-        # timeout config: 60 minutes.
-        timeout = 60 * 60
-
-        while timeout > 0:
-            if len(rows) == 0:
-                print """Load label: %s doesn't exist""" % label
-                return 
-            load_status = rows[0][2]
-            print "mini load status: " + load_status
-            if load_status == 'FINISHED':
-                print """Async mini load to db success! label is %s""" % label
-                break
-            if load_status == 'CANCELLED':
-                print """Async load to db failed! label is %s""" % label
-                break
-            timeout = timeout - 5
-            time.sleep(5)
-            db_cursor.execute(sql)
-            rows = db_cursor.fetchall()
-
-        if time_out <= 0:
-            print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label)
-
-
-if __name__ == '__main__':
-    """
-    mini_load demo.
-    There is no need to install subprocess in Python 2.7. It is a standard module that is built in.
-    You need input db config & load param.
-    """
-    db_host = "db_conn_host"
-    db_port = "port"
-    db_name = "db_name"
-    db_user = "db_user"
-    db_password = "db_password"
-    file_name = "file_name"
-    table = "db_table"
-    # default load_time_out, seconds
-    load_timeout = 86400
-    doris_client = DorisMiniLoadClient(
-        db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout)
-    doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name)
-    print "load to doris end"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org