You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2018/11/08 19:02:53 UTC

kudu git commit: Add "network_plane" as part of ConnectionId

Repository: kudu
Updated Branches:
  refs/heads/master 3906d94a4 -> 208369547


Add "network_plane" as part of ConnectionId

The motivation for doing so is to allow N services on the same host to
be multiplexed on M different connections. For instance, a server may
host multiple KRPC based services: one for control command and one for
data transfer. Separating the connections between the control channel
and the data channel prevents unnecessary delays of the control commands
due to being stuck behind large data transfers from client to server.

By default, the network_plane of a new ConnectionId is not set.
A user can change it to a different value by calling
Proxy::set_network_plane() on the ConnectionId.

Change-Id: I6767e631fd9530ea54f5ed63ff4c8c179ab216b2
Reviewed-on: http://gerrit.cloudera.org:8080/11681
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/20836954
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/20836954
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/20836954

Branch: refs/heads/master
Commit: 2083695473ee1635e0298049044a9f28b3459644
Parents: 3906d94
Author: Michael Ho <kw...@cloudera.com>
Authored: Sat Oct 13 00:08:19 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Nov 8 19:02:10 2018 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection_id.cc |  20 +++++--
 src/kudu/rpc/connection_id.h  |  10 +++-
 src/kudu/rpc/messenger.h      |   1 +
 src/kudu/rpc/proxy.cc         |  10 +++-
 src/kudu/rpc/proxy.h          |  13 ++++-
 src/kudu/rpc/reactor.h        |   3 +-
 src/kudu/rpc/rpc-test.cc      | 112 ++++++++++++++++++++++++++++++-------
 7 files changed, 138 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection_id.cc b/src/kudu/rpc/connection_id.cc
index 6720807..ab98715 100644
--- a/src/kudu/rpc/connection_id.cc
+++ b/src/kudu/rpc/connection_id.cc
@@ -33,7 +33,7 @@ namespace rpc {
 ConnectionId::ConnectionId() {}
 
 ConnectionId::ConnectionId(const Sockaddr& remote,
-                           std::string hostname,
+                           string hostname,
                            UserCredentials user_credentials)
     : remote_(remote),
       hostname_(std::move(hostname)),
@@ -46,6 +46,10 @@ void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
   user_credentials_ = std::move(user_credentials);
 }
 
+void ConnectionId::set_network_plane(string network_plane) {
+  network_plane_ = std::move(network_plane);
+}
+
 string ConnectionId::ToString() const {
   string remote;
   if (hostname_ != remote_.host()) {
@@ -54,9 +58,15 @@ string ConnectionId::ToString() const {
     remote = remote_.ToString();
   }
 
-  return strings::Substitute("{remote=$0, user_credentials=$1}",
+  string network_plane;
+  if (!network_plane_.empty()) {
+    network_plane = strings::Substitute(", network_plane=$0", network_plane_);
+  }
+
+  return strings::Substitute("{remote=$0, user_credentials=$1$2}",
                              remote,
-                             user_credentials_.ToString());
+                             user_credentials_.ToString(),
+                             network_plane);
 }
 
 size_t ConnectionId::HashCode() const {
@@ -64,13 +74,15 @@ size_t ConnectionId::HashCode() const {
   boost::hash_combine(seed, remote_.HashCode());
   boost::hash_combine(seed, hostname_);
   boost::hash_combine(seed, user_credentials_.HashCode());
+  boost::hash_combine(seed, network_plane_);
   return seed;
 }
 
 bool ConnectionId::Equals(const ConnectionId& other) const {
   return remote() == other.remote() &&
       hostname_ == other.hostname_ &&
-      user_credentials().Equals(other.user_credentials());
+      user_credentials().Equals(other.user_credentials()) &&
+      network_plane_ == other.network_plane_;
 }
 
 size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection_id.h b/src/kudu/rpc/connection_id.h
index 67a4786..6ec98f7 100644
--- a/src/kudu/rpc/connection_id.h
+++ b/src/kudu/rpc/connection_id.h
@@ -50,8 +50,10 @@ class ConnectionId {
 
   const UserCredentials& user_credentials() const { return user_credentials_; }
 
-  // Copy state from another object to this one.
-  void CopyFrom(const ConnectionId& other);
+  // The network plane associated with this connection.
+  void set_network_plane(std::string network_plane);
+
+  const std::string& network_plane() const { return network_plane_; }
 
   // Returns a string representation of the object, not including the password field.
   std::string ToString() const;
@@ -68,6 +70,10 @@ class ConnectionId {
   std::string hostname_;
 
   UserCredentials user_credentials_;
+
+  // The name of the network plane adopted by this connection. Please see header comemnts
+  // at proxy.h for details.
+  std::string network_plane_;
 };
 
 class ConnectionIdHash {

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 64a804b..b3a78e0 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -329,6 +329,7 @@ class Messenger {
   FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
   FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
   FRIEND_TEST(TestRpc, TestCredentialsPolicy);
+  FRIEND_TEST(TestRpc, TestConnectionNetworkPlane);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 
   explicit Messenger(const MessengerBuilder &bld);

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 54b8085..24668ab 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -102,10 +102,16 @@ Status Proxy::SyncRequest(const string& method,
   return controller->status();
 }
 
-void Proxy::set_user_credentials(const UserCredentials& user_credentials) {
+void Proxy::set_user_credentials(UserCredentials user_credentials) {
   CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
     << "It is illegal to call set_user_credentials() after request processing has started";
-  conn_id_.set_user_credentials(user_credentials);
+  conn_id_.set_user_credentials(std::move(user_credentials));
+}
+
+void Proxy::set_network_plane(string network_plane) {
+  CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+    << "It is illegal to call set_network_plane() after request processing has started";
+  conn_id_.set_network_plane(std::move(network_plane));
 }
 
 std::string Proxy::ToString() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index 92b3079..641c514 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -50,6 +50,11 @@ class UserCredentials;
 // re-established as necessary by the messenger. Additionally, the messenger is
 // likely to multiplex many Proxy objects on the same connection.
 //
+// A proxy object can optionally specify the "network plane" it uses. This allows
+// proxies of N services to be multiplexed on M TCP connections so that a higher priority
+// service (e.g. a control channel) may use a different connection than other services,
+// avoiding the chance of being blocked by traffic of other services.
+//
 // Proxy objects are thread-safe after initialization only.
 // Setters on the Proxy are not thread-safe, and calling a setter after any RPC
 // request has started will cause a fatal error.
@@ -104,11 +109,17 @@ class Proxy {
                      RpcController* controller) const;
 
   // Set the user credentials which should be used to log in.
-  void set_user_credentials(const UserCredentials& user_credentials);
+  void set_user_credentials(UserCredentials user_credentials);
 
   // Get the user credentials which should be used to log in.
   const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); }
 
+  // Set the network plane which this proxy uses.
+  void set_network_plane(std::string network_plane);
+
+  // Get the network plane which this proxy uses.
+  const std::string& network_plane() const { return conn_id_.network_plane(); }
+
   std::string ToString() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index 8884f54..ce251c1 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -137,7 +137,8 @@ class ReactorThread {
   friend class Connection;
 
   // Client-side connection map. Multiple connections could be open to a remote
-  // server if multiple credential policies are used for individual RPCs.
+  // server if multiple credential policies or different network planes are used
+  // for individual RPCs.
   typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
                                   ConnectionIdHash, ConnectionIdEqual>
       conn_multimap_t;

http://git-wip-us.apache.org/repos/asf/kudu/blob/20836954/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 1cffdfd..2af84a1 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -630,13 +630,13 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // Make an RPC call with ANY_CREDENTIALS policy.
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(0, metrics.total_client_connections_);
-  EXPECT_EQ(1, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_server_connections_);
-  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(1, metrics.total_client_connections_);
-  EXPECT_EQ(0, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_client_connections_);
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(1, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_client_connections_);
 
   // This is to allow all the data to be sent so the connection becomes idle.
   SleepFor(MonoDelta::FromMilliseconds(5));
@@ -647,13 +647,13 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName,
                            CredentialsPolicy::PRIMARY_CREDENTIALS));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(0, metrics.total_client_connections_);
-  EXPECT_EQ(2, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_server_connections_);
-  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(2, metrics.total_client_connections_);
-  EXPECT_EQ(0, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_client_connections_);
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(2, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(2, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_client_connections_);
 
   // Make another RPC call with ANY_CREDENTIALS policy. The already established
   // connection with PRIMARY_CREDENTIALS policy should be re-used because
@@ -661,13 +661,83 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // the currently open connection has been established with.
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(0, metrics.total_client_connections_);
-  EXPECT_EQ(2, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_server_connections_);
-  EXPECT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
-  EXPECT_EQ(2, metrics.total_client_connections_);
-  EXPECT_EQ(0, metrics.total_server_connections_);
-  EXPECT_EQ(1, metrics.num_client_connections_);
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(2, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(2, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_client_connections_);
+}
+
+// Test that proxies with different network planes will open separate connections to server.
+TEST_P(TestRpc, TestConnectionNetworkPlane) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+
+  // Keep the connection alive all the time.
+  keepalive_time_ms_ = -1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+  // Set up clients with default and non-default network planes.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+  Proxy p1(client_messenger, server_addr, server_addr.host(),
+           GenericCalculatorService::static_service_name());
+  Proxy p2(client_messenger, server_addr, server_addr.host(),
+           GenericCalculatorService::static_service_name());
+  p2.set_network_plane("control-channel");
+
+  // Verify the initial counters.
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(0, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(0, metrics.num_client_connections_);
+
+  // Make an RPC call with the default network plane.
+  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(1, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(1, metrics.num_client_connections_);
+
+  // Make an RPC call with the non-default network plane.
+  ASSERT_OK(DoTestSyncCall(p2, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(2, metrics.total_server_connections_);
+  ASSERT_EQ(2, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(2, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(2, metrics.num_client_connections_);
+
+  // Make an RPC call with the default network plane again and verify that
+  // there are no new connections.
+  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(2, metrics.total_server_connections_);
+  ASSERT_EQ(2, metrics.num_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(2, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_EQ(2, metrics.num_client_connections_);
 }
 
 // Test that a call which takes longer than the keepalive time