You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/24 09:06:11 UTC
[rocketmq-client-cpp] 03/04: feat: support ipv6
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 40882dc35e8f6bc62edd2fb03864ef031e72c8b3
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Mar 11 15:59:04 2021 +0800
feat: support ipv6
---
src/ClientRemotingProcessor.cpp | 6 +-
src/MQClientConfigImpl.hpp | 3 +-
src/common/UtilAll.cpp | 53 +------
src/common/UtilAll.h | 8 --
src/consumer/DefaultMQPushConsumerImpl.cpp | 4 +-
src/message/MessageBatch.cpp | 3 +-
src/message/MessageClientIDSetter.cpp | 40 ++++--
src/message/MessageClientIDSetter.h | 2 +-
src/message/MessageDecoder.cpp | 42 +++---
src/message/MessageDecoder.h | 7 +-
src/message/MessageExtImpl.cpp | 26 ++--
src/message/MessageExtImpl.h | 4 +-
src/message/MessageId.h | 16 +--
src/transport/EventLoop.cpp | 6 +-
src/transport/SocketUtil.cpp | 217 ++++++++++++++++++-----------
src/transport/SocketUtil.h | 45 ++++--
16 files changed, 258 insertions(+), 224 deletions(-)
diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index cf97306..32366d6 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -16,9 +16,9 @@
*/
#include "ClientRemotingProcessor.h"
-#include "MessageDecoder.h"
#include "MQProtos.h"
#include "MessageAccessor.hpp"
+#include "MessageDecoder.h"
#include "MessageSysFlag.h"
#include "RequestFutureTable.h"
#include "SocketUtil.h"
@@ -153,11 +153,11 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
msg->set_store_timestamp(requestHeader->store_timestamp());
if (!requestHeader->born_host().empty()) {
- msg->set_born_host(string2SocketAddress(requestHeader->born_host()));
+ msg->set_born_host(StringToSockaddr(requestHeader->born_host()));
}
if (!requestHeader->store_host().empty()) {
- msg->set_store_host(string2SocketAddress(requestHeader->store_host()));
+ msg->set_store_host(StringToSockaddr(requestHeader->store_host()));
}
auto body = request->body();
diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp
index 59b4601..b9ad503 100644
--- a/src/MQClientConfigImpl.hpp
+++ b/src/MQClientConfigImpl.hpp
@@ -22,6 +22,7 @@
#include "MQClientConfig.h"
#include "NamespaceUtil.h"
+#include "SocketUtil.h"
#include "UtilAll.h"
namespace rocketmq {
@@ -45,7 +46,7 @@ class MQClientConfigImpl : virtual public MQClientConfig {
std::string buildMQClientId() const override {
std::string clientId;
- clientId.append(UtilAll::getLocalAddress()); // clientIP
+ clientId.append(GetLocalAddress()); // clientIP
clientId.append("@");
clientId.append(instance_name_); // instanceName
if (!unit_name_.empty()) {
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index 2cbe00a..731d8aa 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -40,9 +40,6 @@
namespace rocketmq {
-std::string UtilAll::sLocalHostName;
-std::string UtilAll::sLocalIpAddress;
-
bool UtilAll::try_lock_for(std::timed_mutex& mutex, long timeout) {
auto now = std::chrono::steady_clock::now();
auto deadline = now + std::chrono::milliseconds(timeout);
@@ -157,7 +154,7 @@ bool UtilAll::isBlank(const std::string& str) {
}
bool UtilAll::SplitURL(const std::string& serverURL, std::string& addr, short& nPort) {
- auto pos = serverURL.find(':');
+ auto pos = serverURL.find_last_of(':');
if (pos == std::string::npos) {
return false;
}
@@ -228,54 +225,6 @@ int UtilAll::Split(std::vector<std::string>& ret_, const std::string& strIn, con
return ret_.size();
}
-std::string UtilAll::getLocalHostName() {
- if (sLocalHostName.empty()) {
- char name[1024];
- if (::gethostname(name, sizeof(name)) != 0) {
- return null;
- }
- sLocalHostName.append(name, strlen(name));
- }
- return sLocalHostName;
-}
-
-std::string UtilAll::getLocalAddress() {
- if (sLocalIpAddress.empty()) {
- auto hostname = getLocalHostName();
- if (!hostname.empty()) {
- try {
- sLocalIpAddress = socketAddress2String(lookupNameServers(hostname));
- } catch (std::exception& e) {
- LOG_WARN(e.what());
- sLocalIpAddress = "127.0.0.1";
- }
- }
- }
- return sLocalIpAddress;
-}
-
-uint32_t UtilAll::getIP() {
- std::string ip = UtilAll::getLocalAddress();
- if (ip.empty()) {
- return 0;
- }
-
- char* ip_str = new char[ip.length() + 1];
- std::strncpy(ip_str, ip.c_str(), ip.length());
- ip_str[ip.length()] = '\0';
-
- int i = 3;
- uint32_t nResult = 0;
- for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token = std::strtok(nullptr, ".")) {
- uint32_t n = std::atoi(token);
- nResult |= n << (8 * i--);
- }
-
- delete[] ip_str;
-
- return nResult;
-}
-
std::string UtilAll::getHomeDirectory() {
#ifndef WIN32
char* home_env = std::getenv("HOME");
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 8ad49b0..63989f7 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -112,10 +112,6 @@ class UtilAll {
static int Split(std::vector<std::string>& ret_, const std::string& strIn, const char sep);
static int Split(std::vector<std::string>& ret_, const std::string& strIn, const std::string& sep);
- static std::string getLocalHostName();
- static std::string getLocalAddress();
- static uint32_t getIP();
-
static std::string getHomeDirectory();
static void createDirectory(std::string const& dir);
static bool existDirectory(std::string const& dir);
@@ -138,10 +134,6 @@ class UtilAll {
// Returns true on success.
// Returns false on failure..
static bool ReplaceFile(const std::string& from_path, const std::string& to_path);
-
- private:
- static std::string sLocalHostName;
- static std::string sLocalIpAddress;
};
template <typename T>
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 11a12a1..bf00cc2 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -536,8 +536,8 @@ bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delay_lev
try {
msg->set_topic(NamespaceUtil::wrapNamespace(client_config_->name_space(), msg->topic()));
- std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host())
- : client_instance_->findBrokerAddressInPublish(brokerName);
+ std::string brokerAddr =
+ brokerName.empty() ? msg->store_host_string() : client_instance_->findBrokerAddressInPublish(brokerName);
client_instance_->getMQClientAPIImpl()->consumerSendMessageBack(
brokerAddr, msg, getDefaultMQPushConsumerConfig()->group_name(), delay_level, 5000,
diff --git a/src/message/MessageBatch.cpp b/src/message/MessageBatch.cpp
index 4030fd6..2ba0c18 100644
--- a/src/message/MessageBatch.cpp
+++ b/src/message/MessageBatch.cpp
@@ -16,8 +16,9 @@
*/
#include "MessageBatch.h"
-#include "MessageDecoder.h"
+#include "MQException.h"
#include "MessageClientIDSetter.h"
+#include "MessageDecoder.h"
namespace rocketmq {
diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp
index dda45df..5560ba1 100644
--- a/src/message/MessageClientIDSetter.cpp
+++ b/src/message/MessageClientIDSetter.cpp
@@ -25,7 +25,8 @@
#include <unistd.h>
#endif
-#include "ByteOrder.h"
+#include "ByteBuffer.hpp"
+#include "SocketUtil.h"
#include "UtilAll.h"
namespace rocketmq {
@@ -33,16 +34,28 @@ namespace rocketmq {
MessageClientIDSetter::MessageClientIDSetter() {
std::srand((uint32_t)std::time(NULL));
- uint32_t pid = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(UtilAll::getProcessId()));
- uint32_t ip = ByteOrderUtil::NorminalBigEndian(UtilAll::getIP());
- uint32_t random_num = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(std::rand()));
-
- char bin_buf[10];
- std::memcpy(bin_buf + 2, &pid, 4);
- std::memcpy(bin_buf, &ip, 4);
- std::memcpy(bin_buf + 6, &random_num, 4);
+ std::unique_ptr<ByteBuffer> buffer;
+ sockaddr* addr = GetSelfIP();
+ if (addr != nullptr) {
+ buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4));
+ if (addr->sa_family == AF_INET) {
+ auto* sin = (struct sockaddr_in*)addr;
+ buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+ } else if (addr->sa_family == AF_INET6) {
+ auto* sin6 = (struct sockaddr_in6*)addr;
+ buffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize));
+ } else {
+ (void)buffer.release();
+ }
+ }
+ if (buffer == nullptr) {
+ buffer.reset(ByteBuffer::allocate(4 + 2 + 4));
+ buffer->putInt(UtilAll::currentTimeMillis());
+ }
+ buffer->putShort(UtilAll::getProcessId());
+ buffer->putInt(std::rand());
- fix_string_ = UtilAll::bytes2string(bin_buf, 10);
+ fixed_string_ = UtilAll::bytes2string(buffer->array(), buffer->position());
setStartTime(UtilAll::currentTimeMillis());
@@ -93,11 +106,8 @@ std::string MessageClientIDSetter::createUniqueID() {
uint32_t period = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(current - start_time_));
uint16_t seqid = ByteOrderUtil::NorminalBigEndian(counter_++);
- char bin_buf[6];
- std::memcpy(bin_buf, &period, 4);
- std::memcpy(bin_buf + 4, &seqid, 2);
-
- return fix_string_ + UtilAll::bytes2string(bin_buf, 6);
+ return fixed_string_ + UtilAll::bytes2string(reinterpret_cast<char*>(&period), sizeof(period)) +
+ UtilAll::bytes2string(reinterpret_cast<char*>(&seqid), sizeof(seqid));
}
} // namespace rocketmq
diff --git a/src/message/MessageClientIDSetter.h b/src/message/MessageClientIDSetter.h
index 222115e..e688786 100644
--- a/src/message/MessageClientIDSetter.h
+++ b/src/message/MessageClientIDSetter.h
@@ -68,7 +68,7 @@ class MessageClientIDSetter {
uint64_t next_start_time_;
std::atomic<uint16_t> counter_;
- std::string fix_string_;
+ std::string fixed_string_;
};
} // namespace rocketmq
diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp
index a36d3bf..08df407 100644
--- a/src/message/MessageDecoder.cpp
+++ b/src/message/MessageDecoder.cpp
@@ -20,14 +20,18 @@
#include <sstream> // std::stringstream
#ifndef WIN32
-#include <netinet/in.h> // struct sockaddr, sockaddr_in, sockaddr_in6
+#include <arpa/inet.h> // htons
+#include <netinet/in.h> // sockaddr_in, sockaddr_in6
+#else
+#include "Winsock2.h"
#endif
#include "ByteOrder.h"
#include "Logging.h"
-#include "MessageExtImpl.h"
#include "MessageAccessor.hpp"
+#include "MessageExtImpl.h"
#include "MessageSysFlag.h"
+#include "SocketUtil.h"
#include "UtilAll.h"
static const char NAME_VALUE_SEPARATOR = 1;
@@ -36,37 +40,35 @@ static const char PROPERTY_SEPARATOR = 2;
namespace rocketmq {
std::string MessageDecoder::createMessageId(const struct sockaddr* sa, int64_t offset) {
- int msgIDLength = sa->sa_family == AF_INET ? 16 : 28;
- std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIDLength));
+ int msgIdLength = IpaddrSize(sa) + /* port field size */ 4 + sizeof(offset);
+ std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIdLength));
if (sa->sa_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)sa;
- byteBuffer->put(ByteArray((char*)&sin->sin_addr, 4));
- byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin->sin_port));
+ byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+ byteBuffer->putInt(ntohs(sin->sin_port));
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)sa;
- byteBuffer->put(ByteArray((char*)&sin6->sin6_addr, 16));
- byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin6->sin6_port));
+ byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize));
+ byteBuffer->putInt(ntohs(sin6->sin6_port));
}
byteBuffer->putLong(offset);
byteBuffer->flip();
- return UtilAll::bytes2string(byteBuffer->array(), msgIDLength);
+ return UtilAll::bytes2string(byteBuffer->array(), msgIdLength);
}
MessageId MessageDecoder::decodeMessageId(const std::string& msgId) {
- size_t ip_length = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+ size_t ip_length = msgId.length() == 32 ? kIPv4AddrSize * 2 : kIPv6AddrSize * 2;
ByteArray byteArray(ip_length / 2);
std::string ip = msgId.substr(0, ip_length);
UtilAll::string2bytes(byteArray.array(), ip);
std::string port = msgId.substr(ip_length, 8);
- // uint32_t portInt = ByteOrderUtil::NorminalBigEndian<uint32_t>(std::stoul(port, nullptr, 16));
uint32_t portInt = std::stoul(port, nullptr, 16);
- auto* sin = ipPort2SocketAddress(byteArray, portInt);
+ auto* sin = IPPortToSockaddr(byteArray, portInt);
std::string offset = msgId.substr(ip_length + 8);
- // uint64_t offsetInt = ByteOrderUtil::NorminalBigEndian<uint64_t>(std::stoull(offset, nullptr, 16));
uint64_t offsetInt = std::stoull(offset, nullptr, 16);
return MessageId(sin, offsetInt);
@@ -123,22 +125,22 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool
msgExt->set_born_timestamp(bornTimeStamp);
// 10 BORNHOST
- int bornhostIPLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? 4 : 16;
- ByteArray bornHost(bornhostIPLength);
- byteBuffer.get(bornHost, 0, bornhostIPLength);
+ int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+ ByteArray bornHost(bornHostLength);
+ byteBuffer.get(bornHost, 0, bornHostLength);
int32_t bornPort = byteBuffer.getInt();
- msgExt->set_born_host(ipPort2SocketAddress(bornHost, bornPort));
+ msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
// 11 STORETIMESTAMP
int64_t storeTimestamp = byteBuffer.getLong();
msgExt->set_store_timestamp(storeTimestamp);
// 12 STOREHOST
- int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? 4 : 16;
- ByteArray storeHost(bornhostIPLength);
+ int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+ ByteArray storeHost(bornHostLength);
byteBuffer.get(storeHost, 0, storehostIPLength);
int32_t storePort = byteBuffer.getInt();
- msgExt->set_store_host(ipPort2SocketAddress(storeHost, storePort));
+ msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
// 13 RECONSUMETIMES
int32_t reconsumeTimes = byteBuffer.getInt();
diff --git a/src/message/MessageDecoder.h b/src/message/MessageDecoder.h
index ad0b82d..5c45d8e 100644
--- a/src/message/MessageDecoder.h
+++ b/src/message/MessageDecoder.h
@@ -17,8 +17,13 @@
#ifndef ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
#define ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
+#ifndef WIN32
+#include <sys/socket.h> // sockaddr
+#else
+#include <Winsock2.h>
+#endif
+
#include "ByteBuffer.hpp"
-#include "MQException.h"
#include "MQMessageExt.h"
#include "MessageId.h"
diff --git a/src/message/MessageExtImpl.cpp b/src/message/MessageExtImpl.cpp
index f43ac35..cfd6b99 100644
--- a/src/message/MessageExtImpl.cpp
+++ b/src/message/MessageExtImpl.cpp
@@ -44,20 +44,14 @@ MessageExtImpl::MessageExtImpl(int queueId,
commit_log_offset_(0),
sys_flag_(0),
born_timestamp_(bornTimestamp),
- born_host_(nullptr),
+ born_host_(SockaddrToStorage(bornHost)),
store_timestamp_(storeTimestamp),
- store_host_(nullptr),
+ store_host_(SockaddrToStorage(storeHost)),
reconsume_times_(3),
prepared_transaction_offset_(0),
- msg_id_(msgId) {
- born_host_ = copySocketAddress(born_host_, bornHost);
- store_host_ = copySocketAddress(store_host_, storeHost);
-}
+ msg_id_(msgId) {}
-MessageExtImpl::~MessageExtImpl() {
- free(born_host_);
- free(store_host_);
-}
+MessageExtImpl::~MessageExtImpl() = default;
TopicFilterType MessageExtImpl::parseTopicFilterType(int32_t sysFlag) {
if ((sysFlag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG) {
@@ -123,15 +117,15 @@ void MessageExtImpl::set_born_timestamp(int64_t bornTimestamp) {
}
const struct sockaddr* MessageExtImpl::born_host() const {
- return born_host_;
+ return reinterpret_cast<sockaddr*>(born_host_.get());
}
std::string MessageExtImpl::born_host_string() const {
- return socketAddress2String(born_host_);
+ return SockaddrToString(born_host());
}
void MessageExtImpl::set_born_host(const struct sockaddr* bornHost) {
- born_host_ = copySocketAddress(born_host_, bornHost);
+ born_host_ = SockaddrToStorage(bornHost);
}
int64_t MessageExtImpl::store_timestamp() const {
@@ -143,15 +137,15 @@ void MessageExtImpl::set_store_timestamp(int64_t storeTimestamp) {
}
const struct sockaddr* MessageExtImpl::store_host() const {
- return store_host_;
+ return reinterpret_cast<sockaddr*>(store_host_.get());
}
std::string MessageExtImpl::store_host_string() const {
- return socketAddress2String(store_host_);
+ return SockaddrToString(store_host());
}
void MessageExtImpl::set_store_host(const struct sockaddr* storeHost) {
- store_host_ = copySocketAddress(store_host_, storeHost);
+ store_host_ = SockaddrToStorage(storeHost);
}
const std::string& MessageExtImpl::msg_id() const {
diff --git a/src/message/MessageExtImpl.h b/src/message/MessageExtImpl.h
index 5d85cea..fc44471 100644
--- a/src/message/MessageExtImpl.h
+++ b/src/message/MessageExtImpl.h
@@ -94,9 +94,9 @@ class MessageExtImpl : public MessageImpl, // base
int64_t commit_log_offset_;
int32_t sys_flag_;
int64_t born_timestamp_;
- struct sockaddr* born_host_;
+ std::unique_ptr<sockaddr_storage> born_host_;
int64_t store_timestamp_;
- struct sockaddr* store_host_;
+ std::unique_ptr<sockaddr_storage> store_host_;
int32_t reconsume_times_;
int64_t prepared_transaction_offset_;
std::string msg_id_;
diff --git a/src/message/MessageId.h b/src/message/MessageId.h
index 64e4603..e9a501b 100644
--- a/src/message/MessageId.h
+++ b/src/message/MessageId.h
@@ -27,29 +27,29 @@ namespace rocketmq {
class MessageId {
public:
MessageId() : MessageId(nullptr, 0) {}
- MessageId(struct sockaddr* address, int64_t offset) : address_(nullptr), offset_(offset) { setAddress(address); }
+ MessageId(const struct sockaddr* address, int64_t offset) : address_(SockaddrToStorage(address)), offset_(offset) {}
- MessageId(const MessageId& other) : MessageId(other.address_, other.offset_) {}
- MessageId(MessageId&& other) : address_(other.address_), offset_(other.offset_) { other.address_ = nullptr; }
+ MessageId(const MessageId& other) : MessageId(other.getAddress(), other.offset_) {}
+ MessageId(MessageId&& other) : address_(std::move(other.address_)), offset_(other.offset_) {}
- virtual ~MessageId() { std::free(address_); }
+ virtual ~MessageId() = default;
MessageId& operator=(const MessageId& other) {
if (&other != this) {
- setAddress(other.address_);
+ setAddress(other.getAddress());
this->offset_ = other.offset_;
}
return *this;
}
- const struct sockaddr* getAddress() const { return address_; }
- void setAddress(struct sockaddr* address) { address_ = copySocketAddress(address_, address); }
+ const struct sockaddr* getAddress() const { return reinterpret_cast<sockaddr*>(address_.get()); }
+ void setAddress(const struct sockaddr* address) { address_ = SockaddrToStorage(address); }
int64_t getOffset() const { return offset_; }
void setOffset(int64_t offset) { offset_ = offset; }
private:
- struct sockaddr* address_;
+ std::unique_ptr<sockaddr_storage> address_;
int64_t offset_;
};
diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp
index 85ea113..5dce2d1 100644
--- a/src/transport/EventLoop.cpp
+++ b/src/transport/EventLoop.cpp
@@ -207,9 +207,9 @@ int BufferEvent::connect(const std::string& addr) {
}
try {
- auto* sa = string2SocketAddress(addr); // resolve domain
- peer_addr_port_ = socketAddress2String(sa);
- return bufferevent_socket_connect(buffer_event_, sa, sockaddr_size(sa));
+ auto* sa = StringToSockaddr(addr); // resolve domain
+ peer_addr_port_ = SockaddrToString(sa);
+ return bufferevent_socket_connect(buffer_event_, sa, SockaddrSize(sa));
} catch (const std::exception& e) {
LOG_ERROR_NEW("can not connect to {}, {}", addr, e.what());
return -1;
diff --git a/src/transport/SocketUtil.cpp b/src/transport/SocketUtil.cpp
index 0470088..f207263 100644
--- a/src/transport/SocketUtil.cpp
+++ b/src/transport/SocketUtil.cpp
@@ -16,73 +16,115 @@
*/
#include "SocketUtil.h"
-#include <cstdlib> // std::realloc
-#include <cstring> // std::memset, std::memcpy
+#include <cstdlib> // std::abort
+#include <cstring> // std::memcpy, std::memset
-#include <sstream>
-#include <stdexcept>
-
-#include <event2/event.h>
+#include <iostream>
+#include <stdexcept> // std::invalid_argument, std::runtime_error
+#include <string>
#ifndef WIN32
-#include <netdb.h>
-#include <unistd.h>
+#include <arpa/inet.h> // htons
+#include <unistd.h> // gethostname
+#else
+#include <Winsock2.h>
#endif
-#include "ByteOrder.h"
+#include <event2/event.h>
+
#include "MQException.h"
-#include "UtilAll.h"
namespace rocketmq {
-union sockaddr_union {
- struct sockaddr_in sin;
- struct sockaddr_in6 sin6;
-};
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src) {
+ if (src == nullptr) {
+ return nullptr;
+ }
+ std::unique_ptr<sockaddr_storage> ss(new sockaddr_storage);
+ std::memcpy(ss.get(), src, SockaddrSize(src));
+ return ss;
+}
-thread_local static sockaddr_union sin_buf;
+thread_local sockaddr_storage ss_buffer;
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port) {
- if (ip.size() == 4) {
- struct sockaddr_in* sin = &sin_buf.sin;
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port) {
+ sockaddr_storage* ss = &ss_buffer;
+ if (ip.size() == kIPv4AddrSize) {
+ auto* sin = reinterpret_cast<sockaddr_in*>(ss);
sin->sin_family = AF_INET;
- sin->sin_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
- ByteOrderUtil::Read<decltype(sin->sin_addr)>(&sin->sin_addr, ip.array());
- return (struct sockaddr*)sin;
- } else if (ip.size() == 16) {
- struct sockaddr_in6* sin6 = &sin_buf.sin6;
+ sin->sin_port = htons(port);
+ std::memcpy(&sin->sin_addr, ip.array(), kIPv4AddrSize);
+ } else if (ip.size() == kIPv6AddrSize) {
+ auto* sin6 = reinterpret_cast<sockaddr_in6*>(&ss);
sin6->sin6_family = AF_INET6;
- sin6->sin6_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
- ByteOrderUtil::Read<decltype(sin6->sin6_addr)>(&sin6->sin6_addr, ip.array());
- return (struct sockaddr*)sin6;
+ sin6->sin6_port = htons(port);
+ std::memcpy(&sin6->sin6_addr, ip.array(), kIPv6AddrSize);
+ } else {
+ throw std::invalid_argument("invalid ip size");
}
- return nullptr;
+ return reinterpret_cast<sockaddr*>(ss);
}
-struct sockaddr* string2SocketAddress(const std::string& addr) {
+sockaddr* StringToSockaddr(const std::string& addr) {
+ if (addr.empty()) {
+ throw std::invalid_argument("invalid address");
+ }
+
std::string::size_type start_pos = addr[0] == '/' ? 1 : 0;
- auto colon_pos = addr.find_last_of(":");
- std::string host = addr.substr(start_pos, colon_pos - start_pos);
- std::string port = addr.substr(colon_pos + 1, addr.length() - colon_pos);
- auto* sa = lookupNameServers(host);
- if (sa != nullptr) {
- if (sa->sa_family == AF_INET) {
- auto* sin = (struct sockaddr_in*)sa;
- sin->sin_port = htons((uint16_t)std::stoi(port));
- } else {
- auto* sin6 = (struct sockaddr_in6*)sa;
- sin6->sin6_port = htons((uint16_t)std::stoi(port));
+ auto colon_pos = addr.find_last_of(':');
+ auto bracket_pos = addr.find_last_of(']');
+ if (bracket_pos != std::string::npos) {
+ // ipv6 address
+ if (addr.at(start_pos) != '[') {
+ throw std::invalid_argument("invalid address");
+ }
+ if (colon_pos == std::string::npos) {
+ throw std::invalid_argument("invalid address");
}
+ if (colon_pos < bracket_pos) {
+ // have not port
+ if (bracket_pos != addr.size() - 1) {
+ throw std::invalid_argument("invalid address");
+ }
+ colon_pos = addr.size();
+ } else if (colon_pos != bracket_pos + 1) {
+ throw std::invalid_argument("invalid address");
+ }
+ } else if (colon_pos == std::string::npos) {
+ // have not port
+ colon_pos = addr.size();
}
+
+ decltype(bracket_pos) fix_bracket = bracket_pos == std::string::npos ? 0 : 1;
+ std::string host = addr.substr(start_pos + fix_bracket, colon_pos - start_pos - fix_bracket * 2);
+ auto* sa = LookupNameServers(host);
+
+ std::string port = colon_pos >= addr.size() ? "0" : addr.substr(colon_pos + 1);
+ uint32_t n = std::stoul(port);
+ if (n > std::numeric_limits<uint16_t>::max()) {
+ throw std::out_of_range("port is to large");
+ }
+ uint16_t port_num = htons(static_cast<uint16_t>(n));
+
+ if (sa->sa_family == AF_INET) {
+ auto* sin = reinterpret_cast<sockaddr_in*>(sa);
+ sin->sin_port = port_num;
+ } else if (sa->sa_family == AF_INET6) {
+ auto* sin6 = reinterpret_cast<sockaddr_in6*>(sa);
+ sin6->sin6_port = port_num;
+ } else {
+ throw std::runtime_error("don't support non-inet address families");
+ }
+
return sa;
}
/**
- * converts an address from network format to presentation format (a.b.c.d)
+ * converts an address from network format to presentation format
*/
-std::string socketAddress2String(const struct sockaddr* addr) {
+std::string SockaddrToString(const sockaddr* addr) {
if (nullptr == addr) {
- return "127.0.0.1";
+ return std::string();
}
char buf[128];
@@ -90,38 +132,40 @@ std::string socketAddress2String(const struct sockaddr* addr) {
uint16_t port = 0;
if (addr->sa_family == AF_INET) {
- auto* sin = (struct sockaddr_in*)addr;
- if (nullptr != evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
- address = buf;
+ const auto* sin = reinterpret_cast<const sockaddr_in*>(addr);
+ if (nullptr == evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
+ throw std::runtime_error("can not convert AF_INET address to text form");
}
+ address = buf;
port = ntohs(sin->sin_port);
} else if (addr->sa_family == AF_INET6) {
- auto* sin6 = (struct sockaddr_in6*)addr;
- if (nullptr != evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) {
- address = buf;
+ const auto* sin6 = reinterpret_cast<const sockaddr_in6*>(addr);
+ if (nullptr == evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) {
+ throw std::runtime_error("can not convert AF_INET6 address to text form");
}
+ address = buf;
port = ntohs(sin6->sin6_port);
} else {
- throw std::runtime_error("don't support non-inet Address families.");
+ throw std::runtime_error("don't support non-inet address families");
}
- if (!address.empty() && port != 0) {
- if (addr->sa_family == AF_INET6) {
- address = "[" + address + "]";
- }
- address += ":" + UtilAll::to_string(port);
+ if (addr->sa_family == AF_INET6) {
+ address = "[" + address + "]";
+ }
+ if (port != 0) {
+ address += ":" + std::to_string(port);
}
return address;
}
-struct sockaddr* lookupNameServers(const std::string& hostname) {
+sockaddr* LookupNameServers(const std::string& hostname) {
if (hostname.empty()) {
- return nullptr;
+ throw std::invalid_argument("invalid hostname");
}
- struct evutil_addrinfo hints;
- struct evutil_addrinfo* answer = NULL;
+ evutil_addrinfo hints;
+ evutil_addrinfo* answer = nullptr;
/* Build the hints to tell getaddrinfo how to act. */
std::memset(&hints, 0, sizeof(hints));
@@ -131,48 +175,63 @@ struct sockaddr* lookupNameServers(const std::string& hostname) {
hints.ai_flags = EVUTIL_AI_ADDRCONFIG; /* Only return addresses we can use. */
// Look up the hostname.
- int err = evutil_getaddrinfo(hostname.c_str(), NULL, &hints, &answer);
+ int err = evutil_getaddrinfo(hostname.c_str(), nullptr, &hints, &answer);
if (err != 0) {
- std::string info = "Failed to resolve host name(" + hostname + "): " + evutil_gai_strerror(err);
+ std::string info = "Failed to resolve hostname(" + hostname + "): " + evutil_gai_strerror(err);
THROW_MQEXCEPTION(UnknownHostException, info, -1);
}
- struct sockaddr* sin = nullptr;
+ sockaddr_storage* ss = &ss_buffer;
- for (struct evutil_addrinfo* ai = answer; ai != NULL; ai = ai->ai_next) {
+ bool hit = false;
+ for (struct evutil_addrinfo* ai = answer; ai != nullptr; ai = ai->ai_next) {
auto* ai_addr = ai->ai_addr;
if (ai_addr->sa_family != AF_INET && ai_addr->sa_family != AF_INET6) {
continue;
}
- sin = (struct sockaddr*)&sin_buf;
- std::memcpy(sin, ai_addr, sockaddr_size(ai_addr));
+ std::memcpy(ss, ai_addr, SockaddrSize(ai_addr));
+ hit = true;
break;
}
evutil_freeaddrinfo(answer);
- return sin;
+ if (!hit) {
+ throw std::runtime_error("hostname is non-inet address family");
+ }
+
+ return reinterpret_cast<sockaddr*>(ss);
}
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src) {
- if (src != nullptr) {
- if (dst == nullptr || dst->sa_family != src->sa_family) {
- dst = (struct sockaddr*)std::realloc(dst, sizeof(union sockaddr_union));
- }
- std::memcpy(dst, src, sockaddr_size(src));
- } else {
- free(dst);
- dst = nullptr;
+sockaddr* GetSelfIP() {
+ try {
+ return LookupNameServers(GetLocalHostname());
+ } catch (const UnknownHostException& e) {
+ return LookupNameServers("localhost");
}
- return dst;
}
-uint64_t h2nll(uint64_t v) {
- return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalHostname() {
+ static std::string local_hostname = []() {
+ char name[1024];
+ if (::gethostname(name, sizeof(name)) != 0) {
+ return std::string();
+ }
+ return std::string(name);
+ }();
+ return local_hostname;
}
-uint64_t n2hll(uint64_t v) {
- return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalAddress() {
+ static std::string local_address = []() {
+ try {
+ return SockaddrToString(GetSelfIP());
+ } catch (std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ std::abort();
+ }
+ }();
+ return local_address;
}
} // namespace rocketmq
diff --git a/src/transport/SocketUtil.h b/src/transport/SocketUtil.h
index be18a38..6bfe535 100644
--- a/src/transport/SocketUtil.h
+++ b/src/transport/SocketUtil.h
@@ -17,13 +17,14 @@
#ifndef ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
#define ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
-#include <cstdint>
+#include <cstddef> // size_t
+#include <cstdint> // uint16_t
#include <string>
#ifndef WIN32
-#include <arpa/inet.h>
-#include <sys/socket.h>
+#include <netinet/in.h> // sockaddr_in, AF_INET, sockaddr_in6, AF_INET6
+#include <sys/socket.h> // sockaddr, sockaddr_storage
#else
#include <Winsock2.h>
#pragma comment(lib, "ws2_32.lib")
@@ -33,21 +34,41 @@
namespace rocketmq {
-static inline size_t sockaddr_size(const struct sockaddr* sa) {
- return sa->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
+const size_t kIPv4AddrSize = 4;
+const size_t kIPv6AddrSize = 16;
+
+static inline size_t IpaddrSize(const sockaddr* sa) {
+ assert(sa != nullptr);
+ assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+ return sa->sa_family == AF_INET6 ? kIPv6AddrSize : kIPv4AddrSize;
+}
+
+static inline size_t IpaddrSize(const sockaddr_storage* ss) {
+ return IpaddrSize(reinterpret_cast<const sockaddr*>(ss));
+}
+
+static inline size_t SockaddrSize(const sockaddr* sa) {
+ assert(sa != nullptr);
+ assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+ return sa->sa_family == AF_INET6 ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
+}
+
+static inline size_t SockaddrSize(const sockaddr_storage* ss) {
+ return SockaddrSize(reinterpret_cast<const sockaddr*>(ss));
}
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port);
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src);
-struct sockaddr* string2SocketAddress(const std::string& addr);
-std::string socketAddress2String(const struct sockaddr* addr);
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port);
-struct sockaddr* lookupNameServers(const std::string& hostname);
+sockaddr* StringToSockaddr(const std::string& addr);
+std::string SockaddrToString(const sockaddr* addr);
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src);
+sockaddr* LookupNameServers(const std::string& hostname);
-uint64_t h2nll(uint64_t v);
-uint64_t n2hll(uint64_t v);
+sockaddr* GetSelfIP();
+const std::string& GetLocalHostname();
+const std::string& GetLocalAddress();
} // namespace rocketmq