You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/08 11:07:22 UTC
[incubator-pegasus] branch master updated: feat(FQDN): Implemention of struct dns_resolver (#1464)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ee03ffee8 feat(FQDN): Implemention of struct dns_resolver (#1464)
ee03ffee8 is described below
commit ee03ffee8ab965d5179e47c8356ac8e822809e21
Author: liguohao <48...@users.noreply.github.com>
AuthorDate: Mon May 8 19:07:15 2023 +0800
feat(FQDN): Implemention of struct dns_resolver (#1464)
issue: https://github.com/apache/incubator-pegasus/issues/1404
Implement of struct `dns_resolver ` in order to transfer 'host_port' to 'rpc_address'.
`rpc_address` is used to rpc communication.
---
src/runtime/rpc/dns_resolver.cpp | 110 ++++++++++++++++++++++++++++++++++++
src/runtime/rpc/dns_resolver.h | 54 ++++++++++++++++++
src/runtime/rpc/rpc_address.cpp | 9 +++
src/runtime/rpc/rpc_address.h | 4 ++
src/runtime/rpc/rpc_host_port.cpp | 85 ++++++++++++++++++++++++++++
src/runtime/rpc/rpc_host_port.h | 6 ++
src/runtime/test/host_port_test.cpp | 59 +++++++++++++++++++
7 files changed, 327 insertions(+)
diff --git a/src/runtime/rpc/dns_resolver.cpp b/src/runtime/rpc/dns_resolver.cpp
new file mode 100644
index 000000000..0cc9a0616
--- /dev/null
+++ b/src/runtime/rpc/dns_resolver.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <utility>
+
+#include "fmt/format.h"
+#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/group_address.h"
+#include "runtime/rpc/group_host_port.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+
+void dns_resolver::add_item(const host_port &hp, const rpc_address &addr)
+{
+ utils::auto_write_lock l(_lock);
+ _dsn_cache.insert(std::make_pair(hp, addr));
+}
+
+bool dns_resolver::get_cached_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
+{
+ utils::auto_read_lock l(_lock);
+ const auto &found = _dsn_cache.find(hp);
+ if (found == _dsn_cache.end()) {
+ return false;
+ }
+
+ addresses = {found->second};
+ return true;
+}
+
+error_s dns_resolver::resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
+{
+ CHECK(addresses.empty(), "invalid addresses, not empty");
+ if (get_cached_addresses(hp, addresses)) {
+ return error_s::ok();
+ }
+
+ std::vector<rpc_address> resolved_addresses;
+ RETURN_NOT_OK(hp.resolve_addresses(resolved_addresses));
+
+ {
+ utils::auto_write_lock l(_lock);
+ if (resolved_addresses.size() > 1) {
+ LOG_DEBUG(
+ "host_port '{}' resolves to {} different addresses {}, using the first one {}.",
+ hp,
+ resolved_addresses.size(),
+ fmt::join(resolved_addresses, ","),
+ resolved_addresses[0]);
+ }
+ _dsn_cache.insert(std::make_pair(hp, resolved_addresses[0]));
+ }
+
+ addresses = std::move(resolved_addresses);
+ return error_s::ok();
+}
+
+rpc_address dns_resolver::resolve_address(const host_port &hp)
+{
+ switch (hp.type()) {
+ case HOST_TYPE_GROUP: {
+ rpc_address addr;
+ auto group_address = hp.group_host_port();
+ addr.assign_group(group_address->name());
+
+ for (const auto &hp : group_address->members()) {
+ CHECK_TRUE(addr.group_address()->add(resolve_address(hp)));
+ }
+ addr.group_address()->set_update_leader_automatically(
+ group_address->is_update_leader_automatically());
+ addr.group_address()->set_leader(resolve_address(group_address->leader()));
+ return addr;
+ }
+ case HOST_TYPE_IPV4: {
+ std::vector<rpc_address> addresses;
+ CHECK_OK(resolve_addresses(hp, addresses), "host_port '{}' can not be resolved", hp);
+ CHECK(!addresses.empty(), "host_port '{}' can not be resolved to any address", hp);
+
+ if (addresses.size() > 1) {
+ LOG_WARNING("host_port '{}' resolves to {} different addresses, using the first one {}",
+ hp,
+ addresses.size(),
+ addresses[0]);
+ }
+ return addresses[0];
+ }
+ default:
+ return rpc_address();
+ }
+}
+
+} // namespace dsn
diff --git a/src/runtime/rpc/dns_resolver.h b/src/runtime/rpc/dns_resolver.h
new file mode 100644
index 000000000..7cf47895c
--- /dev/null
+++ b/src/runtime/rpc/dns_resolver.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <unordered_map>
+#include <vector>
+
+#include "runtime/rpc/rpc_address.h"
+#include "runtime/rpc/rpc_host_port.h"
+#include "utils/errors.h"
+#include "utils/synchronize.h"
+
+namespace dsn {
+
+// This class provide a way to resolve host_port to rpc_address.
+class dns_resolver
+{
+public:
+ explicit dns_resolver() = default;
+
+ void add_item(const host_port &hp, const rpc_address &addr);
+
+ // Resolve this host_port to an unique rpc_address.
+ rpc_address resolve_address(const host_port &hp);
+
+private:
+ bool get_cached_addresses(const host_port &hp, std::vector<rpc_address> &addresses);
+
+ error_s resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses);
+
+ error_s do_resolution(const host_port &hp, std::vector<rpc_address> &addresses);
+
+ mutable utils::rw_lock_nr _lock;
+ std::unordered_map<host_port, rpc_address> _dsn_cache;
+};
+
+} // namespace dsn
diff --git a/src/runtime/rpc/rpc_address.cpp b/src/runtime/rpc/rpc_address.cpp
index 5fce00098..fd3987536 100644
--- a/src/runtime/rpc/rpc_address.cpp
+++ b/src/runtime/rpc/rpc_address.cpp
@@ -240,4 +240,13 @@ const char *rpc_address::to_string() const
return (const char *)p;
}
+
+rpc_address::rpc_address(const struct sockaddr_in &addr)
+{
+ set_invalid();
+ _addr.v4.type = HOST_TYPE_IPV4;
+ _addr.v4.ip = static_cast<uint32_t>(ntohl(addr.sin_addr.s_addr));
+ _addr.v4.port = ntohs(addr.sin_port);
+}
+
} // namespace dsn
diff --git a/src/runtime/rpc/rpc_address.h b/src/runtime/rpc/rpc_address.h
index 1750f9152..7b8cc6820 100644
--- a/src/runtime/rpc/rpc_address.h
+++ b/src/runtime/rpc/rpc_address.h
@@ -26,6 +26,8 @@
#pragma once
+#include <arpa/inet.h> // IWYU pragma: keep
+
#include <cstddef>
#include <cstdint>
#include <functional>
@@ -78,6 +80,8 @@ public:
rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); }
+ explicit rpc_address(const struct sockaddr_in &addr);
+
void assign_ipv4(uint32_t ip, uint16_t port)
{
set_invalid();
diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp
index 97b75b6ea..13c6462b7 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -17,18 +17,51 @@
* under the License.
*/
+#include <errno.h>
+#include <netdb.h>
#include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+#include <unordered_set>
#include <utility>
#include "fmt/core.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
+#include "utils/error_code.h"
+#include "utils/safe_strerror_posix.h"
#include "utils/utils.h"
namespace dsn {
const host_port host_port::s_invalid_host_port;
+namespace {
+
+using AddrInfo = std::unique_ptr<addrinfo, std::function<void(addrinfo *)>>;
+
+error_s GetAddrInfo(const std::string &hostname, const addrinfo &hints, AddrInfo *info)
+{
+ addrinfo *res = nullptr;
+ const int rc = getaddrinfo(hostname.c_str(), nullptr, &hints, &res);
+ const int err = errno; // preserving the errno from the getaddrinfo() call
+ AddrInfo result(res, ::freeaddrinfo);
+ if (rc != 0) {
+ if (rc == EAI_SYSTEM) {
+ return error_s::make(ERR_NETWORK_FAILURE, utils::safe_strerror(err));
+ }
+ return error_s::make(ERR_NETWORK_FAILURE, gai_strerror(rc));
+ }
+
+ if (info != nullptr) {
+ info->swap(result);
+ }
+
+ return error_s::ok();
+}
+}
+
host_port::host_port(std::string host, uint16_t port)
: _host(std::move(host)), _port(port), _type(HOST_TYPE_IPV4)
{
@@ -113,4 +146,56 @@ void host_port::assign_group(const char *name)
_group_host_port->add_ref();
}
+error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
+{
+ CHECK(addresses.empty(), "");
+
+ switch (type()) {
+ case HOST_TYPE_INVALID:
+ return error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_INVALID");
+ case HOST_TYPE_GROUP:
+ return error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_GROUP");
+ case HOST_TYPE_IPV4:
+ break;
+ }
+
+ rpc_address rpc_addr;
+ // Resolve hostname like "localhost:80" or "192.168.0.1:8080".
+ if (rpc_addr.from_string_ipv4(this->to_string().c_str())) {
+ addresses.emplace_back(rpc_addr);
+ return error_s::ok();
+ }
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ AddrInfo result;
+ RETURN_NOT_OK(GetAddrInfo(_host, hints, &result));
+
+ // DNS may return the same host multiple times. We want to return only the unique
+ // addresses, but in the same order as DNS returned them. To do so, we keep track
+ // of the already-inserted elements in a set.
+ std::unordered_set<rpc_address> inserted;
+ std::vector<rpc_address> result_addresses;
+ for (const addrinfo *ai = result.get(); ai != nullptr; ai = ai->ai_next) {
+ CHECK_EQ(AF_INET, ai->ai_family);
+ sockaddr_in *addr = reinterpret_cast<sockaddr_in *>(ai->ai_addr);
+ addr->sin_port = htons(_port);
+ rpc_address rpc_addr(*addr);
+ LOG_INFO("resolved address {} for host_port {}", rpc_addr, to_string());
+ if (inserted.insert(rpc_addr).second) {
+ result_addresses.emplace_back(rpc_addr);
+ }
+ }
+
+ if (result_addresses.empty()) {
+ return error_s::make(dsn::ERR_NETWORK_FAILURE,
+ fmt::format("can not resolve host_port {}.", to_string()));
+ }
+
+ addresses = std::move(result_addresses);
+ return error_s::ok();
+}
+
} // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 0a82afde0..033ba089c 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -25,9 +25,11 @@
#include <functional>
#include <iosfwd>
#include <string>
+#include <vector>
#include "runtime/rpc/rpc_address.h"
#include "utils/autoref_ptr.h"
+#include "utils/errors.h"
#include "utils/fmt_logging.h"
namespace dsn {
@@ -68,6 +70,10 @@ public:
}
void assign_group(const char *name);
+ // Resolve host_port to rpc_addresses.
+ // Trere may be multiple rpc_addresses for one host_port.
+ error_s resolve_addresses(std::vector<rpc_address> &addresses) const;
+
private:
std::string _host;
uint16_t _port = 0;
diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp
index dfa48d2dd..617c7aba3 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -20,12 +20,17 @@
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <string.h>
#include <string>
#include <vector>
+#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
namespace dsn {
@@ -143,4 +148,58 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_EQ(invalid_hp, g->leader());
}
+TEST(host_port_test, transfer_rpc_address)
+{
+ {
+ std::vector<rpc_address> addresses;
+ host_port hp("localhost", 8080);
+ ASSERT_EQ(hp.resolve_addresses(addresses), error_s::ok());
+ ASSERT_TRUE(rpc_address("127.0.0.1", 8080) == addresses[0] ||
+ rpc_address("127.0.1.1", 8080) == addresses[0]);
+ }
+ {
+ std::vector<rpc_address> addresses;
+ host_port hp;
+ hp.resolve_addresses(addresses);
+ ASSERT_EQ(
+ hp.resolve_addresses(addresses),
+ error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_INVALID"));
+
+ hp.assign_group("test_group");
+ ASSERT_EQ(hp.resolve_addresses(addresses),
+ error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_GROUP"));
+ }
+}
+
+TEST(host_port_test, dns_resolver)
+{
+ dns_resolver resolver;
+ {
+ host_port hp("localhost", 8080);
+ auto addr = resolver.resolve_address(hp);
+ ASSERT_TRUE(rpc_address("127.0.0.1", 8080) == addr ||
+ rpc_address("127.0.1.1", 8080) == addr);
+ }
+
+ {
+ host_port hp_grp;
+ hp_grp.assign_group("test_group");
+ rpc_group_host_port *g = hp_grp.group_host_port();
+
+ host_port hp1("localhost", 8080);
+ ASSERT_TRUE(g->add(hp1));
+ host_port hp2("localhost", 8081);
+ g->set_leader(hp2);
+
+ auto addr_grp = resolver.resolve_address(hp_grp);
+
+ ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
+ hp_grp.group_host_port()->is_update_leader_automatically());
+ ASSERT_EQ(strcmp(addr_grp.group_address()->name(), hp_grp.group_host_port()->name()), 0);
+ ASSERT_EQ(addr_grp.group_address()->count(), hp_grp.group_host_port()->count());
+ ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
+ hp_grp.group_host_port()->leader());
+ }
+}
+
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org