You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ji...@apache.org on 2022/08/17 19:34:10 UTC

[incubator-brpc] branch master updated: Restruct event_dispatcher source file (#1888)

This is an automated email from the ASF dual-hosted git repository.

jiashunzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 93038edc Restruct event_dispatcher source file (#1888)
93038edc is described below

commit 93038edc7f1e02ec8646e7a2ed6cb7bec135bbfe
Author: 果冻虾仁 <gu...@apache.org>
AuthorDate: Thu Aug 18 03:34:00 2022 +0800

    Restruct event_dispatcher source file (#1888)
    
    Split event_dispatcher into separate operating system specific files
---
 BUILD.bazel                                        |   6 +-
 CMakeLists.txt                                     |   5 +
 Makefile                                           |   3 +-
 src/brpc/event_dispatcher.cpp                      | 323 +--------------------
 src/brpc/event_dispatcher.h                        |   3 -
 ...t_dispatcher.cpp => event_dispatcher_epoll.cpp} | 147 +---------
 ..._dispatcher.cpp => event_dispatcher_kqueue.cpp} | 179 +-----------
 7 files changed, 35 insertions(+), 631 deletions(-)

diff --git a/BUILD.bazel b/BUILD.bazel
index 7592a186..0cb60fa4 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -495,6 +495,8 @@ cc_library(
         "src/brpc/thrift_service.cpp",
         "src/brpc/thrift_message.cpp",
         "src/brpc/policy/thrift_protocol.cpp",
+        "src/brpc/event_dispatcher_epoll.cpp",
+        "src/brpc/event_dispatcher_kqueue.cpp",
     ]) + select({
         ":with_thrift" : glob([
             "src/brpc/thrift*.cpp",
@@ -503,7 +505,9 @@ cc_library(
     }),
     hdrs = glob([
         "src/brpc/*.h",
-        "src/brpc/**/*.h"
+        "src/brpc/**/*.h",
+        "src/brpc/event_dispatcher_epoll.cpp",
+        "src/brpc/event_dispatcher_kqueue.cpp",
     ]),
     includes = [
         "src/",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5f46dc0e..713a090e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -392,6 +392,7 @@ file(GLOB_RECURSE BTHREAD_SOURCES "${PROJECT_SOURCE_DIR}/src/bthread/*.cpp")
 file(GLOB_RECURSE JSON2PB_SOURCES "${PROJECT_SOURCE_DIR}/src/json2pb/*.cpp")
 file(GLOB_RECURSE BRPC_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/*.cpp")
 file(GLOB_RECURSE THRIFT_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/thrift*.cpp")
+file(GLOB_RECURSE EXCLUDE_SOURCES "${PROJECT_SOURCE_DIR}/src/brpc/event_dispatcher_*.cpp")
 
 if(WITH_THRIFT)
     message("brpc compile with thrift protocol")
@@ -403,6 +404,10 @@ else()
     set(THRIFT_SOURCES "")
 endif()
 
+foreach(v ${EXCLUDE_SOURCES})
+    list(REMOVE_ITEM BRPC_SOURCES ${v})
+endforeach()
+
 set(MCPACK2PB_SOURCES
     ${PROJECT_SOURCE_DIR}/src/mcpack2pb/field_type.cpp
     ${PROJECT_SOURCE_DIR}/src/mcpack2pb/mcpack2pb.cpp
diff --git a/Makefile b/Makefile
index a6d2fed4..f4ea71b4 100644
--- a/Makefile
+++ b/Makefile
@@ -196,8 +196,9 @@ JSON2PB_OBJS = $(addsuffix .o, $(basename $(JSON2PB_SOURCES)))
 
 BRPC_DIRS = src/brpc src/brpc/details src/brpc/builtin src/brpc/policy
 THRIFT_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/thrift*,$(SRCEXTS))))
+EXCLUDE_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/event_dispatcher_*,$(SRCEXTS))))
 BRPC_SOURCES_ALL = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
-BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES), $(BRPC_SOURCES_ALL))
+BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES) $(EXCLUDE_SOURCES), $(BRPC_SOURCES_ALL))
 BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES))
 BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES))
 BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES)))
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index 56b48a76..e6209286 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -23,15 +23,7 @@
 #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
 #include "bthread/bthread.h"                          // bthread_start_background
 #include "brpc/event_dispatcher.h"
-#ifdef BRPC_SOCKET_HAS_EOF
-#include "brpc/details/has_epollrdhup.h"
-#endif
 #include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
-#include <sys/types.h>
-#include <sys/event.h>
-#include <sys/time.h>
-#endif
 
 namespace brpc {
 
@@ -40,313 +32,6 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
 DEFINE_bool(usercode_in_pthread, false, 
             "Call user's callback in pthreads, use bthreads otherwise");
 
-EventDispatcher::EventDispatcher()
-    : _epfd(-1)
-    , _stop(false)
-    , _tid(0)
-    , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
-#if defined(OS_LINUX)
-    _epfd = epoll_create(1024 * 1024);
-    if (_epfd < 0) {
-        PLOG(FATAL) << "Fail to create epoll";
-        return;
-    }
-#elif defined(OS_MACOSX)
-    _epfd = kqueue();
-    if (_epfd < 0) {
-        PLOG(FATAL) << "Fail to create kqueue";
-        return;
-    }
-#else
-    #error Not implemented
-#endif
-    CHECK_EQ(0, butil::make_close_on_exec(_epfd));
-
-    _wakeup_fds[0] = -1;
-    _wakeup_fds[1] = -1;
-    if (pipe(_wakeup_fds) != 0) {
-        PLOG(FATAL) << "Fail to create pipe";
-        return;
-    }
-}
-
-EventDispatcher::~EventDispatcher() {
-    Stop();
-    Join();
-    if (_epfd >= 0) {
-        close(_epfd);
-        _epfd = -1;
-    }
-    if (_wakeup_fds[0] > 0) {
-        close(_wakeup_fds[0]);
-        close(_wakeup_fds[1]);
-    }
-}
-
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
-    if (_epfd < 0) {
-#if defined(OS_LINUX)
-        LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
-        LOG(FATAL) << "kqueue was not created";
-#endif
-        return -1;
-    }
-    
-    if (_tid != 0) {
-        LOG(FATAL) << "Already started this dispatcher(" << this 
-                   << ") in bthread=" << _tid;
-        return -1;
-    }
-
-    // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
-    // everyting seems sane to the thread.
-    _consumer_thread_attr = (consumer_thread_attr  ?
-                             *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
-
-    //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
-    // that created by epoll_wait() never to quit.
-    _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
-
-    // Polling thread uses the same attr for consumer threads (NORMAL right
-    // now). Previously, we used small stack (32KB) which may be overflowed
-    // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
-    // is also a potential issue for consumer threads, using the same attr
-    // should be a reasonable solution.
-    int rc = bthread_start_background(
-        &_tid, &_epoll_thread_attr, RunThis, this);
-    if (rc) {
-        LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
-        return -1;
-    }
-    return 0;
-}
-
-bool EventDispatcher::Running() const {
-    return !_stop  && _epfd >= 0 && _tid != 0;
-}
-
-void EventDispatcher::Stop() {
-    _stop = true;
-
-    if (_epfd >= 0) {
-#if defined(OS_LINUX)
-        epoll_event evt = { EPOLLOUT,  { NULL } };
-        epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
-        struct kevent kqueue_event;
-        EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
-                    0, 0, NULL);
-        kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
-    }
-}
-
-void EventDispatcher::Join() {
-    if (_tid) {
-        bthread_join(_tid, NULL);
-        _tid = 0;
-    }
-}
-
-int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
-    if (_epfd < 0) {
-        errno = EINVAL;
-        return -1;
-    }
-
-#if defined(OS_LINUX)
-    epoll_event evt;
-    evt.data.u64 = socket_id;
-    evt.events = EPOLLOUT | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
-    evt.events |= has_epollrdhup;
-#endif
-    if (pollin) {
-        evt.events |= EPOLLIN;
-        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
-            // This fd has been removed from epoll via `RemoveConsumer',
-            // in which case errno will be ENOENT
-            return -1;
-        }
-    } else {
-        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
-            return -1;
-        }
-    }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    //TODO(zhujiashun): add EV_EOF
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-        return -1;
-    }
-    if (pollin) {
-        EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-            return -1;
-        }
-    }
-#endif
-    return 0;
-}
-
-int EventDispatcher::RemoveEpollOut(SocketId socket_id, 
-                                    int fd, bool pollin) {
-#if defined(OS_LINUX)
-    if (pollin) {
-        epoll_event evt;
-        evt.data.u64 = socket_id;
-        evt.events = EPOLLIN | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
-        evt.events |= has_epollrdhup;
-#endif
-        return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
-    } else {
-        return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
-    }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-        return -1;
-    }
-    if (pollin) {
-        EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-    }
-    return 0;
-#endif
-    return -1;
-}
-
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
-    if (_epfd < 0) {
-        errno = EINVAL;
-        return -1;
-    }
-#if defined(OS_LINUX)
-    epoll_event evt;
-    evt.events = EPOLLIN | EPOLLET;
-    evt.data.u64 = socket_id;
-#ifdef BRPC_SOCKET_HAS_EOF
-    evt.events |= has_epollrdhup;
-#endif
-    return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
-    return -1;
-}
-
-int EventDispatcher::RemoveConsumer(int fd) {
-    if (fd < 0) {
-        return -1;
-    }
-    // Removing the consumer from dispatcher before closing the fd because
-    // if process was forked and the fd is not marked as close-on-exec,
-    // closing does not set reference count of the fd to 0, thus does not
-    // remove the fd from epoll. More badly, the fd will not be removable
-    // from epoll again! If the fd was level-triggered and there's data left,
-    // epoll_wait will keep returning events of the fd continuously, making
-    // program abnormal.
-#if defined(OS_LINUX)
-    if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
-        PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
-        return -1;
-    }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
-    return 0;
-}
-
-void* EventDispatcher::RunThis(void* arg) {
-    ((EventDispatcher*)arg)->Run();
-    return NULL;
-}
-
-void EventDispatcher::Run() {
-    while (!_stop) {
-#if defined(OS_LINUX)
-        epoll_event e[32];
-#ifdef BRPC_ADDITIONAL_EPOLL
-        // Performance downgrades in examples.
-        int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
-        if (n == 0) {
-            n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-        }
-#else
-        const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
-        struct kevent e[32];
-        int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
-#endif
-        if (_stop) {
-            // epoll_ctl/epoll_wait should have some sort of memory fencing
-            // guaranteeing that we(after epoll_wait) see _stop set before
-            // epoll_ctl.
-            break;
-        }
-        if (n < 0) {
-            if (EINTR == errno) {
-                // We've checked _stop, no wake-up will be missed.
-                continue;
-            }
-#if defined(OS_LINUX)
-            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
-            PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
-            break;
-        }
-        for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
-            if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
-#ifdef BRPC_SOCKET_HAS_EOF
-                || (e[i].events & has_epollrdhup)
-#endif
-                ) {
-                // We don't care about the return value.
-                Socket::StartInputEvent(e[i].data.u64, e[i].events,
-                                        _consumer_thread_attr);
-            }
-#elif defined(OS_MACOSX)
-            if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
-                // We don't care about the return value.
-                Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
-                                        _consumer_thread_attr);
-            }
-#endif
-        }
-        for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
-            if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
-                // We don't care about the return value.
-                Socket::HandleEpollOut(e[i].data.u64);
-            }
-#elif defined(OS_MACOSX)
-            if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
-                // We don't care about the return value.
-                Socket::HandleEpollOut((SocketId)e[i].udata);
-            }
-#endif
-        }
-    }
-}
-
 static EventDispatcher* g_edisp = NULL;
 static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
 
@@ -378,3 +63,11 @@ EventDispatcher& GetGlobalEventDispatcher(int fd) {
 }
 
 } // namespace brpc
+
+#if defined(OS_LINUX)
+    #include "brpc/event_dispatcher_epoll.cpp"
+#elif defined(OS_MACOSX)
+    #include "brpc/event_dispatcher_kqueue.cpp"
+#else
+    #error Not implemented
+#endif
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index eaa57e36..b6cae400 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -94,9 +94,6 @@ private:
     // The attribute of bthreads calling user callbacks.
     bthread_attr_t _consumer_thread_attr;
 
-    // The attribute of bthread epoll_wait.
-    bthread_attr_t _epoll_thread_attr;
-
     // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
     int _wakeup_fds[2];
 };
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_epoll.cpp
similarity index 58%
copy from src/brpc/event_dispatcher.cpp
copy to src/brpc/event_dispatcher_epoll.cpp
index 56b48a76..07d485e6 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -16,51 +16,23 @@
 // under the License.
 
 
-#include <gflags/gflags.h>                            // DEFINE_int32
-#include "butil/compat.h"
-#include "butil/fd_utility.h"                         // make_close_on_exec
-#include "butil/logging.h"                            // LOG
-#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
-#include "bthread/bthread.h"                          // bthread_start_background
-#include "brpc/event_dispatcher.h"
 #ifdef BRPC_SOCKET_HAS_EOF
 #include "brpc/details/has_epollrdhup.h"
 #endif
-#include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
-#include <sys/types.h>
-#include <sys/event.h>
-#include <sys/time.h>
-#endif
 
 namespace brpc {
 
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
-
-DEFINE_bool(usercode_in_pthread, false, 
-            "Call user's callback in pthreads, use bthreads otherwise");
-
 EventDispatcher::EventDispatcher()
     : _epfd(-1)
     , _stop(false)
     , _tid(0)
     , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
 {
-#if defined(OS_LINUX)
     _epfd = epoll_create(1024 * 1024);
     if (_epfd < 0) {
         PLOG(FATAL) << "Fail to create epoll";
         return;
     }
-#elif defined(OS_MACOSX)
-    _epfd = kqueue();
-    if (_epfd < 0) {
-        PLOG(FATAL) << "Fail to create kqueue";
-        return;
-    }
-#else
-    #error Not implemented
-#endif
     CHECK_EQ(0, butil::make_close_on_exec(_epfd));
 
     _wakeup_fds[0] = -1;
@@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() {
 
 int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
     if (_epfd < 0) {
-#if defined(OS_LINUX)
         LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
-        LOG(FATAL) << "kqueue was not created";
-#endif
         return -1;
     }
     
@@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
         return -1;
     }
 
-    // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
+    // Set _consumer_thread_attr before creating epoll thread to make sure
     // everyting seems sane to the thread.
     _consumer_thread_attr = (consumer_thread_attr  ?
                              *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
 
     //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
     // that created by epoll_wait() never to quit.
-    _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
+    bthread_attr_t epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
@@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
     // is also a potential issue for consumer threads, using the same attr
     // should be a reasonable solution.
     int rc = bthread_start_background(
-        &_tid, &_epoll_thread_attr, RunThis, this);
+        &_tid, &epoll_thread_attr, RunThis, this);
     if (rc) {
-        LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
+        LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
         return -1;
     }
     return 0;
@@ -131,15 +99,8 @@ void EventDispatcher::Stop() {
     _stop = true;
 
     if (_epfd >= 0) {
-#if defined(OS_LINUX)
         epoll_event evt = { EPOLLOUT,  { NULL } };
         epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
-        struct kevent kqueue_event;
-        EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
-                    0, 0, NULL);
-        kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
     }
 }
 
@@ -156,7 +117,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
         return -1;
     }
 
-#if defined(OS_LINUX)
     epoll_event evt;
     evt.data.u64 = socket_id;
     evt.events = EPOLLOUT | EPOLLET;
@@ -175,28 +135,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
             return -1;
         }
     }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    //TODO(zhujiashun): add EV_EOF
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-        return -1;
-    }
-    if (pollin) {
-        EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-            return -1;
-        }
-    }
-#endif
     return 0;
 }
 
 int EventDispatcher::RemoveEpollOut(SocketId socket_id, 
                                     int fd, bool pollin) {
-#if defined(OS_LINUX)
     if (pollin) {
         epoll_event evt;
         evt.data.u64 = socket_id;
@@ -208,19 +151,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id,
     } else {
         return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
     }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
-        return -1;
-    }
-    if (pollin) {
-        EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-    }
-    return 0;
-#endif
     return -1;
 }
 
@@ -229,7 +159,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
         errno = EINVAL;
         return -1;
     }
-#if defined(OS_LINUX)
     epoll_event evt;
     evt.events = EPOLLIN | EPOLLET;
     evt.data.u64 = socket_id;
@@ -237,12 +166,6 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
     evt.events |= has_epollrdhup;
 #endif
     return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
     return -1;
 }
 
@@ -257,18 +180,10 @@ int EventDispatcher::RemoveConsumer(int fd) {
     // from epoll again! If the fd was level-triggered and there's data left,
     // epoll_wait will keep returning events of the fd continuously, making
     // program abnormal.
-#if defined(OS_LINUX)
     if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
         PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
         return -1;
     }
-#elif defined(OS_MACOSX)
-    struct kevent evt;
-    EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
-    EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
     return 0;
 }
 
@@ -279,7 +194,6 @@ void* EventDispatcher::RunThis(void* arg) {
 
 void EventDispatcher::Run() {
     while (!_stop) {
-#if defined(OS_LINUX)
         epoll_event e[32];
 #ifdef BRPC_ADDITIONAL_EPOLL
         // Performance downgrades in examples.
@@ -289,10 +203,6 @@ void EventDispatcher::Run() {
         }
 #else
         const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
-        struct kevent e[32];
-        int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
 #endif
         if (_stop) {
             // epoll_ctl/epoll_wait should have some sort of memory fencing
@@ -305,15 +215,10 @@ void EventDispatcher::Run() {
                 // We've checked _stop, no wake-up will be missed.
                 continue;
             }
-#if defined(OS_LINUX)
             PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
-            PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
             break;
         }
         for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
             if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
 #ifdef BRPC_SOCKET_HAS_EOF
                 || (e[i].events & has_epollrdhup)
@@ -323,58 +228,14 @@ void EventDispatcher::Run() {
                 Socket::StartInputEvent(e[i].data.u64, e[i].events,
                                         _consumer_thread_attr);
             }
-#elif defined(OS_MACOSX)
-            if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
-                // We don't care about the return value.
-                Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
-                                        _consumer_thread_attr);
-            }
-#endif
         }
         for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
             if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
                 // We don't care about the return value.
                 Socket::HandleEpollOut(e[i].data.u64);
             }
-#elif defined(OS_MACOSX)
-            if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
-                // We don't care about the return value.
-                Socket::HandleEpollOut((SocketId)e[i].udata);
-            }
-#endif
         }
     }
 }
 
-static EventDispatcher* g_edisp = NULL;
-static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
-
-static void StopAndJoinGlobalDispatchers() {
-    for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
-        g_edisp[i].Stop();
-        g_edisp[i].Join();
-    }
-}
-void InitializeGlobalDispatchers() {
-    g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
-    for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
-        const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
-            BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
-        CHECK_EQ(0, g_edisp[i].Start(&attr));
-    }
-    // This atexit is will be run before g_task_control.stop() because above
-    // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
-    CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
-}
-
-EventDispatcher& GetGlobalEventDispatcher(int fd) {
-    pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
-    if (FLAGS_event_dispatcher_num == 1) {
-        return g_edisp[0];
-    }
-    int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
-    return g_edisp[index];
-}
-
 } // namespace brpc
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher_kqueue.cpp
similarity index 52%
copy from src/brpc/event_dispatcher.cpp
copy to src/brpc/event_dispatcher_kqueue.cpp
index 56b48a76..614cd3bc 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -16,51 +16,23 @@
 // under the License.
 
 
-#include <gflags/gflags.h>                            // DEFINE_int32
-#include "butil/compat.h"
-#include "butil/fd_utility.h"                         // make_close_on_exec
-#include "butil/logging.h"                            // LOG
-#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
-#include "bthread/bthread.h"                          // bthread_start_background
-#include "brpc/event_dispatcher.h"
-#ifdef BRPC_SOCKET_HAS_EOF
-#include "brpc/details/has_epollrdhup.h"
-#endif
-#include "brpc/reloadable_flags.h"
-#if defined(OS_MACOSX)
 #include <sys/types.h>
 #include <sys/event.h>
 #include <sys/time.h>
-#endif
 
 namespace brpc {
 
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
-
-DEFINE_bool(usercode_in_pthread, false, 
-            "Call user's callback in pthreads, use bthreads otherwise");
-
 EventDispatcher::EventDispatcher()
     : _epfd(-1)
     , _stop(false)
     , _tid(0)
     , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
 {
-#if defined(OS_LINUX)
-    _epfd = epoll_create(1024 * 1024);
-    if (_epfd < 0) {
-        PLOG(FATAL) << "Fail to create epoll";
-        return;
-    }
-#elif defined(OS_MACOSX)
     _epfd = kqueue();
     if (_epfd < 0) {
         PLOG(FATAL) << "Fail to create kqueue";
         return;
     }
-#else
-    #error Not implemented
-#endif
     CHECK_EQ(0, butil::make_close_on_exec(_epfd));
 
     _wakeup_fds[0] = -1;
@@ -86,11 +58,7 @@ EventDispatcher::~EventDispatcher() {
 
 int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
     if (_epfd < 0) {
-#if defined(OS_LINUX)
-        LOG(FATAL) << "epoll was not created";
-#elif defined(OS_MACOSX)
         LOG(FATAL) << "kqueue was not created";
-#endif
         return -1;
     }
     
@@ -100,14 +68,14 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
         return -1;
     }
 
-    // Set _consumer_thread_attr before creating epoll/kqueue thread to make sure
+    // Set _consumer_thread_attr before creating kqueue thread to make sure
     // everyting seems sane to the thread.
     _consumer_thread_attr = (consumer_thread_attr  ?
                              *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
 
     //_consumer_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread
-    // that created by epoll_wait() never to quit.
-    _epoll_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
+    // that created by kevent() never to quit.
+    bthread_attr_t kqueue_thread_attr = _consumer_thread_attr | BTHREAD_NEVER_QUIT;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
@@ -115,9 +83,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
     // is also a potential issue for consumer threads, using the same attr
     // should be a reasonable solution.
     int rc = bthread_start_background(
-        &_tid, &_epoll_thread_attr, RunThis, this);
+        &_tid, &kqueue_thread_attr, RunThis, this);
     if (rc) {
-        LOG(FATAL) << "Fail to create epoll/kqueue thread: " << berror(rc);
+        LOG(FATAL) << "Fail to create kqueue thread: " << berror(rc);
         return -1;
     }
     return 0;
@@ -131,15 +99,10 @@ void EventDispatcher::Stop() {
     _stop = true;
 
     if (_epfd >= 0) {
-#if defined(OS_LINUX)
-        epoll_event evt = { EPOLLOUT,  { NULL } };
-        epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
-#elif defined(OS_MACOSX)
         struct kevent kqueue_event;
         EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
                     0, 0, NULL);
         kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
-#endif
     }
 }
 
@@ -156,26 +119,6 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
         return -1;
     }
 
-#if defined(OS_LINUX)
-    epoll_event evt;
-    evt.data.u64 = socket_id;
-    evt.events = EPOLLOUT | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
-    evt.events |= has_epollrdhup;
-#endif
-    if (pollin) {
-        evt.events |= EPOLLIN;
-        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
-            // This fd has been removed from epoll via `RemoveConsumer',
-            // in which case errno will be ENOENT
-            return -1;
-        }
-    } else {
-        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
-            return -1;
-        }
-    }
-#elif defined(OS_MACOSX)
     struct kevent evt;
     //TODO(zhujiashun): add EV_EOF
     EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
@@ -190,25 +133,11 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
             return -1;
         }
     }
-#endif
     return 0;
 }
 
 int EventDispatcher::RemoveEpollOut(SocketId socket_id, 
                                     int fd, bool pollin) {
-#if defined(OS_LINUX)
-    if (pollin) {
-        epoll_event evt;
-        evt.data.u64 = socket_id;
-        evt.events = EPOLLIN | EPOLLET;
-#ifdef BRPC_SOCKET_HAS_EOF
-        evt.events |= has_epollrdhup;
-#endif
-        return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
-    } else {
-        return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
-    }
-#elif defined(OS_MACOSX)
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
     if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
@@ -220,8 +149,6 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id,
         return kevent(_epfd, &evt, 1, NULL, 0, NULL);
     }
     return 0;
-#endif
-    return -1;
 }
 
 int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
@@ -229,21 +156,10 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
         errno = EINVAL;
         return -1;
     }
-#if defined(OS_LINUX)
-    epoll_event evt;
-    evt.events = EPOLLIN | EPOLLET;
-    evt.data.u64 = socket_id;
-#ifdef BRPC_SOCKET_HAS_EOF
-    evt.events |= has_epollrdhup;
-#endif
-    return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-#elif defined(OS_MACOSX)
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
                 0, 0, (void*)socket_id);
     return kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
-    return -1;
 }
 
 int EventDispatcher::RemoveConsumer(int fd) {
@@ -253,22 +169,15 @@ int EventDispatcher::RemoveConsumer(int fd) {
     // Removing the consumer from dispatcher before closing the fd because
     // if process was forked and the fd is not marked as close-on-exec,
     // closing does not set reference count of the fd to 0, thus does not
-    // remove the fd from epoll. More badly, the fd will not be removable
-    // from epoll again! If the fd was level-triggered and there's data left,
-    // epoll_wait will keep returning events of the fd continuously, making
+    // remove the fd from kqueue More badly, the fd will not be removable
+    // from kqueue again! If the fd was level-triggered and there's data left,
+    // kevent will keep returning events of the fd continuously, making
     // program abnormal.
-#if defined(OS_LINUX)
-    if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
-        PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
-        return -1;
-    }
-#elif defined(OS_MACOSX)
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
     kevent(_epfd, &evt, 1, NULL, 0, NULL);
     EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
     kevent(_epfd, &evt, 1, NULL, 0, NULL);
-#endif
     return 0;
 }
 
@@ -279,25 +188,12 @@ void* EventDispatcher::RunThis(void* arg) {
 
 void EventDispatcher::Run() {
     while (!_stop) {
-#if defined(OS_LINUX)
-        epoll_event e[32];
-#ifdef BRPC_ADDITIONAL_EPOLL
-        // Performance downgrades in examples.
-        int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
-        if (n == 0) {
-            n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-        }
-#else
-        const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
-#endif
-#elif defined(OS_MACOSX)
         struct kevent e[32];
         int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
-#endif
         if (_stop) {
-            // epoll_ctl/epoll_wait should have some sort of memory fencing
-            // guaranteeing that we(after epoll_wait) see _stop set before
-            // epoll_ctl.
+            // EV_SET/kevent should have some sort of memory fencing
+            // guaranteeing that we(after kevent) see _stop set before
+            // EV_SET
             break;
         }
         if (n < 0) {
@@ -305,76 +201,23 @@ void EventDispatcher::Run() {
                 // We've checked _stop, no wake-up will be missed.
                 continue;
             }
-#if defined(OS_LINUX)
-            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
-#elif defined(OS_MACOSX)
             PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
-#endif
             break;
         }
         for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
-            if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
-#ifdef BRPC_SOCKET_HAS_EOF
-                || (e[i].events & has_epollrdhup)
-#endif
-                ) {
-                // We don't care about the return value.
-                Socket::StartInputEvent(e[i].data.u64, e[i].events,
-                                        _consumer_thread_attr);
-            }
-#elif defined(OS_MACOSX)
             if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
                 // We don't care about the return value.
                 Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
                                         _consumer_thread_attr);
             }
-#endif
         }
         for (int i = 0; i < n; ++i) {
-#if defined(OS_LINUX)
-            if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
-                // We don't care about the return value.
-                Socket::HandleEpollOut(e[i].data.u64);
-            }
-#elif defined(OS_MACOSX)
             if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
                 // We don't care about the return value.
                 Socket::HandleEpollOut((SocketId)e[i].udata);
             }
-#endif
         }
     }
 }
 
-static EventDispatcher* g_edisp = NULL;
-static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
-
-static void StopAndJoinGlobalDispatchers() {
-    for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
-        g_edisp[i].Stop();
-        g_edisp[i].Join();
-    }
-}
-void InitializeGlobalDispatchers() {
-    g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
-    for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
-        const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
-            BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
-        CHECK_EQ(0, g_edisp[i].Start(&attr));
-    }
-    // This atexit is will be run before g_task_control.stop() because above
-    // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
-    CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
-}
-
-EventDispatcher& GetGlobalEventDispatcher(int fd) {
-    pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
-    if (FLAGS_event_dispatcher_num == 1) {
-        return g_edisp[0];
-    }
-    int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
-    return g_edisp[index];
-}
-
 } // namespace brpc


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org