You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/17 02:39:10 UTC

[doris] branch master updated: Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aea719627d Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)
aea719627d is described below

commit aea719627def18e9fd9a895a63909b2c83a6d379
Author: zhengyu <fr...@gmail.com>
AuthorDate: Sat Jun 17 10:39:02 2023 +0800

    Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)
    
    This reverts commit 5b6761acb86852a93351b7b971eb2049fb567aaf.
---
 be/src/http/action/stream_load.cpp                 |  4 +-
 be/src/http/ev_http_server.cpp                     | 52 +---------------------
 be/src/http/http_request.cpp                       |  9 +---
 be/src/io/fs/stream_load_pipe.cpp                  | 25 +----------
 be/src/io/fs/stream_load_pipe.h                    | 11 +----
 be/src/runtime/exec_env.cpp                        |  2 +-
 be/src/runtime/exec_env.h                          |  6 ---
 be/src/runtime/exec_env_init.cpp                   | 38 ----------------
 be/src/runtime/fragment_mgr.cpp                    |  2 +-
 be/src/runtime/stream_load/stream_load_context.cpp |  3 --
 be/src/runtime/stream_load/stream_load_context.h   |  5 ---
 be/src/util/byte_buffer.h                          | 15 +------
 12 files changed, 11 insertions(+), 161 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 33a526cd81..fdf6f6eaca 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
-              << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit;
+              << ", tbl=" << ctx->table;
 
     auto st = _on_header(req, ctx);
     if (!st.ok()) {
@@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (ctx->use_streaming) {
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-                ctx->body_bytes /* total_length */, false, ctx->id);
+                ctx->body_bytes /* total_length */);
         request.fileType = TFileType::FILE_STREAM;
         ctx->body_sink = pipe;
         ctx->pipe = pipe;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index bdc6d420a7..b0743baee3 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -22,7 +22,6 @@
 #include <butil/fd_utility.h>
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
-#include <event.h>
 #include <event2/event.h>
 #include <event2/http.h>
 #include <event2/http_struct.h>
@@ -36,14 +35,12 @@
 #include <memory>
 #include <sstream>
 
-#include "bvar/bvar.h"
 #include "common/logging.h"
 #include "http/http_channel.h"
 #include "http/http_handler.h"
 #include "http/http_headers.h"
 #include "http/http_request.h"
 #include "http/http_status.h"
-#include "mutex"
 #include "service/backend_options.h"
 #include "util/threadpool.h"
 
@@ -51,50 +48,15 @@ struct event_base;
 struct evhttp;
 
 namespace doris {
-static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map;
-static std::mutex g_conn_req_map_lock;
 
 static void on_chunked(struct evhttp_request* ev_req, void* param) {
     HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
     request->handler()->on_chunk_data(request);
 }
 
-static void on_close(evhttp_connection* con, void* arg) {
-    HttpRequest* request = (HttpRequest*)arg;
-    {
-        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
-        auto itr = g_conn_req_map.find(con);
-        if (itr != g_conn_req_map.end()) {
-            if (itr->second) {
-                if (itr->second != request) {
-                    LOG(WARNING) << "close connection. connection=" << con
-                                 << " current HttpRequest=" << request
-                                 << " but orginal HttpRequest=" << itr->second;
-                }
-                delete itr->second;
-            }
-            g_conn_req_map.erase(con);
-        }
-    }
-}
-
 static void on_free(struct evhttp_request* ev_req, void* arg) {
     HttpRequest* request = (HttpRequest*)arg;
-    {
-        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
-        auto itr = g_conn_req_map.find(ev_req->evcon);
-        if (itr != g_conn_req_map.end()) {
-            if (itr->second) {
-                if (itr->second != request) {
-                    LOG(WARNING) << "free request. connection=" << ev_req->evcon
-                                 << " current HttpRequest=" << request
-                                 << " but orginal HttpRequest=" << itr->second;
-                }
-                delete itr->second;
-            }
-            g_conn_req_map.erase(ev_req->evcon);
-        }
-    }
+    delete request;
 }
 
 static void on_request(struct evhttp_request* ev_req, void* arg) {
@@ -163,7 +125,6 @@ void EvHttpServer::start() {
                           std::shared_ptr<evhttp> http(evhttp_new(base.get()),
                                                        [](evhttp* http) { evhttp_free(http); });
                           CHECK(http != nullptr) << "Couldn't create an evhttp.";
-                          evhttp_set_timeout(http.get(), 60 /* timeout in seconds */);
 
                           auto res = evhttp_accept_socket(http.get(), _server_fd);
                           CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
@@ -300,18 +261,7 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) {
         evhttp_request_set_chunked_cb(ev_req, on_chunked);
     }
 
-    {
-        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
-        g_conn_req_map.erase(ev_req->evcon);
-        g_conn_req_map[ev_req->evcon] = request.get();
-    }
-
-    struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req);
-    evhttp_connection_set_closecb(httpcon, on_close, request.get());
     evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
-    struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon);
-    if (bufev) bufferevent_enable(bufev, EV_READ);
-
     return 0;
 }
 
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 97274d0e01..14bde591b4 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -27,22 +27,15 @@
 #include <unordered_map>
 #include <utility>
 
-#include "bvar/bvar.h"
 #include "http/http_handler.h"
-#include "util/url_coding.h"
 
 namespace doris {
 
 static std::string s_empty = "";
 
-bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt");
-
-HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {
-    g_http_request_cnt << 1;
-}
+HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {}
 
 HttpRequest::~HttpRequest() {
-    g_http_request_cnt << -1;
     if (_handler_ctx != nullptr) {
         DCHECK(_handler != nullptr);
         _handler->free_handler_ctx(_handler_ctx);
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index 2940a45e4d..25099c8796 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -33,31 +33,20 @@ namespace doris {
 namespace io {
 class IOContext;
 
-std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes;
-std::mutex g_streamloadpipes_lock;
-
 StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
-                               int64_t total_length, bool use_proto, UniqueId id)
+                               int64_t total_length, bool use_proto)
         : _buffered_bytes(0),
           _proto_buffered_bytes(0),
           _max_buffered_bytes(max_buffered_bytes),
           _min_chunk_size(min_chunk_size),
           _total_length(total_length),
-          _use_proto(use_proto),
-          _id(id) {
-    std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
-    g_streamloadpipes[_id] = this;
-}
+          _use_proto(use_proto) {}
 
 StreamLoadPipe::~StreamLoadPipe() {
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
     while (!_buf_queue.empty()) {
         _buf_queue.pop_front();
     }
-    {
-        std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
-        g_streamloadpipes.erase(_id);
-    }
 }
 
 Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
@@ -124,7 +113,6 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
 }
 
 Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
-    _last_active = GetCurrentTimeMicros();
     ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
     buf->put_bytes(data, size);
     buf->flip();
@@ -132,7 +120,6 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr
 }
 
 Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
-    _last_active = GetCurrentTimeMicros();
     PDataRow* row_ptr = row.get();
     {
         std::unique_lock<std::mutex> l(_lock);
@@ -143,7 +130,6 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
 }
 
 Status StreamLoadPipe::append(const char* data, size_t size) {
-    _last_active = GetCurrentTimeMicros();
     size_t pos = 0;
     if (_write_buf != nullptr) {
         if (size < _write_buf->remaining()) {
@@ -167,7 +153,6 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
 }
 
 Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
-    _last_active = GetCurrentTimeMicros();
     if (_write_buf != nullptr) {
         _write_buf->flip();
         RETURN_IF_ERROR(_append(_write_buf));
@@ -241,7 +226,6 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
 
 // called when producer finished
 Status StreamLoadPipe::finish() {
-    LOG(INFO) << "finish pipe=" << this;
     if (_write_buf != nullptr) {
         _write_buf->flip();
         _append(_write_buf);
@@ -257,16 +241,11 @@ Status StreamLoadPipe::finish() {
 
 // called when producer/consumer failed
 void StreamLoadPipe::cancel(const std::string& reason) {
-    LOG(INFO) << "cancel pipe=" << this;
     {
         std::lock_guard<std::mutex> l(_lock);
         _cancelled = true;
         _cancelled_reason = reason;
     }
-    {
-        std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
-        g_streamloadpipes.erase(_id);
-    }
     _get_cond.notify_all();
     _put_cond.notify_all();
 }
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index e41f5162ac..848175ce9a 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -34,13 +34,10 @@
 #include "runtime/message_body_sink.h"
 #include "util/byte_buffer.h"
 #include "util/slice.h"
-#include "util/time.h"
-#include "util/uid_util.h"
 
 namespace doris {
 namespace io {
 class IOContext;
-class StreamLoadPipe;
 
 static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
 
@@ -48,7 +45,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
     StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
                    size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
-                   bool use_proto = false, UniqueId id = UniqueId(0, 0));
+                   bool use_proto = false);
     ~StreamLoadPipe() override;
 
     Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0);
@@ -81,10 +78,6 @@ public:
 
     size_t get_queue_size() { return _buf_queue.size(); }
 
-    bool is_cancelled() { return _cancelled; }
-    bool is_finished() { return _finished; }
-    uint64_t last_active() { return _last_active; }
-
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -116,8 +109,6 @@ private:
     std::condition_variable _get_cond;
 
     ByteBufferPtr _write_buf;
-    UniqueId _id;
-    uint64_t _last_active = 0;
 
     // no use, only for compatibility with the `Path` interface
     Path _path = "";
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 9215479f62..8f63d6fe6a 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -21,7 +21,7 @@
 
 namespace doris {
 
-ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {}
+ExecEnv::ExecEnv() : _is_init(false) {}
 
 ExecEnv::~ExecEnv() {}
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 00a58b956b..46e1157820 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -28,8 +28,6 @@
 
 #include "common/status.h"
 #include "olap/options.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
 #include "util/threadpool.h"
 
 namespace doris {
@@ -193,8 +191,6 @@ private:
     void _register_metrics();
     void _deregister_metrics();
 
-    void _check_streamloadpipe();
-
     bool _is_init;
     std::vector<StorePath> _store_paths;
     // path => store index
@@ -260,8 +256,6 @@ private:
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
-    CountDownLatch _check_streamloadpipe_latch;
-    scoped_refptr<Thread> _check_streamloadpipe_thread;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 40cccf0b77..d524fa13e2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -27,7 +27,6 @@
 #include <limits>
 #include <map>
 #include <memory>
-#include <mutex>
 #include <ostream>
 #include <string>
 #include <unordered_map>
@@ -38,7 +37,6 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/file_meta_cache.h"
-#include "io/fs/stream_load_pipe.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
 #include "olap/page_cache.h"
@@ -79,8 +77,6 @@
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
-#include "util/time.h"
-#include "util/uid_util.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
@@ -89,15 +85,7 @@
 #include "runtime/memory/tcmalloc_hook.h"
 #endif
 
-namespace doris::io {
-extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes;
-extern std::mutex g_streamloadpipes_lock;
-} // namespace doris::io
-
 namespace doris {
-
-using namespace io;
-
 class PBackendService_Stub;
 class PFunctionService_Stub;
 
@@ -107,9 +95,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);
 
-bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
-bvar::Adder<uint64_t> g_byte_buffer_cnt;
-
 Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
     return env->_init(store_paths);
 }
@@ -195,16 +180,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
 
     _init_mem_env();
 
-    RETURN_IF_ERROR(Thread::create(
-            "ExecEnv", "check_streamloadpipe",
-            [this]() {
-                uint32_t interval = 300;
-                while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) {
-                    _check_streamloadpipe();
-                }
-            },
-            &_check_streamloadpipe_thread));
-
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     _heartbeat_flags = new HeartbeatFlags();
     _register_metrics();
@@ -232,19 +207,6 @@ Status ExecEnv::init_pipeline_task_scheduler() {
     return Status::OK();
 }
 
-void ExecEnv::_check_streamloadpipe() {
-    uint64_t now = GetCurrentTimeMicros();
-    std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
-    for (auto& pipe : g_streamloadpipes) {
-        if (pipe.second == nullptr || pipe.second->is_cancelled()) {
-            continue;
-        }
-        uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000;
-        LOG(INFO) << "active StreamLoadPipe=" << pipe.second
-                  << " diff_time_from_last_append=" << diff_s;
-    }
-}
-
 Status ExecEnv::_init_mem_env() {
     bool is_percent = false;
     std::stringstream ss;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4581768199..648c9373d1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
         stream_load_ctx->need_rollback = true;
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-                -1 /* total_length */, true /* use_proto */, stream_load_ctx->id);
+                -1 /* total_length */, true /* use_proto */);
         stream_load_ctx->body_sink = pipe;
         stream_load_ctx->pipe = pipe;
         stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp
index b0002c98dc..f381ba097d 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -28,14 +28,11 @@
 #include <new>
 #include <sstream>
 
-#include "bvar/bvar.h"
 #include "common/logging.h"
 
 namespace doris {
 using namespace ErrorCode;
 
-bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt");
-
 std::string StreamLoadContext::to_json() const {
     rapidjson::StringBuffer s;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index c35d949780..0e004b12f5 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -31,7 +31,6 @@
 #include <utility>
 #include <vector>
 
-#include "bvar/bvar.h"
 #include "common/status.h"
 #include "common/utils.h"
 #include "runtime/exec_env.h"
@@ -87,19 +86,15 @@ public:
 
 class MessageBodySink;
 
-extern bvar::Adder<int64> g_streamloadctx_obj_cnt;
-
 class StreamLoadContext {
     ENABLE_FACTORY_CREATOR(StreamLoadContext);
 
 public:
     StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
         start_millis = UnixMillis();
-        g_streamloadctx_obj_cnt << 1;
     }
 
     ~StreamLoadContext() {
-        g_streamloadctx_obj_cnt << -1;
         if (need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(this);
             need_rollback = false;
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index 9c296030c5..aab8fd42db 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -22,14 +22,10 @@
 #include <cstddef>
 #include <memory>
 
-#include "bvar/bvar.h"
 #include "common/logging.h"
 
 namespace doris {
 
-extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
-extern bvar::Adder<uint64_t> g_byte_buffer_cnt;
-
 struct ByteBuffer;
 using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
 
@@ -39,11 +35,7 @@ struct ByteBuffer {
         return ptr;
     }
 
-    ~ByteBuffer() {
-        delete[] ptr;
-        g_byte_buffer_allocate_kb << -(capacity / 1024);
-        g_byte_buffer_cnt << -1;
-    }
+    ~ByteBuffer() { delete[] ptr; }
 
     void put_bytes(const char* data, size_t size) {
         memcpy(ptr + pos, data, size);
@@ -71,10 +63,7 @@ struct ByteBuffer {
 
 private:
     ByteBuffer(size_t capacity_)
-            : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {
-        g_byte_buffer_allocate_kb << capacity / 1024;
-        g_byte_buffer_cnt << 1;
-    }
+            : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {}
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org