You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/06/12 13:13:07 UTC
[rocketmq-client-cpp] branch master updated: Update: update network
interface. (#60)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 5689607 Update: update network interface. (#60)
5689607 is described below
commit 5689607f6d4ddea6001cfdc262580004c7dded9a
Author: James <yw...@hotmail.com>
AuthorDate: Wed Jun 12 21:13:02 2019 +0800
Update: update network interface. (#60)
* update network interface.
- feature: use only one event loop for all TcpTransport.
- update: network components.
* remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
---
src/common/AsyncCallbackWrap.cpp | 386 ++++++++++-----------
src/common/noncopyable.h | 33 ++
src/transport/EventLoop.cpp | 241 ++++++++++++++
src/transport/EventLoop.h | 117 +++++++
src/transport/ResponseFuture.cpp | 159 ++++-----
src/transport/ResponseFuture.h | 66 ++--
src/transport/TcpRemotingClient.cpp | 647 +++++++++++++++++-------------------
src/transport/TcpRemotingClient.h | 262 ++++++++-------
src/transport/TcpTransport.cpp | 343 +++++++------------
src/transport/TcpTransport.h | 88 ++---
10 files changed, 1288 insertions(+), 1054 deletions(-)
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
old mode 100644
new mode 100755
index 52e0374..cb26fda
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -1,193 +1,193 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "AsyncCallbackWrap.h"
-#include "Logging.h"
-#include "MQClientAPIImpl.h"
-#include "MQDecoder.h"
-#include "MQMessageQueue.h"
-#include "MQProtos.h"
-#include "PullAPIWrapper.h"
-#include "PullResultExt.h"
-#include "ResponseFuture.h"
-
-namespace rocketmq {
-//<!***************************************************************************
-AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI)
- : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
-
-AsyncCallbackWrap::~AsyncCallbackWrap() {
- m_pAsyncCallBack = NULL;
- m_pClientAPI = NULL;
-}
-
-//<!************************************************************************
-SendCallbackWrap::SendCallbackWrap(const string& brokerName,
- const MQMessage& msg,
- AsyncCallback* pAsyncCallback,
- MQClientAPIImpl* pclientAPI)
- : AsyncCallbackWrap(pAsyncCallback, pclientAPI), m_msg(msg), m_brokerName(brokerName) {}
-
-void SendCallbackWrap::onException() {
- if (m_pAsyncCallBack == NULL)
- return;
-
- SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
- if (pCallback) {
- unique_ptr<MQException> exception(
- new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
- pCallback->onException(*exception);
- if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
- deleteAndZero(pCallback);
- }
- }
-}
-
-void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
- unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
-
- if (m_pAsyncCallBack == NULL) {
- return;
- }
- int opaque = pResponseFuture->getOpaque();
- SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
-
- if (!pResponse) {
- string err = "unknow reseaon";
- if (!pResponseFuture->isSendRequestOK()) {
- err = "send request failed";
-
- } else if (pResponseFuture->isTimeOut()) {
- // pResponseFuture->setAsyncResponseFlag();
- err = "wait response timeout";
- }
- if (pCallback) {
- MQException exception(err, -1, __FILE__, __LINE__);
- pCallback->onException(exception);
- }
- LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
- } else {
- try {
- SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
- if (pCallback) {
- LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
- opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
- pCallback->onSuccess(ret);
- }
- } catch (MQException& e) {
- LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
-
- // broker may return exception, need consider retry send
- int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
- int retryTimes = pResponseFuture->getRetrySendTimes();
- if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
- int64 left_timeout_ms = pResponseFuture->leftTime();
- string brokerAddr = pResponseFuture->getBrokerAddr();
- const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
- retryTimes += 1;
- LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
- opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
-
- bool exception_flag = false;
- try {
- m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
- (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
- retryTimes);
- } catch (MQClientException& e) {
- LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
- retryTimes, m_msg.toString().data());
- exception_flag = true;
- }
-
- if (exception_flag == false) {
- return; // send retry again, here need return
- }
- }
-
- if (pCallback) {
- MQException exception("process send response error", -1, __FILE__, __LINE__);
- pCallback->onException(exception);
- }
- }
- }
- if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
- deleteAndZero(pCallback);
- }
-}
-
-//<!************************************************************************
-PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
- : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
- m_pArg = *static_cast<AsyncArg*>(pArg);
-}
-
-PullCallbackWarp::~PullCallbackWarp() {}
-
-void PullCallbackWarp::onException() {
- if (m_pAsyncCallBack == NULL)
- return;
-
- PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
- if (pCallback) {
- MQException exception("wait response timeout", -1, __FILE__, __LINE__);
- pCallback->onException(exception);
- } else {
- LOG_ERROR("PullCallback is NULL, AsyncPull could not continue");
- }
-}
-
-void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
- unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
- if (m_pAsyncCallBack == NULL) {
- LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
- return;
- }
- PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
- if (!pResponse) {
- string err = "unknow reseaon";
- if (!pResponseFuture->isSendRequestOK()) {
- err = "send request failed";
-
- } else if (pResponseFuture->isTimeOut()) {
- // pResponseFuture->setAsyncResponseFlag();
- err = "wait response timeout";
- }
- MQException exception(err, -1, __FILE__, __LINE__);
- LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
- if (pCallback && bProducePullRequest)
- pCallback->onException(exception);
- } else {
- try {
- if (m_pArg.pPullWrapper) {
- unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
- PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
- if (pCallback)
- pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
- } else {
- LOG_ERROR("pPullWrapper had been destroyed with consumer");
- }
- } catch (MQException& e) {
- LOG_ERROR(e.what());
- MQException exception("pullResult error", -1, __FILE__, __LINE__);
- if (pCallback && bProducePullRequest)
- pCallback->onException(exception);
- }
- }
-}
-
-//<!***************************************************************************
-} //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncCallbackWrap.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQDecoder.h"
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "PullAPIWrapper.h"
+#include "PullResultExt.h"
+#include "ResponseFuture.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI)
+ : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
+
+AsyncCallbackWrap::~AsyncCallbackWrap() {
+ m_pAsyncCallBack = NULL;
+ m_pClientAPI = NULL;
+}
+
+//<!************************************************************************
+SendCallbackWrap::SendCallbackWrap(const string& brokerName,
+ const MQMessage& msg,
+ AsyncCallback* pAsyncCallback,
+ MQClientAPIImpl* pclientAPI)
+ : AsyncCallbackWrap(pAsyncCallback, pclientAPI), m_msg(msg), m_brokerName(brokerName) {}
+
+void SendCallbackWrap::onException() {
+ if (m_pAsyncCallBack == NULL)
+ return;
+
+ SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
+ if (pCallback) {
+ unique_ptr<MQException> exception(
+ new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
+ pCallback->onException(*exception);
+ if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
+ deleteAndZero(pCallback);
+ }
+ }
+}
+
+void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+ unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
+
+ if (m_pAsyncCallBack == NULL) {
+ return;
+ }
+ int opaque = pResponseFuture->getOpaque();
+ SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
+
+ if (!pResponse) {
+ string err = "unknow reseaon";
+ if (!pResponseFuture->isSendRequestOK()) {
+ err = "send request failed";
+
+ } else if (pResponseFuture->isTimeOut()) {
+ // pResponseFuture->setAsyncResponseFlag();
+ err = "wait response timeout";
+ }
+ if (pCallback) {
+ MQException exception(err, -1, __FILE__, __LINE__);
+ pCallback->onException(exception);
+ }
+ LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
+ } else {
+ try {
+ SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
+ if (pCallback) {
+ LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
+ opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
+ pCallback->onSuccess(ret);
+ }
+ } catch (MQException& e) {
+ LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
+
+ // broker may return exception, need consider retry send
+ int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
+ int retryTimes = pResponseFuture->getRetrySendTimes();
+ if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
+ int64 left_timeout_ms = pResponseFuture->leftTime();
+ string brokerAddr = pResponseFuture->getBrokerAddr();
+ const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
+ retryTimes += 1;
+ LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
+ opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
+
+ bool exception_flag = false;
+ try {
+ m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
+ (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
+ retryTimes);
+ } catch (MQClientException& e) {
+ LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
+ retryTimes, m_msg.toString().data());
+ exception_flag = true;
+ }
+
+ if (exception_flag == false) {
+ return; // send retry again, here need return
+ }
+ }
+
+ if (pCallback) {
+ MQException exception("process send response error", -1, __FILE__, __LINE__);
+ pCallback->onException(exception);
+ }
+ }
+ }
+ if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
+ deleteAndZero(pCallback);
+ }
+}
+
+//<!************************************************************************
+PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
+ : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
+ m_pArg = *static_cast<AsyncArg*>(pArg);
+}
+
+PullCallbackWarp::~PullCallbackWarp() {}
+
+void PullCallbackWarp::onException() {
+ if (m_pAsyncCallBack == NULL)
+ return;
+
+ PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
+ if (pCallback) {
+ MQException exception("wait response timeout", -1, __FILE__, __LINE__);
+ pCallback->onException(exception);
+ } else {
+ LOG_ERROR("PullCallback is NULL, AsyncPull could not continue");
+ }
+}
+
+void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+ unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
+ if (m_pAsyncCallBack == NULL) {
+ LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
+ return;
+ }
+ PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
+ if (!pResponse) {
+ string err = "unknow reseaon";
+ if (!pResponseFuture->isSendRequestOK()) {
+ err = "send request failed";
+
+ } else if (pResponseFuture->isTimeOut()) {
+ // pResponseFuture->setAsyncResponseFlag();
+ err = "wait response timeout";
+ }
+ MQException exception(err, -1, __FILE__, __LINE__);
+ LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
+ if (pCallback && bProducePullRequest)
+ pCallback->onException(exception);
+ } else {
+ try {
+ if (m_pArg.pPullWrapper) {
+ unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
+ PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
+ if (pCallback)
+ pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
+ } else {
+ LOG_ERROR("pPullWrapper had been destroyed with consumer");
+ }
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ MQException exception("pullResult error", -1, __FILE__, __LINE__);
+ if (pCallback && bProducePullRequest)
+ pCallback->onException(exception);
+ }
+ }
+}
+
+//<!***************************************************************************
+} // namespace rocketmq
diff --git a/src/common/noncopyable.h b/src/common/noncopyable.h
new file mode 100644
index 0000000..f52f988
--- /dev/null
+++ b/src/common/noncopyable.h
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __NONCOPYABLE_H__
+#define __NONCOPYABLE_H__
+
+namespace rocketmq {
+
+class noncopyable {
+ protected:
+ noncopyable() = default;
+ ~noncopyable() = default;
+
+ noncopyable(const noncopyable&) = delete;
+ noncopyable& operator=(const noncopyable&) = delete;
+};
+
+} // namespace rocketmq
+
+#endif //__NONCOPYABLE_H__
diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp
new file mode 100644
index 0000000..3d67405
--- /dev/null
+++ b/src/transport/EventLoop.cpp
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "EventLoop.h"
+
+#if !defined(WIN32) && !defined(__APPLE__)
+#include <sys/prctl.h>
+#endif
+
+#include <event2/thread.h>
+
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+EventLoop* EventLoop::GetDefaultEventLoop() {
+ static EventLoop defaultEventLoop;
+ return &defaultEventLoop;
+}
+
+EventLoop::EventLoop(const struct event_config* config, bool run_immediately)
+ : m_eventBase(nullptr), m_loopThread(nullptr), _is_running(false) {
+ // tell libevent support multi-threads
+#ifdef WIN32
+ evthread_use_windows_threads();
+#else
+ evthread_use_pthreads();
+#endif
+
+ if (config == nullptr) {
+ m_eventBase = event_base_new();
+ } else {
+ m_eventBase = event_base_new_with_config(config);
+ }
+
+ if (m_eventBase == nullptr) {
+ // failure...
+ LOG_ERROR("Failed to create event base!");
+ return;
+ }
+
+ evthread_make_base_notifiable(m_eventBase);
+
+ if (run_immediately) {
+ start();
+ }
+}
+
+EventLoop::~EventLoop() {
+ stop();
+
+ if (m_eventBase != nullptr) {
+ event_base_free(m_eventBase);
+ m_eventBase = nullptr;
+ }
+}
+
+void EventLoop::start() {
+ if (m_loopThread == nullptr) {
+ // start event loop
+#if !defined(WIN32) && !defined(__APPLE__)
+ string taskName = UtilAll::getProcessName();
+ prctl(PR_SET_NAME, "EventLoop", 0, 0, 0);
+#endif
+ m_loopThread = new std::thread(&EventLoop::runLoop, this);
+#if !defined(WIN32) && !defined(__APPLE__)
+ prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+ }
+}
+
+void EventLoop::stop() {
+ if (m_loopThread != nullptr /*&& m_loopThread.joinable()*/) {
+ _is_running = false;
+ m_loopThread->join();
+
+ delete m_loopThread;
+ m_loopThread = nullptr;
+ }
+}
+
+void EventLoop::runLoop() {
+ _is_running = true;
+
+ while (_is_running) {
+ int ret;
+
+ ret = event_base_dispatch(m_eventBase);
+ // ret = event_base_loop(m_eventBase, EVLOOP_NONBLOCK);
+
+ if (ret == 1) {
+ // no event
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+}
+
+#define OPT_UNLOCK_CALLBACKS (BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS)
+
+BufferEvent* EventLoop::createBufferEvent(socket_t fd, int options) {
+ struct bufferevent* event = bufferevent_socket_new(m_eventBase, fd, options);
+ if (event == nullptr) {
+ return nullptr;
+ }
+
+ bool unlock = (options & OPT_UNLOCK_CALLBACKS) == OPT_UNLOCK_CALLBACKS;
+
+ return new BufferEvent(event, unlock);
+}
+
+BufferEvent::BufferEvent(struct bufferevent* event, bool unlockCallbacks)
+ : m_bufferEvent(event),
+ m_unlockCallbacks(unlockCallbacks),
+ m_readCallback(nullptr),
+ m_writeCallback(nullptr),
+ m_eventCallback(nullptr),
+ m_callbackTransport() {
+#ifdef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+ if (m_bufferEvent != nullptr) {
+ bufferevent_setcb(m_bufferEvent, read_callback, write_callback, event_callback, this);
+ }
+#endif // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+}
+
+BufferEvent::~BufferEvent() {
+ if (m_bufferEvent != nullptr) {
+ // free function will set all callbacks to NULL first.
+ bufferevent_free(m_bufferEvent);
+ m_bufferEvent = nullptr;
+ }
+}
+
+void BufferEvent::setCallback(BufferEventDataCallback readCallback,
+ BufferEventDataCallback writeCallback,
+ BufferEventEventCallback eventCallback,
+ std::shared_ptr<TcpTransport> transport) {
+ // use lock in bufferevent
+ bufferevent_lock(m_bufferEvent);
+
+ // wrap callback
+ m_readCallback = readCallback;
+ m_writeCallback = writeCallback;
+ m_eventCallback = eventCallback;
+ m_callbackTransport = transport;
+
+#ifndef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+ bufferevent_data_cb readcb = readCallback != nullptr ? read_callback : nullptr;
+ bufferevent_data_cb writecb = writeCallback != nullptr ? write_callback : nullptr;
+ bufferevent_event_cb eventcb = eventCallback != nullptr ? event_callback : nullptr;
+
+ bufferevent_setcb(m_bufferEvent, readcb, writecb, eventcb, this);
+#endif // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+
+ bufferevent_unlock(m_bufferEvent);
+}
+
+void BufferEvent::read_callback(struct bufferevent* bev, void* ctx) {
+ auto event = static_cast<BufferEvent*>(ctx);
+
+ if (event->m_unlockCallbacks)
+ bufferevent_lock(event->m_bufferEvent);
+
+ BufferEventDataCallback callback = event->m_readCallback;
+ std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+ if (event->m_unlockCallbacks)
+ bufferevent_unlock(event->m_bufferEvent);
+
+ if (callback) {
+ callback(event, transport.get());
+ }
+}
+
+void BufferEvent::write_callback(struct bufferevent* bev, void* ctx) {
+ auto event = static_cast<BufferEvent*>(ctx);
+
+ if (event->m_unlockCallbacks)
+ bufferevent_lock(event->m_bufferEvent);
+
+ BufferEventDataCallback callback = event->m_writeCallback;
+ std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+ if (event->m_unlockCallbacks)
+ bufferevent_unlock(event->m_bufferEvent);
+
+ if (callback) {
+ callback(event, transport.get());
+ }
+}
+
+static std::string buildPeerAddrPort(socket_t fd) {
+ sockaddr_in addr;
+ socklen_t len = sizeof(addr);
+
+ getpeername(fd, (struct sockaddr*)&addr, &len);
+
+ LOG_DEBUG("socket: %d, addr: %s, port: %d", fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
+ std::string addrPort(inet_ntoa(addr.sin_addr));
+ addrPort.append(":");
+ addrPort.append(UtilAll::to_string(ntohs(addr.sin_port)));
+
+ return addrPort;
+}
+
+void BufferEvent::event_callback(struct bufferevent* bev, short what, void* ctx) {
+ auto event = static_cast<BufferEvent*>(ctx);
+
+ if (what & BEV_EVENT_CONNECTED) {
+ socket_t fd = event->getfd();
+ event->m_peerAddrPort = buildPeerAddrPort(fd);
+ }
+
+ if (event->m_unlockCallbacks)
+ bufferevent_lock(event->m_bufferEvent);
+
+ BufferEventEventCallback callback = event->m_eventCallback;
+ std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+ if (event->m_unlockCallbacks)
+ bufferevent_unlock(event->m_bufferEvent);
+
+ if (callback) {
+ callback(event, what, transport.get());
+ }
+}
+
+} // namespace rocketmq
diff --git a/src/transport/EventLoop.h b/src/transport/EventLoop.h
new file mode 100644
index 0000000..c974479
--- /dev/null
+++ b/src/transport/EventLoop.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EVENTLOOP_H__
+#define __EVENTLOOP_H__
+
+#include <memory>
+#include <thread>
+
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/event.h>
+
+#include "noncopyable.h"
+
+using socket_t = evutil_socket_t;
+
+namespace rocketmq {
+
+class BufferEvent;
+
+class EventLoop : public noncopyable {
+ public:
+ static EventLoop* GetDefaultEventLoop();
+
+ public:
+ explicit EventLoop(const struct event_config* config = nullptr, bool run_immediately = true);
+ virtual ~EventLoop();
+
+ void start();
+ void stop();
+
+ BufferEvent* createBufferEvent(socket_t fd, int options);
+
+ private:
+ void runLoop();
+
+ private:
+ struct event_base* m_eventBase;
+ std::thread* m_loopThread;
+
+ bool _is_running; // aotmic is unnecessary
+};
+
+class TcpTransport;
+
+using BufferEventDataCallback = void (*)(BufferEvent* event, TcpTransport* transport);
+using BufferEventEventCallback = void (*)(BufferEvent* event, short what, TcpTransport* transport);
+
+class BufferEvent : public noncopyable {
+ public:
+ virtual ~BufferEvent();
+
+ void setCallback(BufferEventDataCallback readCallback,
+ BufferEventDataCallback writeCallback,
+ BufferEventEventCallback eventCallback,
+ std::shared_ptr<TcpTransport> transport);
+
+ void setWatermark(short events, size_t lowmark, size_t highmark) {
+ bufferevent_setwatermark(m_bufferEvent, events, lowmark, highmark);
+ }
+
+ int enable(short event) { return bufferevent_enable(m_bufferEvent, event); }
+
+ int connect(const struct sockaddr* addr, int socklen) {
+ return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)addr, socklen);
+ }
+
+ int write(const void* data, size_t size) { return bufferevent_write(m_bufferEvent, data, size); }
+
+ size_t read(void* data, size_t size) { return bufferevent_read(m_bufferEvent, data, size); }
+
+ struct evbuffer* getInput() {
+ return bufferevent_get_input(m_bufferEvent);
+ }
+
+ socket_t getfd() const { return bufferevent_getfd(m_bufferEvent); }
+
+ std::string getPeerAddrPort() const { return m_peerAddrPort; }
+
+ private:
+ BufferEvent(struct bufferevent* event, bool unlockCallbacks);
+ friend EventLoop;
+
+ static void read_callback(struct bufferevent* bev, void* ctx);
+ static void write_callback(struct bufferevent* bev, void* ctx);
+ static void event_callback(struct bufferevent* bev, short what, void* ctx);
+
+ private:
+ struct bufferevent* m_bufferEvent;
+ const bool m_unlockCallbacks;
+
+ BufferEventDataCallback m_readCallback;
+ BufferEventDataCallback m_writeCallback;
+ BufferEventEventCallback m_eventCallback;
+ std::weak_ptr<TcpTransport> m_callbackTransport; // avoid reference cycle
+
+ // cache properties
+ std::string m_peerAddrPort;
+};
+
+} // namespace rocketmq
+
+#endif //__EVENTLOOP_H__
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
old mode 100644
new mode 100755
index cc10daa..b0b2613
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -15,46 +15,42 @@
* limitations under the License.
*/
#include "ResponseFuture.h"
+
+#include <chrono>
+
#include "Logging.h"
#include "TcpRemotingClient.h"
namespace rocketmq {
+
//<!************************************************************************
ResponseFuture::ResponseFuture(int requestCode,
int opaque,
TcpRemotingClient* powner,
int64 timeout,
- bool bAsync /* = false */,
- AsyncCallbackWrap* pcall /* = NULL */) {
- m_bAsync.store(bAsync);
- m_requestCode = requestCode;
- m_opaque = opaque;
- m_timeout = timeout;
- m_pCallbackWrap = pcall;
- m_pResponseCommand = NULL;
- m_sendRequestOK = false;
- m_maxRetrySendTimes = 1;
- m_retrySendTimes = 1;
+ bool bAsync,
+ AsyncCallbackWrap* pCallback)
+ : m_requestCode(requestCode),
+ m_opaque(opaque),
+ m_timeout(timeout),
+ m_bAsync(bAsync),
+ m_pCallbackWrap(pCallback),
+ m_asyncCallbackStatus(ASYNC_CALLBACK_STATUS_INIT),
+ m_haveResponse(false),
+ m_sendRequestOK(false),
+ m_pResponseCommand(nullptr),
+ m_maxRetrySendTimes(1),
+ m_retrySendTimes(1) {
m_brokerAddr = "";
m_beginTimestamp = UtilAll::currentTimeMillis();
- m_asyncCallbackStatus = asyncCallBackStatus_init;
- if (getASyncFlag()) {
- m_asyncResponse.store(false);
- m_syncResponse.store(true);
- } else {
- m_asyncResponse.store(true);
- m_syncResponse.store(false);
- }
}
ResponseFuture::~ResponseFuture() {
deleteAndZero(m_pCallbackWrap);
/*
- do not set m_pResponseCommand to NULL when destruct, as m_pResponseCommand
- is used by MQClientAPIImpl concurrently, and will be released by producer or
- consumer;
- m_pResponseCommand = NULL;
- */
+ do not delete m_pResponseCommand when destruct, as m_pResponseCommand
+ is used by MQClientAPIImpl concurrently, and will be released by producer or consumer;
+ */
}
void ResponseFuture::releaseThreadCondition() {
@@ -62,55 +58,41 @@ void ResponseFuture::releaseThreadCondition() {
}
RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
- boost::unique_lock<boost::mutex> lk(m_defaultEventLock);
- if (!m_defaultEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillis))) {
- LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque);
- m_syncResponse.store(true);
+ std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
+ if (!m_haveResponse) {
+ if (timeoutMillis <= 0) {
+ timeoutMillis = m_timeout;
+ }
+ if (m_defaultEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
+ LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque);
+ m_haveResponse = true;
+ }
}
return m_pResponseCommand;
}
-void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
- // LOG_DEBUG("setResponse of opaque:%d",m_opaque);
- m_pResponseCommand = pResponseCommand;
+bool ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
+ std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
- if (!getASyncFlag()) {
- if (m_syncResponse.load() == false) {
- m_defaultEvent.notify_all();
- m_syncResponse.store(true);
- }
+ if (m_haveResponse) {
+ return false;
}
-}
-const bool ResponseFuture::getSyncResponseFlag() {
- if (m_syncResponse.load() == true) {
- return true;
- }
- return false;
-}
+ m_pResponseCommand = pResponseCommand;
+ m_haveResponse = true;
-const bool ResponseFuture::getAsyncResponseFlag() {
- if (m_asyncResponse.load() == true) {
- // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
- return true;
+ if (!getAsyncFlag()) {
+ m_defaultEvent.notify_all();
}
- return false;
-}
-
-void ResponseFuture::setAsyncResponseFlag() {
- m_asyncResponse.store(true);
+ return true;
}
-const bool ResponseFuture::getASyncFlag() {
- if (m_bAsync.load() == true) {
- // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
- return true;
- }
- return false;
+const bool ResponseFuture::getAsyncFlag() {
+ return m_bAsync;
}
-bool ResponseFuture::isSendRequestOK() {
+bool ResponseFuture::isSendRequestOK() const {
return m_sendRequestOK;
}
@@ -126,58 +108,32 @@ int ResponseFuture::getRequestCode() const {
return m_requestCode;
}
-void ResponseFuture::setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus) {
- boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
- if (m_asyncCallbackStatus == asyncCallBackStatus_init) {
- m_asyncCallbackStatus = asyncCallbackStatus;
- }
-}
-
-void ResponseFuture::executeInvokeCallback() {
- if (m_pCallbackWrap == NULL) {
+void ResponseFuture::invokeCompleteCallback() {
+ if (m_pCallbackWrap == nullptr) {
deleteAndZero(m_pResponseCommand);
return;
} else {
- if (m_asyncCallbackStatus == asyncCallBackStatus_response) {
- m_pCallbackWrap->operationComplete(this, true);
- } else {
- if (m_pResponseCommand)
- deleteAndZero(m_pResponseCommand); // the responseCommand from
- // RemotingCommand::Decode(mem) will
- // only deleted by operationComplete
- // automatically
- LOG_WARN(
- "timeout and response incoming concurrently of opaque:%d, and "
- "executeInvokeCallbackException was called earlier",
- m_opaque);
- }
+ m_pCallbackWrap->operationComplete(this, true);
}
}
-void ResponseFuture::executeInvokeCallbackException() {
- if (m_pCallbackWrap == NULL) {
+void ResponseFuture::invokeExceptionCallback() {
+ if (m_pCallbackWrap == nullptr) {
LOG_ERROR("m_pCallbackWrap is NULL, critical error");
return;
} else {
- if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
- // here no need retrySendTimes process because of it have timeout
- LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(),
- getRetrySendTimes(), getMaxRetrySendTimes());
-
- m_pCallbackWrap->onException();
- } else {
- LOG_WARN(
- "timeout and response incoming concurrently of opaque:%d, and "
- "executeInvokeCallback was called earlier",
- m_opaque);
- }
+ // here no need retrySendTimes process because of it have timeout
+ LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(),
+ getMaxRetrySendTimes());
+
+ m_pCallbackWrap->onException();
}
}
bool ResponseFuture::isTimeOut() const {
int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
//<!only async;
- return m_bAsync.load() == 1 && diff > m_timeout;
+ return m_bAsync && diff > m_timeout;
}
int ResponseFuture::getMaxRetrySendTimes() const {
@@ -197,16 +153,17 @@ void ResponseFuture::setRetrySendTimes(int retryTimes) {
void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) {
m_brokerAddr = brokerAddr;
}
+
+std::string ResponseFuture::getBrokerAddr() const {
+ return m_brokerAddr;
+}
+
void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) {
m_requestCommand = requestCommand;
}
-
const RemotingCommand& ResponseFuture::getRequestCommand() {
return m_requestCommand;
}
-std::string ResponseFuture::getBrokerAddr() const {
- return m_brokerAddr;
-}
int64 ResponseFuture::leftTime() const {
int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
@@ -222,4 +179,4 @@ AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
old mode 100644
new mode 100755
index d6f6b7a..66be663
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -16,19 +16,21 @@
*/
#ifndef __RESPONSEFUTURE_H__
#define __RESPONSEFUTURE_H__
-#include <boost/atomic.hpp>
-#include <boost/thread/condition_variable.hpp>
+
+#include <atomic>
+#include <condition_variable>
+
#include "AsyncCallbackWrap.h"
#include "RemotingCommand.h"
#include "UtilAll.h"
namespace rocketmq {
-typedef enum asyncCallBackStatus {
- asyncCallBackStatus_init = 0,
- asyncCallBackStatus_response = 1,
- asyncCallBackStatus_timeout = 2
-} asyncCallBackStatus;
+typedef enum AsyncCallbackStatus {
+ ASYNC_CALLBACK_STATUS_INIT = 0,
+ ASYNC_CALLBACK_STATUS_RESPONSE = 1,
+ ASYNC_CALLBACK_STATUS_TIMEOUT = 2
+} AsyncCallbAackStatus;
class TcpRemotingClient;
//<!***************************************************************************
@@ -39,32 +41,30 @@ class ResponseFuture {
TcpRemotingClient* powner,
int64 timeoutMilliseconds,
bool bAsync = false,
- AsyncCallbackWrap* pcall = NULL);
+ AsyncCallbackWrap* pCallback = nullptr);
virtual ~ResponseFuture();
+
void releaseThreadCondition();
- RemotingCommand* waitResponse(int timeoutMillis);
+ RemotingCommand* waitResponse(int timeoutMillis = 0);
RemotingCommand* getCommand() const;
- void setResponse(RemotingCommand* pResponseCommand);
- bool isSendRequestOK();
+ bool setResponse(RemotingCommand* pResponseCommand);
+
+ bool isSendRequestOK() const;
void setSendRequestOK(bool sendRequestOK);
int getRequestCode() const;
int getOpaque() const;
//<!callback;
- void executeInvokeCallback();
- void executeInvokeCallbackException();
+ void invokeCompleteCallback();
+ void invokeExceptionCallback();
bool isTimeOut() const;
int getMaxRetrySendTimes() const;
int getRetrySendTimes() const;
int64 leftTime() const;
- // bool isTimeOutMoreThan30s() const;
- const bool getASyncFlag();
- void setAsyncResponseFlag();
- const bool getAsyncResponseFlag();
- const bool getSyncResponseFlag();
+ const bool getAsyncFlag();
AsyncCallbackWrap* getAsyncCallbackWrap();
- void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus);
+
void setMaxRetrySendTimes(int maxRetryTimes);
void setRetrySendTimes(int retryTimes);
void setBrokerAddr(const std::string& brokerAddr);
@@ -75,26 +75,30 @@ class ResponseFuture {
private:
int m_requestCode;
int m_opaque;
- bool m_sendRequestOK;
- boost::mutex m_defaultEventLock;
- boost::condition_variable_any m_defaultEvent;
- int64 m_beginTimestamp;
int64 m_timeout; // ms
- boost::atomic<bool> m_bAsync;
- RemotingCommand* m_pResponseCommand; //<!delete outside;
+
+ const bool m_bAsync;
AsyncCallbackWrap* m_pCallbackWrap;
- boost::mutex m_asyncCallbackLock;
- asyncCallBackStatus m_asyncCallbackStatus;
- boost::atomic<bool> m_asyncResponse;
- boost::atomic<bool> m_syncResponse;
+
+ AsyncCallbackStatus m_asyncCallbackStatus;
+ std::mutex m_asyncCallbackLock;
+
+ bool m_haveResponse;
+ std::mutex m_defaultEventLock;
+ std::condition_variable m_defaultEvent;
+
+ int64 m_beginTimestamp;
+ bool m_sendRequestOK;
+ RemotingCommand* m_pResponseCommand; //<!delete outside;
int m_maxRetrySendTimes;
int m_retrySendTimes;
+
std::string m_brokerAddr;
RemotingCommand m_requestCommand;
- // TcpRemotingClient* m_tcpRemoteClient;
};
+
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
old mode 100644
new mode 100755
index 8256391..2f2ab46
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -19,6 +19,7 @@
#if !defined(WIN32) && !defined(__APPLE__)
#include <sys/prctl.h>
#endif
+
#include "Logging.h"
#include "MemoryOutputStream.h"
#include "TopAddressing.h"
@@ -35,27 +36,31 @@ TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeo
m_ioServiceWork(m_ioService) {
#if !defined(WIN32) && !defined(__APPLE__)
string taskName = UtilAll::getProcessName();
- prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
+ prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0);
#endif
- for (int i = 0; i != pullThreadNum; ++i) {
+ for (int i = 0; i != m_pullThreadNum; ++i) {
m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
}
#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
#endif
- LOG_INFO(
- "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
- "m_pullThreadNum:%d",
- m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+
+ LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout,
+ m_tcpTransportTryLockTimeout, m_pullThreadNum);
+
m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
}
void TcpRemotingClient::boost_asio_work() {
- LOG_INFO("TcpRemotingClient::boost asio async service runing");
- boost::asio::io_service::work work(m_async_ioService); // avoid async io
- // service stops after
- // first timer timeout
- // callback
+ LOG_INFO("TcpRemotingClient::boost asio async service running");
+
+#if !defined(WIN32) && !defined(__APPLE__)
+ prctl(PR_SET_NAME, "RemotingAsioT", 0, 0, 0);
+#endif
+
+ // avoid async io service stops after first timer timeout callback
+ boost::asio::io_service::work work(m_async_ioService);
+
m_async_ioService.run();
}
@@ -69,15 +74,15 @@ TcpRemotingClient::~TcpRemotingClient() {
void TcpRemotingClient::stopAllTcpTransportThread() {
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+
m_async_ioService.stop();
m_async_service_thread->interrupt();
m_async_service_thread->join();
removeAllTimerCallback();
{
- TcpMap::iterator it = m_tcpTable.begin();
- for (; it != m_tcpTable.end(); ++it) {
- it->second->disconnect(it->first);
+ for (const auto& trans : m_tcpTable) {
+ trans.second->disconnect(trans.first);
}
m_tcpTable.clear();
}
@@ -86,62 +91,66 @@ void TcpRemotingClient::stopAllTcpTransportThread() {
m_threadpool.join_all();
{
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end(); ++it) {
- if (it->second)
- it->second->releaseThreadCondition();
+ std::lock_guard<std::mutex> lock(m_futureTableLock);
+ for (const auto& future : m_futureTable) {
+ if (future.second)
+ future.second->releaseThreadCondition();
}
}
+
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
}
void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str());
- if (!addrs.empty()) {
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) {
- LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
- return;
- }
+
+ if (addrs.empty()) {
+ return;
+ }
+
+ std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.try_lock_for(std::chrono::seconds(10))) {
+ LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+ return;
}
- // clear first;
- m_namesrvAddrList.clear();
-
- vector<string> out;
- UtilAll::Split(out, addrs, ";");
- for (size_t i = 0; i < out.size(); i++) {
- string addr = out[i];
- UtilAll::Trim(addr);
-
- string hostName;
- short portNumber;
- if (UtilAll::SplitURL(addr, hostName, portNumber)) {
- LOG_INFO("update Namesrv:%s", addr.c_str());
- m_namesrvAddrList.push_back(addr);
- } else {
- LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
- }
+ }
+
+ // clear first;
+ m_namesrvAddrList.clear();
+
+ vector<string> out;
+ UtilAll::Split(out, addrs, ";");
+ for (auto addr : out) {
+ UtilAll::Trim(addr);
+
+ string hostName;
+ short portNumber;
+ if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+ LOG_INFO("update Namesrv:%s", addr.c_str());
+ m_namesrvAddrList.push_back(addr);
+ } else {
+ LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
}
- out.clear();
}
+ out.clear();
}
-bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request) {
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
+bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis) {
+ std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != nullptr) {
int code = request.getCode();
int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, 3000, false, NULL));
+
+ std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
addResponseFuture(opaque, responseFuture);
- // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
- // timeoutms:%d", addr.c_str(), code, opaque, 3000);
if (SendCommand(pTcp, request)) {
responseFuture->setSendRequestOK(true);
- unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
- if (pRsp == NULL) {
+ unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse());
+ if (pRsp == nullptr) {
LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str());
+ // avoid responseFuture leak;
findAndDeleteResponseFuture(opaque);
CloseTransport(addr, pTcp);
return false;
@@ -152,6 +161,7 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req
return false;
}
} else {
+ // avoid responseFuture leak;
findAndDeleteResponseFuture(opaque);
CloseTransport(addr, pTcp);
}
@@ -159,34 +169,28 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req
return false;
}
-RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
- RemotingCommand& request,
- int timeoutMillis /* = 3000 */) {
+RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis) {
LOG_DEBUG("InvokeSync:", addr.c_str());
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
+ std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != nullptr) {
int code = request.getCode();
int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(
- new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
+
+ std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
addResponseFuture(opaque, responseFuture);
if (SendCommand(pTcp, request)) {
- // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
- // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
responseFuture->setSendRequestOK(true);
- RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
- if (pRsp == NULL) {
+ RemotingCommand* pRsp = responseFuture->waitResponse();
+ if (pRsp == nullptr) {
if (code != GET_CONSUMER_LIST_BY_GROUP) {
- LOG_WARN(
- "wait response timeout or get NULL response of code:%d, so "
- "closeTransport of addr:%s",
- code, addr.c_str());
+ LOG_WARN("wait response timeout or get NULL response of code:%d, so closeTransport of addr:%s", code,
+ addr.c_str());
CloseTransport(addr, pTcp);
}
// avoid responseFuture leak;
findAndDeleteResponseFuture(opaque);
- return NULL;
+ return nullptr;
} else {
return pRsp;
}
@@ -197,137 +201,130 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
}
}
LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str());
- return NULL;
+ return nullptr;
}
bool TcpRemotingClient::invokeAsync(const string& addr,
RemotingCommand& request,
- AsyncCallbackWrap* cbw,
- int64 timeoutMilliseconds,
+ AsyncCallbackWrap* callback,
+ int64 timeoutMillis,
int maxRetrySendTimes,
int retrySendTimes) {
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
- //<!not delete, for callback to delete;
+ std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != nullptr) {
int code = request.getCode();
int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(
- new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
+
+ // delete in callback
+ std::shared_ptr<ResponseFuture> responseFuture(
+ new ResponseFuture(code, opaque, this, timeoutMillis, true, callback));
responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
responseFuture->setRetrySendTimes(retrySendTimes);
responseFuture->setBrokerAddr(addr);
responseFuture->setRequestCommand(request);
addAsyncResponseFuture(opaque, responseFuture);
- if (cbw) {
+
+ if (callback) {
boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMilliseconds));
+ new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis));
addTimerCallback(t, opaque);
- boost::system::error_code e;
- t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, this, e, opaque));
+ t->async_wait(
+ boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
}
- if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread
- // will trigger next pull request or report
- // send msg failed
- {
+ // Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
+ if (SendCommand(pTcp, request)) {
LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque);
responseFuture->setSendRequestOK(true);
}
return true;
}
+
LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
return false;
}
void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& request) {
//<!not need callback;
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
+ std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != nullptr) {
request.markOnewayRPC();
- LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), request.getCode());
- SendCommand(pTcp, request);
+ if (SendCommand(pTcp, request)) {
+ LOG_DEBUG("invokeOneway success. addr:%s, code:%d", addr.c_str(), request.getCode());
+ } else {
+ LOG_WARN("invokeOneway failed. addr:%s, code:%d", addr.c_str(), request.getCode());
+ }
+ } else {
+ LOG_WARN("invokeOneway failed: NULL transport. addr:%s, code:%d", addr.c_str(), request.getCode());
}
}
-boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needRespons) {
+std::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needResponse) {
if (addr.empty()) {
LOG_DEBUG("GetTransport of NameServer");
- return CreateNameserverTransport(needRespons);
+ return CreateNameServerTransport(needResponse);
}
- return CreateTransport(addr, needRespons);
+ return CreateTransport(addr, needResponse);
}
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needRespons) {
- boost::shared_ptr<TcpTransport> tts;
+std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) {
+ std::shared_ptr<TcpTransport> tts;
+
{
// try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
- // long
- // time, if could not get m_tcpLock, return NULL
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+ // long time, if could not get m_tcpLock, return NULL
+ std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
- boost::shared_ptr<TcpTransport> pTcp;
+ std::shared_ptr<TcpTransport> pTcp;
return pTcp;
- } else {
- bGetMutex = true;
}
- } else {
- bGetMutex = true;
}
- if (bGetMutex) {
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
- boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
- if (tcp) {
- tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
- if (connectStatus == e_connectWaitResponse) {
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else if (connectStatus == e_connectFail) {
- LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
- tcp->disconnect(addr); // avoid coredump when connection with broker was broken
- m_tcpTable.erase(addr);
- } else if (connectStatus == e_connectSuccess) {
- return tcp;
- } else {
- LOG_ERROR(
- "go to fault state, erase:%s from tcpMap, and reconnect "
- "it",
- addr.c_str());
- m_tcpTable.erase(addr);
- }
+
+ // check for reuse
+ if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+ std::shared_ptr<TcpTransport> tcp = m_tcpTable[addr];
+
+ if (tcp) {
+ TcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+ if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) {
+ return tcp;
+ } else if (connectStatus == TCP_CONNECT_STATUS_WAIT) {
+ std::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else if (connectStatus == TCP_CONNECT_STATUS_FAILED) {
+ LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
+ tcp->disconnect(addr); // avoid coredump when connection with broker was broken
+ m_tcpTable.erase(addr);
+ } else {
+ LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
+ m_tcpTable.erase(addr);
}
}
+ }
- //<!callback;
- READ_CALLBACK callback = needRespons ? &TcpRemotingClient::static_messageReceived : NULL;
+ //<!callback;
+ TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr;
- tts.reset(new TcpTransport(this, callback));
- tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
- if (connectStatus != e_connectWaitResponse) {
- LOG_WARN("can not connect to :%s", addr.c_str());
- tts->disconnect(addr);
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- m_tcpTable[addr] = tts; // even if connecting failed finally, this
- // server transport will be erased by next
- // CreateTransport
- }
- } else {
- LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
- boost::shared_ptr<TcpTransport> pTcp;
+ tts = TcpTransport::CreateTransport(this, callback);
+ TcpConnectStatus connectStatus = tts->connect(addr, 0); // use non-block
+ if (connectStatus != TCP_CONNECT_STATUS_WAIT) {
+ LOG_WARN("can not connect to:%s", addr.c_str());
+ tts->disconnect(addr);
+ std::shared_ptr<TcpTransport> pTcp;
return pTcp;
+ } else {
+ // even if connecting failed finally, this server transport will be erased by next CreateTransport
+ m_tcpTable[addr] = tts;
}
}
- tcpConnectStatus connectStatus = tts->waitTcpConnectEvent(m_tcpConnectTimeout);
- if (connectStatus != e_connectSuccess) {
+ TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout));
+ if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
LOG_WARN("can not connect to server:%s", addr.c_str());
tts->disconnect(addr);
- boost::shared_ptr<TcpTransport> pTcp;
+ std::shared_ptr<TcpTransport> pTcp;
return pTcp;
} else {
LOG_INFO("connect server with addr:%s success", addr.c_str());
@@ -335,165 +332,122 @@ boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string&
}
}
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
+std::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameServerTransport(bool needResponse) {
// m_namesrvLock was added to avoid operation of nameServer was blocked by
// m_tcpLock, it was used by single Thread mostly, so no performance impact
- // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
+ // try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long
// time, if could not get m_namesrvlock, return NULL
LOG_DEBUG("--CreateNameserverTransport--");
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+ std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
- boost::shared_ptr<TcpTransport> pTcp;
+ std::shared_ptr<TcpTransport> pTcp;
return pTcp;
- } else {
- bGetMutex = true;
}
- } else {
- bGetMutex = true;
}
- if (bGetMutex) {
- if (!m_namesrvAddrChoosed.empty()) {
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrChoosed, true);
- if (pTcp)
- return pTcp;
- else
- m_namesrvAddrChoosed.clear();
- }
+ if (!m_namesrvAddrChoosed.empty()) {
+ std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrChoosed, true);
+ if (pTcp)
+ return pTcp;
+ else
+ m_namesrvAddrChoosed.clear();
+ }
- vector<string>::iterator itp = m_namesrvAddrList.begin();
- for (; itp != m_namesrvAddrList.end(); ++itp) {
- unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
- if (m_namesrvIndex == numeric_limits<unsigned int>::max())
- m_namesrvIndex = 0;
- m_namesrvIndex++;
- LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index,
- m_namesrvAddrList.size());
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrList[index], true);
- if (pTcp) {
- m_namesrvAddrChoosed = m_namesrvAddrList[index];
- return pTcp;
- }
+ for (int i = 0; i < m_namesrvAddrList.size(); i++) {
+ unsigned int index = m_namesrvIndex++ % m_namesrvAddrList.size();
+ LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index,
+ m_namesrvAddrList.size());
+ std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrList[index], true);
+ if (pTcp) {
+ m_namesrvAddrChoosed = m_namesrvAddrList[index];
+ return pTcp;
}
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- LOG_WARN("get nameServer tcpTransport mutex failed");
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
}
+
+ std::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
}
-void TcpRemotingClient::CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp) {
+bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp) {
if (addr.empty()) {
return CloseNameServerTransport(pTcp);
}
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+ std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
- return;
- } else {
- bGetMutex = true;
+ return false;
}
- } else {
- bGetMutex = true;
}
+
LOG_ERROR("CloseTransport of:%s", addr.c_str());
- if (bGetMutex) {
- bool removeItemFromTable = true;
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
- LOG_INFO(
- "tcpTransport with addr:%s has been closed before, and has been "
- "created again, nothing to do",
- addr.c_str());
- removeItemFromTable = false;
- }
- } else {
- LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
- removeItemFromTable = false;
- }
- if (removeItemFromTable == true) {
- LOG_WARN("closeTransport: disconnect broker:%s with state:%d", addr.c_str(),
- m_tcpTable[addr]->getTcpConnectStatus());
- if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
- m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken
- LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
- m_tcpTable.erase(addr);
+ bool removeItemFromTable = true;
+ if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+ if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+ LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do",
+ addr.c_str());
+ removeItemFromTable = false;
}
} else {
- LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
- return;
+ LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
+ removeItemFromTable = false;
}
+
+ if (removeItemFromTable) {
+ LOG_WARN("closeTransport: disconnect:%s with state:%d", addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus());
+ if (m_tcpTable[addr]->getTcpConnectStatus() == TCP_CONNECT_STATUS_SUCCESS)
+ m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken
+ LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+ m_tcpTable.erase(addr);
+ }
+
LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+
+ return removeItemFromTable;
}
-void TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp) {
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+bool TcpRemotingClient::CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp) {
+ std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
- LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
- return;
- } else {
- bGetMutex = true;
+ if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
+ LOG_ERROR("CreateNameServerTransport get timed_mutex timeout");
+ return false;
}
- } else {
- bGetMutex = true;
}
- if (bGetMutex) {
- string addr = m_namesrvAddrChoosed;
- bool removeItemFromTable = true;
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
- LOG_INFO(
- "tcpTransport with addr:%s has been closed before, and has been "
- "created again, nothing to do",
- addr.c_str());
- removeItemFromTable = false;
- }
- } else {
- LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
- removeItemFromTable = false;
- }
- if (removeItemFromTable == true) {
- m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection with server was broken
- LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
- m_tcpTable.erase(addr);
- m_namesrvAddrChoosed.clear();
- }
- } else {
- LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", m_namesrvAddrChoosed.c_str());
- return;
+ string addr = m_namesrvAddrChoosed;
+
+ bool removeItemFromTable = CloseTransport(addr, pTcp);
+ if (removeItemFromTable) {
+ m_namesrvAddrChoosed.clear();
}
+
+ return removeItemFromTable;
}
-bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) {
- const MemoryBlock* phead = msg.GetHead();
- const MemoryBlock* pbody = msg.GetBody();
+bool TcpRemotingClient::SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) {
+ const MemoryBlock* pHead = msg.GetHead();
+ const MemoryBlock* pBody = msg.GetBody();
- unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
- if (phead->getData()) {
- result->write(phead->getData(), phead->getSize());
+ unique_ptr<MemoryOutputStream> buffer(new MemoryOutputStream(1024));
+ if (pHead->getSize() > 0) {
+ buffer->write(pHead->getData(), static_cast<size_t>(pHead->getSize()));
}
- if (pbody->getData()) {
- result->write(pbody->getData(), pbody->getSize());
+ if (pBody->getSize() > 0) {
+ buffer->write(pBody->getData(), static_cast<size_t>(pBody->getSize()));
}
- const char* pData = static_cast<const char*>(result->getData());
- int len = result->getDataSize();
+
+ const char* pData = static_cast<const char*>(buffer->getData());
+ size_t len = buffer->getDataSize();
return pTts->sendMessage(pData, len);
}
void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& mem, const string& addr) {
- TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
+ auto* pTcpRemotingClient = reinterpret_cast<TcpRemotingClient*>(context);
if (pTcpRemotingClient)
pTcpRemotingClient->messageReceived(mem, addr);
}
@@ -503,11 +457,11 @@ void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& ad
}
void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) {
- RemotingCommand* pRespondCmd = NULL;
+ RemotingCommand* pRespondCmd = nullptr;
try {
pRespondCmd = RemotingCommand::Decode(mem);
} catch (...) {
- LOG_ERROR("processData_error");
+ LOG_ERROR("processData error");
return;
}
@@ -515,43 +469,58 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr)
//<!process self;
if (pRespondCmd->isResponseType()) {
- boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ std::shared_ptr<ResponseFuture> pFuture = findAndDeleteAsyncResponseFuture(opaque);
if (!pFuture) {
pFuture = findAndDeleteResponseFuture(opaque);
- if (pFuture) {
- if (pFuture->getSyncResponseFlag()) {
- LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
- deleteAndZero(pRespondCmd);
- return;
- }
- LOG_DEBUG("find_response opaque:%d", opaque);
- } else {
+ if (!pFuture) {
LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque);
deleteAndZero(pRespondCmd);
return;
}
}
+
+ LOG_DEBUG("find_response opaque:%d", opaque);
processResponseCommand(pRespondCmd, pFuture);
} else {
processRequestCommand(pRespondCmd, addr);
}
}
-void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) {
- int code = pfuture->getRequestCode();
+void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture) {
+ int code = pFuture->getRequestCode();
+ pCmd->SetExtHeader(code); // set head, for response use
+
int opaque = pCmd->getOpaque();
- LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque,
- pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
- pCmd->SetExtHeader(code); // set head , for response use
-
- pfuture->setResponse(pCmd);
-
- if (pfuture->getASyncFlag()) {
- if (!pfuture->getAsyncResponseFlag()) {
- pfuture->setAsyncResponseFlag();
- pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
- cancelTimerCallback(opaque);
- pfuture->executeInvokeCallback();
+ LOG_DEBUG("processResponseCommand, code:%d, opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque,
+ pFuture->getMaxRetrySendTimes(), pFuture->getRetrySendTimes());
+
+ if (!pFuture->setResponse(pCmd)) {
+ // this branch is unreachable normally.
+ LOG_WARN("response already timeout of opaque:%d", opaque);
+ deleteAndZero(pCmd);
+ return;
+ }
+
+ if (pFuture->getAsyncFlag()) {
+ cancelTimerCallback(opaque);
+ pFuture->invokeCompleteCallback();
+ }
+}
+
+void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque) {
+ if (e == boost::asio::error::operation_aborted) {
+ LOG_DEBUG("handleAsyncRequestTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
+ return;
+ }
+
+ LOG_DEBUG("handleAsyncRequestTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
+
+ std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ if (pFuture) {
+ LOG_ERROR("no response got for opaque:%d", opaque);
+ eraseTimerCallback(opaque);
+ if (pFuture->getAsyncCallbackWrap()) {
+ pFuture->invokeExceptionCallback();
}
}
}
@@ -575,16 +544,16 @@ void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const strin
}
}
-void TcpRemotingClient::addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- m_futureTable[opaque] = pfuture;
+void TcpRemotingClient::addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
+ std::lock_guard<std::mutex> lock(m_futureTableLock);
+ m_futureTable[opaque] = pFuture;
}
// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- boost::shared_ptr<ResponseFuture> pResponseFuture;
+std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
+ std::lock_guard<std::mutex> lock(m_futureTableLock);
+ std::shared_ptr<ResponseFuture> pResponseFuture;
if (m_futureTable.find(opaque) != m_futureTable.end()) {
pResponseFuture = m_futureTable[opaque];
m_futureTable.erase(opaque);
@@ -592,42 +561,20 @@ boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture
return pResponseFuture;
}
-void TcpRemotingClient::handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque) {
- if (e == boost::asio::error::operation_aborted) {
- LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(),
- e.message().data());
- return;
- }
-
- LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
- boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
- if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) {
- if ((pFuture->getAsyncResponseFlag() != true)) // if no response received, then check timeout or not
- {
- LOG_ERROR("no response got for opaque:%d", opaque);
- pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
- pFuture->executeInvokeCallbackException();
- }
- }
-
- eraseTimerCallback(opaque);
-}
-
-void TcpRemotingClient::addAsyncResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
- boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
- m_asyncFutureTable[opaque] = pfuture;
+void TcpRemotingClient::addAsyncResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
+ std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
+ m_asyncFutureTable[opaque] = pFuture;
}
// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will
// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
- boost::shared_ptr<ResponseFuture> pResponseFuture;
+std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
+ std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
+ std::shared_ptr<ResponseFuture> pResponseFuture;
if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
pResponseFuture = m_asyncFutureTable[opaque];
m_asyncFutureTable.erase(opaque);
}
-
return pResponseFuture;
}
@@ -638,56 +585,64 @@ void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemot
}
void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+ if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
- boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
- old_t->cancel();
+ boost::asio::deadline_timer* old_t = m_asyncTimerTable[opaque];
+ m_asyncTimerTable.erase(opaque);
+ try {
+ old_t->cancel();
+ } catch (const std::exception& ec) {
+ LOG_WARN("encounter exception when cancel old timer: %s", ec.what());
+ }
delete old_t;
- old_t = NULL;
- m_async_timer_map.erase(opaque);
}
- m_async_timer_map[opaque] = t;
+ m_asyncTimerTable[opaque] = t;
}
void TcpRemotingClient::eraseTimerCallback(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+ if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
- boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+ boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
+ m_asyncTimerTable.erase(opaque);
delete t;
- t = NULL;
- m_async_timer_map.erase(opaque);
}
}
void TcpRemotingClient::cancelTimerCallback(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+ if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
- boost::asio::deadline_timer* t = m_async_timer_map[opaque];
- t->cancel();
+ boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
+ m_asyncTimerTable.erase(opaque);
+ try {
+ t->cancel();
+ } catch (const std::exception& ec) {
+ LOG_WARN("encounter exception when cancel timer: %s", ec.what());
+ }
delete t;
- t = NULL;
- m_async_timer_map.erase(opaque);
}
}
void TcpRemotingClient::removeAllTimerCallback() {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != m_async_timer_map.end(); ++it) {
- boost::asio::deadline_timer* t = it->second;
- t->cancel();
+ std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+ for (const auto& timer : m_asyncTimerTable) {
+ boost::asio::deadline_timer* t = timer.second;
+ try {
+ t->cancel();
+ } catch (const std::exception& ec) {
+ LOG_WARN("encounter exception when cancel timer: %s", ec.what());
+ }
delete t;
- t = NULL;
}
- m_async_timer_map.clear();
+ m_asyncTimerTable.clear();
}
void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
- // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it
- // later
- boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
+ // discard when receive it later
+ std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
if (!pFuture) {
pFuture = findAndDeleteResponseFuture(opaque);
if (pFuture) {
@@ -695,9 +650,9 @@ void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq,
}
} else {
LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+ // delete the timeout timer for opaque for pullrequest
+ cancelTimerCallback(opaque);
}
- // delete the timeout timer for opaque for pullrequest
- cancelTimerCallback(opaque);
}
//<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
old mode 100644
new mode 100755
index 6085f7e..82f1155
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -1,127 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TCPREMOTINGCLIENT_H__
-#define __TCPREMOTINGCLIENT_H__
-
-#include <boost/asio.hpp>
-#include <boost/asio/io_service.hpp>
-#include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/weak_ptr.hpp>
-#include <map>
-#include "ClientRemotingProcessor.h"
-#include "RemotingCommand.h"
-#include "ResponseFuture.h"
-#include "SocketUtil.h"
-#include "TcpTransport.h"
-
-namespace rocketmq {
-//<!************************************************************************
-
-class TcpRemotingClient {
- public:
- TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
- virtual ~TcpRemotingClient();
- void stopAllTcpTransportThread();
- void updateNameServerAddressList(const string& addrs);
-
- //<!delete outsite;
- RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
-
- bool invokeHeartBeat(const string& addr, RemotingCommand& request);
-
- bool invokeAsync(const string& addr,
- RemotingCommand& request,
- AsyncCallbackWrap* cbw,
- int64 timeoutMilliseconds,
- int maxRetrySendTimes = 1,
- int retrySendTimes = 1);
- void invokeOneway(const string& addr, RemotingCommand& request);
-
- void ProcessData(const MemoryBlock& mem, const string& addr);
-
- void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
-
- void boost_asio_work();
- void handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque);
- void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
-
- private:
- static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
- void messageReceived(const MemoryBlock& mem, const string& addr);
- boost::shared_ptr<TcpTransport> GetTransport(const string& addr, bool needRespons);
- boost::shared_ptr<TcpTransport> CreateTransport(const string& addr, bool needRespons);
- boost::shared_ptr<TcpTransport> CreateNameserverTransport(bool needRespons);
- void CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp);
- void CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp);
- bool SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg);
- void processRequestCommand(RemotingCommand* pCmd, const string& addr);
- void processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture);
-
- void addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture);
- boost::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque);
-
- void addAsyncResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture);
- boost::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture(int opaque);
-
- void addTimerCallback(boost::asio::deadline_timer* t, int opaque);
- void eraseTimerCallback(int opaque);
- void cancelTimerCallback(int opaque);
- void removeAllTimerCallback();
-
- private:
- typedef map<string, boost::shared_ptr<TcpTransport>> TcpMap;
- typedef map<int, boost::shared_ptr<ResponseFuture>> ResMap;
-
- typedef map<int, ClientRemotingProcessor*> RequestMap;
- RequestMap m_requestTable;
-
- boost::mutex m_futureTableMutex;
- ResMap m_futureTable; //<! id->future;
-
- ResMap m_asyncFutureTable;
- boost::mutex m_asyncFutureLock;
-
- TcpMap m_tcpTable; //<! ip->tcp;
- boost::timed_mutex m_tcpLock;
-
- // ThreadPool m_threadpool;
- int m_pullThreadNum;
- uint64_t m_tcpConnectTimeout; // ms
- uint64_t m_tcpTransportTryLockTimeout; // s
-
- //<! Nameserver
- boost::timed_mutex m_namesrvlock;
- vector<string> m_namesrvAddrList;
- string m_namesrvAddrChoosed;
- unsigned int m_namesrvIndex;
- boost::asio::io_service m_ioService;
- boost::thread_group m_threadpool;
- boost::asio::io_service::work m_ioServiceWork;
-
- boost::asio::io_service m_async_ioService;
- unique_ptr<boost::thread> m_async_service_thread;
-
- typedef map<int, boost::asio::deadline_timer*> asyncTimerMap;
- boost::mutex m_timerMapMutex;
- asyncTimerMap m_async_timer_map;
-};
-
-//<!************************************************************************
-} //<!end namespace;
-
-#endif
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TCPREMOTINGCLIENT_H__
+#define __TCPREMOTINGCLIENT_H__
+
+#include <map>
+#include <mutex>
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "ClientRemotingProcessor.h"
+#include "RemotingCommand.h"
+#include "ResponseFuture.h"
+#include "SocketUtil.h"
+#include "TcpTransport.h"
+
+namespace rocketmq {
+//<!************************************************************************
+
+class TcpRemotingClient {
+ public:
+ TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
+ virtual ~TcpRemotingClient();
+
+ void stopAllTcpTransportThread();
+ void updateNameServerAddressList(const string& addrs);
+
+ bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+
+ // delete outsite;
+ RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+
+ bool invokeAsync(const string& addr,
+ RemotingCommand& request,
+ AsyncCallbackWrap* cbw,
+ int64 timeoutMilliseconds,
+ int maxRetrySendTimes = 1,
+ int retrySendTimes = 1);
+
+ void invokeOneway(const string& addr, RemotingCommand& request);
+
+ void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
+
+ void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
+ private:
+ static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
+
+ void messageReceived(const MemoryBlock& mem, const string& addr);
+ void ProcessData(const MemoryBlock& mem, const string& addr);
+ void processRequestCommand(RemotingCommand* pCmd, const string& addr);
+ void processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture);
+ void handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque);
+
+ std::shared_ptr<TcpTransport> GetTransport(const string& addr, bool needResponse);
+ std::shared_ptr<TcpTransport> CreateTransport(const string& addr, bool needResponse);
+ std::shared_ptr<TcpTransport> CreateNameServerTransport(bool needResponse);
+
+ bool CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp);
+ bool CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp);
+
+ bool SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg);
+
+ void addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture);
+ std::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque);
+
+ void addAsyncResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture);
+ std::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture(int opaque);
+
+ void addTimerCallback(boost::asio::deadline_timer* t, int opaque);
+ void eraseTimerCallback(int opaque);
+ void cancelTimerCallback(int opaque);
+ void removeAllTimerCallback();
+
+ void boost_asio_work();
+
+ private:
+ using RequestMap = map<int, ClientRemotingProcessor*>;
+ using TcpMap = map<string, std::shared_ptr<TcpTransport>>;
+ using ResMap = map<int, std::shared_ptr<ResponseFuture>>;
+ using AsyncTimerMap = map<int, boost::asio::deadline_timer*>;
+
+ RequestMap m_requestTable;
+
+ TcpMap m_tcpTable; //<! addr->tcp;
+ std::timed_mutex m_tcpTableLock;
+
+ ResMap m_futureTable; //<! id->future;
+ std::mutex m_futureTableLock;
+
+ ResMap m_asyncFutureTable;
+ std::mutex m_asyncFutureTableLock;
+
+ AsyncTimerMap m_asyncTimerTable;
+ std::mutex m_asyncTimerTableLock;
+
+ int m_pullThreadNum;
+ uint64_t m_tcpConnectTimeout; // ms
+ uint64_t m_tcpTransportTryLockTimeout; // s
+
+ //<! NameServer
+ std::timed_mutex m_namesrvLock;
+ vector<string> m_namesrvAddrList;
+ string m_namesrvAddrChoosed;
+ unsigned int m_namesrvIndex;
+
+ boost::asio::io_service m_ioService;
+ boost::asio::io_service::work m_ioServiceWork;
+ boost::thread_group m_threadpool;
+
+ boost::asio::io_service m_async_ioService;
+ unique_ptr<boost::thread> m_async_service_thread;
+};
+
+//<!************************************************************************
+} // namespace rocketmq
+
+#endif
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 701cd8a..c56e5d1 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -15,11 +15,15 @@
* limitations under the License.
*/
#include "TcpTransport.h"
+
+#include <chrono>
+
#ifndef WIN32
#include <arpa/inet.h> // for sockaddr_in and inet_ntoa...
#include <netinet/tcp.h>
#include <sys/socket.h> // for socket(), bind(), and connect()...
#endif
+
#include "Logging.h"
#include "TcpRemotingClient.h"
#include "UtilAll.h"
@@ -27,99 +31,56 @@
namespace rocketmq {
//<!************************************************************************
-TcpTransport::TcpTransport(TcpRemotingClient* pTcpRemointClient, READ_CALLBACK handle /* = NULL */)
- : m_tcpConnectStatus(e_connectInit),
- m_event_base_status(false),
- m_event_base_mtx(),
- m_event_base_cv(),
- m_ReadDatathread(NULL),
- m_readcallback(handle),
+TcpTransport::TcpTransport(TcpRemotingClient* pTcpRemointClient, TcpTransportReadCallback handle)
+ : m_event(nullptr),
+ m_tcpConnectStatus(TCP_CONNECT_STATUS_INIT),
+ m_connectEventLock(),
+ m_connectEvent(),
+ m_readCallback(handle),
m_tcpRemotingClient(pTcpRemointClient) {
m_startTime = UtilAll::currentTimeMillis();
-#ifdef WIN32
- evthread_use_windows_threads();
-#else
- evthread_use_pthreads();
-#endif
- m_eventBase = NULL;
- m_bufferEvent = NULL;
}
+
TcpTransport::~TcpTransport() {
- m_readcallback = NULL;
- m_bufferEvent = NULL;
- m_eventBase = NULL;
+ freeBufferEvent();
+ m_readCallback = nullptr;
}
-tcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeOutMillisecs /* = 3000 */) {
- string hostName;
- short portNumber;
- LOG_DEBUG("connect to [%s].", strServerURL.c_str());
- if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) {
- LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
- return e_connectFail;
- }
-
- boost::lock_guard<boost::mutex> lock(m_socketLock);
-
- struct sockaddr_in sin;
- memset(&sin, 0, sizeof(sin));
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = getInetAddr(hostName);
-
- sin.sin_port = htons(portNumber);
-
- m_eventBase = event_base_new();
- m_bufferEvent = bufferevent_socket_new(m_eventBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
- bufferevent_setcb(m_bufferEvent, readNextMessageIntCallback, NULL, eventcb, this);
- bufferevent_enable(m_bufferEvent, EV_READ | EV_WRITE);
- bufferevent_setwatermark(m_bufferEvent, EV_READ, 4, 0);
-
- setTcpConnectStatus(e_connectWaitResponse);
- if (bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
- LOG_INFO("connect to fd:%d failed", bufferevent_getfd(m_bufferEvent));
- setTcpConnectStatus(e_connectFail);
- freeBufferEvent();
- return e_connectFail;
- } else {
- int fd = bufferevent_getfd(m_bufferEvent);
- LOG_INFO("try to connect to fd:%d, addr:%s", fd, (hostName.c_str()));
-
- evthread_make_base_notifiable(m_eventBase);
-
- m_ReadDatathread = new boost::thread(boost::bind(&TcpTransport::runThread, this));
-
- while (!m_event_base_status) {
- LOG_INFO("Wait till event base is looping");
- boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(1000);
- boost::unique_lock<boost::mutex> lock(m_event_base_mtx);
- m_event_base_cv.timed_wait(lock, timeout);
- }
+void TcpTransport::freeBufferEvent() {
+ // freeBufferEvent is idempotent.
- return e_connectWaitResponse;
+ // first, unlink BufferEvent
+ if (m_event != nullptr) {
+ m_event->setCallback(nullptr, nullptr, nullptr, nullptr);
}
+
+ // then, release BufferEvent
+ m_event.reset();
}
-void TcpTransport::setTcpConnectStatus(tcpConnectStatus connectStatus) {
+void TcpTransport::setTcpConnectStatus(TcpConnectStatus connectStatus) {
m_tcpConnectStatus = connectStatus;
}
-tcpConnectStatus TcpTransport::getTcpConnectStatus() {
+TcpConnectStatus TcpTransport::getTcpConnectStatus() {
return m_tcpConnectStatus;
}
-tcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillisecs) {
- boost::unique_lock<boost::mutex> lk(m_connectEventLock);
- if (!m_connectEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillisecs))) {
- LOG_INFO("connect timeout");
+TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
+ std::unique_lock<std::mutex> eventLock(m_connectEventLock);
+ if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) {
+ if (m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
+ LOG_INFO("connect timeout");
+ }
}
- return getTcpConnectStatus();
+ return m_tcpConnectStatus;
}
-void TcpTransport::setTcpConnectEvent(tcpConnectStatus connectStatus) {
- tcpConnectStatus baseStatus(getTcpConnectStatus());
- setTcpConnectStatus(connectStatus);
- if (baseStatus == e_connectWaitResponse) {
- LOG_INFO("received libevent callback event");
+// internal method
+void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
+ TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed);
+ if (baseStatus == TCP_CONNECT_STATUS_WAIT) {
+ std::unique_lock<std::mutex> eventLock(m_connectEventLock);
m_connectEvent.notify_all();
}
}
@@ -165,129 +126,109 @@ u_long TcpTransport::getInetAddr(string& hostname) {
}
void TcpTransport::disconnect(const string& addr) {
- boost::lock_guard<boost::mutex> lock(m_socketLock);
- if (getTcpConnectStatus() != e_connectInit) {
- clearBufferEventCallback();
- LOG_INFO("disconnect:%s start", addr.c_str());
- m_connectEvent.notify_all();
- setTcpConnectStatus(e_connectInit);
- if (m_ReadDatathread) {
- m_ReadDatathread->interrupt();
- exitBaseDispatch();
- while (m_ReadDatathread->timed_join(boost::posix_time::seconds(1)) == false) {
- LOG_WARN("join readDataThread fail, retry");
- m_ReadDatathread->interrupt();
- exitBaseDispatch();
- }
- delete m_ReadDatathread;
- m_ReadDatathread = NULL;
- }
+ // disconnect is idempotent.
+ std::lock_guard<std::mutex> lock(m_eventLock);
+ if (getTcpConnectStatus() != TCP_CONNECT_STATUS_INIT) {
+ LOG_INFO("disconnect:%s start. event:%p", addr.c_str(), m_event.get());
freeBufferEvent();
+ setTcpConnectEvent(TCP_CONNECT_STATUS_INIT);
LOG_INFO("disconnect:%s completely", addr.c_str());
}
}
-void TcpTransport::clearBufferEventCallback() {
- if (m_bufferEvent) {
- // Bufferevents are internally reference-counted, so if the bufferevent has
- // pending deferred callbacks when you free it, it won't be deleted until
- // the callbacks are done.
- // so just empty callback to avoid future callback by libevent
- bufferevent_setcb(m_bufferEvent, NULL, NULL, NULL, NULL);
+TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMillis) {
+ string hostname;
+ short port;
+ LOG_DEBUG("connect to [%s].", strServerURL.c_str());
+ if (!UtilAll::SplitURL(strServerURL, hostname, port)) {
+ LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
+ return TCP_CONNECT_STATUS_FAILED;
}
-}
-void TcpTransport::freeBufferEvent() {
- if (m_bufferEvent) {
- bufferevent_free(m_bufferEvent);
- m_bufferEvent = NULL;
- }
- if (m_eventBase) {
- event_base_free(m_eventBase);
- m_eventBase = NULL;
- }
-}
-void TcpTransport::exitBaseDispatch() {
- if (m_eventBase) {
- event_base_loopbreak(m_eventBase);
- // event_base_loopexit(m_eventBase, NULL); //Note: memory leak will be
- // occured when timer callback was not done;
+ {
+ std::lock_guard<std::mutex> lock(m_eventLock);
+
+ struct sockaddr_in sin;
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = getInetAddr(hostname);
+ sin.sin_port = htons(port);
+
+ m_event.reset(EventLoop::GetDefaultEventLoop()->createBufferEvent(-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE));
+ m_event->setCallback(readNextMessageIntCallback, nullptr, eventCallback, shared_from_this());
+ m_event->setWatermark(EV_READ, 4, 0);
+ m_event->enable(EV_READ | EV_WRITE);
+
+ setTcpConnectStatus(TCP_CONNECT_STATUS_WAIT);
+ if (m_event->connect((struct sockaddr*)&sin, sizeof(sin)) < 0) {
+ LOG_INFO("connect to fd:%d failed", m_event->getfd());
+ freeBufferEvent();
+ setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
+ return TCP_CONNECT_STATUS_FAILED;
+ }
}
-}
-void TcpTransport::runThread() {
- if (m_eventBase != NULL) {
- if (!m_event_base_status) {
- boost::mutex::scoped_lock lock(m_event_base_mtx);
- m_event_base_status.store(true);
- m_event_base_cv.notify_all();
- LOG_INFO("Notify on event_base_dispatch");
- }
- event_base_dispatch(m_eventBase);
- // event_base_loop(m_eventBase, EVLOOP_ONCE);//EVLOOP_NONBLOCK should not
- // be used, as could not callback event immediatly
+ if (timeoutMillis <= 0) {
+ LOG_INFO("try to connect to fd:%d, addr:%s", m_event->getfd(), hostname.c_str());
+ return TCP_CONNECT_STATUS_WAIT;
}
- LOG_INFO("event_base_dispatch exit once");
- boost::this_thread::sleep(boost::posix_time::milliseconds(1));
- if (getTcpConnectStatus() != e_connectSuccess)
- return;
-}
-void TcpTransport::timeoutcb(evutil_socket_t fd, short what, void* arg) {
- LOG_INFO("timeoutcb: received event:%d on fd:%d", what, fd);
- TcpTransport* tcpTrans = (TcpTransport*)arg;
- if (tcpTrans->getTcpConnectStatus() != e_connectSuccess) {
- LOG_INFO("timeoutcb: after connect time, tcp was not established on fd:%d", fd);
- tcpTrans->setTcpConnectStatus(e_connectFail);
- } else {
- LOG_INFO("timeoutcb: after connect time, tcp was established on fd:%d", fd);
+ TcpConnectStatus connectStatus = waitTcpConnectEvent(timeoutMillis);
+ if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
+ LOG_WARN("can not connect to server:%s", strServerURL.c_str());
+
+ std::lock_guard<std::mutex> lock(m_eventLock);
+ freeBufferEvent();
+ setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
+ return TCP_CONNECT_STATUS_FAILED;
}
+
+ return TCP_CONNECT_STATUS_SUCCESS;
}
-void TcpTransport::eventcb(struct bufferevent* bev, short what, void* ctx) {
- evutil_socket_t fd = bufferevent_getfd(bev);
- TcpTransport* tcpTrans = (TcpTransport*)ctx;
+void TcpTransport::eventCallback(BufferEvent* event, short what, TcpTransport* transport) {
+ socket_t fd = event->getfd();
LOG_INFO("eventcb: received event:%x on fd:%d", what, fd);
if (what & BEV_EVENT_CONNECTED) {
+ LOG_INFO("eventcb: connect to fd:%d successfully", fd);
+
+ // disable Nagle
int val = 1;
- setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val));
- LOG_INFO("eventcb:connect to fd:%d successfully", fd);
- tcpTrans->setTcpConnectEvent(e_connectSuccess);
+ setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&val, sizeof(val));
+ transport->setTcpConnectEvent(TCP_CONNECT_STATUS_SUCCESS);
} else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING | BEV_EVENT_WRITING)) {
- LOG_INFO("eventcb:rcv error event cb:%x on fd:%d", what, fd);
- tcpTrans->setTcpConnectEvent(e_connectFail);
- bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
- // bufferevent_disable(bev, EV_READ|EV_WRITE);
- // bufferevent_free(bev);
+ LOG_INFO("eventcb: received error event cb:%x on fd:%d", what, fd);
+ // if error, stop callback.
+ event->setCallback(nullptr, nullptr, nullptr, nullptr);
+ transport->setTcpConnectEvent(TCP_CONNECT_STATUS_FAILED);
} else {
LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd);
}
}
-void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx) {
+void TcpTransport::readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport) {
/* This callback is invoked when there is data to read on bev. */
// protocol: <length> <header length> <header data> <body data>
- // 1 2 3 4
+ // 1 2 3 4
// rocketmq protocol contains 4 parts as following:
// 1, big endian 4 bytes int, its length is sum of 2,3 and 4
// 2, big endian 4 bytes int, its length is 3
// 3, use json to serialization data
- // 4, application could self-defination binary data
+ // 4, application could self-defined binary data
- struct evbuffer* input = bufferevent_get_input(bev);
+ struct evbuffer* input = event->getInput();
while (1) {
struct evbuffer_iovec v[4];
int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0]));
- int idx = 0;
char hdr[4];
char* p = hdr;
- unsigned int needed = 4;
+ size_t needed = 4;
- for (idx = 0; idx < n; idx++) {
- if (needed) {
- unsigned int tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
+ for (int idx = 0; idx < n; idx++) {
+ if (needed > 0) {
+ size_t tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
memcpy(p, v[idx].iov_base, tmp);
p += tmp;
needed -= tmp;
@@ -296,80 +237,54 @@ void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx
}
}
- if (needed) {
- LOG_DEBUG(" too little data received with sum = %d ", 4 - needed);
+ if (needed > 0) {
+ LOG_DEBUG("too little data received with sum = %d", 4 - needed);
return;
}
- uint32 totalLenOfOneMsg = *(uint32*)hdr; // first 4 bytes, which indicates 1st part of protocol
- uint32 bytesInMessage = ntohl(totalLenOfOneMsg);
- LOG_DEBUG("fd:%d, totalLen:" SIZET_FMT ", bytesInMessage:%d", bufferevent_getfd(bev), v[0].iov_len, bytesInMessage);
- uint32 len = evbuffer_get_length(input);
- if (len >= bytesInMessage + 4) {
- LOG_DEBUG("had received all data with len:%d from fd:%d", len, bufferevent_getfd(bev));
+ uint32 totalLenOfOneMsg = *(uint32*)hdr; // first 4 bytes, which indicates 1st part of protocol
+ uint32 msgLen = ntohl(totalLenOfOneMsg);
+ size_t recvLen = evbuffer_get_length(input);
+ if (recvLen >= msgLen + 4) {
+ LOG_DEBUG("had received all data. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
} else {
- LOG_DEBUG("didn't received whole bytesInMessage:%d, from fd:%d, totalLen:%d", bytesInMessage,
- bufferevent_getfd(bev), len);
+ LOG_DEBUG("didn't received whole. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
return; // consider large data which was not received completely by now
}
- if (bytesInMessage > 0) {
- MemoryBlock messageData(bytesInMessage, true);
- uint32 bytesRead = 0;
- char* data = messageData.getData() + bytesRead;
- bufferevent_read(bev, data, 4);
- bytesRead = bufferevent_read(bev, data, bytesInMessage);
+ if (msgLen > 0) {
+ MemoryBlock msg(msgLen, true);
- TcpTransport* tcpTrans = (TcpTransport*)ctx;
- tcpTrans->messageReceived(messageData);
+ event->read(hdr, 4); // skip length field
+ size_t bytesRead = event->read(msg.getData(), msgLen);
+
+ transport->messageReceived(msg, event->getPeerAddrPort());
}
}
}
-bool TcpTransport::sendMessage(const char* pData, int len) {
- boost::lock_guard<boost::mutex> lock(m_socketLock);
- if (getTcpConnectStatus() != e_connectSuccess) {
- return false;
- }
-
- int bytes_left = len;
- int bytes_written = 0;
- const char* ptr = pData;
-
- /*NOTE:
- 1. do not need to consider large data which could not send by once, as
- bufferevent could handle this case;
- */
- if (m_bufferEvent) {
- bytes_written = bufferevent_write(m_bufferEvent, ptr, bytes_left);
- if (bytes_written == 0)
- return true;
- else
- return false;
+void TcpTransport::messageReceived(const MemoryBlock& mem, const std::string& addr) {
+ if (m_readCallback != nullptr) {
+ m_readCallback(m_tcpRemotingClient, mem, addr);
}
- return false;
}
-void TcpTransport::messageReceived(const MemoryBlock& mem) {
- if (m_readcallback) {
- m_readcallback(m_tcpRemotingClient, mem, getPeerAddrAndPort());
+bool TcpTransport::sendMessage(const char* pData, size_t len) {
+ std::lock_guard<std::mutex> lock(m_eventLock);
+ if (getTcpConnectStatus() != TCP_CONNECT_STATUS_SUCCESS) {
+ return false;
}
+
+ /* NOTE:
+ do not need to consider large data which could not send by once, as
+ bufferevent could handle this case;
+ */
+ return m_event != nullptr && m_event->write(pData, len) == 0;
}
const string TcpTransport::getPeerAddrAndPort() {
- struct sockaddr_in broker;
- socklen_t cLen = sizeof(broker);
-
- // getsockname(m_socket->getRawSocketHandle(), (struct sockaddr*) &s, &sLen);
- // // ! use connectSock here.
- getpeername(bufferevent_getfd(m_bufferEvent), (struct sockaddr*)&broker, &cLen); // ! use connectSock here.
- LOG_DEBUG("broker addr: %s, broker port: %d", inet_ntoa(broker.sin_addr), ntohs(broker.sin_port));
- string brokerAddr(inet_ntoa(broker.sin_addr));
- brokerAddr.append(":");
- string brokerPort(UtilAll::to_string(ntohs(broker.sin_port)));
- brokerAddr.append(brokerPort);
- LOG_DEBUG("brokerAddr:%s", brokerAddr.c_str());
- return brokerAddr;
+ std::lock_guard<std::mutex> lock(m_eventLock);
+ return m_event ? m_event->getPeerAddrPort() : "";
}
const uint64_t TcpTransport::getStartTime() const {
diff --git a/src/transport/TcpTransport.h b/src/transport/TcpTransport.h
old mode 100644
new mode 100755
index cda03ca..bff23dd
--- a/src/transport/TcpTransport.h
+++ b/src/transport/TcpTransport.h
@@ -17,73 +17,77 @@
#ifndef __TCPTRANSPORT_H__
#define __TCPTRANSPORT_H__
-#include <boost/atomic.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
-#include "dataBlock.h"
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
-extern "C" {
-#include "event2/buffer.h"
-#include "event2/bufferevent.h"
-#include "event2/event.h"
-#include "event2/thread.h"
-}
+#include "EventLoop.h"
+#include "dataBlock.h"
namespace rocketmq {
+
//<!***************************************************************************
-typedef enum { e_connectInit = 0, e_connectWaitResponse = 1, e_connectSuccess = 2, e_connectFail = 3 } tcpConnectStatus;
+typedef enum TcpConnectStatus {
+ TCP_CONNECT_STATUS_INIT = 0,
+ TCP_CONNECT_STATUS_WAIT = 1,
+ TCP_CONNECT_STATUS_SUCCESS = 2,
+ TCP_CONNECT_STATUS_FAILED = 3
+} TcpConnectStatus;
+
+using TcpTransportReadCallback = void (*)(void* context, const MemoryBlock&, const std::string&);
-typedef void (*READ_CALLBACK)(void* context, const MemoryBlock&, const std::string&);
class TcpRemotingClient;
-class TcpTransport {
+
+class TcpTransport : public std::enable_shared_from_this<TcpTransport> {
public:
- TcpTransport(TcpRemotingClient* pTcpRemointClient, READ_CALLBACK handle = NULL);
+ static std::shared_ptr<TcpTransport> CreateTransport(TcpRemotingClient* pTcpRemotingClient,
+ TcpTransportReadCallback handle = nullptr) {
+ // transport must be managed by smart pointer
+ std::shared_ptr<TcpTransport> transport(new TcpTransport(pTcpRemotingClient, handle));
+ return transport;
+ }
+
virtual ~TcpTransport();
- tcpConnectStatus connect(const std::string& strServerURL, int timeOutMillisecs = 3000);
void disconnect(const std::string& addr);
- tcpConnectStatus waitTcpConnectEvent(int timeoutMillisecs = 3000);
- void setTcpConnectStatus(tcpConnectStatus connectStatus);
- tcpConnectStatus getTcpConnectStatus();
- bool sendMessage(const char* pData, int len);
+ TcpConnectStatus connect(const std::string& strServerURL, int timeoutMillis = 3000);
+ TcpConnectStatus waitTcpConnectEvent(int timeoutMillis = 3000);
+ TcpConnectStatus getTcpConnectStatus();
+
+ bool sendMessage(const char* pData, size_t len);
const std::string getPeerAddrAndPort();
const uint64_t getStartTime() const;
private:
- void messageReceived(const MemoryBlock& mem);
- static void readNextMessageIntCallback(struct bufferevent* bev, void* ctx);
- static void eventcb(struct bufferevent* bev, short what, void* ctx);
- static void timeoutcb(evutil_socket_t fd, short what, void* arg);
- void runThread();
- void clearBufferEventCallback();
- void freeBufferEvent();
- void exitBaseDispatch();
- void setTcpConnectEvent(tcpConnectStatus connectStatus);
+ TcpTransport(TcpRemotingClient* pTcpRemotingClient, TcpTransportReadCallback handle = nullptr);
+
+ static void readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport);
+ static void eventCallback(BufferEvent* event, short what, TcpTransport* transport);
+
+ void messageReceived(const MemoryBlock& mem, const std::string& addr);
+ void freeBufferEvent(); // not thread-safe
+
+ void setTcpConnectEvent(TcpConnectStatus connectStatus);
+ void setTcpConnectStatus(TcpConnectStatus connectStatus);
+
u_long getInetAddr(std::string& hostname);
private:
uint64_t m_startTime;
- boost::mutex m_socketLock;
- struct event_base* m_eventBase;
- struct bufferevent* m_bufferEvent;
- boost::atomic<tcpConnectStatus> m_tcpConnectStatus;
- boost::mutex m_connectEventLock;
- boost::condition_variable_any m_connectEvent;
- boost::atomic<bool> m_event_base_status;
- boost::mutex m_event_base_mtx;
- boost::condition_variable_any m_event_base_cv;
+ std::shared_ptr<BufferEvent> m_event; // NOTE: use m_event in callback is unsafe.
+ std::mutex m_eventLock;
+ std::atomic<TcpConnectStatus> m_tcpConnectStatus;
- //<!read data thread
- boost::thread* m_ReadDatathread;
+ std::mutex m_connectEventLock;
+ std::condition_variable m_connectEvent;
//<! read data callback
- READ_CALLBACK m_readcallback;
+ TcpTransportReadCallback m_readCallback;
TcpRemotingClient* m_tcpRemotingClient;
};
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif