You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/01/11 16:21:51 UTC
[kudu] branch master updated: KUDU-3357 proxied RPCs for multi-master Kudu clusters
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new cdc97be9a KUDU-3357 proxied RPCs for multi-master Kudu clusters
cdc97be9a is described below
commit cdc97be9aaecb3232f174786ad60d9e30d6a78dd
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Jan 6 16:03:37 2023 -0800
KUDU-3357 proxied RPCs for multi-master Kudu clusters
With this patch, it's now possible to run a multi-master Kudu cluster in
a local area network while processing RPC requests sent from external
networks. The RPC requests are assumed to be forwarded by a TCP proxy
which advertises RPC endpoints for the masters and the tablet servers
of the multi-master Kudu cluster.
Refactored and unified test scenarios in client-proxied-rpc-test provide
the coverage for the newly added functionality.
This is a follow-up to 3f29b5da5f59ea96cfec0608226d5c35740884a6.
Change-Id: Id834d5fae8dcc2230de9f6b6429334c24052011a
Reviewed-on: http://gerrit.cloudera.org:8080/19404
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Zoltan Chovan <zc...@cloudera.com>
Tested-by: Alexey Serbin <al...@apache.org>
---
.../integration-tests/client-proxied-rpc-test.cc | 517 +++++++++++++--------
src/kudu/master/master.cc | 109 ++++-
src/kudu/master/master.h | 15 +-
src/kudu/master/master_service.cc | 8 +-
src/kudu/mini-cluster/external_mini_cluster.cc | 27 +-
src/kudu/mini-cluster/external_mini_cluster.h | 32 +-
6 files changed, 490 insertions(+), 218 deletions(-)
diff --git a/src/kudu/integration-tests/client-proxied-rpc-test.cc b/src/kudu/integration-tests/client-proxied-rpc-test.cc
index 932a8b853..05d0ad32f 100644
--- a/src/kudu/integration-tests/client-proxied-rpc-test.cc
+++ b/src/kudu/integration-tests/client-proxied-rpc-test.cc
@@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
+#include <array>
#include <csignal>
+#include <cstddef>
#include <cstdint>
+#include <functional>
#include <memory>
#include <ostream>
+#include <set>
#include <string>
#include <type_traits>
-#include <utility>
#include <vector>
#include <glog/logging.h>
@@ -34,6 +38,8 @@
#include "kudu/client/write_op.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/data_gen_util.h"
#include "kudu/integration-tests/test_workload.h"
@@ -61,44 +67,87 @@ using kudu::client::KuduTableAlterer;
using kudu::client::KuduTabletServer;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
+using std::array;
+using std::function;
+using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Split;
using strings::Substitute;
namespace kudu {
+// Class template for test scenarios running against external mini-cluster
+// with M masters and T tablet servers.
+template<size_t M, size_t T>
class ClientProxiedRpcTest : public KuduTest {
public:
void SetUp() override {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc_);
+ if (s.IsNotFound()) {
+ LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
+ GTEST_SKIP();
+ }
+ ASSERT_OK(s);
KuduTest::SetUp();
- ASSERT_OK(GetRandomPort(kIpAddr, &m_proxy_advertised_port_));
- m_proxy_advertised_addr_ = HostPort(kIpAddr, m_proxy_advertised_port_);
- ASSERT_OK(GetRandomPort(kIpAddr, &m_proxied_port_));
- m_proxied_addr_ = HostPort(kIpAddr, m_proxied_port_);
+ ExternalMiniClusterOptions opts;
+ opts.num_masters = M;
+ opts.num_tablet_servers = T;
+
+ vector<string> master_addrs;
+ for (size_t i = 0; i < M; ++i) {
+ auto& a_port = m_proxy_advertised_ports_[i];
+ ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
+ auto& a_addr = m_proxy_advertised_addrs_[i];
+ a_addr = HostPort(kIpAddr, a_port);
+
+ auto& p_port = m_proxied_ports_[i];
+ ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
+ auto& p_addr = m_proxied_addrs_[i];
+ p_addr = HostPort(kIpAddr, p_port);
+
+ vector<string> flags = {
+ Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
+ Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
+ };
+ opts.m_custom_flags.emplace_back(std::move(flags));
+
+ master_addrs.emplace_back(a_addr.ToString());
+ }
- ASSERT_OK(GetRandomPort(kIpAddr, &t_proxy_advertised_port_));
- t_proxy_advertised_addr_ = HostPort(kIpAddr, t_proxy_advertised_port_);
- ASSERT_OK(GetRandomPort(kIpAddr, &t_proxied_port_));
- t_proxied_addr_ = HostPort(kIpAddr, t_proxied_port_);
+ if (M > 1) {
+ const auto flag = Substitute("--master_rpc_proxy_advertised_addresses=$0",
+ JoinStrings(master_addrs, ","));
+ for (size_t i = 0; i < M; ++i) {
+ opts.m_custom_flags[i].push_back(flag);
+ }
+ }
- ExternalMiniClusterOptions opts;
- opts.extra_master_flags = {
- Substitute("--rpc_proxy_advertised_addresses=$0",
- m_proxy_advertised_addr_.ToString()),
- Substitute("--rpc_proxied_addresses=$0",
- m_proxied_addr_.ToString()),
- };
- opts.extra_tserver_flags = {
- Substitute("--rpc_proxy_advertised_addresses=$0",
- t_proxy_advertised_addr_.ToString()),
- Substitute("--rpc_proxied_addresses=$0",
- t_proxied_addr_.ToString()),
- };
+ for (size_t i = 0; i < T; ++i) {
+ auto& a_port = t_proxy_advertised_ports_[i];
+ ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
+ auto& a_addr = t_proxy_advertised_addrs_[i];
+ a_addr = HostPort(kIpAddr, a_port);
+
+ auto& p_port = t_proxied_ports_[i];
+ ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
+ auto& p_addr = t_proxied_addrs_[i];
+ p_addr = HostPort(kIpAddr, p_port);
+
+ vector<string> flags = {
+ Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
+ Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
+ };
+ opts.t_custom_flags.emplace_back(std::move(flags));
+ }
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+
ASSERT_OK(cluster_->Start());
}
@@ -109,191 +158,275 @@ class ClientProxiedRpcTest : public KuduTest {
KuduTest::TearDown();
}
- protected:
- static constexpr const char* const kIpAddr = "127.0.0.1";
+ // Verify basic functionality when RPC connections to Kudu masters and tablet
+ // servers are forwarded via a TCP proxy.
+ void Run() {
+ ASSERT_FALSE(nc_.empty());
+
+ const auto kTimeout = MonoDelta::FromSeconds(5);
+ const char* const kTableName = CURRENT_TEST_NAME();
+ const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+ TestWorkload w(cluster_.get());
+ w.set_schema(schema);
+ w.set_table_name(kTableName);
+ w.set_num_replicas(1);
+ w.Setup();
+
+ vector<unique_ptr<Fifo>> m_fifos(M);
+ for (auto i = 0; i < M; ++i) {
+ const auto fname = Substitute("m.fifo.$0", i);
+ ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &m_fifos[i]));
+ }
- uint16_t m_proxied_port_;
- HostPort m_proxied_addr_;
- uint16_t m_proxy_advertised_port_;
- HostPort m_proxy_advertised_addr_;
+ vector<unique_ptr<Fifo>> t_fifos(T);
+ for (auto i = 0; i < T; ++i) {
+ const auto fname = Substitute("t.fifo.$0", i);
+ ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &t_fifos[i]));
+ }
- uint16_t t_proxied_port_;
- HostPort t_proxied_addr_;
- uint16_t t_proxy_advertised_port_;
- HostPort t_proxy_advertised_addr_;
+ // Run TCP proxies for Kudu masters' connections.
+ vector<unique_ptr<Subprocess>> m_proxies;
+ m_proxies.reserve(M);
+ vector<ScopedCleanup<function<void(void)>>> m_proxy_cleanups;
+ m_proxy_cleanups.reserve(M);
+ for (auto i = 0; i < M; ++i) {
+ const auto proxy_cmd_str = Substitute(
+ kProxyCmdPattern,
+ nc_,
+ kIpAddr,
+ m_proxy_advertised_ports_[i],
+ m_proxied_ports_[i],
+ m_fifos[i]->filename());
+ m_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
+
+ auto* proxy = m_proxies.back().get();
+ function<void(void)> cleanup = [proxy] {
+ if (proxy->IsStarted()) {
+ WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
+ Substitute("PID $0: could not stop process", proxy->pid()));
+ }
+ };
+
+ m_proxy_cleanups.emplace_back(std::move(cleanup));
+ }
+ for (auto& p : m_proxies) {
+ ASSERT_OK(p->Start());
+ }
- unique_ptr<ExternalMiniCluster> cluster_;
-};
+ // Run TCP proxies for Kudu tablet servers' connections.
+ vector<unique_ptr<Subprocess>> t_proxies;
+ t_proxies.reserve(T);
+ vector<ScopedCleanup<function<void(void)>>> t_proxy_cleanups;
+ t_proxy_cleanups.reserve(T);
+ for (auto i = 0; i < T; ++i) {
+ const auto proxy_cmd_str = Substitute(
+ kProxyCmdPattern,
+ nc_,
+ kIpAddr,
+ t_proxy_advertised_ports_[i],
+ t_proxied_ports_[i],
+ t_fifos[i]->filename());
+ t_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
+
+ auto* proxy = t_proxies.back().get();
+ function<void(void)> cleanup = [proxy] {
+ if (proxy->IsStarted()) {
+ WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
+ Substitute("PID $0: could not stop process", proxy->pid()));
+ }
+ };
+ t_proxy_cleanups.emplace_back(std::move(cleanup));
+ }
+ for (auto& p : t_proxies) {
+ ASSERT_OK(p->Start());
+ }
-// Verify basic functionality when RPC connections to Kudu master and tablet
-// server are forwarded via a simple TCP proxy.
-TEST_F(ClientProxiedRpcTest, Basic) {
- SKIP_IF_SLOW_NOT_ALLOWED();
+ // Wait for the TCP proxies to start up.
+ for (auto port : m_proxy_advertised_ports_) {
+ ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
+ }
+ for (auto port : t_proxy_advertised_ports_) {
+ ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
+ }
- string nc;
- {
- auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc);
- if (s.IsNotFound()) {
- LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
- GTEST_SKIP();
+ // Build a client to send requests via RPC endpoints advertised by proxy.
+ client::sp::shared_ptr<client::KuduClient> client;
+ {
+ client::KuduClientBuilder b;
+ for (auto i = 0; i < M; ++i) {
+ b.add_master_server_addr(m_proxy_advertised_addrs_[i].ToString());
+ }
+ b.default_admin_operation_timeout(kTimeout);
+ b.default_rpc_timeout(kTimeout);
+ ASSERT_OK(b.Build(&client));
}
- ASSERT_OK(s);
- }
- ASSERT_FALSE(nc.empty());
- const auto kTimeout = MonoDelta::FromSeconds(5);
- constexpr const char* const kTableName = "proxied_rpc_test";
- constexpr const char* const kProxyCmdPattern =
- "trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
- const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
- TestWorkload w(cluster_.get());
- w.set_schema(schema);
- w.set_table_name(kTableName);
- w.set_num_replicas(1);
- w.Setup();
-
- unique_ptr<Fifo> m_fifo;
- ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "m.fifo"), &m_fifo));
-
- unique_ptr<Fifo> t_fifo;
- ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "t.fifo"), &t_fifo));
-
- // Run TCP proxy for Kudu master connections.
- const auto m_proxy_cmd_str = Substitute(
- kProxyCmdPattern, nc,
- kIpAddr, m_proxy_advertised_port_, m_proxied_port_, m_fifo->filename());
- Subprocess m_proxy({"/bin/bash", "-c", m_proxy_cmd_str});
- ASSERT_OK(m_proxy.Start());
- auto m_proxy_cleanup = MakeScopedCleanup([&] {
- m_proxy.KillAndWait(SIGTERM);
- });
-
- // Run TCP proxy for Kudu tablet server connections.
- const auto t_proxy_cmd_str = Substitute(
- kProxyCmdPattern, nc,
- kIpAddr, t_proxy_advertised_port_, t_proxied_port_, t_fifo->filename());
- Subprocess t_proxy({"/bin/bash", "-c", t_proxy_cmd_str});
- ASSERT_OK(t_proxy.Start());
- auto t_proxy_cleanup = MakeScopedCleanup([&] {
- t_proxy.KillAndWait(SIGTERM);
- });
-
- // Wait for the TCP proxies to start up.
- ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, m_proxy_advertised_port_, kTimeout));
- ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, t_proxy_advertised_port_, kTimeout));
-
- // Build a client to send requests via the proxied RPC endpoint.
- client::sp::shared_ptr<client::KuduClient> client;
- {
- client::KuduClientBuilder b;
- b.add_master_server_addr(m_proxy_advertised_addr_.ToString());
- b.default_admin_operation_timeout(kTimeout);
- b.default_rpc_timeout(kTimeout);
- ASSERT_OK(b.Build(&client));
- }
+ // Make sure the client receives the addresses advertised by proxy since
+ // the request came through the proxied RPC address.
+ const vector<string> master_addresses(Split(client->GetMasterAddresses(), ","));
+ for (const auto& hp : m_proxy_advertised_addrs_) {
+ ASSERT_TRUE(std::any_of(master_addresses.begin(), master_addresses.end(),
+ [&hp](const string& e) { return e == hp.ToString(); }));
+ }
+ ASSERT_EQ(m_proxy_advertised_addrs_.size(), master_addresses.size());
+ if (M > 1) {
+ ASSERT_TRUE(client->IsMultiMaster());
+ } else {
+ ASSERT_FALSE(client->IsMultiMaster());
+ }
- // Make sure the client receives proxy advertised addresses since the request
- // came to the proxied RPC address.
- const auto& master_addresses = client->GetMasterAddresses();
- ASSERT_EQ(m_proxy_advertised_addr_.ToString(), master_addresses);
- // Just a sanity check: multiple RPC endpoints shouldn't be treated as
- // a presence of multiple masters in the cluster.
- ASSERT_FALSE(client->IsMultiMaster());
-
- // Check that client sees RPC addresses advertised by TCP proxy for the
- // tablet server.
- vector<KuduTabletServer*> tss;
- ElementDeleter deleter(&tss);
- ASSERT_OK(client->ListTabletServers(&tss));
- ASSERT_EQ(1, tss.size());
- ASSERT_EQ(kIpAddr, tss[0]->hostname());
- ASSERT_EQ(t_proxy_advertised_port_, tss[0]->port());
-
- client::sp::shared_ptr<KuduTable> table;
- ASSERT_OK(client->OpenTable(kTableName, &table));
-
- // Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
- // to send every operation when calling Apply().
- client::sp::shared_ptr<KuduSession> session(client->NewSession());
- ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
- ThreadSafeRandom rng(SeedRandom());
- for (auto i = 0; i < 10; ++i) {
- unique_ptr<KuduInsert> insert(table->NewInsert());
- auto* row = insert->mutable_row();
- GenerateDataForRow(schema, i, &rng, row);
- ASSERT_OK(session->Apply(insert.release()));
- }
- // Call Flush() just in case, but it's a no-op effectively since the chosen
- // session flush mode.
- ASSERT_OK(session->Flush());
-
- // Read the data back.
- {
- KuduScanner scanner(table.get());
- ASSERT_OK(scanner.SetFaultTolerant());
- ASSERT_OK(scanner.Open());
- ASSERT_TRUE(scanner.HasMoreRows());
- KuduScanBatch batch;
-
- int32_t idx = 0;
- while (scanner.HasMoreRows()) {
- ASSERT_OK(scanner.NextBatch(&batch));
- for (const auto& row : batch) {
- int32_t value;
- ASSERT_OK(row.GetInt32(0, &value));
- ASSERT_EQ(idx++, value);
+ // Check that client sees RPC addresses advertised by TCP proxy for the
+ // tablet server.
+ vector<KuduTabletServer*> tss;
+ ElementDeleter deleter(&tss);
+ ASSERT_OK(client->ListTabletServers(&tss));
+ ASSERT_EQ(T, tss.size());
+ for (auto i = 0; i < T; ++i) {
+ ASSERT_EQ(kIpAddr, tss[i]->hostname());
+ }
+ {
+ set<uint16_t> ports;
+ for (auto i = 0; i < T; ++i) {
+ const auto port = tss[i]->port();
+ ports.emplace(port);
+ ASSERT_TRUE(std::any_of(t_proxy_advertised_ports_.begin(),
+ t_proxy_advertised_ports_.end(),
+ [&port](uint16_t e) { return e == port; }));
}
+ // Make sure all ports are different.
+ ASSERT_EQ(T, ports.size());
}
- ASSERT_EQ(10, idx);
- }
- // Make sure the client indeed works through the RPC address advertised by
- // proxy: stop the proxy and check if client could write any data to the table.
- t_proxy_cleanup.cancel();
- ASSERT_OK(t_proxy.KillAndWait(SIGTERM));
- {
- unique_ptr<KuduInsert> insert(table->NewInsert());
- auto* row = insert->mutable_row();
- GenerateDataForRow(schema, 100, &rng, row);
- const auto s = session->Apply(insert.release());
- ASSERT_TRUE(s.IsIOError()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
- }
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+
+ // Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
+ // to send every operation when calling Apply().
+ client::sp::shared_ptr<KuduSession> session(client->NewSession());
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+ ThreadSafeRandom rng(SeedRandom());
+ for (auto i = 0; i < 10; ++i) {
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ auto* row = insert->mutable_row();
+ GenerateDataForRow(schema, i, &rng, row);
+ ASSERT_OK(session->Apply(insert.release()));
+ }
+ // Call Flush() just in case, but it's a no-op effectively since the chosen
+ // session flush mode.
+ ASSERT_OK(session->Flush());
+
+ // Read the data back.
+ {
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
+ ASSERT_OK(scanner.Open());
+ ASSERT_TRUE(scanner.HasMoreRows());
+ KuduScanBatch batch;
+
+ int32_t idx = 0;
+ while (scanner.HasMoreRows()) {
+ ASSERT_OK(scanner.NextBatch(&batch));
+ for (const auto& row : batch) {
+ int32_t value;
+ ASSERT_OK(row.GetInt32(0, &value));
+ ASSERT_EQ(idx++, value);
+ }
+ }
+ ASSERT_EQ(10, idx);
+ }
- // Try reading the data now: this expected to fail since the client works only
- // through the advertised addressed.
- {
- KuduScanner scanner(table.get());
- ASSERT_OK(scanner.SetFaultTolerant());
- ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
- const auto s = scanner.Open();
- ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
- ASSERT_STR_MATCHES(s.ToString(),
- "(timed out after deadline expired|exceeded configured scan timeout)");
- }
+ // Make sure the client indeed works through the RPC addresses advertised by
+ // proxy: stop the proxy and check if client can succeed in writing any data
+ // to the table.
+ for (auto i = 0; i < T; ++i) {
+ t_proxy_cleanups[i].cancel();
+ ASSERT_OK(t_proxies[i]->KillAndWait(SIGTERM));
+ }
+ {
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ auto* row = insert->mutable_row();
+ GenerateDataForRow(schema, 100, &rng, row);
+ const auto s = session->Apply(insert.release());
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+ }
- // Meanwhile, DDL operations should be still possible: connections to master
- // are still being proxied as needed, and master and tablet servers
- // communicate via standard, non-proxied RPC endpoints.
- {
- unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
- alt->AlterColumn("string_val")->RenameTo("str_val");
- ASSERT_OK(alt->Alter());
- }
+ // Try reading the data now: this expected to fail since the client works
+ // only through the advertised addresses.
+ {
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
+ const auto s = scanner.Open();
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_MATCHES(s.ToString(),
+ "(timed out after deadline expired|exceeded configured scan timeout)");
+ }
- // Make sure the client communicates with master via the advertised addresses:
- // once the TCP proxy is shut down, client should not be able to reach master
- // to perform a DDL operation.
- m_proxy_cleanup.cancel();
- ASSERT_OK(m_proxy.KillAndWait(SIGTERM));
- {
- unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
- alt->AlterColumn("str_val")->RenameTo("string_val");
- const auto s = alt->Alter();
- ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
- ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+ // Meanwhile, DDL operations should be still possible: connections to
+ // masters are still being proxied as needed, and masters and tablet servers
+ // communicate via standard, non-proxied RPC endpoints.
+ {
+ unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+ alt->AlterColumn("string_val")->RenameTo("str_val");
+ ASSERT_OK(alt->Alter());
+ }
+
+ // Make sure the client communicates with masters via the advertised
+ // addresses: once the corresponding TCP proxy is shut down, the client
+ // should not be able to reach the master to perform a DDL operation.
+ for (auto i = 0; i < M; ++i) {
+ m_proxy_cleanups[i].cancel();
+ ASSERT_OK(m_proxies[i]->KillAndWait(SIGTERM));
+ }
+ {
+ unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+ alt->AlterColumn("str_val")->RenameTo("string_val");
+ const auto s = alt->Alter();
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
+ ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+ }
}
+
+ protected:
+ static constexpr const char* const kIpAddr = "127.0.0.1";
+ static constexpr const char* const kProxyCmdPattern =
+ "trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
+
+ // Full path to the nc/netcat utility (if present).
+ string nc_;
+
+ array<uint16_t, M> m_proxied_ports_;
+ array<HostPort, M> m_proxied_addrs_;
+ array<uint16_t, M> m_proxy_advertised_ports_;
+ array<HostPort, M> m_proxy_advertised_addrs_;
+
+ array<uint16_t, T> t_proxied_ports_;
+ array<HostPort, T> t_proxied_addrs_;
+ array<uint16_t, T> t_proxy_advertised_ports_;
+ array<HostPort, T> t_proxy_advertised_addrs_;
+
+ unique_ptr<ExternalMiniCluster> cluster_;
+};
+
+typedef ClientProxiedRpcTest<1, 1> ClientProxiedRpc1M1Test;
+TEST_F(ClientProxiedRpc1M1Test, Basic) {
+ NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<1, 3> ClientProxiedRpc1M3Test;
+TEST_F(ClientProxiedRpc1M3Test, Basic) {
+ NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<3, 1> ClientProxiedRpc3M1Test;
+TEST_F(ClientProxiedRpc3M1Test, Basic) {
+ NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<3, 3> ClientProxiedRpc3M3Test;
+TEST_F(ClientProxiedRpc3M3Test, Basic) {
+ NO_FATALS(Run());
}
} // namespace kudu
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 296765b21..79c83ee7c 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -38,6 +38,7 @@
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/master/catalog_manager.h"
@@ -60,6 +61,7 @@
#include "kudu/tserver/tablet_service.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
#include "kudu/util/logging.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/monotime.h"
@@ -112,8 +114,18 @@ DEFINE_string(location_mapping_cmd, "",
"characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
"using location awareness features this flag should not be set.");
+DEFINE_string(master_rpc_proxy_advertised_addresses, "",
+ "RPC endpoints of all masters in this cluster exposed to the "
+ "external network via a TCP proxy. This is a comma-separated "
+ "list of the masters' proxy-advertised RPC addresses, each "
+ "specified via the --rpc_proxy_advertised_addresses "
+ "flag for a single master.");
+TAG_FLAG(master_rpc_proxy_advertised_addresses, experimental);
+
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_bool(txn_manager_enabled);
+DECLARE_string(master_addresses);
+DECLARE_string(rpc_proxy_advertised_addresses);
using kudu::consensus::RaftPeerPB;
using kudu::fs::ErrorHandlerType;
@@ -129,6 +141,7 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Split;
using strings::Substitute;
namespace kudu {
@@ -140,7 +153,40 @@ class RpcContext;
namespace kudu {
namespace master {
+
namespace {
+
+// This validator issues a warning (not an error) to allow for a temporary
+// configurations when adding a new master.
+bool ValidateMultiMasterProxiedRpcFlags() {
+ if (FLAGS_rpc_proxy_advertised_addresses.empty()) {
+ return true;
+ }
+ const vector<string> master_addrs = Split(FLAGS_master_addresses, ",");
+ if (master_addrs.size() <= 1) {
+ return true;
+ }
+ // Make sure --master_rpc_proxy_advertised_addresses is set if a multi-master
+ // flags are detected and --rpc_proxy_advertised_addresses is set as well.
+ // Also, check whether the number of masters in the internal network
+ // corresponds to the number of masters advertised for clients in external
+ // networks.
+ if (FLAGS_master_rpc_proxy_advertised_addresses.empty()) {
+ LOG(WARNING) << Substitute(
+ "--master_rpc_proxy_advertised_addresses should be set as well "
+ "when configuring RPC proxying in a multi-master cluster");
+ }
+ const vector<string> master_proxy_addrs =
+ Split(FLAGS_master_rpc_proxy_advertised_addresses, ",");
+ if (master_proxy_addrs.size() != master_addrs.size()) {
+ LOG(WARNING) << Substitute(
+ "--master_rpc_proxy_advertised_addresses and --master_addresses "
+ "should have same number of elements");
+ }
+ return true;
+}
+GROUP_FLAG_VALIDATOR(multi_master_pp, &ValidateMultiMasterProxiedRpcFlags);
+
constexpr const char* kReplaceMasterMessage =
"this master may return incorrect results and should be replaced";
void CrashMasterOnDiskError(const string& uuid) {
@@ -236,6 +282,17 @@ Status Master::Init() {
FLAGS_tsk_rotation_seconds,
messenger_->shared_token_verifier()));
+ if (!FLAGS_master_rpc_proxy_advertised_addresses.empty()) {
+ vector<HostPort> host_ports;
+ RETURN_NOT_OK(HostPort::ParseStrings(
+ FLAGS_master_rpc_proxy_advertised_addresses, kDefaultPort, &host_ports));
+ if (host_ports.empty()) {
+ return Status::InvalidArgument(
+ "the set of RPC advertised master addresses is empty");
+ }
+ master_rpc_proxy_advertised_hostports_ = std::move(host_ports);
+ }
+
state_ = kInitialized;
return Status::OK();
}
@@ -544,26 +601,37 @@ Status Master::ListMasters(vector<ServerEntryPB>* masters,
}
// For distributed master configuration.
- // TODO(aserbin): update this to work with proxied RPCs
for (const auto& peer : config.peers()) {
- HostPort hp = HostPortFromPB(peer.last_known_addr());
+ const auto hp = HostPortFromPB(peer.last_known_addr());
ServerEntryPB peer_entry;
- Status s = GetMasterEntryForHost(messenger_, hp, &peer_entry);
- if (!s.ok()) {
- s = s.CloneAndPrepend(
- Substitute("Unable to get registration information for peer $0 ($1)",
- peer.permanent_uuid(),
- hp.ToString()));
+ if (auto s = GetMasterEntryForHost(messenger_, hp, &peer_entry); !s.ok()) {
+ const auto errmsg = Substitute("unable to get registration information for peer $0",
+ peer.permanent_uuid());
+ s = s.CloneAndPrepend(errmsg);
+ if (use_external_addr) {
+ StatusToPB(Status::IllegalState(errmsg), peer_entry.mutable_error());
+ } else {
+ StatusToPB(s, peer_entry.mutable_error());
+ }
LOG(WARNING) << s.ToString();
- StatusToPB(s, peer_entry.mutable_error());
} else if (peer_entry.instance_id().permanent_uuid() != peer.permanent_uuid()) {
- StatusToPB(Status::IllegalState(
- Substitute("mismatched UUIDs: expected UUID $0 from master at $1, but got UUID $2",
- peer.permanent_uuid(),
- hp.ToString(),
- peer_entry.instance_id().permanent_uuid())),
+ StatusToPB(Status::IllegalState(Substitute("UUID mismatch: $0 vs $1 expected",
+ peer_entry.instance_id().permanent_uuid(),
+ peer.permanent_uuid())),
peer_entry.mutable_error());
+ LOG(WARNING) << Substitute(
+ "UUID mismatch: $0 vs $1 expected for master at $2",
+ peer_entry.instance_id().permanent_uuid(), peer.permanent_uuid(), hp.ToString());
+ }
+ if (use_external_addr && peer_entry.has_registration()) {
+ auto* reg = peer_entry.mutable_registration();
+ reg->mutable_rpc_addresses()->Swap(reg->mutable_rpc_proxy_addresses());
+ reg->clear_rpc_proxy_addresses();
+
+ reg->mutable_http_addresses()->Swap(reg->mutable_http_proxy_addresses());
+ reg->clear_http_proxy_addresses();
}
+
masters->emplace_back(std::move(peer_entry));
}
return Status::OK();
@@ -724,5 +792,18 @@ Status Master::RemoveMaster(const HostPort& hp, const string& uuid, rpc::RpcCont
matching_uuid, rpc);
}
+const vector<HostPort>& Master::GetProxyAdvertisedHostPorts() const {
+ // In case of multi-master configuration or a single-master configuration
+ // where --master_rpc_proxy_advertised_addresses is specified, get the list
+ // of proxy advertised <host, port> pairs from that flag.
+ DCHECK_NE(kStopped, state_);
+ if (!master_rpc_proxy_advertised_hostports_.empty()) {
+ return master_rpc_proxy_advertised_hostports_;
+ }
+
+ // Otherwise, return whatever is set for --rpc_proxy_advertised_addresses
+ return rpc_server()->GetProxyAdvertisedHostPorts();
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 49b6541fc..38752e032 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -29,12 +29,12 @@
#include "kudu/kserver/kserver.h"
#include "kudu/master/master_options.h"
#include "kudu/server/rpc_server.h"
+#include "kudu/util/net/net_util.h"
#include "kudu/util/promise.h"
#include "kudu/util/status.h"
namespace kudu {
-class HostPort;
class MaintenanceManager;
class MonoDelta;
class MonoTime;
@@ -150,11 +150,15 @@ class Master : public kserver::KuduServer {
}
// A shortcut to get addresses this master server is configured with
- // for processing RPCs proxied from an external network.
+ // for processing RPCs proxied from external networks.
const std::vector<Sockaddr>& rpc_proxied_addresses() const {
return rpc_server()->GetRpcProxiedAddresses();
}
+ // Return addresses advertised at a TCP proxy for clients connecting from
+ // external networks.
+ const std::vector<HostPort>& GetProxyAdvertisedHostPorts() const;
+
private:
friend class MasterTest;
friend class CatalogManager;
@@ -225,6 +229,13 @@ class Master : public kserver::KuduServer {
scoped_refptr<Thread> expired_reserved_tables_deleter_thread_;
+ // RPC endpoints of all masters in this cluster in the external network.
+ // These are sourced from the --master_rpc_proxy_advertised_addresses flag.
+ // These are assumed to be proxied/forwarded to the corresponding RPC
+ // endpoints of this cluster's masters, where latter are specified by
+ // --rpc_proxied_addresses for each master correspondingly.
+ std::vector<HostPort> master_rpc_proxy_advertised_hostports_;
+
DISALLOW_COPY_AND_ASSIGN(Master);
};
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 7af0bd754..234008826 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -51,7 +51,6 @@
#include "kudu/security/token.pb.h"
#include "kudu/security/token_signer.h"
#include "kudu/security/token_verifier.h"
-#include "kudu/server/rpc_server.h"
#include "kudu/server/server_base.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
@@ -814,11 +813,10 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
// Set the info about the other masters, so that the client can verify
// it has the full set of info.
{
- // Check if the request came to the dedicated RPC endpoint meaning
- // it's been proxied from the outside network.
+ // Check if the request came through the dedicated RPC endpoint meaning
+ // it's been proxied from external network.
if (IsAddrOneOf(rpc->local_address(), server_->rpc_proxied_addresses())) {
- // TODO(aserbin): adapt this to multi-master configuration
- for (const auto& hp : server_->rpc_server()->GetProxyAdvertisedHostPorts()) {
+ for (const auto& hp : server_->GetProxyAdvertisedHostPorts()) {
*resp->add_master_addrs() = HostPortToPB(hp);
}
} else {
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 480581adb..2cb5187dd 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -26,6 +26,7 @@
#include <memory>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_set>
#include <utility>
@@ -97,6 +98,7 @@ using kudu::tserver::ListTabletsRequestPB;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::TabletServerAdminServiceProxy;
using kudu::tserver::TabletServerServiceProxy;
+using std::back_inserter;
using std::copy;
using std::map;
using std::pair;
@@ -633,7 +635,16 @@ Status ExternalMiniCluster::AddTabletServer() {
vector<string> extra_flags;
RETURN_NOT_OK(AddTimeSourceFlags(idx, &extra_flags));
auto flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
- copy(flags.begin(), flags.end(), std::back_inserter(extra_flags));
+ copy(flags.begin(), flags.end(), back_inserter(extra_flags));
+
+ // Add custom tablet server flags, specific to this particular tablet server
+ // instance.
+ if (!opts_.t_custom_flags.empty()) {
+ CHECK_EQ(opts_.num_tablet_servers, opts_.t_custom_flags.size());
+ const auto& custom_flags = opts_.t_custom_flags[idx];
+ copy(custom_flags.begin(), custom_flags.end(), back_inserter(extra_flags));
+ }
+
opts.extra_flags = extra_flags;
if (!opts_.tserver_alias_prefix.empty()) {
opts.extra_flags.emplace_back(
@@ -657,7 +668,8 @@ Status ExternalMiniCluster::AddTabletServer() {
return Status::OK();
}
-Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addrs, int idx,
+Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addrs,
+ size_t idx,
scoped_refptr<ExternalMaster>* master) {
DCHECK_LT(idx, master_rpc_addrs.size());
vector<string> flags;
@@ -720,9 +732,16 @@ Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addr
flags.emplace_back(Substitute("--ranger_kms_url=$0", ranger_kms_->url()));
}
}
- // Add custom master flags.
+ // Add extra master flags, common for all masters in the cluster.
copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(),
- std::back_inserter(flags));
+ back_inserter(flags));
+
+ // Add custom master flags, specific to this particular master instance.
+ if (!opts_.m_custom_flags.empty()) {
+ CHECK_EQ(opts_.num_masters, opts_.m_custom_flags.size());
+ const auto& custom_flags = opts_.m_custom_flags[idx];
+ copy(custom_flags.begin(), custom_flags.end(), back_inserter(flags));
+ }
string daemon_id = Substitute("master-$0", idx);
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index b06e66ccb..e1ab188b0 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -19,6 +19,7 @@
#include <sys/types.h>
+#include <cstddef>
#include <cstdint>
#include <functional>
#include <map>
@@ -48,6 +49,7 @@ class Env;
class NodeInstancePB;
class Sockaddr;
class Subprocess;
+
namespace ranger {
class MiniRanger;
} // namespace ranger
@@ -140,6 +142,9 @@ enum class BuiltinNtpConfigMode {
};
#endif
+// TODO(aserbin): maybe, turn this struct into a class template based on number
+// of master and tablet servers, changing std::vector to
+// std::array for {masters,tservers}_custom_flags
struct ExternalMiniClusterOptions {
ExternalMiniClusterOptions();
@@ -192,6 +197,30 @@ struct ExternalMiniClusterOptions {
// If unset, addresses are assigned automatically.
std::vector<HostPort> master_rpc_addresses;
+ // Custom flags for masters in the cluster. These are per-instance,
+ // i.e. not common across all masters in the cluster:
+ // 'm_custom_flags[i]' are the flags for master at index 'i'.
+ //
+ // If not empty, must contain the same number of elements as the number
+ // of masters in the cluster. In other words, the invariant is
+ //
+ // m_custom_flags.empty() || m_custom_flags.size() == num_masters
+ //
+ // Default: empty
+ std::vector<std::vector<std::string>> m_custom_flags;
+
+ // Custom flags for tablet servers in the cluster. These are per-instance,
+ // i.e. not common across all tablet servers in the cluster:
+ // 't_custom_flags[i]' are the flags for tablet servers at index 'i'.
+ //
+ // If not empty, must contain the same number of elements as the number
+ // of tablet servers in the cluster. In other words, the invariant is
+ //
+ // t_custom_flags.empty() || t_custom_flags.size() == num_tablet_servers
+ //
+ // Default: empty
+ std::vector<std::vector<std::string>> t_custom_flags;
+
// Options to configure the MiniKdc before starting it up.
// Only used when 'enable_kerberos' is 'true'.
MiniKdcOptions mini_kdc_options;
@@ -559,7 +588,8 @@ class ExternalMiniCluster : public MiniCluster {
//
// It's expected that the port for the master at 'idx' is reserved, and that
// the master can be run with the --rpc_reuseport flag.
- Status CreateMaster(const std::vector<HostPort>& master_rpc_addrs, int idx,
+ Status CreateMaster(const std::vector<HostPort>& master_rpc_addrs,
+ size_t idx,
scoped_refptr<ExternalMaster>* master);
private: