You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/06/12 11:07:23 UTC
[incubator-kvrocks] branch unstable updated: Don't allow the instance replication of itself and it's own replicas (#1488)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 517864ad Don't allow the instance replication of itself and it's own replicas (#1488)
517864ad is described below
commit 517864adaa244d7477769bec526177c417b2e0f5
Author: Uddeshya Singh <si...@gmail.com>
AuthorDate: Mon Jun 12 16:37:16 2023 +0530
Don't allow the instance replication of itself and it's own replicas (#1488)
Co-authored-by: git-hulk <hu...@gmail.com>
Co-authored-by: Twice <tw...@apache.org>
---
src/commands/cmd_server.cc | 25 ++++++++++++++++++-
src/common/io_util.cc | 27 ++++++++++++++++++++
src/common/io_util.h | 1 +
src/server/server.cc | 13 ++++++++++
src/server/server.h | 1 +
.../integration/replication/replication_test.go | 29 ++++++++++++++++++++++
6 files changed, 95 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 4002aa00..cc0b2f22 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -20,6 +20,7 @@
#include "commander.h"
#include "commands/scan_base.h"
+#include "common/io_util.h"
#include "config/config.h"
#include "error_constants.h"
#include "server/redis_connection.h"
@@ -867,6 +868,24 @@ class CommandFlushBackup : public Commander {
class CommandSlaveOf : public Commander {
public:
+ static Status IsTryingToReplicateItself(Server *svr, const std::string &host, uint32_t port) {
+ auto ip_addresses = util::LookupHostByName(host);
+ if (!ip_addresses) {
+ return {Status::NotOK, "Can not resolve hostname: " + host};
+ }
+ for (auto &ip : *ip_addresses) {
+ if (util::MatchListeningIP(svr->GetConfig()->binds, ip) && port == svr->GetConfig()->port) {
+ return {Status::NotOK, "can't replicate itself"};
+ }
+ for (std::pair<std::string, uint32_t> &host_port_pair : svr->GetSlaveHostAndPort()) {
+ if (host_port_pair.first == ip && host_port_pair.second == port) {
+ return {Status::NotOK, "can't replicate your own replicas"};
+ }
+ }
+ }
+ return Status::OK();
+ }
+
Status Parse(const std::vector<std::string> &args) override {
host_ = args[1];
const auto &port = args[2];
@@ -914,7 +933,11 @@ class CommandSlaveOf : public Commander {
return Status::OK();
}
- auto s = svr->AddMaster(host_, port_, false);
+ auto s = IsTryingToReplicateItself(svr, host_, port_);
+ if (!s.IsOK()) {
+ return {Status::RedisExecErr, s.Msg()};
+ }
+ s = svr->AddMaster(host_, port_, false);
if (s.IsOK()) {
*output = redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ << " enabled (user request from '" << conn->GetAddr()
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index a56854d6..cb30a480 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -99,6 +99,33 @@ Status SockSetTcpKeepalive(int fd, int interval) {
return Status::OK();
}
+// Lookup IP addresses by hostname
+StatusOr<std::vector<std::string>> LookupHostByName(const std::string &host) {
+ addrinfo hints = {}, *servinfo = nullptr;
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+
+ if (int rv = getaddrinfo(host.c_str(), nullptr, &hints, &servinfo); rv != 0) {
+ return {Status::NotOK, gai_strerror(rv)};
+ }
+
+ auto exit = MakeScopeExit([servinfo] { freeaddrinfo(servinfo); });
+
+ std::vector<std::string> ips;
+ for (auto p = servinfo; p != nullptr; p = p->ai_next) {
+ char ip[INET6_ADDRSTRLEN] = {};
+ if (p->ai_family == AF_INET) {
+ inet_ntop(p->ai_family, &((struct sockaddr_in *)p->ai_addr)->sin_addr, ip, sizeof(ip));
+ } else {
+ inet_ntop(p->ai_family, &((struct sockaddr_in6 *)p->ai_addr)->sin6_addr, ip, sizeof(ip));
+ }
+ ips.emplace_back(ip);
+ }
+
+ return ips;
+}
+
StatusOr<int> SockConnect(const std::string &host, uint32_t port, int conn_timeout, int timeout) {
addrinfo hints = {}, *servinfo = nullptr;
diff --git a/src/common/io_util.h b/src/common/io_util.h
index e0d4ade1..8391d2cc 100644
--- a/src/common/io_util.h
+++ b/src/common/io_util.h
@@ -27,6 +27,7 @@
namespace util {
sockaddr_in NewSockaddrInet(const std::string &host, uint32_t port);
+StatusOr<std::vector<std::string>> LookupHostByName(const std::string &host);
StatusOr<int> SockConnect(const std::string &host, uint32_t port, int conn_timeout = 0, int timeout = 0);
Status SockSetTcpNoDelay(int fd, int val);
Status SockSetTcpKeepalive(int fd, int interval);
diff --git a/src/server/server.cc b/src/server/server.cc
index 21d299a0..6be0aae7 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1751,3 +1751,16 @@ void Server::ResetWatchedKeys(redis::Connection *conn) {
watched_key_size_ = watched_key_map_.size();
}
}
+
+std::list<std::pair<std::string, uint32_t>> Server::GetSlaveHostAndPort() {
+ std::list<std::pair<std::string, uint32_t>> result;
+ slave_threads_mu_.lock();
+ for (const auto &slave : slave_threads_) {
+ if (slave->IsStopped()) continue;
+ std::pair<std::string, int> host_port_pair = {slave->GetConn()->GetAnnounceIP(),
+ slave->GetConn()->GetListeningPort()};
+ result.emplace_back(host_port_pair);
+ }
+ slave_threads_mu_.unlock();
+ return result;
+}
diff --git a/src/server/server.h b/src/server/server.h
index 79f7d228..a34d8cdb 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -229,6 +229,7 @@ class Server {
void WatchKey(redis::Connection *conn, const std::vector<std::string> &keys);
static bool IsWatchedKeysModified(redis::Connection *conn);
void ResetWatchedKeys(redis::Connection *conn);
+ std::list<std::pair<std::string, uint32_t>> GetSlaveHostAndPort();
#ifdef ENABLE_OPENSSL
UniqueSSLContext ssl_ctx;
diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go
index 0ade5936..c38e1fee 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -422,3 +422,32 @@ func TestReplicationAnnounceIP(t *testing.T) {
require.Equal(t, "1234", slave0port)
})
}
+
+func TestShouldNotReplicate(t *testing.T) {
+ master := util.StartServer(t, map[string]string{})
+ defer master.Close()
+ masterClient := master.NewClient()
+ defer func() { require.NoError(t, masterClient.Close()) }()
+
+ ctx := context.Background()
+
+ slave := util.StartServer(t, map[string]string{})
+ defer slave.Close()
+ slaveClient := slave.NewClient()
+ defer func() { require.NoError(t, slaveClient.Close()) }()
+
+ t.Run("Setting server as replica of itself should throw error", func(t *testing.T) {
+ err := slaveClient.SlaveOf(ctx, slave.Host(), fmt.Sprintf("%d", slave.Port())).Err()
+ require.Equal(t, "ERR can't replicate itself", err.Error())
+ require.Equal(t, "master", util.FindInfoEntry(slaveClient, "role"))
+ })
+
+ t.Run("Master should not be able to replicate slave", func(t *testing.T) {
+ util.SlaveOf(t, slaveClient, master)
+ util.WaitForSync(t, slaveClient)
+ require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role"))
+ err := masterClient.SlaveOf(ctx, slave.Host(), fmt.Sprintf("%d", slave.Port())).Err()
+ require.EqualErrorf(t, err, "ERR can't replicate your own replicas", err.Error())
+ require.Equal(t, "master", util.FindInfoEntry(masterClient, "role"))
+ })
+}