You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by GitBox <gi...@apache.org> on 2022/07/13 11:19:13 UTC

[GitHub] [incubator-brpc] Tuvie opened a new pull request, #1836: enable brpc use rdma

Tuvie opened a new pull request, #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836

   Trying to merge rdma implementation into master.
   This implementation is little different from previous rdma branch.
   I remove some proved useless code and try to improve the handshake process.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] yanjianglu commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
yanjianglu commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r944078824


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    *(current_pos++) = butil::HostToNet16(block_size);
+    *(current_pos++) = butil::HostToNet16(sq_size);
+    *(current_pos++) = butil::HostToNet16(rq_size);
+    *(current_pos++) = butil::HostToNet16(lid);
+    memcpy(current_pos, gid.raw, 16);
+    uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16);
+    *qp_num_pos = butil::HostToNet32(qp_num);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    block_size = butil::NetToHost16(*current_pos++);
+    sq_size = butil::NetToHost16(*current_pos++);
+    rq_size = butil::NetToHost16(*current_pos++);
+    lid = butil::NetToHost16(*current_pos++);
+    memcpy(gid.raw, current_pos, 16);
+    qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
+}
+
+RdmaResource::RdmaResource() 
+    : qp(NULL)
+    , cq(NULL)
+    , comp_channel(NULL)
+    , next(NULL) { }
+
+RdmaResource::~RdmaResource() {
+    if (qp) {
+        IbvDestroyQp(qp);
+        qp = NULL;
+    }
+    if (cq) {
+        IbvDestroyCq(cq);
+        cq = NULL;
+    }
+    if (comp_channel) {
+        IbvDestroyCompChannel(comp_channel);
+        comp_channel = NULL;
+    }
+}
+
+RdmaEndpoint::RdmaEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _resource(NULL)
+    , _cq_events(0)
+    , _cq_sid(INVALID_SOCKET_ID)
+    , _sq_size(FLAGS_rdma_sq_size)
+    , _rq_size(FLAGS_rdma_rq_size)
+    , _sbuf()
+    , _rbuf()
+    , _rbuf_data()
+    , _remote_recv_block_size(0)
+    , _accumulated_ack(0)
+    , _unsolicited(0)
+    , _unsolicited_bytes(0)
+    , _sq_current(0)
+    , _sq_unsignaled(0)
+    , _sq_sent(0)
+    , _rq_received(0)
+    , _local_window_capacity(0)
+    , _remote_window_capacity(0)
+    , _window_size(0)
+    , _new_rq_wrs(0)
+{
+    if (_sq_size < MIN_QP_SIZE) {
+        _sq_size = MIN_QP_SIZE;
+    }
+    if (_rq_size < MIN_QP_SIZE) {
+        _rq_size = MIN_QP_SIZE;
+    }
+    _read_butex = bthread::butex_create_checked<butil::atomic<int> >();
+}
+
+RdmaEndpoint::~RdmaEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void RdmaEndpoint::Reset() {
+    DeallocateResources();
+
+    _cq_events = 0;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+    _sbuf.clear();
+    _rbuf.clear();
+    _rbuf_data.clear();
+    _accumulated_ack = 0;
+    _unsolicited = 0;
+    _sq_current = 0;
+    _sq_unsignaled = 0;
+    _local_window_capacity = 0;
+    _remote_window_capacity = 0;
+    _window_size.store(0, butil::memory_order_relaxed);
+    _new_rq_wrs = 0;
+    _sq_sent = 0;
+    _rq_received = 0;
+}
+
+void RdmaConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    CHECK(socket->_rdma_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsRdmaAvailable()) {
+        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+    } else {
+        s.release();
+    }
+}
+
+void RdmaConnect::StopConnect(Socket* socket) { }
+
+void RdmaConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
+    RdmaEndpoint* ep = m->_rdma_ep;
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsRdmaAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    m->_rdma_state = Socket::RDMA_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+
+bool HelloNegotiationValid(HelloMessage msg) {

Review Comment:
   这个函数为何单独放在这里,不再HelloMessage结构体呢?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] wwbmmm merged pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
wwbmmm merged PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] yanjianglu commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
yanjianglu commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r943239730


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    *(current_pos++) = butil::HostToNet16(block_size);
+    *(current_pos++) = butil::HostToNet16(sq_size);
+    *(current_pos++) = butil::HostToNet16(rq_size);
+    *(current_pos++) = butil::HostToNet16(lid);
+    memcpy(current_pos, gid.raw, 16);
+    uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16);
+    *qp_num_pos = butil::HostToNet32(qp_num);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    block_size = butil::NetToHost16(*current_pos++);
+    sq_size = butil::NetToHost16(*current_pos++);
+    rq_size = butil::NetToHost16(*current_pos++);
+    lid = butil::NetToHost16(*current_pos++);
+    memcpy(gid.raw, current_pos, 16);
+    qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
+}
+
+RdmaResource::RdmaResource() 
+    : qp(NULL)
+    , cq(NULL)
+    , comp_channel(NULL)
+    , next(NULL) { }
+
+RdmaResource::~RdmaResource() {
+    if (qp) {
+        IbvDestroyQp(qp);
+        qp = NULL;
+    }
+    if (cq) {
+        IbvDestroyCq(cq);
+        cq = NULL;
+    }
+    if (comp_channel) {
+        IbvDestroyCompChannel(comp_channel);
+        comp_channel = NULL;
+    }
+}
+
+RdmaEndpoint::RdmaEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _resource(NULL)
+    , _cq_events(0)
+    , _cq_sid(INVALID_SOCKET_ID)
+    , _sq_size(FLAGS_rdma_sq_size)
+    , _rq_size(FLAGS_rdma_rq_size)
+    , _sbuf()
+    , _rbuf()
+    , _rbuf_data()
+    , _remote_recv_block_size(0)
+    , _accumulated_ack(0)
+    , _unsolicited(0)
+    , _unsolicited_bytes(0)
+    , _sq_current(0)
+    , _sq_unsignaled(0)
+    , _sq_sent(0)
+    , _rq_received(0)
+    , _local_window_capacity(0)
+    , _remote_window_capacity(0)
+    , _window_size(0)
+    , _new_rq_wrs(0)
+{
+    if (_sq_size < MIN_QP_SIZE) {
+        _sq_size = MIN_QP_SIZE;
+    }
+    if (_rq_size < MIN_QP_SIZE) {
+        _rq_size = MIN_QP_SIZE;
+    }
+    _read_butex = bthread::butex_create_checked<butil::atomic<int> >();
+}
+
+RdmaEndpoint::~RdmaEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void RdmaEndpoint::Reset() {
+    DeallocateResources();
+
+    _cq_events = 0;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+    _sbuf.clear();
+    _rbuf.clear();
+    _rbuf_data.clear();
+    _accumulated_ack = 0;
+    _unsolicited = 0;
+    _sq_current = 0;
+    _sq_unsignaled = 0;
+    _local_window_capacity = 0;
+    _remote_window_capacity = 0;
+    _window_size.store(0, butil::memory_order_relaxed);
+    _new_rq_wrs = 0;
+    _sq_sent = 0;
+    _rq_received = 0;
+}
+
+void RdmaConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    CHECK(socket->_rdma_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsRdmaAvailable()) {
+        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+    } else {
+        s.release();
+    }
+}
+
+void RdmaConnect::StopConnect(Socket* socket) { }
+
+void RdmaConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
+    RdmaEndpoint* ep = m->_rdma_ep;
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsRdmaAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    m->_rdma_state = Socket::RDMA_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+
+bool HelloNegotiationValid(HelloMessage msg) {

Review Comment:
   参数是否应该使用引用(const HelloMessage&) 避免拷贝?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r936256208


##########
src/brpc/socket.cpp:
##########
@@ -626,6 +630,23 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
     m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
     m->_ssl_session = NULL;
     m->_ssl_ctx = options.initial_ssl_ctx;
+#if BRPC_WITH_RDMA
+    if (options.use_rdma) {
+        m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
+        if (!m->_rdma_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create RdmaEndpoint";
+            m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+                         berror(saved_errno));
+            return -1;
+        }
+        m->_rdma_state = RDMA_UNKNOWN;
+    } else {
+        delete m->_rdma_ep;

Review Comment:
   已修改



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r943382570


##########
src/brpc/rdma/rdma_endpoint.h:
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_RDMA_ENDPOINT_H
+#define BRPC_RDMA_ENDPOINT_H
+
+#if BRPC_WITH_RDMA
+#include <cstring>
+#include <iostream>
+#include <string>
+#include <vector>
+#include <infiniband/verbs.h>
+#include "butil/atomicops.h"
+#include "butil/iobuf.h"
+#include "butil/macros.h"
+#include "brpc/socket.h"
+
+
+namespace brpc {
+class Socket;
+namespace rdma {
+
+class RdmaConnect : public AppConnect {
+public:
+    void StartConnect(const Socket* socket, 
+            void (*done)(int err, void* data), void* data) override;
+    void StopConnect(Socket*) override;
+    struct RunGuard {
+        RunGuard(RdmaConnect* rc) { this_rc = rc; }
+        ~RunGuard() { if (this_rc) this_rc->Run(); }
+        RdmaConnect* this_rc;
+    };
+
+private:
+    void Run();
+    void (*_done)(int, void*);
+    void* _data;
+};
+
+struct RdmaResource {
+    ibv_qp* qp;
+    ibv_cq* cq;
+    ibv_comp_channel* comp_channel;
+    RdmaResource* next;
+    RdmaResource();
+    ~RdmaResource();
+};
+
+class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser {
+friend class RdmaConnect;
+friend class brpc::Socket;
+public:
+    RdmaEndpoint(Socket* s);
+    ~RdmaEndpoint();
+
+    // Global initialization
+    // Return 0 if success, -1 if failed and errno set
+    static int GlobalInitialize();
+
+    static void GlobalRelease();
+
+    // Reset the endpoint (for next use)
+    void Reset();
+
+    // Cut data from the given IOBuf list and use RDMA to send
+    // Return bytes cut if success, -1 if failed and errno set
+    ssize_t CutFromIOBufList(butil::IOBuf** data, size_t ndata);
+
+    // Whether the endpoint can send more data
+    bool IsWritable() const;
+
+    // For debug
+    void DebugInfo(std::ostream& os) const;
+
+    // Callback when there is new epollin event on TCP fd
+    static void OnNewDataFromTcp(Socket* m);
+
+private:
+    enum State {
+        UNINIT = 0x0,
+        C_ALLOC_QPCQ = 0x1,
+        C_HELLO_SEND = 0x2,
+        C_HELLO_WAIT = 0x3,
+        C_BRINGUP_QP = 0x4,
+        C_ACK_SEND = 0x5,
+        S_HELLO_WAIT = 0x11,
+        S_ALLOC_QPCQ = 0x12,
+        S_BRINGUP_QP = 0x13,
+        S_HELLO_SEND = 0x14,
+        S_ACK_WAIT = 0x15,
+        ESTABLISHED = 0x100,
+        FALLBACK_TCP = 0x200,
+        FAILED = 0x300
+    };
+
+    // Process handshake at the client
+    static void* ProcessHandshakeAtClient(void* arg);
+
+    // Process handshake at the server
+    static void* ProcessHandshakeAtServer(void* arg);
+
+    // Allocate resources
+    // Return 0 if success, -1 if failed and errno set
+    int AllocateResources();
+
+    // Release resources
+    void DeallocateResources();
+
+    // Send Imm data to the remote side
+    // Arguments:
+    //     imm: imm data in the WR
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int SendImm(uint32_t imm);
+
+    // Try to send pure ACK to the remote side
+    // Arguments:
+    //     num: the number of rq entry received
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int SendAck(int num);
+
+    // Handle CQE
+    // If wc is not RDMA RECV event:
+    //     return 0 if success, -1 if failed and errno set
+    // If wc is RDMA RECV event:
+    //     return bytes appended if success, -1 if failed and errno set
+    ssize_t HandleCompletion(ibv_wc& wc);
+
+    // Post a given number of WRs to Recv Queue
+    // If zerocopy is true, reallocate block.
+    // Return 0 if success, -1 if failed and errno set
+    int PostRecv(uint32_t num, bool zerocopy);
+
+    // Post a WR pointing to the block to the local Recv Queue
+    // Arguments:
+    //     block: the addr to receive data (ibv_sge.addr)
+    //     block_size: the maximum length can be received (ibv_sge.length)
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int DoPostRecv(void* block, size_t block_size);
+
+    // Read at most len bytes from fd in _socket to data
+    // wait for _read_butex if encounter EAGAIN
+    // return -1 if encounter other errno (including EOF)
+    int ReadFromFd(void* data, size_t len);
+
+
+    // Wrute at most len bytes from data to fd in _socket

Review Comment:
   Thanks



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    *(current_pos++) = butil::HostToNet16(block_size);
+    *(current_pos++) = butil::HostToNet16(sq_size);
+    *(current_pos++) = butil::HostToNet16(rq_size);
+    *(current_pos++) = butil::HostToNet16(lid);
+    memcpy(current_pos, gid.raw, 16);
+    uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16);
+    *qp_num_pos = butil::HostToNet32(qp_num);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    block_size = butil::NetToHost16(*current_pos++);
+    sq_size = butil::NetToHost16(*current_pos++);
+    rq_size = butil::NetToHost16(*current_pos++);
+    lid = butil::NetToHost16(*current_pos++);
+    memcpy(gid.raw, current_pos, 16);
+    qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
+}
+
+RdmaResource::RdmaResource() 
+    : qp(NULL)
+    , cq(NULL)
+    , comp_channel(NULL)
+    , next(NULL) { }
+
+RdmaResource::~RdmaResource() {
+    if (qp) {
+        IbvDestroyQp(qp);
+        qp = NULL;
+    }
+    if (cq) {
+        IbvDestroyCq(cq);
+        cq = NULL;
+    }
+    if (comp_channel) {
+        IbvDestroyCompChannel(comp_channel);
+        comp_channel = NULL;
+    }
+}
+
+RdmaEndpoint::RdmaEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _resource(NULL)
+    , _cq_events(0)
+    , _cq_sid(INVALID_SOCKET_ID)
+    , _sq_size(FLAGS_rdma_sq_size)
+    , _rq_size(FLAGS_rdma_rq_size)
+    , _sbuf()
+    , _rbuf()
+    , _rbuf_data()
+    , _remote_recv_block_size(0)
+    , _accumulated_ack(0)
+    , _unsolicited(0)
+    , _unsolicited_bytes(0)
+    , _sq_current(0)
+    , _sq_unsignaled(0)
+    , _sq_sent(0)
+    , _rq_received(0)
+    , _local_window_capacity(0)
+    , _remote_window_capacity(0)
+    , _window_size(0)
+    , _new_rq_wrs(0)
+{
+    if (_sq_size < MIN_QP_SIZE) {
+        _sq_size = MIN_QP_SIZE;
+    }
+    if (_rq_size < MIN_QP_SIZE) {
+        _rq_size = MIN_QP_SIZE;
+    }
+    _read_butex = bthread::butex_create_checked<butil::atomic<int> >();
+}
+
+RdmaEndpoint::~RdmaEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void RdmaEndpoint::Reset() {
+    DeallocateResources();
+
+    _cq_events = 0;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+    _sbuf.clear();
+    _rbuf.clear();
+    _rbuf_data.clear();
+    _accumulated_ack = 0;
+    _unsolicited = 0;
+    _sq_current = 0;
+    _sq_unsignaled = 0;
+    _local_window_capacity = 0;
+    _remote_window_capacity = 0;
+    _window_size.store(0, butil::memory_order_relaxed);
+    _new_rq_wrs = 0;
+    _sq_sent = 0;
+    _rq_received = 0;
+}
+
+void RdmaConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    CHECK(socket->_rdma_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsRdmaAvailable()) {
+        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+    } else {
+        s.release();
+    }
+}
+
+void RdmaConnect::StopConnect(Socket* socket) { }
+
+void RdmaConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
+    RdmaEndpoint* ep = m->_rdma_ep;
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsRdmaAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    m->_rdma_state = Socket::RDMA_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+
+bool HelloNegotiationValid(HelloMessage msg) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] wwbmmm commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
wwbmmm commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1216228279

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939476215


##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,

Review Comment:
   设立多个bucket还是希望尽量减少相互的竞争。考虑到本身各个bucket随机使用,相对比较平均,一个bucket没有空间了,则大概率其他bucket也快没有了,这时不如直接extend出来。从实际使用建议上看,不建议依赖中途extend的机制,这个会引发Lkey的动态查找。所以暂时也没有太多优化这里。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] yanjianglu commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
yanjianglu commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r943260172


##########
src/brpc/rdma/rdma_endpoint.h:
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_RDMA_ENDPOINT_H
+#define BRPC_RDMA_ENDPOINT_H
+
+#if BRPC_WITH_RDMA
+#include <cstring>
+#include <iostream>
+#include <string>
+#include <vector>
+#include <infiniband/verbs.h>
+#include "butil/atomicops.h"
+#include "butil/iobuf.h"
+#include "butil/macros.h"
+#include "brpc/socket.h"
+
+
+namespace brpc {
+class Socket;
+namespace rdma {
+
+class RdmaConnect : public AppConnect {
+public:
+    void StartConnect(const Socket* socket, 
+            void (*done)(int err, void* data), void* data) override;
+    void StopConnect(Socket*) override;
+    struct RunGuard {
+        RunGuard(RdmaConnect* rc) { this_rc = rc; }
+        ~RunGuard() { if (this_rc) this_rc->Run(); }
+        RdmaConnect* this_rc;
+    };
+
+private:
+    void Run();
+    void (*_done)(int, void*);
+    void* _data;
+};
+
+struct RdmaResource {
+    ibv_qp* qp;
+    ibv_cq* cq;
+    ibv_comp_channel* comp_channel;
+    RdmaResource* next;
+    RdmaResource();
+    ~RdmaResource();
+};
+
+class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser {
+friend class RdmaConnect;
+friend class brpc::Socket;
+public:
+    RdmaEndpoint(Socket* s);
+    ~RdmaEndpoint();
+
+    // Global initialization
+    // Return 0 if success, -1 if failed and errno set
+    static int GlobalInitialize();
+
+    static void GlobalRelease();
+
+    // Reset the endpoint (for next use)
+    void Reset();
+
+    // Cut data from the given IOBuf list and use RDMA to send
+    // Return bytes cut if success, -1 if failed and errno set
+    ssize_t CutFromIOBufList(butil::IOBuf** data, size_t ndata);
+
+    // Whether the endpoint can send more data
+    bool IsWritable() const;
+
+    // For debug
+    void DebugInfo(std::ostream& os) const;
+
+    // Callback when there is new epollin event on TCP fd
+    static void OnNewDataFromTcp(Socket* m);
+
+private:
+    enum State {
+        UNINIT = 0x0,
+        C_ALLOC_QPCQ = 0x1,
+        C_HELLO_SEND = 0x2,
+        C_HELLO_WAIT = 0x3,
+        C_BRINGUP_QP = 0x4,
+        C_ACK_SEND = 0x5,
+        S_HELLO_WAIT = 0x11,
+        S_ALLOC_QPCQ = 0x12,
+        S_BRINGUP_QP = 0x13,
+        S_HELLO_SEND = 0x14,
+        S_ACK_WAIT = 0x15,
+        ESTABLISHED = 0x100,
+        FALLBACK_TCP = 0x200,
+        FAILED = 0x300
+    };
+
+    // Process handshake at the client
+    static void* ProcessHandshakeAtClient(void* arg);
+
+    // Process handshake at the server
+    static void* ProcessHandshakeAtServer(void* arg);
+
+    // Allocate resources
+    // Return 0 if success, -1 if failed and errno set
+    int AllocateResources();
+
+    // Release resources
+    void DeallocateResources();
+
+    // Send Imm data to the remote side
+    // Arguments:
+    //     imm: imm data in the WR
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int SendImm(uint32_t imm);
+
+    // Try to send pure ACK to the remote side
+    // Arguments:
+    //     num: the number of rq entry received
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int SendAck(int num);
+
+    // Handle CQE
+    // If wc is not RDMA RECV event:
+    //     return 0 if success, -1 if failed and errno set
+    // If wc is RDMA RECV event:
+    //     return bytes appended if success, -1 if failed and errno set
+    ssize_t HandleCompletion(ibv_wc& wc);
+
+    // Post a given number of WRs to Recv Queue
+    // If zerocopy is true, reallocate block.
+    // Return 0 if success, -1 if failed and errno set
+    int PostRecv(uint32_t num, bool zerocopy);
+
+    // Post a WR pointing to the block to the local Recv Queue
+    // Arguments:
+    //     block: the addr to receive data (ibv_sge.addr)
+    //     block_size: the maximum length can be received (ibv_sge.length)
+    // Return:
+    //     0:   success
+    //     -1:  failed, errno set
+    int DoPostRecv(void* block, size_t block_size);
+
+    // Read at most len bytes from fd in _socket to data
+    // wait for _read_butex if encounter EAGAIN
+    // return -1 if encounter other errno (including EOF)
+    int ReadFromFd(void* data, size_t len);
+
+
+    // Wrute at most len bytes from data to fd in _socket

Review Comment:
   Wrute 拼写错误



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1236033096

   > LGTM.
   > 
   > @Tuvie 从上次提交到现在有什么bugfix吗?没有的话等 @wwbmmm 看看线上运行的稳定程度,没什么问题的话可以考虑合入了。
   
   目前暂时没有,也可以先等等


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939475608


##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);

Review Comment:
   是。这个改了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1236032754

   > LGTM.
   > 
   > @Tuvie 从上次提交到现在有什么bugfix吗?没有的话等 @wwbmmm 看看线上运行的稳定程度,没什么问题的话可以考虑合入了。
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] zyearn commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
zyearn commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1218423198

   > > @Tuvie 可以简单介绍一下代码结构和改动思路吗?文件有点多,有个整体的overview的话review起来会方便一些,谢谢。
   > 
   > 已在docs中添加相应描述
   
   感谢。目前正在review,文件有点多,看完需要些时间。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939473930


##########
src/brpc/server.cpp:
##########
@@ -701,6 +703,28 @@ static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
     return true;
 }
 
+#if BRPC_WITH_RDMA
+static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
+    if (opt->rtmp_service) {
+        LOG(WARNING) << "RTMP is not supported by RDMA";
+        return false;
+    }
+    if (opt->has_ssl_options()) {
+        LOG(WARNING) << "SSL is not supported by RDMA";
+        return false;
+    }
+    if (opt->nshead_service) {

Review Comment:
   可以支持,之前的版本也允许。但是事实上不建议这么做,因为http头本身也很重。所以这个版本给取消了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r945531228


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1236465891

   > @Tuvie 有conflicts,需要merge master重新提交一下
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] yanjianglu commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
yanjianglu commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r944078579


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);

Review Comment:
   Serialize 是否可以设为常量成员函数呢?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1211871136

   > 后续是否支持同一个进程中启动多个brpc server,并分别绑定到不同的rdma网卡?
   
   这个可以在后续支持


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r936256078


##########
src/brpc/channel.cpp:
##########
@@ -149,6 +169,16 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
         LOG(ERROR) << "Channel does not support the protocol";
         return -1;
     }
+
+#if BRPC_WITH_RDMA

Review Comment:
   已调整为报错



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r936255855


##########
CMakeLists.txt:
##########
@@ -68,6 +69,12 @@ if(WITH_THRIFT)
     set(THRIFT_LIB "thrift")
 endif()
 
+set(WITH_RDMA_VAL "0")
+if(WITH_RDMA)
+    set(WITH_RDMA_VAL "1")
+    set(BRPC_WITH_RDMA 1)

Review Comment:
   应该是没用的,我删掉



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] wwbmmm commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
wwbmmm commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r935263314


##########
src/brpc/channel.cpp:
##########
@@ -149,6 +169,16 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
         LOG(ERROR) << "Channel does not support the protocol";
         return -1;
     }
+
+#if BRPC_WITH_RDMA

Review Comment:
   没有BRPC_WITH_RDMA宏的时候,如果用户设置options里的use_rdma,是不是应该报错



##########
CMakeLists.txt:
##########
@@ -68,6 +69,12 @@ if(WITH_THRIFT)
     set(THRIFT_LIB "thrift")
 endif()
 
+set(WITH_RDMA_VAL "0")
+if(WITH_RDMA)
+    set(WITH_RDMA_VAL "1")
+    set(BRPC_WITH_RDMA 1)

Review Comment:
   这个有用到吗



##########
src/brpc/socket.cpp:
##########
@@ -626,6 +630,23 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
     m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
     m->_ssl_session = NULL;
     m->_ssl_ctx = options.initial_ssl_ctx;
+#if BRPC_WITH_RDMA
+    if (options.use_rdma) {
+        m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
+        if (!m->_rdma_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create RdmaEndpoint";
+            m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+                         berror(saved_errno));
+            return -1;
+        }
+        m->_rdma_state = RDMA_UNKNOWN;
+    } else {
+        delete m->_rdma_ep;

Review Comment:
   同厂内评论,确保这里一定是NULL



##########
src/brpc/input_messenger.cpp:
##########
@@ -85,6 +86,15 @@ ParseResult InputMessenger::CutInputMessage(
                     << " bytes, the connection will be closed."
                     " Set max_body_size to allow bigger messages";
                 return result;
+            } else {
+                if (m->_read_buf.size() >= 4) {
+                char data[4];

Review Comment:
   缩进有问题



##########
src/brpc/input_messenger.cpp:
##########
@@ -85,6 +86,15 @@ ParseResult InputMessenger::CutInputMessage(
                     << " bytes, the connection will be closed."
                     " Set max_body_size to allow bigger messages";
                 return result;
+            } else {
+                if (m->_read_buf.size() >= 4) {
+                char data[4];
+                m->_read_buf.copy_to_cstr(data, 4);
+                if (strncmp(data, "RDMA", 4) == 0 && m->_rdma_state == Socket::RDMA_OFF) {

Review Comment:
   这个 4 能否定义成常量



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] trevor211 commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
trevor211 commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r974982087


##########
example/rdma_performance/client.cpp:
##########
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/atomicops.h"
+#include "butil/fast_rand.h"
+#include "butil/logging.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/server.h"
+#include "brpc/channel.h"
+#include "bthread/bthread.h"
+#include "bvar/latency_recorder.h"
+#include "bvar/variable.h"
+#include "test.pb.h"
+
+DEFINE_int32(thread_num, 0, "How many threads are used");
+DEFINE_int32(queue_depth, 1, "How many requests can be pending in the queue");
+DEFINE_int32(expected_qps, 0, "The expected QPS");
+DEFINE_int32(max_thread_num, 16, "The max number of threads are used");
+DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)");
+DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo");
+DEFINE_string(connection_type, "single", "Connection type of the channel");
+DEFINE_string(protocol, "baidu_std", "Protocol type.");
+DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers");
+DEFINE_bool(use_rdma, true, "Use RDMA or not");
+DEFINE_int32(rpc_timeout_ms, 2000, "RPC call timeout");
+DEFINE_int32(test_seconds, 20, "Test running time");
+DEFINE_int32(test_iterations, 0, "Test iterations");
+
+bvar::LatencyRecorder g_latency_recorder("client");
+bvar::LatencyRecorder g_server_cpu_recorder("server_cpu");
+bvar::LatencyRecorder g_client_cpu_recorder("client_cpu");
+butil::atomic<uint64_t> g_last_time(0);
+butil::atomic<uint64_t> g_total_bytes;
+butil::atomic<uint64_t> g_total_cnt;
+std::vector<std::string> g_servers;
+int rr_index = 0;
+volatile bool g_stop = false;
+
+butil::atomic<int64_t> g_token(10000);
+
+static void* GenerateToken(void* arg) {
+    int64_t start_time = butil::monotonic_time_ns();
+    int64_t accumulative_token = g_token.load(butil::memory_order_relaxed);
+    while (!g_stop) {
+        bthread_usleep(100000);
+        int64_t now = butil::monotonic_time_ns();
+        if (accumulative_token * 1000000000 / (now - start_time) < FLAGS_expected_qps) {
+            int64_t delta = FLAGS_expected_qps * (now - start_time) / 1000000000 - accumulative_token;
+            g_token.fetch_add(delta, butil::memory_order_relaxed);
+            accumulative_token += delta;
+        }
+    }
+    return NULL;
+}
+
+class PerformanceTest {
+public:
+    PerformanceTest(int attachment_size, bool echo_attachment)
+        : _addr(NULL)
+        , _channel(NULL)
+        , _start_time(0)
+        , _iterations(0)
+        , _stop(false)
+    {
+        if (attachment_size > 0) {
+            _addr = malloc(attachment_size);
+            butil::fast_rand_bytes(_addr, attachment_size);
+            _attachment.append(_addr, attachment_size);
+        }
+        _echo_attachment = echo_attachment;
+    }
+
+    ~PerformanceTest() {
+        if (_addr) {
+            free(_addr);
+        }
+        delete _channel;
+    }
+
+    inline bool IsStop() { return _stop; }
+
+    int Init() {
+        brpc::ChannelOptions options;
+        options.use_rdma = FLAGS_use_rdma;
+        options.protocol = FLAGS_protocol;
+        options.connection_type = FLAGS_connection_type;
+        options.timeout_ms = FLAGS_rpc_timeout_ms;
+        options.max_retry = 0;
+        std::string server = g_servers[(rr_index++) % g_servers.size()];
+        _channel = new brpc::Channel();
+        if (_channel->Init(server.c_str(), &options) != 0) {
+            LOG(ERROR) << "Fail to initialize channel";
+            return -1;
+        }
+        brpc::Controller cntl;
+        test::PerfTestResponse response;
+        test::PerfTestRequest request;
+        request.set_echo_attachment(_echo_attachment);
+        test::PerfTestService_Stub stub(_channel);
+        stub.Test(&cntl, &request, &response, NULL);
+        if (cntl.Failed()) {
+            LOG(ERROR) << "RPC call failed: " << cntl.ErrorText();
+            return -1;
+        }
+        return 0;
+    }
+
+    struct RespClosure {
+        brpc::Controller* cntl;
+        test::PerfTestResponse* resp;
+        PerformanceTest* test;
+    };
+
+    void SendRequest() {
+        if (FLAGS_expected_qps > 0) {
+            while (g_token.load(butil::memory_order_relaxed) <= 0) {
+                bthread_usleep(10);
+            }
+            g_token.fetch_sub(1, butil::memory_order_relaxed);
+        }
+        RespClosure* closure = new RespClosure;
+        test::PerfTestRequest request;
+        closure->resp = new test::PerfTestResponse();
+        closure->cntl = new brpc::Controller();
+        request.set_echo_attachment(_echo_attachment);
+        closure->cntl->request_attachment().append(_attachment);
+        closure->test = this;
+        google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse, closure);
+        test::PerfTestService_Stub stub(_channel);
+        stub.Test(closure->cntl, &request, closure->resp, done);
+    }
+
+    static void HandleResponse(RespClosure* closure) {
+        std::unique_ptr<brpc::Controller> cntl_guard(closure->cntl);
+        std::unique_ptr<test::PerfTestResponse> response_guard(closure->resp);
+        if (closure->cntl->Failed()) {
+            LOG(ERROR) << "RPC call failed: " << closure->cntl->ErrorText();
+            closure->test->_stop = true;
+            return;
+        }
+
+        g_latency_recorder << closure->cntl->latency_us();
+        if (closure->resp->cpu_usage().size() > 0) {
+            g_server_cpu_recorder << atof(closure->resp->cpu_usage().c_str()) * 100;
+        }
+        g_total_bytes.fetch_add(closure->cntl->request_attachment().size(), butil::memory_order_relaxed);
+        g_total_cnt.fetch_add(1, butil::memory_order_relaxed);
+
+        cntl_guard.reset(NULL);
+        response_guard.reset(NULL);
+
+        if (closure->test->_iterations == 0 && FLAGS_test_iterations > 0) {
+            closure->test->_stop = true;
+            return;
+        }
+        --closure->test->_iterations;
+        uint64_t last = g_last_time.load(butil::memory_order_relaxed);
+        uint64_t now = butil::gettimeofday_us();
+        if (now > last && now - last > 100000) {
+            if (g_last_time.exchange(now, butil::memory_order_relaxed) == last) {
+                g_client_cpu_recorder << 
+                    atof(bvar::Variable::describe_exposed("process_cpu_usage").c_str()) * 100;
+            }
+        }
+        if (now - closure->test->_start_time > FLAGS_test_seconds * 1000000u) {
+            closure->test->_stop = true;
+            return;
+        }
+        closure->test->SendRequest();
+    }
+
+    static void* RunTest(void* arg) {
+        PerformanceTest* test = (PerformanceTest*)arg;
+        test->_start_time = butil::gettimeofday_us();
+        test->_iterations = FLAGS_test_iterations;
+        
+        for (int i = 0; i < FLAGS_queue_depth; ++i) {
+            test->SendRequest();
+        }
+
+        return NULL;
+    }
+
+private:
+    void* _addr;
+    brpc::Channel* _channel;
+    uint64_t _start_time;
+    uint32_t _iterations;
+    volatile bool _stop;
+    butil::IOBuf _attachment;
+    bool _echo_attachment;
+};
+
+static void* DeleteTest(void* arg) {
+    PerformanceTest* test = (PerformanceTest*)arg;
+    delete test;
+    return NULL;
+}
+
+void Test(int thread_num, int attachment_size) {
+    std::cout << "[Threads: " << thread_num
+        << ", Depth: " << FLAGS_queue_depth
+        << ", Attachment: " << attachment_size << "B"
+        << ", RDMA: " << (FLAGS_use_rdma ? "yes" : "no")
+        << ", Echo: " << (FLAGS_echo_attachment ? "yes]" : "no]")
+        << std::endl;
+    g_total_bytes.store(0, butil::memory_order_relaxed);
+    g_total_cnt.store(0, butil::memory_order_relaxed);
+    std::vector<PerformanceTest*> tests;
+    for (int k = 0; k < thread_num; ++k) {
+        PerformanceTest* t = new PerformanceTest(attachment_size, FLAGS_echo_attachment);
+        if (t->Init() < 0) {
+            exit(1);
+        }
+        tests.push_back(t);
+    }
+    uint64_t start_time = butil::gettimeofday_us();
+    bthread_t tid[thread_num];
+    if (FLAGS_expected_qps > 0) {
+        bthread_t tid;
+        bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, GenerateToken, NULL);
+    }
+    for (int k = 0; k < thread_num; ++k) {
+        bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL,
+                PerformanceTest::RunTest, tests[k]);
+    }
+    for (int k = 0; k < thread_num; ++k) {
+        while (!tests[k]->IsStop()) {
+            bthread_usleep(10000);
+        }
+    }
+    uint64_t end_time = butil::gettimeofday_us();
+    double throughput = g_total_bytes / 1.048576 / (end_time - start_time);
+    if (FLAGS_test_iterations == 0) {
+        std::cout << "Avg-Latency: " << g_latency_recorder.latency(10)
+            << ", 90th-Latency: " << g_latency_recorder.latency_percentile(0.9)
+            << ", 99th-Latency: " << g_latency_recorder.latency_percentile(0.99)
+            << ", 99.9th-Latency: " << g_latency_recorder.latency_percentile(0.999)
+            << ", Throughput: " << throughput << "MB/s"
+            << ", QPS: " << (g_total_cnt.load(butil::memory_order_relaxed) * 1000 / (end_time - start_time)) << "k"
+            << ", Server CPU-utilization: " << g_server_cpu_recorder.latency(10) << "\%"
+            << ", Client CPU-utilization: " << g_client_cpu_recorder.latency(10) << "\%"
+            << std::endl;
+    } else {
+        std::cout << " Throughput: " << throughput << "MB/s" << std::endl;
+    }
+    g_stop = true;
+    for (int k = 0; k < thread_num; ++k) {
+        bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL, DeleteTest, tests[k]);
+    }
+}
+
+int main(int argc, char* argv[]) {
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Initialize RDMA environment in advance.
+    if (FLAGS_use_rdma) {
+        brpc::rdma::GlobalRdmaInitializeOrDie();
+    }
+
+    brpc::StartDummyServerAt(8001);

Review Comment:
   Maybe we can make dummy port configurable by adding a gflag say `dummy_port`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] trevor211 commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
trevor211 commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r977579941


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    *(current_pos++) = butil::HostToNet16(block_size);
+    *(current_pos++) = butil::HostToNet16(sq_size);
+    *(current_pos++) = butil::HostToNet16(rq_size);
+    *(current_pos++) = butil::HostToNet16(lid);
+    memcpy(current_pos, gid.raw, 16);
+    uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16);
+    *qp_num_pos = butil::HostToNet32(qp_num);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    block_size = butil::NetToHost16(*current_pos++);
+    sq_size = butil::NetToHost16(*current_pos++);
+    rq_size = butil::NetToHost16(*current_pos++);
+    lid = butil::NetToHost16(*current_pos++);
+    memcpy(gid.raw, current_pos, 16);
+    qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
+}
+
+RdmaResource::RdmaResource() 
+    : qp(NULL)
+    , cq(NULL)
+    , comp_channel(NULL)
+    , next(NULL) { }
+
+RdmaResource::~RdmaResource() {
+    if (qp) {
+        IbvDestroyQp(qp);
+        qp = NULL;
+    }
+    if (cq) {
+        IbvDestroyCq(cq);
+        cq = NULL;
+    }
+    if (comp_channel) {
+        IbvDestroyCompChannel(comp_channel);
+        comp_channel = NULL;
+    }
+}
+
+RdmaEndpoint::RdmaEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _resource(NULL)
+    , _cq_events(0)
+    , _cq_sid(INVALID_SOCKET_ID)
+    , _sq_size(FLAGS_rdma_sq_size)
+    , _rq_size(FLAGS_rdma_rq_size)
+    , _sbuf()
+    , _rbuf()
+    , _rbuf_data()
+    , _remote_recv_block_size(0)
+    , _accumulated_ack(0)
+    , _unsolicited(0)
+    , _unsolicited_bytes(0)
+    , _sq_current(0)
+    , _sq_unsignaled(0)
+    , _sq_sent(0)
+    , _rq_received(0)
+    , _local_window_capacity(0)
+    , _remote_window_capacity(0)
+    , _window_size(0)
+    , _new_rq_wrs(0)
+{
+    if (_sq_size < MIN_QP_SIZE) {
+        _sq_size = MIN_QP_SIZE;
+    }
+    if (_rq_size < MIN_QP_SIZE) {
+        _rq_size = MIN_QP_SIZE;
+    }
+    _read_butex = bthread::butex_create_checked<butil::atomic<int> >();
+}
+
+RdmaEndpoint::~RdmaEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void RdmaEndpoint::Reset() {
+    DeallocateResources();
+
+    _cq_events = 0;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+    _sbuf.clear();
+    _rbuf.clear();
+    _rbuf_data.clear();
+    _accumulated_ack = 0;
+    _unsolicited = 0;
+    _sq_current = 0;
+    _sq_unsignaled = 0;
+    _local_window_capacity = 0;
+    _remote_window_capacity = 0;
+    _window_size.store(0, butil::memory_order_relaxed);
+    _new_rq_wrs = 0;
+    _sq_sent = 0;
+    _rq_received = 0;
+}
+
+void RdmaConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    CHECK(socket->_rdma_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsRdmaAvailable()) {
+        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+    } else {
+        s.release();
+    }
+}
+
+void RdmaConnect::StopConnect(Socket* socket) { }
+
+void RdmaConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
+    RdmaEndpoint* ep = m->_rdma_ep;
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsRdmaAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    m->_rdma_state = Socket::RDMA_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+
+bool HelloNegotiationValid(HelloMessage& msg) {
+    if (msg.hello_ver == g_rdma_hello_version &&
+        msg.impl_ver == g_rdma_impl_version &&
+        msg.block_size >= MIN_BLOCK_SIZE &&
+        msg.sq_size >= MIN_QP_SIZE &&
+        msg.rq_size >= MIN_QP_SIZE) {
+        // This can be modified for future compatibility
+        return true;
+    }
+    return false;
+}
+
+static const int WAIT_TIMEOUT_MS = 50;
+
+int RdmaEndpoint::ReadFromFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nr = 0;
+    size_t received = 0;
+    do {
+        const int expected_val = _read_butex->load(butil::memory_order_acquire);
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nr = read(_socket->fd(), (uint8_t*)data + received, len - received);
+        if (nr < 0) {
+            if (errno == EAGAIN) {
+                if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 0) {
+                    if (errno != EWOULDBLOCK && errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else if (nr == 0) {  // Got EOF
+            errno = EEOF;
+            return -1;
+        } else {
+            received += nr;
+        }
+    } while (received < len);
+    return 0;
+}
+
+int RdmaEndpoint::WriteToFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nw = 0;
+    size_t written = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nw = write(_socket->fd(), (uint8_t*)data + written, len - written);
+        if (nw < 0) {
+            if (errno == EAGAIN) {
+                if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) {
+                    if (errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else {
+            written += nw;
+        }
+    } while (written < len);
+    return 0;
+}
+
+inline void RdmaEndpoint::TryReadOnTcp() {
+    if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
+        if (_state == FALLBACK_TCP) {
+            InputMessenger::OnNewMessages(_socket);
+        } else if (_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(_socket);
+        }
+    }
+}
+
+void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
+    RdmaEndpoint* ep = static_cast<RdmaEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+    RdmaConnect::RunGuard rg((RdmaConnect*)s->_app_connect.get());
+
+    LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+        << "Start handshake on " << s->_local_side;
+
+    void* data = malloc(g_rdma_hello_msg_len);
+    if (!data) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // First initialize CQ and QP resources
+    ep->_state = C_ALLOC_QPCQ;
+    if (ep->AllocateResources() < 0) {
+        LOG(WARNING) << "Fallback to tcp:" << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->_state = FALLBACK_TCP;
+        return NULL;
+    }
+
+    // Send hello message to server
+    ep->_state = C_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_rdma_hello_msg_len;
+    local_msg.hello_ver = g_rdma_hello_version;
+    local_msg.impl_ver = g_rdma_impl_version;
+    local_msg.block_size = g_rdma_recv_block_size;
+    local_msg.sq_size = ep->_sq_size;
+    local_msg.rq_size = ep->_rq_size;
+    local_msg.lid = GetRdmaLid();
+    local_msg.gid = GetRdmaGid();
+    if (BAIDU_LIKELY(ep->_resource)) {
+        local_msg.qp_num = ep->_resource->qp->qp_num;
+    } else {
+        // Only happens in UT
+        local_msg.qp_num = 0;
+    }
+    memcpy(data, MAGIC_STR, 4);
+    local_msg.Serialize((char*)data + 4);
+    if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Check magic str
+    ep->_state = C_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get hello message from server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG(WARNING) << "Read unexpected data during handshake:" << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Read hello message from server
+    if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get Hello Message from server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from server:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized data
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
+                     << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+    } else {
+        ep->_remote_recv_block_size = remote_msg.block_size;
+        ep->_local_window_capacity = 
+            std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
+        ep->_remote_window_capacity = 
+            std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
+        ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+
+        ep->_state = C_BRINGUP_QP;
+        if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
+            LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description();
+            s->_rdma_state = Socket::RDMA_OFF;
+        } else {
+            s->_rdma_state = Socket::RDMA_ON;
+        }
+    }
+
+    // Send ACK message to server
+    ep->_state = C_ACK_SEND;
+    uint32_t flags = 0;
+    if (s->_rdma_state != Socket::RDMA_OFF) {
+        flags |= ACK_MSG_RDMA_OK;
+    }
+    *(uint32_t*)data = butil::HostToNet32(flags);
+    if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Ack Message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (s->_rdma_state == Socket::RDMA_ON) {
+        ep->_state = ESTABLISHED;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use rdma) on " << s->description();
+    } else {
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use tcp) on " << s->description();
+    }
+
+    errno = 0;
+
+    return NULL;
+}
+
+void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
+    RdmaEndpoint* ep = static_cast<RdmaEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+
+    LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+        << "Start handshake on " << s->description();
+
+    void* data = malloc(g_rdma_hello_msg_len);
+    if (!data) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to recv hello message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    ep->_state = S_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description() << " " << s->_remote_side;
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) << "It seems that the "
+            << "client does not use RDMA, fallback to TCP:"
+            << s->description();
+        // we need to copy data read back to _socket->_read_buf
+        s->_read_buf.append(data, MAGIC_STR_LEN);
+        ep->_state = FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->TryReadOnTcp();
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, g_rdma_hello_msg_len - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized header
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
+                     << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+    } else {
+        ep->_remote_recv_block_size = remote_msg.block_size;
+        ep->_local_window_capacity = 
+            std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
+        ep->_remote_window_capacity = 
+            std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
+        ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+
+        ep->_state = S_ALLOC_QPCQ;
+        if (ep->AllocateResources() < 0) {
+            LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
+                         << s->description();
+            s->_rdma_state = Socket::RDMA_OFF;
+        } else {
+            ep->_state = S_BRINGUP_QP;
+            if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
+                LOG(WARNING) << "Fail to bringup QP, fallback to tcp:"
+                             << s->description();
+                s->_rdma_state = Socket::RDMA_OFF;
+            }
+        }
+    }
+
+    // Send hello message to client
+    ep->_state = S_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_rdma_hello_msg_len;
+    if (s->_rdma_state == Socket::RDMA_OFF) {
+        local_msg.impl_ver = 0;
+        local_msg.hello_ver = 0;
+    } else {
+        local_msg.lid = GetRdmaLid();
+        local_msg.gid = GetRdmaGid();
+        local_msg.block_size = g_rdma_recv_block_size;
+        local_msg.sq_size = ep->_sq_size;
+        local_msg.rq_size = ep->_rq_size;
+        local_msg.hello_ver = g_rdma_hello_version;
+        local_msg.impl_ver = g_rdma_impl_version;
+        if (BAIDU_LIKELY(ep->_resource)) {
+            local_msg.qp_num = ep->_resource->qp->qp_num;
+        } else {
+            // Only happens in UT
+            local_msg.qp_num = 0;
+        }
+    }
+    memcpy(data, MAGIC_STR, 4);
+    local_msg.Serialize((char*)data + 4);
+    if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Recv ACK Message
+    ep->_state = S_ACK_WAIT;
+    if (ep->ReadFromFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read ack message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Check RDMA enable flag
+    uint32_t flags = butil::NetToHost32(*(uint32_t*)data);
+    if (flags & ACK_MSG_RDMA_OK) {
+        if (s->_rdma_state == Socket::RDMA_OFF) {
+            LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                         << s->description();
+            s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                    s->description().c_str(), berror(EPROTO));
+            ep->_state = FAILED;
+            return NULL;
+        } else {
+            s->_rdma_state = Socket::RDMA_ON;
+            ep->_state = ESTABLISHED;
+            LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+                << "Handshake ends (use rdma) on " << s->description();
+        }
+    } else {
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use tcp) on " << s->description();
+    }
+ 
+    ep->TryReadOnTcp();
+
+    return NULL;
+}
+
+bool RdmaEndpoint::IsWritable() const {
+    if (BAIDU_UNLIKELY(g_skip_rdma_init)) {
+        // Just for UT
+        return false;
+    }
+
+    return _window_size.load(butil::memory_order_relaxed) > 0;
+}
+
+// RdmaIOBuf inherits from IOBuf to provide a new function.
+// The reason is that we need to use some protected member function of IOBuf.
+class RdmaIOBuf : public butil::IOBuf {
+friend class RdmaEndpoint;
+private:
+    // Cut the current IOBuf to ibv_sge list and `to' for at most first max_sge
+    // blocks or first max_len bytes.
+    // Return: the bytes included in the sglist, or -1 if failed
+    ssize_t cut_into_sglist_and_iobuf(ibv_sge* sglist, size_t* sge_index,
+            butil::IOBuf* to, size_t max_sge, size_t max_len) {
+        size_t len = 0;
+        while (*sge_index < max_sge) {
+            if (len == max_len || _ref_num() == 0) {
+                break;
+            }
+            butil::IOBuf::BlockRef const& r = _ref_at(0);
+            CHECK(r.length > 0);
+            const void* start = fetch1();
+            uint32_t lkey = GetLKey((char*)start - r.offset);
+            if (lkey == 0) {
+                LOG(WARNING) << "Memory not registered for rdma. "
+                             << "Is this iobuf allocated before calling "
+                             << "GlobalRdmaInitializeOrDie? Or just forget to "
+                             << "call RegisterMemoryForRdma for your own buffer?";
+                errno = ERDMAMEM;
+                return -1;
+            }
+            size_t i = *sge_index;
+            if (len + r.length > max_len) {
+                // Split the block to comply with size for receiving
+                sglist[i].length = max_len - len;
+                len = max_len;
+            } else {
+                sglist[i].length = r.length;
+                len += r.length;
+            }
+            sglist[i].addr = (uint64_t)start;
+            sglist[i].lkey = lkey;
+            cutn(to, sglist[i].length);
+            (*sge_index)++;
+        }
+        return len;
+    }
+};
+
+// Note this function is coupled with the implementation of IOBuf
+ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+    if (BAIDU_UNLIKELY(g_skip_rdma_init)) {
+        // Just for UT
+        errno = EAGAIN;
+        return -1;
+    }
+
+    CHECK(from != NULL);
+    CHECK(ndata > 0);
+
+    size_t total_len = 0;
+    size_t current = 0;
+    uint32_t window = 0;
+    ibv_send_wr wr;
+    int max_sge = GetRdmaMaxSge();
+    ibv_sge sglist[max_sge];
+    while (current < ndata) {
+        window = _window_size.load(butil::memory_order_relaxed);
+        if (window == 0) {
+            if (total_len > 0) {
+                break;
+            } else {
+                errno = EAGAIN;
+                return -1;
+            }
+        }
+        butil::IOBuf* to = &_sbuf[_sq_current];
+        size_t this_len = 0;
+
+        memset(&wr, 0, sizeof(wr));
+        wr.sg_list = sglist;
+        wr.opcode = IBV_WR_SEND_WITH_IMM;
+
+        RdmaIOBuf* data = (RdmaIOBuf*)from[current];
+        size_t sge_index = 0;
+        while (sge_index < (uint32_t)max_sge &&
+                this_len < _remote_recv_block_size) {
+            if (data->size() == 0) {
+                // The current IOBuf is empty, find next one
+                ++current;
+                if (current == ndata) {
+                    break;
+                }
+                data = (RdmaIOBuf*)from[current];
+                continue;
+            }
+
+            ssize_t len = data->cut_into_sglist_and_iobuf(
+                    sglist, &sge_index, to, max_sge,
+                    _remote_recv_block_size - this_len);
+            if (len < 0) {
+                return -1;
+            }
+            CHECK(len > 0);
+            this_len += len;
+            total_len += len;
+        }
+        if (this_len == 0) {
+            continue;
+        }
+
+        wr.num_sge = sge_index;
+
+        uint32_t imm = _new_rq_wrs.exchange(0, butil::memory_order_relaxed);
+        wr.imm_data = butil::HostToNet32(imm);
+        // Avoid too much recv completion event to reduce the cpu overhead
+        bool solicited = false;
+        if (window == 1 || current + 1 >= ndata) {
+            // Only last message in the write queue or last message in the
+            // current window will be flagged as solicited.
+            solicited = true;
+        } else {
+            if (_unsolicited > _local_window_capacity / 4) {
+                // Make sure the recv side can be signaled to return ack
+                solicited = true;
+            } else if (_accumulated_ack > _remote_window_capacity / 4) {
+                // Make sure the recv side can be signaled to handle ack
+                solicited = true;
+            } else if (_unsolicited_bytes > 1048576) {
+                // Make sure the recv side can be signaled when it receives enough data
+                solicited = true;
+            } else {
+                ++_unsolicited;
+                _unsolicited_bytes += this_len;
+                _accumulated_ack += imm;
+            }
+        }
+        if (solicited) {
+            wr.send_flags |= IBV_SEND_SOLICITED;
+            _unsolicited = 0;
+            _unsolicited_bytes = 0;
+            _accumulated_ack = 0;
+        }
+
+        // Avoid too much send completion event to reduce the CPU overhead
+        ++_sq_unsignaled;
+        if (_sq_unsignaled >= _local_window_capacity / 4) {
+            // Refer to:
+            // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
+            wr.send_flags |= IBV_SEND_SIGNALED;
+            _sq_unsignaled = 0;
+        }
+
+        ibv_send_wr* bad = NULL;
+        if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
+            // We use other way to guarantee the Send Queue is not full.
+            // So we just consider this error as an unrecoverable error.
+            PLOG(WARNING) << "Fail to ibv_post_send";
+            return -1;
+        }
+
+        ++_sq_current;
+        if (_sq_current == _sq_size - RESERVED_WR_NUM) {
+            _sq_current = 0;
+        }
+
+        // Update _window_size. Note that _window_size will never be negative.
+        // Because there is at most one thread can enter this function for each
+        // Socket, and the other thread of HandleCompletion can only add this
+        // counter.
+        _window_size.fetch_sub(1, butil::memory_order_relaxed);
+    }
+
+    return total_len;
+}
+
+int RdmaEndpoint::SendAck(int num) {
+    if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) {
+        return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
+    }
+    return 0;
+}
+
+int RdmaEndpoint::SendImm(uint32_t imm) {
+    if (imm == 0) {
+        return 0;
+    }
+
+    ibv_send_wr wr;
+    memset(&wr, 0, sizeof(wr));
+    wr.opcode = IBV_WR_SEND_WITH_IMM;
+    wr.imm_data = butil::HostToNet32(imm);
+    wr.send_flags |= IBV_SEND_SOLICITED;
+    wr.send_flags |= IBV_SEND_SIGNALED;
+
+    ibv_send_wr* bad = NULL;
+    if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
+        // We use other way to guarantee the Send Queue is not full.
+        // So we just consider this error as an unrecoverable error.
+        PLOG(WARNING) << "Fail to ibv_post_send";
+        return -1;
+    }
+    return 0;
+}
+
+ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
+    bool zerocopy = FLAGS_rdma_recv_zerocopy;
+    switch (wc.opcode) {
+    case IBV_WC_SEND: {  // send completion
+        // Do nothing
+        break;
+    }
+    case IBV_WC_RECV: {  // recv completion
+        // Please note that only the first wc.byte_len bytes is valid
+        if (wc.byte_len > 0) {
+            if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) {
+                zerocopy = false;
+            }
+            CHECK(_state != FALLBACK_TCP);
+            if (zerocopy) {
+                butil::IOBuf tmp;
+                _rbuf[_rq_received].cutn(&tmp, wc.byte_len);
+                _socket->_read_buf.append(tmp);
+            } else {
+                // Copy data when the receive data is really small
+                _socket->_read_buf.append(_rbuf_data[_rq_received], wc.byte_len);
+            }
+        }
+        if (wc.imm_data > 0) {
+            // Clear sbuf here because we ignore event wakeup for send completions
+            uint32_t acks = butil::NetToHost32(wc.imm_data);
+            uint32_t num = acks;
+            while (num > 0) {
+                _sbuf[_sq_sent++].clear();
+                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                    _sq_sent = 0;
+                }
+                --num;
+            }
+            butil::subtle::MemoryBarrier();
+
+            // Update window
+            uint32_t wnd_thresh = _local_window_capacity / 8;
+            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+                    || acks >= wnd_thresh) {
+                // Do not wake up writing thread right after _window_size > 0.
+                // Otherwise the writing thread may switch to background too quickly.
+                _socket->WakeAsEpollOut();
+            }
+        }
+        // We must re-post recv WR
+        if (PostRecv(1, zerocopy) < 0) {
+            return -1;
+        }
+        if (wc.byte_len > 0) {
+            SendAck(1);
+        }
+        return wc.byte_len;
+    }
+    default:
+        CHECK(false) << "This should not happen";

Review Comment:
   ```suggestion
           // Do not CHECK abort since wc.opcode could be IBV_WC_DRIVER2(136) which is
           // a bug already fixed by rdma-core.
           // FYI: https://github.com/linux-rdma/rdma-core/commit/4c905646de3e75bdccada4abe9f0d273d76eaf50
           LOG(WARNING) << "This should not happen, wc.opcode = " << wc.opcode;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie closed pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie closed pull request #1836: enable brpc use rdma
URL: https://github.com/apache/incubator-brpc/pull/1836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939473980


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {

Review Comment:
   这个是握手协议,发送的第一个信息命名为Hello



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1216172836

   > @Tuvie 可以简单介绍一下代码结构和改动思路吗?文件有点多,有个整体的overview的话review起来会方便一些,谢谢。
   
   已在docs中添加相应描述


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] trevor211 commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
trevor211 commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r974975514


##########
example/rdma_performance/CMakeLists.txt:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(rdma_performance C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+    COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+    OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER test.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+# Search for libthrift* by best effort. If it is not found and brpc is
+# compiled with thrift protocol enabled, a link error would be reported.
+find_library(THRIFT_LIB NAMES thrift)
+if (NOT THRIFT_LIB)
+    set(THRIFT_LIB "")
+endif()
+find_library(THRIFTNB_LIB NAMES thriftnb)
+if (NOT THRIFTNB_LIB)
+    set(THRIFTNB_LIB "")
+endif()
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+    find_library(BRPC_LIB NAMES brpc)
+else()
+    find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+    message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+    message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+    COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'"
+    OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+    execute_process(
+        COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'"
+        OUTPUT_VARIABLE GFLAGS_NS
+    )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    include(CheckFunctionExists)
+    CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+    if(NOT HAVE_CLOCK_GETTIME)
+        set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+    endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS} -DBRPC_WITH_RDMA=1")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+    if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+    endif()
+else()
+    set(CMAKE_CXX_STANDARD 11)
+    set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+    message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+    set(OPENSSL_ROOT_DIR
+        "/usr/local/opt/openssl"    # Homebrew installed OpenSSL
+        )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+find_path(RDMA_INCLUDE_PATH NAMES infiniband/verbs.h)
+find_library(RDMA_LIB NAMES ibverbs)
+if ((NOT RDMA_INCLUDE_PATH) OR (NOT RDMA_LIB))
+    message(FATAL_ERROR "Fail to find ibverbs")
+endif()
+
+set(DYNAMIC_LIB
+    ${CMAKE_THREAD_LIBS_INIT}
+    ${GFLAGS_LIBRARY}
+    ${PROTOBUF_LIBRARIES}
+    ${LEVELDB_LIB}
+    ${OPENSSL_CRYPTO_LIBRARY}
+    ${OPENSSL_SSL_LIBRARY}
+    ${THRIFT_LIB}
+    ${THRIFTNB_LIB}

Review Comment:
   We should consider glog library. If brpc was compiled with WITH_GLOG set to ON, rdma_performace won't link successfully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] zyearn commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
zyearn commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1203666218

   @Tuvie 可以简单介绍一下代码结构和改动思路吗?文件有点多,有个整体的overview的话review起来会方便一些,谢谢。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r939476496


##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
+                                 block_type)) {
+                LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
+                                        << "You can set the size of memory pool larger. "
+                                        << "Refer to the help message of these flags: "
+                                        << "rdma_memory_pool_initial_size_mb, "
+                                        << "rdma_memory_pool_increase_size_mb, "
+                                        << "rdma_memory_pool_max_regions.";
+                if (locked) {
+                    g_dump_mutex->unlock();
+                }
+                return NULL;
+            }
+            node = g_info->idle_list[block_type][index];
+        }
+    }
+    if (node) {
+        ptr = node->start;
+        if (node->len > g_block_size[block_type]) {
+            node->start = (char*)node->start + g_block_size[block_type];
+            node->len -= g_block_size[block_type];
+        } else {
+            g_info->idle_list[block_type][index] = node->next;
+            butil::return_object<IdleNode>(node);
+        }
+        g_info->idle_size[block_type][index] -= g_block_size[block_type];
+    } else {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return NULL;
+    }
+
+    // Move more blocks from global list to tls list
+    if (block_type == 0) {
+        node = g_info->idle_list[0][index];
+        tls_idle_list = node;

Review Comment:
   这个block pool是为了做block分配前的thread local。iobuf里面的tls是分配出来以后的thread local。实际上iobuf里面不少操作并没有用到里面的tls cache。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] wwbmmm commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
wwbmmm commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r935457002


##########
src/brpc/server.cpp:
##########
@@ -701,6 +703,28 @@ static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
     return true;
 }
 
+#if BRPC_WITH_RDMA
+static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
+    if (opt->rtmp_service) {
+        LOG(WARNING) << "RTMP is not supported by RDMA";
+        return false;
+    }
+    if (opt->has_ssl_options()) {
+        LOG(WARNING) << "SSL is not supported by RDMA";
+        return false;
+    }
+    if (opt->nshead_service) {

Review Comment:
   http_master_service是不是也不支持



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
+                                 block_type)) {
+                LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
+                                        << "You can set the size of memory pool larger. "
+                                        << "Refer to the help message of these flags: "
+                                        << "rdma_memory_pool_initial_size_mb, "
+                                        << "rdma_memory_pool_increase_size_mb, "
+                                        << "rdma_memory_pool_max_regions.";
+                if (locked) {
+                    g_dump_mutex->unlock();
+                }
+                return NULL;
+            }
+            node = g_info->idle_list[block_type][index];
+        }
+    }
+    if (node) {
+        ptr = node->start;
+        if (node->len > g_block_size[block_type]) {
+            node->start = (char*)node->start + g_block_size[block_type];
+            node->len -= g_block_size[block_type];
+        } else {
+            g_info->idle_list[block_type][index] = node->next;
+            butil::return_object<IdleNode>(node);
+        }
+        g_info->idle_size[block_type][index] -= g_block_size[block_type];
+    } else {
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return NULL;
+    }
+
+    // Move more blocks from global list to tls list
+    if (block_type == 0) {
+        node = g_info->idle_list[0][index];
+        tls_idle_list = node;

Review Comment:
   iobuf内部就有一个tls的block缓存,这里的tls_idle_list有什么不一样吗?



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);
+static Callback g_cb = NULL;
+
+// Number of bytes in 1MB
+static const size_t BYTES_IN_MB = 1048576;
+
+static const int BLOCK_DEFAULT = 0; // 8KB
+static const int BLOCK_LARGE = 1;  // 64KB
+static const int BLOCK_HUGE = 2;  // 2MB
+static const int BLOCK_SIZE_COUNT = 3;
+static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };
+
+struct IdleNode {
+    void* start;
+    size_t len;
+    IdleNode* next;
+};
+
+struct Region {
+    Region() { start = 0; }
+    uintptr_t start;
+    size_t size;
+    uint32_t block_type;
+    uint32_t id;  // lkey
+};
+
+static const int32_t RDMA_MEMORY_POOL_MIN_REGIONS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_REGIONS = 16;
+static Region g_regions[RDMA_MEMORY_POOL_MAX_REGIONS];
+static int g_region_num = 0;
+
+static const int32_t RDMA_MEMORY_POOL_MIN_SIZE = 32;  // 16MB
+static const int32_t RDMA_MEMORY_POOL_MAX_SIZE = 1048576;  // 1TB
+
+static const int32_t RDMA_MEMORY_POOL_MIN_BUCKETS = 1;
+static const int32_t RDMA_MEMORY_POOL_MAX_BUCKETS = 16;
+static size_t g_buckets = 1;
+
+static bool g_dump_enable = false;
+static butil::Mutex* g_dump_mutex = NULL;
+
+// Only for default block size
+static __thread IdleNode* tls_idle_list = NULL;
+static __thread size_t tls_idle_num = 0;
+static __thread bool tls_inited = false;
+static butil::Mutex* g_tls_info_mutex = NULL;
+static size_t g_tls_info_cnt = 0;
+static size_t* g_tls_info[1024];
+
+// For each block size, there are some buckets of idle list to reduce race.
+struct GlobalInfo {
+    std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
+    std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
+    std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
+    butil::Mutex extend_lock;
+};
+static GlobalInfo* g_info = NULL;
+
+static inline Region* GetRegion(const void* buf) {
+    if (!buf) {
+        errno = EINVAL;
+        return NULL;
+    }
+    Region* r = NULL;
+    uintptr_t addr = (uintptr_t)buf;
+    for (int i = 0; i < FLAGS_rdma_memory_pool_max_regions; ++i) {
+        if (g_regions[i].start == 0) {
+            break;
+        }
+        if (addr >= g_regions[i].start &&
+            addr < g_regions[i].start + g_regions[i].size) {
+            r = &g_regions[i];
+            break;
+        }
+    }
+    return r;
+}
+
+uint32_t GetRegionId(const void* buf) {
+    Region* r = GetRegion(buf);
+    if (!r) {
+        return 0;
+    }
+    return r->id;
+}
+
+// Extend the block pool with a new region (with different region ID)
+static void* ExtendBlockPool(size_t region_size, int block_type) {
+    if (region_size < 1) {
+        errno = EINVAL;
+        return NULL;
+    }
+
+    if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
+        LOG(INFO) << "Memory pool reaches max regions";
+        errno = ENOMEM;
+        return NULL;
+    }
+
+    // Regularize region size
+    region_size = region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
+    region_size *= g_block_size[block_type] * g_buckets;
+
+    LOG(INFO) << "Start extend rdma memory " << region_size / BYTES_IN_MB << "MB";
+
+    void* region_base = NULL;
+    if (posix_memalign(&region_base, 4096, region_size) != 0) {
+        PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+        return NULL;
+    }
+
+    uint32_t id = g_cb(region_base, region_size);
+    if (id == 0) {
+        free(region_base);
+        return NULL;
+    }
+
+    IdleNode* node[g_buckets];
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i] = butil::get_object<IdleNode>();
+        if (!node[i]) {
+            PLOG_EVERY_SECOND(ERROR) << "Memory not enough";
+            for (size_t j = 0; j < i; ++j) {
+                butil::return_object<IdleNode>(node[j]);
+            }
+            free(region_base);
+            return NULL;
+        }
+    }
+ 
+    Region* region = &g_regions[g_region_num++];
+    region->start = (uintptr_t)region_base;
+    region->size = region_size;
+    region->id = id;
+    region->block_type = block_type;
+
+    for (size_t i = 0; i < g_buckets; ++i) {
+        node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
+        node[i]->len = region_size / g_buckets;
+        node[i]->next = NULL;
+        g_info->idle_list[block_type][i] = node[i];
+        g_info->idle_size[block_type][i] += node[i]->len;
+    }
+
+    return region_base;
+}
+
+void* InitBlockPool(Callback cb) {
+    if (!cb) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (g_cb) {
+        LOG(WARNING) << "Do not initialize block pool repeatedly";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_cb = cb;
+    if (FLAGS_rdma_memory_pool_max_regions < RDMA_MEMORY_POOL_MIN_REGIONS ||
+        FLAGS_rdma_memory_pool_max_regions > RDMA_MEMORY_POOL_MAX_REGIONS) {
+        LOG(WARNING) << "rdma_memory_pool_max_regions("
+                     << FLAGS_rdma_memory_pool_max_regions << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_REGIONS << ","
+                     << RDMA_MEMORY_POOL_MAX_REGIONS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_initial_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_initial_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_initial_size_mb("
+                     << FLAGS_rdma_memory_pool_initial_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_increase_size_mb < RDMA_MEMORY_POOL_MIN_SIZE ||
+        FLAGS_rdma_memory_pool_increase_size_mb > RDMA_MEMORY_POOL_MAX_SIZE) {
+        LOG(WARNING) << "rdma_memory_pool_increase_size_mb("
+                     << FLAGS_rdma_memory_pool_increase_size_mb << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_SIZE << ","
+                     << RDMA_MEMORY_POOL_MAX_SIZE << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    if (FLAGS_rdma_memory_pool_buckets < RDMA_MEMORY_POOL_MIN_BUCKETS ||
+        FLAGS_rdma_memory_pool_buckets > RDMA_MEMORY_POOL_MAX_BUCKETS) {
+        LOG(WARNING) << "rdma_memory_pool_buckets("
+                     << FLAGS_rdma_memory_pool_buckets << ") not in ["
+                     << RDMA_MEMORY_POOL_MIN_BUCKETS << ","
+                     << RDMA_MEMORY_POOL_MAX_BUCKETS << "]!";
+        errno = EINVAL;
+        return NULL;
+    }
+    g_buckets = FLAGS_rdma_memory_pool_buckets;
+
+    g_info = new (std::nothrow) GlobalInfo;
+    if (!g_info) {
+        return NULL;
+    }
+
+    for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
+        g_info->idle_list[i].resize(g_buckets, NULL);
+        if (g_info->idle_list[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->lock[i].resize(g_buckets, NULL);
+        if (g_info->lock[i].size() != g_buckets) {
+            return NULL;
+        }
+        g_info->idle_size[i].resize(g_buckets, 0);
+        if (g_info->idle_size[i].size() != g_buckets) {
+            return NULL;
+        }
+        for (size_t j = 0; j < g_buckets; ++j) {
+            g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
+            if (!g_info->lock[i][j]) {
+                return NULL;
+            }
+        }
+    }
+
+    g_dump_mutex = new butil::Mutex;
+    g_tls_info_mutex = new butil::Mutex;
+
+    return ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
+                           BLOCK_DEFAULT);
+}
+
+static void* AllocBlockFrom(int block_type) {
+    bool locked = false;
+    if (BAIDU_UNLIKELY(g_dump_enable)) {
+        g_dump_mutex->lock();
+        locked = true;
+    }
+    void* ptr = NULL;
+    if (block_type == 0 && tls_idle_list != NULL){
+        CHECK(tls_idle_num > 0);
+        IdleNode* n = tls_idle_list;
+        tls_idle_list = n->next;
+        ptr = n->start;
+        butil::return_object<IdleNode>(n);
+        tls_idle_num--;
+        if (locked) {
+            g_dump_mutex->unlock();
+        }
+        return ptr;
+    }
+
+    uint64_t index = butil::fast_rand() % g_buckets;
+    BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
+    IdleNode* node = g_info->idle_list[block_type][index];
+    if (!node) {
+        BAIDU_SCOPED_LOCK(g_info->extend_lock);
+        node = g_info->idle_list[block_type][index];
+        if (!node) {
+            // There is no block left, extend a new region
+            if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,

Review Comment:
   在当前bucket上找不到,为什么不换个bucket再找找,而是直接Extend?



##########
src/brpc/rdma/block_pool.cpp:
##########
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <errno.h>
+#include <stdlib.h>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/fast_rand.h"
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/thread_local.h"
+#include "bthread/bthread.h"
+#include "brpc/rdma/block_pool.h"
+
+
+namespace brpc {
+namespace rdma {
+
+DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
+             "Initial size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
+             "Increased size of memory pool for RDMA (MB)");
+DEFINE_int32(rdma_memory_pool_max_regions, 4, "Max number of regions");
+DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
+DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
+
+// This callback is used when extending a new region
+// Generally, it is a memory region register call
+typedef uint32_t (*Callback)(void*, size_t);

Review Comment:
   这个是不是定义在头文件中比较合适?
   另外,这个命名不是很容易让人理解这个callback的作用



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {

Review Comment:
   这个叫HelloMessage可能会让人误以为是测试代码



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data);
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) {
+    // Note serialization does include magic str
+    memcpy(data, MAGIC_STR, 4);

Review Comment:
   deserialize是不含这个MAGIC_STR的,所以serialize也不包含是不是更合理?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] zyearn commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
zyearn commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1233276139

   @Tuvie 有conflicts,需要merge master重新提交一下


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] zyearn commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
zyearn commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1233275545

   LGTM.
   
   @Tuvie 从上次提交到现在有什么bugfix吗?没有的话等 @wwbmmm 看看线上运行的稳定程度,没什么问题的话可以考虑合入了。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] tsyw1987 commented on pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
tsyw1987 commented on PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#issuecomment-1208924101

   后续是否支持同一个进程中启动多个brpc server,并分别绑定到不同的rdma网卡?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [incubator-brpc] Tuvie commented on a diff in pull request #1836: enable brpc use rdma

Posted by GitBox <gi...@apache.org>.
Tuvie commented on code in PR #1836:
URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r977807489


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -0,0 +1,1467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include <gflags/gflags.h>
+#include "butil/fd_utility.h"
+#include "butil/logging.h"                   // CHECK, LOG
+#include "butil/sys_byteorder.h"             // HostToNet,NetToHost
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "brpc/reloadable_flags.h"
+#include "brpc/rdma/block_pool.h"
+#include "brpc/rdma/rdma_helper.h"
+#include "brpc/rdma/rdma_endpoint.h"
+
+
+namespace brpc {
+namespace rdma {
+
+extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
+extern int (*IbvDestroyCq)(ibv_cq*);
+extern ibv_comp_channel* (*IbvCreateCompChannel)(ibv_context*);
+extern int (*IbvDestroyCompChannel)(ibv_comp_channel*);
+extern int (*IbvGetCqEvent)(ibv_comp_channel*, ibv_cq**, void**);
+extern void (*IbvAckCqEvents)(ibv_cq*, unsigned int);
+extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
+extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
+extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
+extern int (*IbvDestroyQp)(ibv_qp*);
+extern bool g_skip_rdma_init;
+
+DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
+DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
+DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
+DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
+DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
+              "default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
+DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
+DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
+DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
+DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely");
+BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
+
+static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
+static const size_t IOBUF_BLOCK_DEFAULT_PAYLOAD =
+        butil::IOBuf::DEFAULT_BLOCK_SIZE - IOBUF_BLOCK_HEADER_LEN;
+
+// DO NOT change this value unless you know the safe value!!!
+// This is the number of reserved WRs in SQ/RQ for pure ACK.
+static const size_t RESERVED_WR_NUM = 3;
+
+// magic string RDMA (4B)
+// message length (2B)
+// hello version (2B)
+// impl version (2B): 0 means should use tcp
+// block size (2B)
+// sq size (2B)
+// rq size (2B)
+// GID (16B)
+// QP number (4B)
+static const char* MAGIC_STR = "RDMA";
+static const size_t MAGIC_STR_LEN = 4;
+static const size_t HELLO_MSG_LEN_MIN = 38;
+static const size_t HELLO_MSG_LEN_MAX = 4096;
+static const size_t ACK_MSG_LEN = 4;
+static uint16_t g_rdma_hello_msg_len = 38;  // In Byte
+static uint16_t g_rdma_hello_version = 1;
+static uint16_t g_rdma_impl_version = 1;
+static uint16_t g_rdma_recv_block_size = 0;
+
+static const uint32_t MAX_INLINE_DATA = 64;
+static const uint8_t MAX_HOP_LIMIT = 16;
+static const uint8_t TIMEOUT = 14;
+static const uint8_t RETRY_CNT = 7;
+static const uint16_t MIN_QP_SIZE = 16;
+static const uint16_t MIN_BLOCK_SIZE = 1024;
+static const uint32_t ACK_MSG_RDMA_OK = 0x1;
+
+static butil::Mutex* g_rdma_resource_mutex = NULL;
+static RdmaResource* g_rdma_resource_list = NULL;
+
+struct HelloMessage {
+    void Serialize(void* data) const;
+    void Deserialize(void* data);
+
+    uint16_t msg_len;
+    uint16_t hello_ver;
+    uint16_t impl_ver;
+    uint16_t block_size;
+    uint16_t sq_size;
+    uint16_t rq_size;
+    uint16_t lid;
+    ibv_gid gid;
+    uint32_t qp_num;
+};
+
+void HelloMessage::Serialize(void* data) const {
+    uint16_t* current_pos = (uint16_t*)data;
+    *(current_pos++) = butil::HostToNet16(msg_len);
+    *(current_pos++) = butil::HostToNet16(hello_ver);
+    *(current_pos++) = butil::HostToNet16(impl_ver);
+    *(current_pos++) = butil::HostToNet16(block_size);
+    *(current_pos++) = butil::HostToNet16(sq_size);
+    *(current_pos++) = butil::HostToNet16(rq_size);
+    *(current_pos++) = butil::HostToNet16(lid);
+    memcpy(current_pos, gid.raw, 16);
+    uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16);
+    *qp_num_pos = butil::HostToNet32(qp_num);
+}
+
+void HelloMessage::Deserialize(void* data) {
+    uint16_t* current_pos = (uint16_t*)data;
+    msg_len = butil::NetToHost16(*current_pos++);
+    hello_ver = butil::NetToHost16(*current_pos++);
+    impl_ver = butil::NetToHost16(*current_pos++);
+    block_size = butil::NetToHost16(*current_pos++);
+    sq_size = butil::NetToHost16(*current_pos++);
+    rq_size = butil::NetToHost16(*current_pos++);
+    lid = butil::NetToHost16(*current_pos++);
+    memcpy(gid.raw, current_pos, 16);
+    qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
+}
+
+RdmaResource::RdmaResource() 
+    : qp(NULL)
+    , cq(NULL)
+    , comp_channel(NULL)
+    , next(NULL) { }
+
+RdmaResource::~RdmaResource() {
+    if (qp) {
+        IbvDestroyQp(qp);
+        qp = NULL;
+    }
+    if (cq) {
+        IbvDestroyCq(cq);
+        cq = NULL;
+    }
+    if (comp_channel) {
+        IbvDestroyCompChannel(comp_channel);
+        comp_channel = NULL;
+    }
+}
+
+RdmaEndpoint::RdmaEndpoint(Socket* s)
+    : _socket(s)
+    , _state(UNINIT)
+    , _resource(NULL)
+    , _cq_events(0)
+    , _cq_sid(INVALID_SOCKET_ID)
+    , _sq_size(FLAGS_rdma_sq_size)
+    , _rq_size(FLAGS_rdma_rq_size)
+    , _sbuf()
+    , _rbuf()
+    , _rbuf_data()
+    , _remote_recv_block_size(0)
+    , _accumulated_ack(0)
+    , _unsolicited(0)
+    , _unsolicited_bytes(0)
+    , _sq_current(0)
+    , _sq_unsignaled(0)
+    , _sq_sent(0)
+    , _rq_received(0)
+    , _local_window_capacity(0)
+    , _remote_window_capacity(0)
+    , _window_size(0)
+    , _new_rq_wrs(0)
+{
+    if (_sq_size < MIN_QP_SIZE) {
+        _sq_size = MIN_QP_SIZE;
+    }
+    if (_rq_size < MIN_QP_SIZE) {
+        _rq_size = MIN_QP_SIZE;
+    }
+    _read_butex = bthread::butex_create_checked<butil::atomic<int> >();
+}
+
+RdmaEndpoint::~RdmaEndpoint() {
+    Reset();
+    bthread::butex_destroy(_read_butex);
+}
+
+void RdmaEndpoint::Reset() {
+    DeallocateResources();
+
+    _cq_events = 0;
+    _cq_sid = INVALID_SOCKET_ID;
+    _state = UNINIT;
+    _sbuf.clear();
+    _rbuf.clear();
+    _rbuf_data.clear();
+    _accumulated_ack = 0;
+    _unsolicited = 0;
+    _sq_current = 0;
+    _sq_unsignaled = 0;
+    _local_window_capacity = 0;
+    _remote_window_capacity = 0;
+    _window_size.store(0, butil::memory_order_relaxed);
+    _new_rq_wrs = 0;
+    _sq_sent = 0;
+    _rq_received = 0;
+}
+
+void RdmaConnect::StartConnect(const Socket* socket,
+                               void (*done)(int err, void* data),
+                               void* data) {
+    CHECK(socket->_rdma_ep != NULL);
+    SocketUniquePtr s;
+    if (Socket::Address(socket->id(), &s) != 0) {
+        return;
+    }
+    if (!IsRdmaAvailable()) {
+        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        done(0, data);
+        return;
+    }
+    _done = done;
+    _data = data;
+    bthread_t tid;
+    if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) {
+        LOG(FATAL) << "Fail to start handshake bthread";
+    } else {
+        s.release();
+    }
+}
+
+void RdmaConnect::StopConnect(Socket* socket) { }
+
+void RdmaConnect::Run() {
+    _done(errno, _data);
+}
+
+static void TryReadOnTcpDuringRdmaEst(Socket* s) {
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        uint8_t tmp;
+        ssize_t nr = read(s->fd(), &tmp, 1);
+        if (nr < 0) {
+            if (errno != EAGAIN) {
+                const int saved_errno = errno;
+                PLOG(WARNING) << "Fail to read from " << s;
+                s->SetFailed(saved_errno, "Fail to read from %s: %s",
+                        s->description().c_str(), berror(saved_errno));
+                return;
+            }
+            if (!s->MoreReadEvents(&progress)) {
+                break;
+            }
+        } else if (nr == 0) {
+            s->SetEOF();
+            return;
+        } else {
+            LOG(WARNING) << "Read unexpected data from " << s;
+            s->SetFailed(EPROTO, "Read unexpected data from %s",
+                    s->description().c_str());
+            return;
+        }
+    }
+}
+
+void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
+    RdmaEndpoint* ep = m->_rdma_ep;
+    CHECK(ep != NULL);
+
+    int progress = Socket::PROGRESS_INIT;
+    while (true) {
+        if (ep->_state == UNINIT) {
+            if (!m->CreatedByConnect()) {
+                if (!IsRdmaAvailable()) {
+                    ep->_state = FALLBACK_TCP;
+                    m->_rdma_state = Socket::RDMA_OFF;
+                    continue;
+                }
+                bthread_t tid;
+                ep->_state = S_HELLO_WAIT;
+                SocketUniquePtr s;
+                m->ReAddress(&s);
+                if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
+                            ProcessHandshakeAtServer, ep) < 0) {
+                    ep->_state = UNINIT;
+                    LOG(FATAL) << "Fail to start handshake bthread";
+                } else {
+                    s.release();
+                }
+            } else {
+                // The connection may be closed or reset before the client
+                // starts handshake. This will be handled by client handshake.
+                // Ignore the exception here.
+            }
+        } else if (ep->_state < ESTABLISHED) {  // during handshake
+            ep->_read_butex->fetch_add(1, butil::memory_order_release);
+            bthread::butex_wake(ep->_read_butex);
+        } else if (ep->_state == FALLBACK_TCP){  // handshake finishes
+            InputMessenger::OnNewMessages(m);
+            return;
+        } else if (ep->_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(ep->_socket);
+            return;
+        }
+        if (!m->MoreReadEvents(&progress)) {
+            break;
+        }
+    }
+}
+
+bool HelloNegotiationValid(HelloMessage& msg) {
+    if (msg.hello_ver == g_rdma_hello_version &&
+        msg.impl_ver == g_rdma_impl_version &&
+        msg.block_size >= MIN_BLOCK_SIZE &&
+        msg.sq_size >= MIN_QP_SIZE &&
+        msg.rq_size >= MIN_QP_SIZE) {
+        // This can be modified for future compatibility
+        return true;
+    }
+    return false;
+}
+
+static const int WAIT_TIMEOUT_MS = 50;
+
+int RdmaEndpoint::ReadFromFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nr = 0;
+    size_t received = 0;
+    do {
+        const int expected_val = _read_butex->load(butil::memory_order_acquire);
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nr = read(_socket->fd(), (uint8_t*)data + received, len - received);
+        if (nr < 0) {
+            if (errno == EAGAIN) {
+                if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 0) {
+                    if (errno != EWOULDBLOCK && errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else if (nr == 0) {  // Got EOF
+            errno = EEOF;
+            return -1;
+        } else {
+            received += nr;
+        }
+    } while (received < len);
+    return 0;
+}
+
+int RdmaEndpoint::WriteToFd(void* data, size_t len) {
+    CHECK(data != NULL);
+    int nw = 0;
+    size_t written = 0;
+    do {
+        const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS);
+        nw = write(_socket->fd(), (uint8_t*)data + written, len - written);
+        if (nw < 0) {
+            if (errno == EAGAIN) {
+                if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) {
+                    if (errno != ETIMEDOUT) {
+                        return -1;
+                    }
+                }
+            } else {
+                return -1;
+            }
+        } else {
+            written += nw;
+        }
+    } while (written < len);
+    return 0;
+}
+
+inline void RdmaEndpoint::TryReadOnTcp() {
+    if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) {
+        if (_state == FALLBACK_TCP) {
+            InputMessenger::OnNewMessages(_socket);
+        } else if (_state == ESTABLISHED) {
+            TryReadOnTcpDuringRdmaEst(_socket);
+        }
+    }
+}
+
+void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
+    RdmaEndpoint* ep = static_cast<RdmaEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+    RdmaConnect::RunGuard rg((RdmaConnect*)s->_app_connect.get());
+
+    LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+        << "Start handshake on " << s->_local_side;
+
+    void* data = malloc(g_rdma_hello_msg_len);
+    if (!data) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // First initialize CQ and QP resources
+    ep->_state = C_ALLOC_QPCQ;
+    if (ep->AllocateResources() < 0) {
+        LOG(WARNING) << "Fallback to tcp:" << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->_state = FALLBACK_TCP;
+        return NULL;
+    }
+
+    // Send hello message to server
+    ep->_state = C_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_rdma_hello_msg_len;
+    local_msg.hello_ver = g_rdma_hello_version;
+    local_msg.impl_ver = g_rdma_impl_version;
+    local_msg.block_size = g_rdma_recv_block_size;
+    local_msg.sq_size = ep->_sq_size;
+    local_msg.rq_size = ep->_rq_size;
+    local_msg.lid = GetRdmaLid();
+    local_msg.gid = GetRdmaGid();
+    if (BAIDU_LIKELY(ep->_resource)) {
+        local_msg.qp_num = ep->_resource->qp->qp_num;
+    } else {
+        // Only happens in UT
+        local_msg.qp_num = 0;
+    }
+    memcpy(data, MAGIC_STR, 4);
+    local_msg.Serialize((char*)data + 4);
+    if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send hello message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Check magic str
+    ep->_state = C_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get hello message from server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG(WARNING) << "Read unexpected data during handshake:" << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Read hello message from server
+    if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to get Hello Message from server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from server:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized data
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
+                     << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+    } else {
+        ep->_remote_recv_block_size = remote_msg.block_size;
+        ep->_local_window_capacity = 
+            std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
+        ep->_remote_window_capacity = 
+            std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
+        ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+
+        ep->_state = C_BRINGUP_QP;
+        if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
+            LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description();
+            s->_rdma_state = Socket::RDMA_OFF;
+        } else {
+            s->_rdma_state = Socket::RDMA_ON;
+        }
+    }
+
+    // Send ACK message to server
+    ep->_state = C_ACK_SEND;
+    uint32_t flags = 0;
+    if (s->_rdma_state != Socket::RDMA_OFF) {
+        flags |= ACK_MSG_RDMA_OK;
+    }
+    *(uint32_t*)data = butil::HostToNet32(flags);
+    if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Ack Message to server:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (s->_rdma_state == Socket::RDMA_ON) {
+        ep->_state = ESTABLISHED;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use rdma) on " << s->description();
+    } else {
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use tcp) on " << s->description();
+    }
+
+    errno = 0;
+
+    return NULL;
+}
+
+void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
+    RdmaEndpoint* ep = static_cast<RdmaEndpoint*>(arg);
+    SocketUniquePtr s(ep->_socket);
+
+    LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+        << "Start handshake on " << s->description();
+
+    void* data = malloc(g_rdma_hello_msg_len);
+    if (!data) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to recv hello message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    ep->_state = S_HELLO_WAIT;
+    if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description() << " " << s->_remote_side;
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) << "It seems that the "
+            << "client does not use RDMA, fallback to TCP:"
+            << s->description();
+        // we need to copy data read back to _socket->_read_buf
+        s->_read_buf.append(data, MAGIC_STR_LEN);
+        ep->_state = FALLBACK_TCP;
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->TryReadOnTcp();
+        return NULL;
+    }
+
+    if (ep->ReadFromFd(data, g_rdma_hello_msg_len - MAGIC_STR_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    HelloMessage remote_msg;
+    remote_msg.Deserialize(data);
+    if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) {
+        LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                     << s->description();
+        s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(EPROTO));
+        ep->_state = FAILED;
+        return NULL;
+    }
+    if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) {
+        // TODO: Read Hello Message customized header
+        // Just for future use, should not happen now
+    }
+
+    if (!HelloNegotiationValid(remote_msg)) {
+        LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
+                     << s->description();
+        s->_rdma_state = Socket::RDMA_OFF;
+    } else {
+        ep->_remote_recv_block_size = remote_msg.block_size;
+        ep->_local_window_capacity = 
+            std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
+        ep->_remote_window_capacity = 
+            std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
+        ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+
+        ep->_state = S_ALLOC_QPCQ;
+        if (ep->AllocateResources() < 0) {
+            LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
+                         << s->description();
+            s->_rdma_state = Socket::RDMA_OFF;
+        } else {
+            ep->_state = S_BRINGUP_QP;
+            if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
+                LOG(WARNING) << "Fail to bringup QP, fallback to tcp:"
+                             << s->description();
+                s->_rdma_state = Socket::RDMA_OFF;
+            }
+        }
+    }
+
+    // Send hello message to client
+    ep->_state = S_HELLO_SEND;
+    HelloMessage local_msg;
+    local_msg.msg_len = g_rdma_hello_msg_len;
+    if (s->_rdma_state == Socket::RDMA_OFF) {
+        local_msg.impl_ver = 0;
+        local_msg.hello_ver = 0;
+    } else {
+        local_msg.lid = GetRdmaLid();
+        local_msg.gid = GetRdmaGid();
+        local_msg.block_size = g_rdma_recv_block_size;
+        local_msg.sq_size = ep->_sq_size;
+        local_msg.rq_size = ep->_rq_size;
+        local_msg.hello_ver = g_rdma_hello_version;
+        local_msg.impl_ver = g_rdma_impl_version;
+        if (BAIDU_LIKELY(ep->_resource)) {
+            local_msg.qp_num = ep->_resource->qp->qp_num;
+        } else {
+            // Only happens in UT
+            local_msg.qp_num = 0;
+        }
+    }
+    memcpy(data, MAGIC_STR, 4);
+    local_msg.Serialize((char*)data + 4);
+    if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Recv ACK Message
+    ep->_state = S_ACK_WAIT;
+    if (ep->ReadFromFd(data, ACK_MSG_LEN) < 0) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to read ack message from client:" << s->description();
+        s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s",
+                s->description().c_str(), berror(saved_errno));
+        ep->_state = FAILED;
+        return NULL;
+    }
+
+    // Check RDMA enable flag
+    uint32_t flags = butil::NetToHost32(*(uint32_t*)data);
+    if (flags & ACK_MSG_RDMA_OK) {
+        if (s->_rdma_state == Socket::RDMA_OFF) {
+            LOG(WARNING) << "Fail to parse Hello Message length from client:"
+                         << s->description();
+            s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
+                    s->description().c_str(), berror(EPROTO));
+            ep->_state = FAILED;
+            return NULL;
+        } else {
+            s->_rdma_state = Socket::RDMA_ON;
+            ep->_state = ESTABLISHED;
+            LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+                << "Handshake ends (use rdma) on " << s->description();
+        }
+    } else {
+        s->_rdma_state = Socket::RDMA_OFF;
+        ep->_state = FALLBACK_TCP;
+        LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
+            << "Handshake ends (use tcp) on " << s->description();
+    }
+ 
+    ep->TryReadOnTcp();
+
+    return NULL;
+}
+
+bool RdmaEndpoint::IsWritable() const {
+    if (BAIDU_UNLIKELY(g_skip_rdma_init)) {
+        // Just for UT
+        return false;
+    }
+
+    return _window_size.load(butil::memory_order_relaxed) > 0;
+}
+
+// RdmaIOBuf inherits from IOBuf to provide a new function.
+// The reason is that we need to use some protected member function of IOBuf.
+class RdmaIOBuf : public butil::IOBuf {
+friend class RdmaEndpoint;
+private:
+    // Cut the current IOBuf to ibv_sge list and `to' for at most first max_sge
+    // blocks or first max_len bytes.
+    // Return: the bytes included in the sglist, or -1 if failed
+    ssize_t cut_into_sglist_and_iobuf(ibv_sge* sglist, size_t* sge_index,
+            butil::IOBuf* to, size_t max_sge, size_t max_len) {
+        size_t len = 0;
+        while (*sge_index < max_sge) {
+            if (len == max_len || _ref_num() == 0) {
+                break;
+            }
+            butil::IOBuf::BlockRef const& r = _ref_at(0);
+            CHECK(r.length > 0);
+            const void* start = fetch1();
+            uint32_t lkey = GetLKey((char*)start - r.offset);
+            if (lkey == 0) {
+                LOG(WARNING) << "Memory not registered for rdma. "
+                             << "Is this iobuf allocated before calling "
+                             << "GlobalRdmaInitializeOrDie? Or just forget to "
+                             << "call RegisterMemoryForRdma for your own buffer?";
+                errno = ERDMAMEM;
+                return -1;
+            }
+            size_t i = *sge_index;
+            if (len + r.length > max_len) {
+                // Split the block to comply with size for receiving
+                sglist[i].length = max_len - len;
+                len = max_len;
+            } else {
+                sglist[i].length = r.length;
+                len += r.length;
+            }
+            sglist[i].addr = (uint64_t)start;
+            sglist[i].lkey = lkey;
+            cutn(to, sglist[i].length);
+            (*sge_index)++;
+        }
+        return len;
+    }
+};
+
+// Note this function is coupled with the implementation of IOBuf
+ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
+    if (BAIDU_UNLIKELY(g_skip_rdma_init)) {
+        // Just for UT
+        errno = EAGAIN;
+        return -1;
+    }
+
+    CHECK(from != NULL);
+    CHECK(ndata > 0);
+
+    size_t total_len = 0;
+    size_t current = 0;
+    uint32_t window = 0;
+    ibv_send_wr wr;
+    int max_sge = GetRdmaMaxSge();
+    ibv_sge sglist[max_sge];
+    while (current < ndata) {
+        window = _window_size.load(butil::memory_order_relaxed);
+        if (window == 0) {
+            if (total_len > 0) {
+                break;
+            } else {
+                errno = EAGAIN;
+                return -1;
+            }
+        }
+        butil::IOBuf* to = &_sbuf[_sq_current];
+        size_t this_len = 0;
+
+        memset(&wr, 0, sizeof(wr));
+        wr.sg_list = sglist;
+        wr.opcode = IBV_WR_SEND_WITH_IMM;
+
+        RdmaIOBuf* data = (RdmaIOBuf*)from[current];
+        size_t sge_index = 0;
+        while (sge_index < (uint32_t)max_sge &&
+                this_len < _remote_recv_block_size) {
+            if (data->size() == 0) {
+                // The current IOBuf is empty, find next one
+                ++current;
+                if (current == ndata) {
+                    break;
+                }
+                data = (RdmaIOBuf*)from[current];
+                continue;
+            }
+
+            ssize_t len = data->cut_into_sglist_and_iobuf(
+                    sglist, &sge_index, to, max_sge,
+                    _remote_recv_block_size - this_len);
+            if (len < 0) {
+                return -1;
+            }
+            CHECK(len > 0);
+            this_len += len;
+            total_len += len;
+        }
+        if (this_len == 0) {
+            continue;
+        }
+
+        wr.num_sge = sge_index;
+
+        uint32_t imm = _new_rq_wrs.exchange(0, butil::memory_order_relaxed);
+        wr.imm_data = butil::HostToNet32(imm);
+        // Avoid too much recv completion event to reduce the cpu overhead
+        bool solicited = false;
+        if (window == 1 || current + 1 >= ndata) {
+            // Only last message in the write queue or last message in the
+            // current window will be flagged as solicited.
+            solicited = true;
+        } else {
+            if (_unsolicited > _local_window_capacity / 4) {
+                // Make sure the recv side can be signaled to return ack
+                solicited = true;
+            } else if (_accumulated_ack > _remote_window_capacity / 4) {
+                // Make sure the recv side can be signaled to handle ack
+                solicited = true;
+            } else if (_unsolicited_bytes > 1048576) {
+                // Make sure the recv side can be signaled when it receives enough data
+                solicited = true;
+            } else {
+                ++_unsolicited;
+                _unsolicited_bytes += this_len;
+                _accumulated_ack += imm;
+            }
+        }
+        if (solicited) {
+            wr.send_flags |= IBV_SEND_SOLICITED;
+            _unsolicited = 0;
+            _unsolicited_bytes = 0;
+            _accumulated_ack = 0;
+        }
+
+        // Avoid too much send completion event to reduce the CPU overhead
+        ++_sq_unsignaled;
+        if (_sq_unsignaled >= _local_window_capacity / 4) {
+            // Refer to:
+            // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
+            wr.send_flags |= IBV_SEND_SIGNALED;
+            _sq_unsignaled = 0;
+        }
+
+        ibv_send_wr* bad = NULL;
+        if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
+            // We use other way to guarantee the Send Queue is not full.
+            // So we just consider this error as an unrecoverable error.
+            PLOG(WARNING) << "Fail to ibv_post_send";
+            return -1;
+        }
+
+        ++_sq_current;
+        if (_sq_current == _sq_size - RESERVED_WR_NUM) {
+            _sq_current = 0;
+        }
+
+        // Update _window_size. Note that _window_size will never be negative.
+        // Because there is at most one thread can enter this function for each
+        // Socket, and the other thread of HandleCompletion can only add this
+        // counter.
+        _window_size.fetch_sub(1, butil::memory_order_relaxed);
+    }
+
+    return total_len;
+}
+
+int RdmaEndpoint::SendAck(int num) {
+    if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) {
+        return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
+    }
+    return 0;
+}
+
+int RdmaEndpoint::SendImm(uint32_t imm) {
+    if (imm == 0) {
+        return 0;
+    }
+
+    ibv_send_wr wr;
+    memset(&wr, 0, sizeof(wr));
+    wr.opcode = IBV_WR_SEND_WITH_IMM;
+    wr.imm_data = butil::HostToNet32(imm);
+    wr.send_flags |= IBV_SEND_SOLICITED;
+    wr.send_flags |= IBV_SEND_SIGNALED;
+
+    ibv_send_wr* bad = NULL;
+    if (ibv_post_send(_resource->qp, &wr, &bad) < 0) {
+        // We use other way to guarantee the Send Queue is not full.
+        // So we just consider this error as an unrecoverable error.
+        PLOG(WARNING) << "Fail to ibv_post_send";
+        return -1;
+    }
+    return 0;
+}
+
+ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
+    bool zerocopy = FLAGS_rdma_recv_zerocopy;
+    switch (wc.opcode) {
+    case IBV_WC_SEND: {  // send completion
+        // Do nothing
+        break;
+    }
+    case IBV_WC_RECV: {  // recv completion
+        // Please note that only the first wc.byte_len bytes is valid
+        if (wc.byte_len > 0) {
+            if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) {
+                zerocopy = false;
+            }
+            CHECK(_state != FALLBACK_TCP);
+            if (zerocopy) {
+                butil::IOBuf tmp;
+                _rbuf[_rq_received].cutn(&tmp, wc.byte_len);
+                _socket->_read_buf.append(tmp);
+            } else {
+                // Copy data when the receive data is really small
+                _socket->_read_buf.append(_rbuf_data[_rq_received], wc.byte_len);
+            }
+        }
+        if (wc.imm_data > 0) {
+            // Clear sbuf here because we ignore event wakeup for send completions
+            uint32_t acks = butil::NetToHost32(wc.imm_data);
+            uint32_t num = acks;
+            while (num > 0) {
+                _sbuf[_sq_sent++].clear();
+                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                    _sq_sent = 0;
+                }
+                --num;
+            }
+            butil::subtle::MemoryBarrier();
+
+            // Update window
+            uint32_t wnd_thresh = _local_window_capacity / 8;
+            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+                    || acks >= wnd_thresh) {
+                // Do not wake up writing thread right after _window_size > 0.
+                // Otherwise the writing thread may switch to background too quickly.
+                _socket->WakeAsEpollOut();
+            }
+        }
+        // We must re-post recv WR
+        if (PostRecv(1, zerocopy) < 0) {
+            return -1;
+        }
+        if (wc.byte_len > 0) {
+            SendAck(1);
+        }
+        return wc.byte_len;
+    }
+    default:
+        CHECK(false) << "This should not happen";

Review Comment:
   这个mlx驱动的bug会在什么情况下触发呢?如果确实是一个严重问题,仅仅抛出WARNING应该也是不行的。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org