You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/15 05:44:08 UTC

[doris] branch master updated: [enhencement](streamload) add on_close callback for httpserver (#20826)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b6761acb8 [enhencement](streamload) add on_close callback for httpserver (#20826)
5b6761acb8 is described below

commit 5b6761acb86852a93351b7b971eb2049fb567aaf
Author: zhengyu <fr...@gmail.com>
AuthorDate: Thu Jun 15 13:44:02 2023 +0800

    [enhencement](streamload) add on_close callback for httpserver (#20826)
    
    Sometimes connection cannot be released properly during on_free. We need
    on_close callback as the last resort.
    
    Signed-off-by: freemandealer <fr...@gmail.com>
---
 be/src/http/action/stream_load.cpp                 |  4 +-
 be/src/http/ev_http_server.cpp                     | 52 +++++++++++++++++++++-
 be/src/http/http_request.cpp                       |  9 +++-
 be/src/io/fs/stream_load_pipe.cpp                  | 25 ++++++++++-
 be/src/io/fs/stream_load_pipe.h                    | 11 ++++-
 be/src/runtime/exec_env.cpp                        |  2 +-
 be/src/runtime/exec_env.h                          |  6 +++
 be/src/runtime/exec_env_init.cpp                   | 38 ++++++++++++++++
 be/src/runtime/fragment_mgr.cpp                    |  2 +-
 be/src/runtime/stream_load/stream_load_context.cpp |  3 ++
 be/src/runtime/stream_load/stream_load_context.h   |  5 +++
 be/src/util/byte_buffer.h                          | 15 ++++++-
 12 files changed, 161 insertions(+), 11 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index fdf6f6eaca..33a526cd81 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
     ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
-              << ", tbl=" << ctx->table;
+              << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit;
 
     auto st = _on_header(req, ctx);
     if (!st.ok()) {
@@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
     if (ctx->use_streaming) {
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-                ctx->body_bytes /* total_length */);
+                ctx->body_bytes /* total_length */, false, ctx->id);
         request.fileType = TFileType::FILE_STREAM;
         ctx->body_sink = pipe;
         ctx->pipe = pipe;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index b0743baee3..bdc6d420a7 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -22,6 +22,7 @@
 #include <butil/fd_utility.h>
 // IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
+#include <event.h>
 #include <event2/event.h>
 #include <event2/http.h>
 #include <event2/http_struct.h>
@@ -35,12 +36,14 @@
 #include <memory>
 #include <sstream>
 
+#include "bvar/bvar.h"
 #include "common/logging.h"
 #include "http/http_channel.h"
 #include "http/http_handler.h"
 #include "http/http_headers.h"
 #include "http/http_request.h"
 #include "http/http_status.h"
+#include "mutex"
 #include "service/backend_options.h"
 #include "util/threadpool.h"
 
@@ -48,15 +51,50 @@ struct event_base;
 struct evhttp;
 
 namespace doris {
+static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map;
+static std::mutex g_conn_req_map_lock;
 
 static void on_chunked(struct evhttp_request* ev_req, void* param) {
     HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
     request->handler()->on_chunk_data(request);
 }
 
+static void on_close(evhttp_connection* con, void* arg) {
+    HttpRequest* request = (HttpRequest*)arg;
+    {
+        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+        auto itr = g_conn_req_map.find(con);
+        if (itr != g_conn_req_map.end()) {
+            if (itr->second) {
+                if (itr->second != request) {
+                    LOG(WARNING) << "close connection. connection=" << con
+                                 << " current HttpRequest=" << request
+                                 << " but orginal HttpRequest=" << itr->second;
+                }
+                delete itr->second;
+            }
+            g_conn_req_map.erase(con);
+        }
+    }
+}
+
 static void on_free(struct evhttp_request* ev_req, void* arg) {
     HttpRequest* request = (HttpRequest*)arg;
-    delete request;
+    {
+        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+        auto itr = g_conn_req_map.find(ev_req->evcon);
+        if (itr != g_conn_req_map.end()) {
+            if (itr->second) {
+                if (itr->second != request) {
+                    LOG(WARNING) << "free request. connection=" << ev_req->evcon
+                                 << " current HttpRequest=" << request
+                                 << " but orginal HttpRequest=" << itr->second;
+                }
+                delete itr->second;
+            }
+            g_conn_req_map.erase(ev_req->evcon);
+        }
+    }
 }
 
 static void on_request(struct evhttp_request* ev_req, void* arg) {
@@ -125,6 +163,7 @@ void EvHttpServer::start() {
                           std::shared_ptr<evhttp> http(evhttp_new(base.get()),
                                                        [](evhttp* http) { evhttp_free(http); });
                           CHECK(http != nullptr) << "Couldn't create an evhttp.";
+                          evhttp_set_timeout(http.get(), 60 /* timeout in seconds */);
 
                           auto res = evhttp_accept_socket(http.get(), _server_fd);
                           CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
@@ -261,7 +300,18 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) {
         evhttp_request_set_chunked_cb(ev_req, on_chunked);
     }
 
+    {
+        std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+        g_conn_req_map.erase(ev_req->evcon);
+        g_conn_req_map[ev_req->evcon] = request.get();
+    }
+
+    struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req);
+    evhttp_connection_set_closecb(httpcon, on_close, request.get());
     evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
+    struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon);
+    if (bufev) bufferevent_enable(bufev, EV_READ);
+
     return 0;
 }
 
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 14bde591b4..97274d0e01 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -27,15 +27,22 @@
 #include <unordered_map>
 #include <utility>
 
+#include "bvar/bvar.h"
 #include "http/http_handler.h"
+#include "util/url_coding.h"
 
 namespace doris {
 
 static std::string s_empty = "";
 
-HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {}
+bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt");
+
+HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {
+    g_http_request_cnt << 1;
+}
 
 HttpRequest::~HttpRequest() {
+    g_http_request_cnt << -1;
     if (_handler_ctx != nullptr) {
         DCHECK(_handler != nullptr);
         _handler->free_handler_ctx(_handler_ctx);
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index 25099c8796..2940a45e4d 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -33,20 +33,31 @@ namespace doris {
 namespace io {
 class IOContext;
 
+std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes;
+std::mutex g_streamloadpipes_lock;
+
 StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
-                               int64_t total_length, bool use_proto)
+                               int64_t total_length, bool use_proto, UniqueId id)
         : _buffered_bytes(0),
           _proto_buffered_bytes(0),
           _max_buffered_bytes(max_buffered_bytes),
           _min_chunk_size(min_chunk_size),
           _total_length(total_length),
-          _use_proto(use_proto) {}
+          _use_proto(use_proto),
+          _id(id) {
+    std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+    g_streamloadpipes[_id] = this;
+}
 
 StreamLoadPipe::~StreamLoadPipe() {
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
     while (!_buf_queue.empty()) {
         _buf_queue.pop_front();
     }
+    {
+        std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+        g_streamloadpipes.erase(_id);
+    }
 }
 
 Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
@@ -113,6 +124,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
 }
 
 Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
+    _last_active = GetCurrentTimeMicros();
     ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
     buf->put_bytes(data, size);
     buf->flip();
@@ -120,6 +132,7 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr
 }
 
 Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
+    _last_active = GetCurrentTimeMicros();
     PDataRow* row_ptr = row.get();
     {
         std::unique_lock<std::mutex> l(_lock);
@@ -130,6 +143,7 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
 }
 
 Status StreamLoadPipe::append(const char* data, size_t size) {
+    _last_active = GetCurrentTimeMicros();
     size_t pos = 0;
     if (_write_buf != nullptr) {
         if (size < _write_buf->remaining()) {
@@ -153,6 +167,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
 }
 
 Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
+    _last_active = GetCurrentTimeMicros();
     if (_write_buf != nullptr) {
         _write_buf->flip();
         RETURN_IF_ERROR(_append(_write_buf));
@@ -226,6 +241,7 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
 
 // called when producer finished
 Status StreamLoadPipe::finish() {
+    LOG(INFO) << "finish pipe=" << this;
     if (_write_buf != nullptr) {
         _write_buf->flip();
         _append(_write_buf);
@@ -241,11 +257,16 @@ Status StreamLoadPipe::finish() {
 
 // called when producer/consumer failed
 void StreamLoadPipe::cancel(const std::string& reason) {
+    LOG(INFO) << "cancel pipe=" << this;
     {
         std::lock_guard<std::mutex> l(_lock);
         _cancelled = true;
         _cancelled_reason = reason;
     }
+    {
+        std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+        g_streamloadpipes.erase(_id);
+    }
     _get_cond.notify_all();
     _put_cond.notify_all();
 }
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 848175ce9a..e41f5162ac 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -34,10 +34,13 @@
 #include "runtime/message_body_sink.h"
 #include "util/byte_buffer.h"
 #include "util/slice.h"
+#include "util/time.h"
+#include "util/uid_util.h"
 
 namespace doris {
 namespace io {
 class IOContext;
+class StreamLoadPipe;
 
 static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
 
@@ -45,7 +48,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
     StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
                    size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
-                   bool use_proto = false);
+                   bool use_proto = false, UniqueId id = UniqueId(0, 0));
     ~StreamLoadPipe() override;
 
     Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0);
@@ -78,6 +81,10 @@ public:
 
     size_t get_queue_size() { return _buf_queue.size(); }
 
+    bool is_cancelled() { return _cancelled; }
+    bool is_finished() { return _finished; }
+    uint64_t last_active() { return _last_active; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -109,6 +116,8 @@ private:
     std::condition_variable _get_cond;
 
     ByteBufferPtr _write_buf;
+    UniqueId _id;
+    uint64_t _last_active = 0;
 
     // no use, only for compatibility with the `Path` interface
     Path _path = "";
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8f63d6fe6a..9215479f62 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -21,7 +21,7 @@
 
 namespace doris {
 
-ExecEnv::ExecEnv() : _is_init(false) {}
+ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {}
 
 ExecEnv::~ExecEnv() {}
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 46e1157820..00a58b956b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -28,6 +28,8 @@
 
 #include "common/status.h"
 #include "olap/options.h"
+#include "util/countdown_latch.h"
+#include "util/thread.h"
 #include "util/threadpool.h"
 
 namespace doris {
@@ -191,6 +193,8 @@ private:
     void _register_metrics();
     void _deregister_metrics();
 
+    void _check_streamloadpipe();
+
     bool _is_init;
     std::vector<StorePath> _store_paths;
     // path => store index
@@ -256,6 +260,8 @@ private:
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
+    CountDownLatch _check_streamloadpipe_latch;
+    scoped_refptr<Thread> _check_streamloadpipe_thread;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d524fa13e2..40cccf0b77 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -27,6 +27,7 @@
 #include <limits>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <string>
 #include <unordered_map>
@@ -37,6 +38,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/fs/file_meta_cache.h"
+#include "io/fs/stream_load_pipe.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
 #include "olap/page_cache.h"
@@ -77,6 +79,8 @@
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
+#include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
@@ -85,7 +89,15 @@
 #include "runtime/memory/tcmalloc_hook.h"
 #endif
 
+namespace doris::io {
+extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes;
+extern std::mutex g_streamloadpipes_lock;
+} // namespace doris::io
+
 namespace doris {
+
+using namespace io;
+
 class PBackendService_Stub;
 class PFunctionService_Stub;
 
@@ -95,6 +107,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);
 
+bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
+bvar::Adder<uint64_t> g_byte_buffer_cnt;
+
 Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
     return env->_init(store_paths);
 }
@@ -180,6 +195,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
 
     _init_mem_env();
 
+    RETURN_IF_ERROR(Thread::create(
+            "ExecEnv", "check_streamloadpipe",
+            [this]() {
+                uint32_t interval = 300;
+                while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) {
+                    _check_streamloadpipe();
+                }
+            },
+            &_check_streamloadpipe_thread));
+
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     _heartbeat_flags = new HeartbeatFlags();
     _register_metrics();
@@ -207,6 +232,19 @@ Status ExecEnv::init_pipeline_task_scheduler() {
     return Status::OK();
 }
 
+void ExecEnv::_check_streamloadpipe() {
+    uint64_t now = GetCurrentTimeMicros();
+    std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+    for (auto& pipe : g_streamloadpipes) {
+        if (pipe.second == nullptr || pipe.second->is_cancelled()) {
+            continue;
+        }
+        uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000;
+        LOG(INFO) << "active StreamLoadPipe=" << pipe.second
+                  << " diff_time_from_last_append=" << diff_s;
+    }
+}
+
 Status ExecEnv::_init_mem_env() {
     bool is_percent = false;
     std::stringstream ss;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f61eae31b9..c98bebafa1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
         stream_load_ctx->need_rollback = true;
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-                -1 /* total_length */, true /* use_proto */);
+                -1 /* total_length */, true /* use_proto */, stream_load_ctx->id);
         stream_load_ctx->body_sink = pipe;
         stream_load_ctx->pipe = pipe;
         stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp
index f381ba097d..b0002c98dc 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -28,11 +28,14 @@
 #include <new>
 #include <sstream>
 
+#include "bvar/bvar.h"
 #include "common/logging.h"
 
 namespace doris {
 using namespace ErrorCode;
 
+bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt");
+
 std::string StreamLoadContext::to_json() const {
     rapidjson::StringBuffer s;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index 0e004b12f5..c35d949780 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -31,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include "bvar/bvar.h"
 #include "common/status.h"
 #include "common/utils.h"
 #include "runtime/exec_env.h"
@@ -86,15 +87,19 @@ public:
 
 class MessageBodySink;
 
+extern bvar::Adder<int64> g_streamloadctx_obj_cnt;
+
 class StreamLoadContext {
     ENABLE_FACTORY_CREATOR(StreamLoadContext);
 
 public:
     StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
         start_millis = UnixMillis();
+        g_streamloadctx_obj_cnt << 1;
     }
 
     ~StreamLoadContext() {
+        g_streamloadctx_obj_cnt << -1;
         if (need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(this);
             need_rollback = false;
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index aab8fd42db..9c296030c5 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -22,10 +22,14 @@
 #include <cstddef>
 #include <memory>
 
+#include "bvar/bvar.h"
 #include "common/logging.h"
 
 namespace doris {
 
+extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
+extern bvar::Adder<uint64_t> g_byte_buffer_cnt;
+
 struct ByteBuffer;
 using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
 
@@ -35,7 +39,11 @@ struct ByteBuffer {
         return ptr;
     }
 
-    ~ByteBuffer() { delete[] ptr; }
+    ~ByteBuffer() {
+        delete[] ptr;
+        g_byte_buffer_allocate_kb << -(capacity / 1024);
+        g_byte_buffer_cnt << -1;
+    }
 
     void put_bytes(const char* data, size_t size) {
         memcpy(ptr + pos, data, size);
@@ -63,7 +71,10 @@ struct ByteBuffer {
 
 private:
     ByteBuffer(size_t capacity_)
-            : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {}
+            : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {
+        g_byte_buffer_allocate_kb << capacity / 1024;
+        g_byte_buffer_cnt << 1;
+    }
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org