You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ji...@apache.org on 2022/08/17 19:34:10 UTC
[incubator-brpc] branch master updated: Restruct event_dispatcher source file (#1888)
This is an automated email from the ASF dual-hosted git repository.
jiashunzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 93038edc Restruct event_dispatcher source file (#1888)
93038edc is described below
commit 93038edc7f1e02ec8646e7a2ed6cb7bec135bbfe
Author: 果冻虾仁 <gu...@apache.org>
AuthorDate: Thu Aug 18 03:34:00 2022 +0800
Restruct event_dispatcher source file (#1888)
Split event_dispatcher into separate operating system specific files
---
BUILD.bazel | 6 +-
CMakeLists.txt | 5 +
Makefile | 3 +-
src/brpc/event_dispatcher.cpp | 323 +--------------------
src/brpc/event_dispatcher.h | 3 -
...t_dispatcher.cpp => event_dispatcher_epoll.cpp} | 147 +---------
..._dispatcher.cpp => event_dispatcher_kqueue.cpp} | 179 +-----------
7 files changed, 35 insertions(+), 631 deletions(-)
diff --git a/BUILD.bazel b/BUILD.bazel
index 7592a186..0cb60fa4 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -495,6 +495,8 @@ cc_library(
"src/brpc/thrift_service.cpp",
"src/brpc/thrift_message.cpp",
"src/brpc/policy/thrift_protocol.cpp",
+ "src/brpc/event_dispatcher_epoll.cpp",
+ "src/brpc/event_dispatcher_kqueue.cpp",
]) + select({
":with_thrift" : glob([
"src/brpc/thrift*.cpp",
@@ -503,7 +505,9 @@ cc_library(
}),
hdrs = glob([
"src/brpc/*.h",
- "src/brpc/**/*.h"
+ "src/brpc/**/*.h",
+ "src/brpc/event_dispatcher_epoll.cpp",
+ "src/brpc/event_dispatcher_kqueue.cpp",
]),
includes = [
"src/",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5f46dc0e..713a090e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -392,6 +392,7 @@ file(GLOB_RECURSE BTHREAD_SOURCES "${PROJECT_SOURCE_DIR}/src/bthread/*.cpp")
file(GLOB_RECURSE JSON2PB_SOURCES "${PROJECT_SOURCE_DIR}/src/json2pb/*.cpp")
file(GLOB_RECURSE BRPC_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/*.cpp")
file(GLOB_RECURSE THRIFT_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/thrift*.cpp")
+file(GLOB_RECURSE EXCLUDE_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/event_dispatcher_*.cpp")
if(WITH_THRIFT)
message("brpc compile with thrift protocol")
@@ -403,6 +404,10 @@ else()
set(THRIFT_SOURCES "")
endif()
+foreach(v ${EXCLUDE_SOURCES})
+ list(REMOVE_ITEM BRPC_SOURCES ${v})
+endforeach()
+
set(MCPACK2PB_SOURCES
${PROJECT_SOURCE_DIR}/src/mcpack2pb/field_type.cpp
${PROJECT_SOURCE_DIR}/src/mcpack2pb/mcpack2pb.cpp
diff --git a/Makefile b/Makefile
index a6d2fed4..f4ea71b4 100644
--- a/Makefile
+++ b/Makefile
@@ -196,8 +196,9 @@ JSON2PB_OBJS = $(addsuffix .o, $(basename $(JSON2PB_SOURCES)))
BRPC_DIRS = src/brpc src/brpc/details src/brpc/builtin src/brpc/policy
THRIFT_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/thrift*,$(SRCEXTS))))
+EXCLUDE_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/event_dispatcher_*,$(SRCEXTS))))
BRPC_SOURCES_ALL = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
-BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES), $(BRPC_SOURCES_ALL))
+BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES) $(EXCLUDE_SOURCES), $(BRPC_SOURCES_ALL))
BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES))
BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES))
BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES)))
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index 56b48a76..e6209286 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -23,15 +23,7 @@
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bthread/bthread.h" // bthread_start_background
#include "brpc/event_dispatcher.h"
-#ifdef BRPC_SOCKET_HAS_EOF
-#include "brpc/details/has_epollrdhup.h"
-#endif
#include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
-#include <sys/types.h>
-#include <sys/event.h>
-#include <sys/time.h>
-#endif
namespace brpc {
@@ -40,313 +32,6 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
DEFINE_bool(usercode_in_pthread, false,
"Call user's callback in pthreads, use bthreads otherwise");
-EventDispatcher::EventDispatcher()
- : _epfd(-1)
- , _stop(false)
- , _tid(0)
- , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
-#if defined(OS_LINUX)
- _epfd = epoll_create(1024 * 1024);
- if (_epfd < 0) {
- PLOG(FATAL) << "Fail to create epoll";
- return;
- }
-#elif defined(OS_MACOSX)
- _epfd = kqueue();
- if (_epfd < 0) {
- PLOG(FATAL) << "Fail to create kqueue";
- return;
- }
-#else
- #error Not implemented
-#endif
- CHECK_EQ(0, butil::make_close_on_exec(_epfd));
-
- _wakeup_fds[0] = -1;
- _wakeup_fds[1] = -1;
- if (pipe(_wakeup_fds) != 0) {
- PLOG(FATAL) << "Fail to create pipe";
- return;
- }
-}
-
-EventDispatcher::~EventDispatcher() {
- Stop();
- Join();
- if (_epfd >= 0) {
- close(_epfd);
- _epfd = -1;
- }
- if (_wakeup_fds[0] > 0) {
- close(_wakeup_fds[0]);
- close(_wakeup_fds[1]);
- }
-}
-
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
- if (_epfd < 0) {
-#if defined(OS_LINUX)
- LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
- LOG(FATAL) << "kqueue was not created";
-#endif
- return -1;
- }
-
- if (_tid != 0) {
- LOG(FATAL) << "Already started this dispatcher(" << this
- << ") in bthread=" << _tid;
- return -1;
- }
-
- // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
- // everyting seems sane to the thread.
- _consumer_thread_attr = (consumer_thread_attr ?
- *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
-
- //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
- // that created by epoll_wait() never to quit.
- _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
-
- // Polling thread uses the same attr for consumer threads (NORMAL right
- // now). Previously, we used small stack (32KB) which may be overflowed
- // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
- // is also a potential issue for consumer threads, using the same attr
- // should be a reasonable solution.
- int rc = bthread_start_background(
- &_tid, &_epoll_thread_attr, RunThis, this);
- if (rc) {
- LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
- return -1;
- }
- return 0;
-}
-
-bool EventDispatcher::Running() const {
- return !_stop && _epfd >= 0 && _tid != 0;
-}
-
-void EventDispatcher::Stop() {
- _stop = true;
-
- if (_epfd >= 0) {
-#if defined(OS_LINUX)
- epoll_event evt = { EPOLLOUT, { NULL } };
- epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
- 0, 0, NULL);
- kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
- }
-}
-
-void EventDispatcher::Join() {
- if (_tid) {
- bthread_join(_tid, NULL);
- _tid = 0;
- }
-}
-
-int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
- if (_epfd < 0) {
- errno = EINVAL;
- return -1;
- }
-
-#if defined(OS_LINUX)
- epoll_event evt;
- evt.data.u64 = socket_id;
- evt.events = EPOLLOUT | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- if (pollin) {
- evt.events |= EPOLLIN;
- if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
- // This fd has been removed from epoll via `RemoveConsumer',
- // in which case errno will be ENOENT
- return -1;
- }
- } else {
- if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
- return -1;
- }
- }
-#elif defined(OS_MACOSX)
- struct kevent evt;
- //TODO(zhujiashun): add EV_EOF
- EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- if (pollin) {
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- }
-#endif
- return 0;
-}
-
-int EventDispatcher::RemoveEpollOut(SocketId socket_id,
- int fd, bool pollin) {
-#if defined(OS_LINUX)
- if (pollin) {
- epoll_event evt;
- evt.data.u64 = socket_id;
- evt.events = EPOLLIN | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
- } else {
- return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
- }
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- if (pollin) {
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
- }
- return 0;
-#endif
- return -1;
-}
-
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
- if (_epfd < 0) {
- errno = EINVAL;
- return -1;
- }
-#if defined(OS_LINUX)
- epoll_event evt;
- evt.events = EPOLLIN | EPOLLET;
- evt.data.u64 = socket_id;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
- return -1;
-}
-
-int EventDispatcher::RemoveConsumer(int fd) {
- if (fd < 0) {
- return -1;
- }
- // Removing the consumer from dispatcher before closing the fd because
- // if process was forked and the fd is not marked as close-on-exec,
- // closing does not set reference count of the fd to 0, thus does not
- // remove the fd from epoll. More badly, the fd will not be removable
- // from epoll again! If the fd was level-triggered and there's data left,
- // epoll_wait will keep returning events of the fd continuously, making
- // program abnormal.
-#if defined(OS_LINUX)
- if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
- PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
- return -1;
- }
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
- EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
- return 0;
-}
-
-void* EventDispatcher::RunThis(void* arg) {
- ((EventDispatcher*)arg)->Run();
- return NULL;
-}
-
-void EventDispatcher::Run() {
- while (!_stop) {
-#if defined(OS_LINUX)
- epoll_event e[32];
-#ifdef BRPC_ADDITIONAL_EPOLL
- // Performance downgrades in examples.
- int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
- if (n == 0) {
- n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
- }
-#else
- const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
- struct kevent e[32];
- int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
-#endif
- if (_stop) {
- // epoll_ctl/epoll_wait should have some sort of memory fencing
- // guaranteeing that we(after epoll_wait) see _stop set before
- // epoll_ctl.
- break;
- }
- if (n < 0) {
- if (EINTR == errno) {
- // We've checked _stop, no wake-up will be missed.
- continue;
- }
-#if defined(OS_LINUX)
- PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
- PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
- break;
- }
- for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
- if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
-#ifdef BRPC_SOCKET_HAS_EOF
- || (e[i].events & has_epollrdhup)
-#endif
- ) {
- // We don't care about the return value.
- Socket::StartInputEvent(e[i].data.u64, e[i].events,
- _consumer_thread_attr);
- }
-#elif defined(OS_MACOSX)
- if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
- // We don't care about the return value.
- Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
- _consumer_thread_attr);
- }
-#endif
- }
- for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
- if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
- // We don't care about the return value.
- Socket::HandleEpollOut(e[i].data.u64);
- }
-#elif defined(OS_MACOSX)
- if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
- // We don't care about the return value.
- Socket::HandleEpollOut((SocketId)e[i].udata);
- }
-#endif
- }
- }
-}
-
static EventDispatcher* g_edisp = NULL;
static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
@@ -378,3 +63,11 @@ EventDispatcher& GetGlobalEventDispatcher(int fd) {
}
} // namespace brpc
+
+#if defined(OS_LINUX)
+ #include "brpc/event_dispatcher_epoll.cpp"
+#elif defined(OS_MACOSX)
+ #include "brpc/event_dispatcher_kqueue.cpp"
+#else
+ #error Not implemented
+#endif
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index eaa57e36..b6cae400 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -94,9 +94,6 @@ private:
// The attribute of bthreads calling user callbacks.
bthread_attr_t _consumer_thread_attr;
- // The attribute of bthread epoll_wait.
- bthread_attr_t _epoll_thread_attr;
-
// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
};
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_epoll.cpp
similarity index 58%
copy from src/brpc/event_dispatcher.cpp
copy to src/brpc/event_dispatcher_epoll.cpp
index 56b48a76..07d485e6 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -16,51 +16,23 @@
// under the License.
-#include <gflags/gflags.h> // DEFINE_int32
-#include "butil/compat.h"
-#include "butil/fd_utility.h" // make_close_on_exec
-#include "butil/logging.h" // LOG
-#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
-#include "bthread/bthread.h" // bthread_start_background
-#include "brpc/event_dispatcher.h"
#ifdef BRPC_SOCKET_HAS_EOF
#include "brpc/details/has_epollrdhup.h"
#endif
-#include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
-#include <sys/types.h>
-#include <sys/event.h>
-#include <sys/time.h>
-#endif
namespace brpc {
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
-
-DEFINE_bool(usercode_in_pthread, false,
- "Call user's callback in pthreads, use bthreads otherwise");
-
EventDispatcher::EventDispatcher()
: _epfd(-1)
, _stop(false)
, _tid(0)
, _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
{
-#if defined(OS_LINUX)
_epfd = epoll_create(1024 * 1024);
if (_epfd < 0) {
PLOG(FATAL) << "Fail to create epoll";
return;
}
-#elif defined(OS_MACOSX)
- _epfd = kqueue();
- if (_epfd < 0) {
- PLOG(FATAL) << "Fail to create kqueue";
- return;
- }
-#else
- #error Not implemented
-#endif
CHECK_EQ(0, butil::make_close_on_exec(_epfd));
_wakeup_fds[0] = -1;
@@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() {
int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
if (_epfd < 0) {
-#if defined(OS_LINUX)
LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
- LOG(FATAL) << "kqueue was not created";
-#endif
return -1;
}
@@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
return -1;
}
- // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
+ // Set _consumer_thread_attr before creating epoll thread to make sure
// everyting seems sane to the thread.
_consumer_thread_attr = (consumer_thread_attr ?
*consumer_thread_attr : BTHREAD_ATTR_NORMAL);
//_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
// that created by epoll_wait() never to quit.
- _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
+ bthread_attr_t epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
@@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
// is also a potential issue for consumer threads, using the same attr
// should be a reasonable solution.
int rc = bthread_start_background(
- &_tid, &_epoll_thread_attr, RunThis, this);
+ &_tid, &epoll_thread_attr, RunThis, this);
if (rc) {
- LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
+ LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
return -1;
}
return 0;
@@ -131,15 +99,8 @@ void EventDispatcher::Stop() {
_stop = true;
if (_epfd >= 0) {
-#if defined(OS_LINUX)
epoll_event evt = { EPOLLOUT, { NULL } };
epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
- 0, 0, NULL);
- kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
}
}
@@ -156,7 +117,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
return -1;
}
-#if defined(OS_LINUX)
epoll_event evt;
evt.data.u64 = socket_id;
evt.events = EPOLLOUT | EPOLLET;
@@ -175,28 +135,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
return -1;
}
}
-#elif defined(OS_MACOSX)
- struct kevent evt;
- //TODO(zhujiashun): add EV_EOF
- EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- if (pollin) {
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- }
-#endif
return 0;
}
int EventDispatcher::RemoveEpollOut(SocketId socket_id,
int fd, bool pollin) {
-#if defined(OS_LINUX)
if (pollin) {
epoll_event evt;
evt.data.u64 = socket_id;
@@ -208,19 +151,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id,
} else {
return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
}
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
- return -1;
- }
- if (pollin) {
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
- }
- return 0;
-#endif
return -1;
}
@@ -229,7 +159,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
errno = EINVAL;
return -1;
}
-#if defined(OS_LINUX)
epoll_event evt;
evt.events = EPOLLIN | EPOLLET;
evt.data.u64 = socket_id;
@@ -237,12 +166,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
evt.events |= has_epollrdhup;
#endif
return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
return -1;
}
@@ -257,18 +180,10 @@ int EventDispatcher::RemoveConsumer(int fd) {
// from epoll again! If the fd was level-triggered and there's data left,
// epoll_wait will keep returning events of the fd continuously, making
// program abnormal.
-#if defined(OS_LINUX)
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
return -1;
}
-#elif defined(OS_MACOSX)
- struct kevent evt;
- EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
- EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
return 0;
}
@@ -279,7 +194,6 @@ void* EventDispatcher::RunThis(void* arg) {
void EventDispatcher::Run() {
while (!_stop) {
-#if defined(OS_LINUX)
epoll_event e[32];
#ifdef BRPC_ADDITIONAL_EPOLL
// Performance downgrades in examples.
@@ -289,10 +203,6 @@ void EventDispatcher::Run() {
}
#else
const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
- struct kevent e[32];
- int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
#endif
if (_stop) {
// epoll_ctl/epoll_wait should have some sort of memory fencing
@@ -305,15 +215,10 @@ void EventDispatcher::Run() {
// We've checked _stop, no wake-up will be missed.
continue;
}
-#if defined(OS_LINUX)
PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
- PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
break;
}
for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
#ifdef BRPC_SOCKET_HAS_EOF
|| (e[i].events & has_epollrdhup)
@@ -323,58 +228,14 @@ void EventDispatcher::Run() {
Socket::StartInputEvent(e[i].data.u64, e[i].events,
_consumer_thread_attr);
}
-#elif defined(OS_MACOSX)
- if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
- // We don't care about the return value.
- Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
- _consumer_thread_attr);
- }
-#endif
}
for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64);
}
-#elif defined(OS_MACOSX)
- if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
- // We don't care about the return value.
- Socket::HandleEpollOut((SocketId)e[i].udata);
- }
-#endif
}
}
}
-static EventDispatcher* g_edisp = NULL;
-static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
-
-static void StopAndJoinGlobalDispatchers() {
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- g_edisp[i].Stop();
- g_edisp[i].Join();
- }
-}
-void InitializeGlobalDispatchers() {
- g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
- BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
- CHECK_EQ(0, g_edisp[i].Start(&attr));
- }
- // This atexit is will be run before g_task_control.stop() because above
- // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
- CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
-}
-
-EventDispatcher& GetGlobalEventDispatcher(int fd) {
- pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
- if (FLAGS_event_dispatcher_num == 1) {
- return g_edisp[0];
- }
- int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
- return g_edisp[index];
-}
-
} // namespace brpc
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_kqueue.cpp
similarity index 52%
copy from src/brpc/event_dispatcher.cpp
copy to src/brpc/event_dispatcher_kqueue.cpp
index 56b48a76..614cd3bc 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -16,51 +16,23 @@
// under the License.
-#include <gflags/gflags.h> // DEFINE_int32
-#include "butil/compat.h"
-#include "butil/fd_utility.h" // make_close_on_exec
-#include "butil/logging.h" // LOG
-#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
-#include "bthread/bthread.h" // bthread_start_background
-#include "brpc/event_dispatcher.h"
-#ifdef BRPC_SOCKET_HAS_EOF
-#include "brpc/details/has_epollrdhup.h"
-#endif
-#include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
-#endif
namespace brpc {
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
-
-DEFINE_bool(usercode_in_pthread, false,
- "Call user's callback in pthreads, use bthreads otherwise");
-
EventDispatcher::EventDispatcher()
: _epfd(-1)
, _stop(false)
, _tid(0)
, _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
{
-#if defined(OS_LINUX)
- _epfd = epoll_create(1024 * 1024);
- if (_epfd < 0) {
- PLOG(FATAL) << "Fail to create epoll";
- return;
- }
-#elif defined(OS_MACOSX)
_epfd = kqueue();
if (_epfd < 0) {
PLOG(FATAL) << "Fail to create kqueue";
return;
}
-#else
- #error Not implemented
-#endif
CHECK_EQ(0, butil::make_close_on_exec(_epfd));
_wakeup_fds[0] = -1;
@@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() {
int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
if (_epfd < 0) {
-#if defined(OS_LINUX)
- LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
LOG(FATAL) << "kqueue was not created";
-#endif
return -1;
}
@@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
return -1;
}
- // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
+ // Set _consumer_thread_attr before creating kqueue thread to make sure
// everyting seems sane to the thread.
_consumer_thread_attr = (consumer_thread_attr ?
*consumer_thread_attr : BTHREAD_ATTR_NORMAL);
//_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
- // that created by epoll_wait() never to quit.
- _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
+ // that created by kevent() never to quit.
+ bthread_attr_t kqueue_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
@@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
// is also a potential issue for consumer threads, using the same attr
// should be a reasonable solution.
int rc = bthread_start_background(
- &_tid, &_epoll_thread_attr, RunThis, this);
+ &_tid, &kqueue_thread_attr, RunThis, this);
if (rc) {
- LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
+ LOG(FATAL) << "Fail to create kqueue thread: " << berror(rc);
return -1;
}
return 0;
@@ -131,15 +99,10 @@ void EventDispatcher::Stop() {
_stop = true;
if (_epfd >= 0) {
-#if defined(OS_LINUX)
- epoll_event evt = { EPOLLOUT, { NULL } };
- epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
}
}
@@ -156,26 +119,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
return -1;
}
-#if defined(OS_LINUX)
- epoll_event evt;
- evt.data.u64 = socket_id;
- evt.events = EPOLLOUT | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- if (pollin) {
- evt.events |= EPOLLIN;
- if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
- // This fd has been removed from epoll via `RemoveConsumer',
- // in which case errno will be ENOENT
- return -1;
- }
- } else {
- if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
- return -1;
- }
- }
-#elif defined(OS_MACOSX)
struct kevent evt;
//TODO(zhujiashun): add EV_EOF
EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
@@ -190,25 +133,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
return -1;
}
}
-#endif
return 0;
}
int EventDispatcher::RemoveEpollOut(SocketId socket_id,
int fd, bool pollin) {
-#if defined(OS_LINUX)
- if (pollin) {
- epoll_event evt;
- evt.data.u64 = socket_id;
- evt.events = EPOLLIN | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
- } else {
- return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
- }
-#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
@@ -220,8 +149,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id,
return kevent(_epfd, &evt, 1, NULL, 0, NULL);
}
return 0;
-#endif
- return -1;
}
int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
@@ -229,21 +156,10 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
errno = EINVAL;
return -1;
}
-#if defined(OS_LINUX)
- epoll_event evt;
- evt.events = EPOLLIN | EPOLLET;
- evt.data.u64 = socket_id;
-#ifdef BRPC_SOCKET_HAS_EOF
- evt.events |= has_epollrdhup;
-#endif
- return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
0, 0, (void*)socket_id);
return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
- return -1;
}
int EventDispatcher::RemoveConsumer(int fd) {
@@ -253,22 +169,15 @@ int EventDispatcher::RemoveConsumer(int fd) {
// Removing the consumer from dispatcher before closing the fd because
// if process was forked and the fd is not marked as close-on-exec,
// closing does not set reference count of the fd to 0, thus does not
- // remove the fd from epoll. More badly, the fd will not be removable
- // from epoll again! If the fd was level-triggered and there's data left,
- // epoll_wait will keep returning events of the fd continuously, making
+ // remove the fd from kqueue More badly, the fd will not be removable
+ // from kqueue again! If the fd was level-triggered and there's data left,
+ // kevent will keep returning events of the fd continuously, making
// program abnormal.
-#if defined(OS_LINUX)
- if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
- PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
- return -1;
- }
-#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
return 0;
}
@@ -279,25 +188,12 @@ void* EventDispatcher::RunThis(void* arg) {
void EventDispatcher::Run() {
while (!_stop) {
-#if defined(OS_LINUX)
- epoll_event e[32];
-#ifdef BRPC_ADDITIONAL_EPOLL
- // Performance downgrades in examples.
- int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
- if (n == 0) {
- n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
- }
-#else
- const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
struct kevent e[32];
int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
-#endif
if (_stop) {
- // epoll_ctl/epoll_wait should have some sort of memory fencing
- // guaranteeing that we(after epoll_wait) see _stop set before
- // epoll_ctl.
+ // EV_SET/kevent should have some sort of memory fencing
+ // guaranteeing that we(after kevent) see _stop set before
+ // EV_SET
break;
}
if (n < 0) {
@@ -305,76 +201,23 @@ void EventDispatcher::Run() {
// We've checked _stop, no wake-up will be missed.
continue;
}
-#if defined(OS_LINUX)
- PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
break;
}
for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
- if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
-#ifdef BRPC_SOCKET_HAS_EOF
- || (e[i].events & has_epollrdhup)
-#endif
- ) {
- // We don't care about the return value.
- Socket::StartInputEvent(e[i].data.u64, e[i].events,
- _consumer_thread_attr);
- }
-#elif defined(OS_MACOSX)
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
// We don't care about the return value.
Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
_consumer_thread_attr);
}
-#endif
}
for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
- if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
- // We don't care about the return value.
- Socket::HandleEpollOut(e[i].data.u64);
- }
-#elif defined(OS_MACOSX)
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
// We don't care about the return value.
Socket::HandleEpollOut((SocketId)e[i].udata);
}
-#endif
}
}
}
-static EventDispatcher* g_edisp = NULL;
-static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
-
-static void StopAndJoinGlobalDispatchers() {
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- g_edisp[i].Stop();
- g_edisp[i].Join();
- }
-}
-void InitializeGlobalDispatchers() {
- g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
- BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
- CHECK_EQ(0, g_edisp[i].Start(&attr));
- }
- // This atexit is will be run before g_task_control.stop() because above
- // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
- CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
-}
-
-EventDispatcher& GetGlobalEventDispatcher(int fd) {
- pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
- if (FLAGS_event_dispatcher_num == 1) {
- return g_edisp[0];
- }
- int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
- return g_edisp[index];
-}
-
} // namespace brpc
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org