You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/08 11:07:22 UTC

[incubator-pegasus] branch master updated: feat(FQDN): Implemention of struct dns_resolver (#1464)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ee03ffee8 feat(FQDN): Implemention of struct dns_resolver (#1464)
ee03ffee8 is described below

commit ee03ffee8ab965d5179e47c8356ac8e822809e21
Author: liguohao <48...@users.noreply.github.com>
AuthorDate: Mon May 8 19:07:15 2023 +0800

    feat(FQDN): Implemention of struct dns_resolver (#1464)
    
    issue: https://github.com/apache/incubator-pegasus/issues/1404
    
    Implement of struct `dns_resolver ` in order to transfer 'host_port' to 'rpc_address'.
    `rpc_address` is used to rpc communication.
---
 src/runtime/rpc/dns_resolver.cpp    | 110 ++++++++++++++++++++++++++++++++++++
 src/runtime/rpc/dns_resolver.h      |  54 ++++++++++++++++++
 src/runtime/rpc/rpc_address.cpp     |   9 +++
 src/runtime/rpc/rpc_address.h       |   4 ++
 src/runtime/rpc/rpc_host_port.cpp   |  85 ++++++++++++++++++++++++++++
 src/runtime/rpc/rpc_host_port.h     |   6 ++
 src/runtime/test/host_port_test.cpp |  59 +++++++++++++++++++
 7 files changed, 327 insertions(+)

diff --git a/src/runtime/rpc/dns_resolver.cpp b/src/runtime/rpc/dns_resolver.cpp
new file mode 100644
index 000000000..0cc9a0616
--- /dev/null
+++ b/src/runtime/rpc/dns_resolver.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <algorithm>
+#include <utility>
+
+#include "fmt/format.h"
+#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/group_address.h"
+#include "runtime/rpc/group_host_port.h"
+#include "utils/fmt_logging.h"
+
+namespace dsn {
+
+void dns_resolver::add_item(const host_port &hp, const rpc_address &addr)
+{
+    utils::auto_write_lock l(_lock);
+    _dsn_cache.insert(std::make_pair(hp, addr));
+}
+
+bool dns_resolver::get_cached_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
+{
+    utils::auto_read_lock l(_lock);
+    const auto &found = _dsn_cache.find(hp);
+    if (found == _dsn_cache.end()) {
+        return false;
+    }
+
+    addresses = {found->second};
+    return true;
+}
+
+error_s dns_resolver::resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
+{
+    CHECK(addresses.empty(), "invalid addresses, not empty");
+    if (get_cached_addresses(hp, addresses)) {
+        return error_s::ok();
+    }
+
+    std::vector<rpc_address> resolved_addresses;
+    RETURN_NOT_OK(hp.resolve_addresses(resolved_addresses));
+
+    {
+        utils::auto_write_lock l(_lock);
+        if (resolved_addresses.size() > 1) {
+            LOG_DEBUG(
+                "host_port '{}' resolves to {} different addresses {}, using the first one {}.",
+                hp,
+                resolved_addresses.size(),
+                fmt::join(resolved_addresses, ","),
+                resolved_addresses[0]);
+        }
+        _dsn_cache.insert(std::make_pair(hp, resolved_addresses[0]));
+    }
+
+    addresses = std::move(resolved_addresses);
+    return error_s::ok();
+}
+
+rpc_address dns_resolver::resolve_address(const host_port &hp)
+{
+    switch (hp.type()) {
+    case HOST_TYPE_GROUP: {
+        rpc_address addr;
+        auto group_address = hp.group_host_port();
+        addr.assign_group(group_address->name());
+
+        for (const auto &hp : group_address->members()) {
+            CHECK_TRUE(addr.group_address()->add(resolve_address(hp)));
+        }
+        addr.group_address()->set_update_leader_automatically(
+            group_address->is_update_leader_automatically());
+        addr.group_address()->set_leader(resolve_address(group_address->leader()));
+        return addr;
+    }
+    case HOST_TYPE_IPV4: {
+        std::vector<rpc_address> addresses;
+        CHECK_OK(resolve_addresses(hp, addresses), "host_port '{}' can not be resolved", hp);
+        CHECK(!addresses.empty(), "host_port '{}' can not be resolved to any address", hp);
+
+        if (addresses.size() > 1) {
+            LOG_WARNING("host_port '{}' resolves to {} different addresses, using the first one {}",
+                        hp,
+                        addresses.size(),
+                        addresses[0]);
+        }
+        return addresses[0];
+    }
+    default:
+        return rpc_address();
+    }
+}
+
+} // namespace dsn
diff --git a/src/runtime/rpc/dns_resolver.h b/src/runtime/rpc/dns_resolver.h
new file mode 100644
index 000000000..7cf47895c
--- /dev/null
+++ b/src/runtime/rpc/dns_resolver.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <unordered_map>
+#include <vector>
+
+#include "runtime/rpc/rpc_address.h"
+#include "runtime/rpc/rpc_host_port.h"
+#include "utils/errors.h"
+#include "utils/synchronize.h"
+
+namespace dsn {
+
+// This class provide a way to resolve host_port to rpc_address.
+class dns_resolver
+{
+public:
+    explicit dns_resolver() = default;
+
+    void add_item(const host_port &hp, const rpc_address &addr);
+
+    // Resolve this host_port to an unique rpc_address.
+    rpc_address resolve_address(const host_port &hp);
+
+private:
+    bool get_cached_addresses(const host_port &hp, std::vector<rpc_address> &addresses);
+
+    error_s resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses);
+
+    error_s do_resolution(const host_port &hp, std::vector<rpc_address> &addresses);
+
+    mutable utils::rw_lock_nr _lock;
+    std::unordered_map<host_port, rpc_address> _dsn_cache;
+};
+
+} // namespace dsn
diff --git a/src/runtime/rpc/rpc_address.cpp b/src/runtime/rpc/rpc_address.cpp
index 5fce00098..fd3987536 100644
--- a/src/runtime/rpc/rpc_address.cpp
+++ b/src/runtime/rpc/rpc_address.cpp
@@ -240,4 +240,13 @@ const char *rpc_address::to_string() const
 
     return (const char *)p;
 }
+
+rpc_address::rpc_address(const struct sockaddr_in &addr)
+{
+    set_invalid();
+    _addr.v4.type = HOST_TYPE_IPV4;
+    _addr.v4.ip = static_cast<uint32_t>(ntohl(addr.sin_addr.s_addr));
+    _addr.v4.port = ntohs(addr.sin_port);
+}
+
 } // namespace dsn
diff --git a/src/runtime/rpc/rpc_address.h b/src/runtime/rpc/rpc_address.h
index 1750f9152..7b8cc6820 100644
--- a/src/runtime/rpc/rpc_address.h
+++ b/src/runtime/rpc/rpc_address.h
@@ -26,6 +26,8 @@
 
 #pragma once
 
+#include <arpa/inet.h> // IWYU pragma: keep
+
 #include <cstddef>
 #include <cstdint>
 #include <functional>
@@ -78,6 +80,8 @@ public:
 
     rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); }
 
+    explicit rpc_address(const struct sockaddr_in &addr);
+
     void assign_ipv4(uint32_t ip, uint16_t port)
     {
         set_invalid();
diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp
index 97b75b6ea..13c6462b7 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -17,18 +17,51 @@
  * under the License.
  */
 
+#include <errno.h>
+#include <netdb.h>
 #include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+#include <unordered_set>
 #include <utility>
 
 #include "fmt/core.h"
 #include "runtime/rpc/group_host_port.h"
 #include "runtime/rpc/rpc_host_port.h"
+#include "utils/error_code.h"
+#include "utils/safe_strerror_posix.h"
 #include "utils/utils.h"
 
 namespace dsn {
 
 const host_port host_port::s_invalid_host_port;
 
+namespace {
+
+using AddrInfo = std::unique_ptr<addrinfo, std::function<void(addrinfo *)>>;
+
+error_s GetAddrInfo(const std::string &hostname, const addrinfo &hints, AddrInfo *info)
+{
+    addrinfo *res = nullptr;
+    const int rc = getaddrinfo(hostname.c_str(), nullptr, &hints, &res);
+    const int err = errno; // preserving the errno from the getaddrinfo() call
+    AddrInfo result(res, ::freeaddrinfo);
+    if (rc != 0) {
+        if (rc == EAI_SYSTEM) {
+            return error_s::make(ERR_NETWORK_FAILURE, utils::safe_strerror(err));
+        }
+        return error_s::make(ERR_NETWORK_FAILURE, gai_strerror(rc));
+    }
+
+    if (info != nullptr) {
+        info->swap(result);
+    }
+
+    return error_s::ok();
+}
+}
+
 host_port::host_port(std::string host, uint16_t port)
     : _host(std::move(host)), _port(port), _type(HOST_TYPE_IPV4)
 {
@@ -113,4 +146,56 @@ void host_port::assign_group(const char *name)
     _group_host_port->add_ref();
 }
 
+error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
+{
+    CHECK(addresses.empty(), "");
+
+    switch (type()) {
+    case HOST_TYPE_INVALID:
+        return error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_INVALID");
+    case HOST_TYPE_GROUP:
+        return error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_GROUP");
+    case HOST_TYPE_IPV4:
+        break;
+    }
+
+    rpc_address rpc_addr;
+    // Resolve hostname like "localhost:80" or "192.168.0.1:8080".
+    if (rpc_addr.from_string_ipv4(this->to_string().c_str())) {
+        addresses.emplace_back(rpc_addr);
+        return error_s::ok();
+    }
+
+    struct addrinfo hints;
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_INET;
+    hints.ai_socktype = SOCK_STREAM;
+    AddrInfo result;
+    RETURN_NOT_OK(GetAddrInfo(_host, hints, &result));
+
+    // DNS may return the same host multiple times. We want to return only the unique
+    // addresses, but in the same order as DNS returned them. To do so, we keep track
+    // of the already-inserted elements in a set.
+    std::unordered_set<rpc_address> inserted;
+    std::vector<rpc_address> result_addresses;
+    for (const addrinfo *ai = result.get(); ai != nullptr; ai = ai->ai_next) {
+        CHECK_EQ(AF_INET, ai->ai_family);
+        sockaddr_in *addr = reinterpret_cast<sockaddr_in *>(ai->ai_addr);
+        addr->sin_port = htons(_port);
+        rpc_address rpc_addr(*addr);
+        LOG_INFO("resolved address {} for host_port {}", rpc_addr, to_string());
+        if (inserted.insert(rpc_addr).second) {
+            result_addresses.emplace_back(rpc_addr);
+        }
+    }
+
+    if (result_addresses.empty()) {
+        return error_s::make(dsn::ERR_NETWORK_FAILURE,
+                             fmt::format("can not resolve host_port {}.", to_string()));
+    }
+
+    addresses = std::move(result_addresses);
+    return error_s::ok();
+}
+
 } // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 0a82afde0..033ba089c 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -25,9 +25,11 @@
 #include <functional>
 #include <iosfwd>
 #include <string>
+#include <vector>
 
 #include "runtime/rpc/rpc_address.h"
 #include "utils/autoref_ptr.h"
+#include "utils/errors.h"
 #include "utils/fmt_logging.h"
 
 namespace dsn {
@@ -68,6 +70,10 @@ public:
     }
     void assign_group(const char *name);
 
+    // Resolve host_port to rpc_addresses.
+    // Trere may be multiple rpc_addresses for one host_port.
+    error_s resolve_addresses(std::vector<rpc_address> &addresses) const;
+
 private:
     std::string _host;
     uint16_t _port = 0;
diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp
index dfa48d2dd..617c7aba3 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -20,12 +20,17 @@
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
+#include <string.h>
 #include <string>
 #include <vector>
 
+#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/group_address.h"
 #include "runtime/rpc/group_host_port.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_host_port.h"
+#include "utils/error_code.h"
+#include "utils/errors.h"
 
 namespace dsn {
 
@@ -143,4 +148,58 @@ TEST(host_port_test, rpc_group_host_port)
     ASSERT_EQ(invalid_hp, g->leader());
 }
 
+TEST(host_port_test, transfer_rpc_address)
+{
+    {
+        std::vector<rpc_address> addresses;
+        host_port hp("localhost", 8080);
+        ASSERT_EQ(hp.resolve_addresses(addresses), error_s::ok());
+        ASSERT_TRUE(rpc_address("127.0.0.1", 8080) == addresses[0] ||
+                    rpc_address("127.0.1.1", 8080) == addresses[0]);
+    }
+    {
+        std::vector<rpc_address> addresses;
+        host_port hp;
+        hp.resolve_addresses(addresses);
+        ASSERT_EQ(
+            hp.resolve_addresses(addresses),
+            error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_INVALID"));
+
+        hp.assign_group("test_group");
+        ASSERT_EQ(hp.resolve_addresses(addresses),
+                  error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_GROUP"));
+    }
+}
+
+TEST(host_port_test, dns_resolver)
+{
+    dns_resolver resolver;
+    {
+        host_port hp("localhost", 8080);
+        auto addr = resolver.resolve_address(hp);
+        ASSERT_TRUE(rpc_address("127.0.0.1", 8080) == addr ||
+                    rpc_address("127.0.1.1", 8080) == addr);
+    }
+
+    {
+        host_port hp_grp;
+        hp_grp.assign_group("test_group");
+        rpc_group_host_port *g = hp_grp.group_host_port();
+
+        host_port hp1("localhost", 8080);
+        ASSERT_TRUE(g->add(hp1));
+        host_port hp2("localhost", 8081);
+        g->set_leader(hp2);
+
+        auto addr_grp = resolver.resolve_address(hp_grp);
+
+        ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
+                  hp_grp.group_host_port()->is_update_leader_automatically());
+        ASSERT_EQ(strcmp(addr_grp.group_address()->name(), hp_grp.group_host_port()->name()), 0);
+        ASSERT_EQ(addr_grp.group_address()->count(), hp_grp.group_host_port()->count());
+        ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
+                  hp_grp.group_host_port()->leader());
+    }
+}
+
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org