You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/17 02:39:10 UTC
[doris] branch master updated: Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aea719627d Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)
aea719627d is described below
commit aea719627def18e9fd9a895a63909b2c83a6d379
Author: zhengyu <fr...@gmail.com>
AuthorDate: Sat Jun 17 10:39:02 2023 +0800
Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927)
This reverts commit 5b6761acb86852a93351b7b971eb2049fb567aaf.
---
be/src/http/action/stream_load.cpp | 4 +-
be/src/http/ev_http_server.cpp | 52 +---------------------
be/src/http/http_request.cpp | 9 +---
be/src/io/fs/stream_load_pipe.cpp | 25 +----------
be/src/io/fs/stream_load_pipe.h | 11 +----
be/src/runtime/exec_env.cpp | 2 +-
be/src/runtime/exec_env.h | 6 ---
be/src/runtime/exec_env_init.cpp | 38 ----------------
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/stream_load/stream_load_context.cpp | 3 --
be/src/runtime/stream_load/stream_load_context.h | 5 ---
be/src/util/byte_buffer.h | 15 +------
12 files changed, 11 insertions(+), 161 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 33a526cd81..fdf6f6eaca 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;
LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
- << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit;
+ << ", tbl=" << ctx->table;
auto st = _on_header(req, ctx);
if (!st.ok()) {
@@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (ctx->use_streaming) {
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
- ctx->body_bytes /* total_length */, false, ctx->id);
+ ctx->body_bytes /* total_length */);
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
ctx->pipe = pipe;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index bdc6d420a7..b0743baee3 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -22,7 +22,6 @@
#include <butil/fd_utility.h>
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
-#include <event.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/http_struct.h>
@@ -36,14 +35,12 @@
#include <memory>
#include <sstream>
-#include "bvar/bvar.h"
#include "common/logging.h"
#include "http/http_channel.h"
#include "http/http_handler.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
-#include "mutex"
#include "service/backend_options.h"
#include "util/threadpool.h"
@@ -51,50 +48,15 @@ struct event_base;
struct evhttp;
namespace doris {
-static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map;
-static std::mutex g_conn_req_map_lock;
static void on_chunked(struct evhttp_request* ev_req, void* param) {
HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg;
request->handler()->on_chunk_data(request);
}
-static void on_close(evhttp_connection* con, void* arg) {
- HttpRequest* request = (HttpRequest*)arg;
- {
- std::lock_guard<std::mutex> l(g_conn_req_map_lock);
- auto itr = g_conn_req_map.find(con);
- if (itr != g_conn_req_map.end()) {
- if (itr->second) {
- if (itr->second != request) {
- LOG(WARNING) << "close connection. connection=" << con
- << " current HttpRequest=" << request
- << " but orginal HttpRequest=" << itr->second;
- }
- delete itr->second;
- }
- g_conn_req_map.erase(con);
- }
- }
-}
-
static void on_free(struct evhttp_request* ev_req, void* arg) {
HttpRequest* request = (HttpRequest*)arg;
- {
- std::lock_guard<std::mutex> l(g_conn_req_map_lock);
- auto itr = g_conn_req_map.find(ev_req->evcon);
- if (itr != g_conn_req_map.end()) {
- if (itr->second) {
- if (itr->second != request) {
- LOG(WARNING) << "free request. connection=" << ev_req->evcon
- << " current HttpRequest=" << request
- << " but orginal HttpRequest=" << itr->second;
- }
- delete itr->second;
- }
- g_conn_req_map.erase(ev_req->evcon);
- }
- }
+ delete request;
}
static void on_request(struct evhttp_request* ev_req, void* arg) {
@@ -163,7 +125,6 @@ void EvHttpServer::start() {
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
[](evhttp* http) { evhttp_free(http); });
CHECK(http != nullptr) << "Couldn't create an evhttp.";
- evhttp_set_timeout(http.get(), 60 /* timeout in seconds */);
auto res = evhttp_accept_socket(http.get(), _server_fd);
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
@@ -300,18 +261,7 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) {
evhttp_request_set_chunked_cb(ev_req, on_chunked);
}
- {
- std::lock_guard<std::mutex> l(g_conn_req_map_lock);
- g_conn_req_map.erase(ev_req->evcon);
- g_conn_req_map[ev_req->evcon] = request.get();
- }
-
- struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req);
- evhttp_connection_set_closecb(httpcon, on_close, request.get());
evhttp_request_set_on_free_cb(ev_req, on_free, request.release());
- struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon);
- if (bufev) bufferevent_enable(bufev, EV_READ);
-
return 0;
}
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index 97274d0e01..14bde591b4 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -27,22 +27,15 @@
#include <unordered_map>
#include <utility>
-#include "bvar/bvar.h"
#include "http/http_handler.h"
-#include "util/url_coding.h"
namespace doris {
static std::string s_empty = "";
-bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt");
-
-HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {
- g_http_request_cnt << 1;
-}
+HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {}
HttpRequest::~HttpRequest() {
- g_http_request_cnt << -1;
if (_handler_ctx != nullptr) {
DCHECK(_handler != nullptr);
_handler->free_handler_ctx(_handler_ctx);
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index 2940a45e4d..25099c8796 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -33,31 +33,20 @@ namespace doris {
namespace io {
class IOContext;
-std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes;
-std::mutex g_streamloadpipes_lock;
-
StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
- int64_t total_length, bool use_proto, UniqueId id)
+ int64_t total_length, bool use_proto)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
- _use_proto(use_proto),
- _id(id) {
- std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
- g_streamloadpipes[_id] = this;
-}
+ _use_proto(use_proto) {}
StreamLoadPipe::~StreamLoadPipe() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
- {
- std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
- g_streamloadpipes.erase(_id);
- }
}
Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
@@ -124,7 +113,6 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
- _last_active = GetCurrentTimeMicros();
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
buf->put_bytes(data, size);
buf->flip();
@@ -132,7 +120,6 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr
}
Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
- _last_active = GetCurrentTimeMicros();
PDataRow* row_ptr = row.get();
{
std::unique_lock<std::mutex> l(_lock);
@@ -143,7 +130,6 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
}
Status StreamLoadPipe::append(const char* data, size_t size) {
- _last_active = GetCurrentTimeMicros();
size_t pos = 0;
if (_write_buf != nullptr) {
if (size < _write_buf->remaining()) {
@@ -167,7 +153,6 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
}
Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
- _last_active = GetCurrentTimeMicros();
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
@@ -241,7 +226,6 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
// called when producer finished
Status StreamLoadPipe::finish() {
- LOG(INFO) << "finish pipe=" << this;
if (_write_buf != nullptr) {
_write_buf->flip();
_append(_write_buf);
@@ -257,16 +241,11 @@ Status StreamLoadPipe::finish() {
// called when producer/consumer failed
void StreamLoadPipe::cancel(const std::string& reason) {
- LOG(INFO) << "cancel pipe=" << this;
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
_cancelled_reason = reason;
}
- {
- std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
- g_streamloadpipes.erase(_id);
- }
_get_cond.notify_all();
_put_cond.notify_all();
}
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index e41f5162ac..848175ce9a 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -34,13 +34,10 @@
#include "runtime/message_body_sink.h"
#include "util/byte_buffer.h"
#include "util/slice.h"
-#include "util/time.h"
-#include "util/uid_util.h"
namespace doris {
namespace io {
class IOContext;
-class StreamLoadPipe;
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
@@ -48,7 +45,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
- bool use_proto = false, UniqueId id = UniqueId(0, 0));
+ bool use_proto = false);
~StreamLoadPipe() override;
Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0);
@@ -81,10 +78,6 @@ public:
size_t get_queue_size() { return _buf_queue.size(); }
- bool is_cancelled() { return _cancelled; }
- bool is_finished() { return _finished; }
- uint64_t last_active() { return _last_active; }
-
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -116,8 +109,6 @@ private:
std::condition_variable _get_cond;
ByteBufferPtr _write_buf;
- UniqueId _id;
- uint64_t _last_active = 0;
// no use, only for compatibility with the `Path` interface
Path _path = "";
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 9215479f62..8f63d6fe6a 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -21,7 +21,7 @@
namespace doris {
-ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {}
+ExecEnv::ExecEnv() : _is_init(false) {}
ExecEnv::~ExecEnv() {}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 00a58b956b..46e1157820 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -28,8 +28,6 @@
#include "common/status.h"
#include "olap/options.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
#include "util/threadpool.h"
namespace doris {
@@ -193,8 +191,6 @@ private:
void _register_metrics();
void _deregister_metrics();
- void _check_streamloadpipe();
-
bool _is_init;
std::vector<StorePath> _store_paths;
// path => store index
@@ -260,8 +256,6 @@ private:
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
- CountDownLatch _check_streamloadpipe_latch;
- scoped_refptr<Thread> _check_streamloadpipe_thread;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 40cccf0b77..d524fa13e2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -27,7 +27,6 @@
#include <limits>
#include <map>
#include <memory>
-#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
@@ -38,7 +37,6 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_meta_cache.h"
-#include "io/fs/stream_load_pipe.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/page_cache.h"
@@ -79,8 +77,6 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
-#include "util/time.h"
-#include "util/uid_util.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -89,15 +85,7 @@
#include "runtime/memory/tcmalloc_hook.h"
#endif
-namespace doris::io {
-extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes;
-extern std::mutex g_streamloadpipes_lock;
-} // namespace doris::io
-
namespace doris {
-
-using namespace io;
-
class PBackendService_Stub;
class PFunctionService_Stub;
@@ -107,9 +95,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT);
-bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
-bvar::Adder<uint64_t> g_byte_buffer_cnt;
-
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
return env->_init(store_paths);
}
@@ -195,16 +180,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_init_mem_env();
- RETURN_IF_ERROR(Thread::create(
- "ExecEnv", "check_streamloadpipe",
- [this]() {
- uint32_t interval = 300;
- while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) {
- _check_streamloadpipe();
- }
- },
- &_check_streamloadpipe_thread));
-
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();
@@ -232,19 +207,6 @@ Status ExecEnv::init_pipeline_task_scheduler() {
return Status::OK();
}
-void ExecEnv::_check_streamloadpipe() {
- uint64_t now = GetCurrentTimeMicros();
- std::lock_guard<std::mutex> l(g_streamloadpipes_lock);
- for (auto& pipe : g_streamloadpipes) {
- if (pipe.second == nullptr || pipe.second->is_cancelled()) {
- continue;
- }
- uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000;
- LOG(INFO) << "active StreamLoadPipe=" << pipe.second
- << " diff_time_from_last_append=" << diff_s;
- }
-}
-
Status ExecEnv::_init_mem_env() {
bool is_percent = false;
std::stringstream ss;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4581768199..648c9373d1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
- -1 /* total_length */, true /* use_proto */, stream_load_ctx->id);
+ -1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->pipe = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp
index b0002c98dc..f381ba097d 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -28,14 +28,11 @@
#include <new>
#include <sstream>
-#include "bvar/bvar.h"
#include "common/logging.h"
namespace doris {
using namespace ErrorCode;
-bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt");
-
std::string StreamLoadContext::to_json() const {
rapidjson::StringBuffer s;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h
index c35d949780..0e004b12f5 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -31,7 +31,6 @@
#include <utility>
#include <vector>
-#include "bvar/bvar.h"
#include "common/status.h"
#include "common/utils.h"
#include "runtime/exec_env.h"
@@ -87,19 +86,15 @@ public:
class MessageBodySink;
-extern bvar::Adder<int64> g_streamloadctx_obj_cnt;
-
class StreamLoadContext {
ENABLE_FACTORY_CREATOR(StreamLoadContext);
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) {
start_millis = UnixMillis();
- g_streamloadctx_obj_cnt << 1;
}
~StreamLoadContext() {
- g_streamloadctx_obj_cnt << -1;
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index 9c296030c5..aab8fd42db 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -22,14 +22,10 @@
#include <cstddef>
#include <memory>
-#include "bvar/bvar.h"
#include "common/logging.h"
namespace doris {
-extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb;
-extern bvar::Adder<uint64_t> g_byte_buffer_cnt;
-
struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
@@ -39,11 +35,7 @@ struct ByteBuffer {
return ptr;
}
- ~ByteBuffer() {
- delete[] ptr;
- g_byte_buffer_allocate_kb << -(capacity / 1024);
- g_byte_buffer_cnt << -1;
- }
+ ~ByteBuffer() { delete[] ptr; }
void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
@@ -71,10 +63,7 @@ struct ByteBuffer {
private:
ByteBuffer(size_t capacity_)
- : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {
- g_byte_buffer_allocate_kb << capacity / 1024;
- g_byte_buffer_cnt << 1;
- }
+ : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {}
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org