You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/15 05:44:08 UTC
[doris] branch master updated: [enhencement](streamload) add on_close callback for httpserver (#20826)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b6761acb8 [enhencement](streamload) add on_close callback for httpserver (#20826)
5b6761acb8 is described below
commit 5b6761acb86852a93351b7b971eb2049fb567aaf
Author: zhengyu <fr...@gmail.com>
AuthorDate: Thu Jun 15 13:44:02 2023 +0800
[enhencement](streamload) add on_close callback for httpserver (#20826)
Sometimes connection cannot be released properly during on_free. We need
on_close callback as the last resort.
Signed-off-by: freemandealer <fr...@gmail.com>
---
be/src/http/action/stream_load.cpp | 4 +-
be/src/http/ev_http_server.cpp | 52 +++++++++++++++++++++-
be/src/http/http_request.cpp | 9 +++-
be/src/io/fs/stream_load_pipe.cpp | 25 ++++++++++-
be/src/io/fs/stream_load_pipe.h | 11 ++++-
be/src/runtime/exec_env.cpp | 2 +-
be/src/runtime/exec_env.h | 6 +++
be/src/runtime/exec_env_init.cpp | 38 ++++++++++++++++
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/stream_load/stream_load_context.cpp | 3 ++
be/src/runtime/stream_load/stream_load_context.h | 5 +++
be/src/util/byte_buffer.h | 15 ++++++-
12 files changed, 161 insertions(+), 11 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index fdf6f6eaca..33a526cd81 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;
LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
- << ", tbl=" << ctx->table;
+ << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit;
auto st = _on_header(req, ctx);
if (!st.ok()) {
@@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (ctx->use_streaming) {
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
- ctx->body_bytes /* total_length */);
+ ctx->body_bytes /* total_length */, false, ctx->id);
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
ctx->pipe = pipe;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index b0743baee3..bdc6d420a7 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -22,6 +22,7 @@
#include <butil/fd_utility.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
+#include <event.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/http_struct.h>
@@ -35,12 +36,14 @@
#include <memory>
#include <sstream>
+#include "bvar/bvar.h"
#include "common/logging.h"
#include "http/http_channel.h"
#include "http/http_handler.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
+#include "mutex"
#include "service/backend_options.h"
#include "util/threadpool.h"
@@ -48,15 +51,50 @@ struct event_base;
struct evhttp;
namespace doris {
+static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map;
+static std::mutex g_conn_req_map_lock;
static void on_chunked(struct evhttp_request* ev_req, void* param) {
HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
request->handler()->on_chunk_data(request);
}
+static void on_close(evhttp_connection* con, void* arg) {
+ HttpRequest* request = (HttpRequest*)arg;
+ {
+ std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+ auto itr = g_conn_req_map.find(con);
+ if (itr != g_conn_req_map.end()) {
+ if (itr->second) {
+ if (itr->second != request) {
+ LOG(WARNING) << "close connection. connection=" << con
+ << " current HttpRequest=" << request
+ << " but orginal HttpRequest=" << itr->second;
+ }
+ delete itr->second;
+ }
+ g_conn_req_map.erase(con);
+ }
+ }
+}
+
static void on_free(struct evhttp_request* ev_req, void* arg) {
HttpRequest* request = (HttpRequest*)arg;
- delete request;
+ {
+ std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+ auto itr = g_conn_req_map.find(ev_req->evcon);
+ if (itr != g_conn_req_map.end()) {
+ if (itr->second) {
+ if (itr->second != request) {
+ LOG(WARNING) << "free request. connection=" << ev_req->evcon
+ << " current HttpRequest=" << request
+ << " but orginal HttpRequest=" << itr->second;
+ }
+ delete itr->second;
+ }
+ g_conn_req_map.erase(ev_req->evcon);
+ }
+ }
}
static void on_request(struct evhttp_request* ev_req, void* arg) {
@@ -125,6 +163,7 @@ void EvHttpServer::start() {
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";
+ evhttp_set_timeout(http.get(), 60 /* timeout in seconds */);
auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
@@ -261,7 +300,18 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) {
evhttp_request_set_chunked_cb(ev_req, on_chunked);
}
+ {
+ std::lock_guard<std::mutex> l(g_conn_req_map_lock);
+ g_conn_req_map.erase(ev_req->evcon);
+ g_conn_req_map[ev_req->evcon] = request.get();
+ }
+
+ struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req);
+ evhttp_connection_set_closecb(httpcon, on_close, request.get());
evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
+ struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon);
+ if (bufev) bufferevent_enable(bufev, EV_READ);
+
return 0;
}
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 14bde591b4..97274d0e01 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -27,15 +27,22 @@
#include <unordered_map>
#include <utility>
+#include "bvar/bvar.h"
#include "http/http_handler.h"
+#include "util/url_coding.h"
namespace doris {
static std::string s_empty = "";
-HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {}
+bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt");
+
+HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {
+ g_http_request_cnt << 1;
+}
HttpRequest::~HttpRequest() {
+ g_http_request_cnt << -1;
if (_handler_ctx != nullptr) {
DCHECK(_handler != nullptr);
_handler->free_handler_ctx(_handler_ctx);
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index 25099c8796..2940a45e4d 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -33,20 +33,31 @@ namespace doris {
namespace io {
class IOContext;
+std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes;
+std::mutex g_streamloadpipes_lock;
+
StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
- int64_t total_length, bool use_proto)
+ int64_t total_length, bool use_proto, UniqueId id)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
- _use_proto(use_proto) {}
+ _use_proto(use_proto),
+ _id(id) {
+ std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+ g_streamloadpipes[_id] = this;
+}
StreamLoadPipe::~StreamLoadPipe() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
+ {
+ std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+ g_streamloadpipes.erase(_id);
+ }
}
Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
@@ -113,6 +124,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
+ _last_active = GetCurrentTimeMicros();
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
buf->put_bytes(data, size);
buf->flip();
@@ -120,6 +132,7 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr
}
Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
+ _last_active = GetCurrentTimeMicros();
PDataRow* row_ptr = row.get();
{
std::unique_lock<std::mutex> l(_lock);
@@ -130,6 +143,7 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
}
Status StreamLoadPipe::append(const char* data, size_t size) {
+ _last_active = GetCurrentTimeMicros();
size_t pos = 0;
if (_write_buf != nullptr) {
if (size < _write_buf->remaining()) {
@@ -153,6 +167,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
}
Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
+ _last_active = GetCurrentTimeMicros();
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
@@ -226,6 +241,7 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
// called when producer finished
Status StreamLoadPipe::finish() {
+ LOG(INFO) << "finish pipe=" << this;
if (_write_buf != nullptr) {
_write_buf->flip();
_append(_write_buf);
@@ -241,11 +257,16 @@ Status StreamLoadPipe::finish() {
// called when producer/consumer failed
void StreamLoadPipe::cancel(const std::string& reason) {
+ LOG(INFO) << "cancel pipe=" << this;
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
_cancelled_reason = reason;
}
+ {
+ std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+ g_streamloadpipes.erase(_id);
+ }
_get_cond.notify_all();
_put_cond.notify_all();
}
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 848175ce9a..e41f5162ac 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -34,10 +34,13 @@
#include "runtime/message_body_sink.h"
#include "util/byte_buffer.h"
#include "util/slice.h"
+#include "util/time.h"
+#include "util/uid_util.h"
namespace doris {
namespace io {
class IOContext;
+class StreamLoadPipe;
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
@@ -45,7 +48,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
- bool use_proto = false);
+ bool use_proto = false, UniqueId id = UniqueId(0, 0));
~StreamLoadPipe() override;
Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0);
@@ -78,6 +81,10 @@ public:
size_t get_queue_size() { return _buf_queue.size(); }
+ bool is_cancelled() { return _cancelled; }
+ bool is_finished() { return _finished; }
+ uint64_t last_active() { return _last_active; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -109,6 +116,8 @@ private:
std::condition_variable _get_cond;
ByteBufferPtr _write_buf;
+ UniqueId _id;
+ uint64_t _last_active = 0;
// no use, only for compatibility with the `Path` interface
Path _path = "";
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8f63d6fe6a..9215479f62 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -21,7 +21,7 @@
namespace doris {
-ExecEnv::ExecEnv() : _is_init(false) {}
+ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {}
ExecEnv::~ExecEnv() {}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 46e1157820..00a58b956b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -28,6 +28,8 @@
#include "common/status.h"
#include "olap/options.h"
+#include "util/countdown_latch.h"
+#include "util/thread.h"
#include "util/threadpool.h"
namespace doris {
@@ -191,6 +193,8 @@ private:
void _register_metrics();
void _deregister_metrics();
+ void _check_streamloadpipe();
+
bool _is_init;
std::vector<StorePath> _store_paths;
// path => store index
@@ -256,6 +260,8 @@ private:
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
+ CountDownLatch _check_streamloadpipe_latch;
+ scoped_refptr<Thread> _check_streamloadpipe_thread;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d524fa13e2..40cccf0b77 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -27,6 +27,7 @@
#include <limits>
#include <map>
#include <memory>
+#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
@@ -37,6 +38,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_meta_cache.h"
+#include "io/fs/stream_load_pipe.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/page_cache.h"
@@ -77,6 +79,8 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
+#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -85,7 +89,15 @@
#include "runtime/memory/tcmalloc_hook.h"
#endif
+namespace doris::io {
+extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes;
+extern std::mutex g_streamloadpipes_lock;
+} // namespace doris::io
+
namespace doris {
+
+using namespace io;
+
class PBackendService_Stub;
class PFunctionService_Stub;
@@ -95,6 +107,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);
+bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
+bvar::Adder<uint64_t> g_byte_buffer_cnt;
+
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
return env->_init(store_paths);
}
@@ -180,6 +195,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_init_mem_env();
+ RETURN_IF_ERROR(Thread::create(
+ "ExecEnv", "check_streamloadpipe",
+ [this]() {
+ uint32_t interval = 300;
+ while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) {
+ _check_streamloadpipe();
+ }
+ },
+ &_check_streamloadpipe_thread));
+
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();
@@ -207,6 +232,19 @@ Status ExecEnv::init_pipeline_task_scheduler() {
return Status::OK();
}
+void ExecEnv::_check_streamloadpipe() {
+ uint64_t now = GetCurrentTimeMicros();
+ std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
+ for (auto& pipe : g_streamloadpipes) {
+ if (pipe.second == nullptr || pipe.second->is_cancelled()) {
+ continue;
+ }
+ uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000;
+ LOG(INFO) << "active StreamLoadPipe=" << pipe.second
+ << " diff_time_from_last_append=" << diff_s;
+ }
+}
+
Status ExecEnv::_init_mem_env() {
bool is_percent = false;
std::stringstream ss;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f61eae31b9..c98bebafa1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
- -1 /* total_length */, true /* use_proto */);
+ -1 /* total_length */, true /* use_proto */, stream_load_ctx->id);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->pipe = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp
index f381ba097d..b0002c98dc 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -28,11 +28,14 @@
#include <new>
#include <sstream>
+#include "bvar/bvar.h"
#include "common/logging.h"
namespace doris {
using namespace ErrorCode;
+bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt");
+
std::string StreamLoadContext::to_json() const {
rapidjson::StringBuffer s;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index 0e004b12f5..c35d949780 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -31,6 +31,7 @@
#include <utility>
#include <vector>
+#include "bvar/bvar.h"
#include "common/status.h"
#include "common/utils.h"
#include "runtime/exec_env.h"
@@ -86,15 +87,19 @@ public:
class MessageBodySink;
+extern bvar::Adder<int64> g_streamloadctx_obj_cnt;
+
class StreamLoadContext {
ENABLE_FACTORY_CREATOR(StreamLoadContext);
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
start_millis = UnixMillis();
+ g_streamloadctx_obj_cnt << 1;
}
~StreamLoadContext() {
+ g_streamloadctx_obj_cnt << -1;
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index aab8fd42db..9c296030c5 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -22,10 +22,14 @@
#include <cstddef>
#include <memory>
+#include "bvar/bvar.h"
#include "common/logging.h"
namespace doris {
+extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
+extern bvar::Adder<uint64_t> g_byte_buffer_cnt;
+
struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
@@ -35,7 +39,11 @@ struct ByteBuffer {
return ptr;
}
- ~ByteBuffer() { delete[] ptr; }
+ ~ByteBuffer() {
+ delete[] ptr;
+ g_byte_buffer_allocate_kb << -(capacity / 1024);
+ g_byte_buffer_cnt << -1;
+ }
void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
@@ -63,7 +71,10 @@ struct ByteBuffer {
private:
ByteBuffer(size_t capacity_)
- : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {}
+ : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {
+ g_byte_buffer_allocate_kb << capacity / 1024;
+ g_byte_buffer_cnt << 1;
+ }
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org