You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/05/18 01:10:23 UTC

[1/2] kudu git commit: log: change default retention to one segment

Repository: kudu
Updated Branches:
  refs/heads/master 30d759274 -> c178563f6


log: change default retention to one segment

We've been testing this on some various clusters recently and it doesn't
seem to have any adverse effects. Additionally, it reduces the amount
of space used by the WAL for idle tablets, and it reduces startup time
since there are fewer WALs to replay. So, it's probably a good idea to
change the default.

Change-Id: I3094f36949b3e518e872993b3026f8314e271a5e
Reviewed-on: http://gerrit.cloudera.org:8080/6907
Reviewed-by: David Ribeiro Alves <da...@gmail.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7dd30b1c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7dd30b1c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7dd30b1c

Branch: refs/heads/master
Commit: 7dd30b1c9c2e362efe2b81eb3acccd5c3b003e78
Parents: 30d7592
Author: Todd Lipcon <to...@cloudera.com>
Authored: Tue May 16 16:35:27 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed May 17 21:53:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test.cc                     | 1 +
 src/kudu/consensus/log.cc                          | 2 +-
 src/kudu/integration-tests/raft_consensus-itest.cc | 8 ++++----
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7dd30b1c/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 12394a9..7101af9 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -481,6 +481,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
 
 // Tests that segments can be GC'd while the log is running.
 TEST_P(LogTestOptionalCompression, TestGCWithLogRunning) {
+  FLAGS_log_min_segments_to_retain = 2;
   ASSERT_OK(BuildLog());
 
   vector<LogAnchor*> anchors;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7dd30b1c/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 8acc3cf..23f80e2 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -56,7 +56,7 @@
 
 // Log retention configuration.
 // -----------------------------
-DEFINE_int32(log_min_segments_to_retain, 2,
+DEFINE_int32(log_min_segments_to_retain, 1,
              "The minimum number of past log segments to keep at all times,"
              " regardless of what is required for durability. "
              "Must be at least 1.");

http://git-wip-us.apache.org/repos/asf/kudu/blob/7dd30b1c/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 8d453b4..d461451 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -631,10 +631,10 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
     // Use short and synchronous rolls so that we can test log segment retention.
     "--log_segment_size_mb=1",
     "--log_async_preallocate_segments=false",
-    // Run the maintenance manager frequently so that we don't have to wait
-    // long for GC.
+    // Run the maintenance manager frequently and flush quickly,
+    // so that we don't have to wait long for GC.
     "--maintenance_manager_polling_interval_ms=100",
-    "--log_target_replay_size_mb=1",
+    "--flush_threshold_secs=3",
     // We write 128KB cells in this test, so bump the limit.
     "--max_cell_size_bytes=1000000",
     // And disable WAL compression so the 128KB cells don't get compressed away.
@@ -680,7 +680,7 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
       for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
         SCOPED_TRACE(Substitute("TS $0", i));
         int num_wals = inspect_->CountFilesInWALDirForTS(i, tablet_id_, "wal-*");
-        ASSERT_EQ(2, num_wals);
+        ASSERT_EQ(1, num_wals);
       }
     });
 }


[2/2] kudu git commit: KUDU-1875: Refuse unauthenticated connections from publicly routable IP addrs

Posted by to...@apache.org.
KUDU-1875: Refuse unauthenticated connections from publicly routable
IP addrs

This rejects unauthenticated connections from publicly routable IPs,
even if authentication and encryption are not configured.

An adavanced flag 'trusted_subnets' is provided to whitelist
trusted subnets. If this flag is set explicitly, all unauthenticated
or unencrypted connections are prohibited except the ones from the
specified address blocks. Otherwise, private network (127.0.0.0/8,
etc.) and local subnets of all local network interfaces will be used.
Set it to '0.0.0.0/0' allows unauthenticated/unencrypted connections
from all remote IP addresses. However, if network access is not
otherwise restricted by a firewall, malicious users may be able to
gain unauthorized access.

Change-Id: I6c3fbb5491785874c5701d6c9d866949cfac905e
Reviewed-on: http://gerrit.cloudera.org:8080/6514
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c178563f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c178563f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c178563f

Branch: refs/heads/master
Commit: c178563f646f1b0e9a9a562d02a252cbe20d820d
Parents: 7dd30b1
Author: hahao <ha...@cloudera.com>
Authored: Thu Mar 30 01:08:33 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu May 18 00:25:11 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/negotiation-test.cc   | 103 +++++++++++++++++++++++++++++++-
 src/kudu/rpc/server_negotiation.cc | 102 ++++++++++++++++++++++++++++++-
 src/kudu/rpc/server_negotiation.h  |   3 +
 src/kudu/util/net/net_util-test.cc |  25 ++++++++
 src/kudu/util/net/net_util.cc      |  76 ++++++++++++++++++++++-
 src/kudu/util/net/net_util.h       |  32 +++++++++-
 src/kudu/util/net/socket.h         |   3 +-
 7 files changed, 337 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 329a2b5..68185bb 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -122,6 +122,8 @@ struct NegotiationDescriptor {
   EndpointConfig client;
   EndpointConfig server;
 
+  bool use_test_socket;
+
   bool rpc_encrypt_loopback;
 
   // The expected client status from negotiating.
@@ -146,6 +148,14 @@ std::ostream& operator<<(std::ostream& o, NegotiationDescriptor c) {
   return o;
 }
 
+class NegotiationTestSocket : public Socket {
+ public:
+  // Return an arbitrary public IP
+  Status GetPeerAddress(Sockaddr *cur_addr) const override {
+    return cur_addr->ParseString("8.8.8.8:12345", 0);
+  }
+};
+
 class TestNegotiation : public RpcTestBase,
                         public ::testing::WithParamInterface<NegotiationDescriptor> {
  public:
@@ -207,7 +217,10 @@ TEST_P(TestNegotiation, TestNegotiation) {
   ASSERT_OK(client_socket->Init(0));
   client_socket->Connect(server_addr);
 
-  unique_ptr<Socket> server_socket(new Socket());
+  unique_ptr<Socket> server_socket(desc.use_test_socket ?
+                                   new NegotiationTestSocket() :
+                                   new Socket());
+
   Sockaddr client_addr;
   CHECK_OK(listening_socket.Accept(server_socket.get(), &client_addr, 0));
 
@@ -270,9 +283,20 @@ TEST_P(TestNegotiation, TestNegotiation) {
   Status client_status;
   Status server_status;
   thread client_thread([&] () {
+      scoped_refptr<Trace> t(new Trace());
+      ADOPT_TRACE(t.get());
       client_status = client_negotiation.Negotiate();
       // Close the socket so that the server will not block forever on error.
       client_negotiation.socket()->Close();
+
+      if (FLAGS_rpc_trace_negotiation || !client_status.ok()) {
+        string msg = Trace::CurrentTrace()->DumpToString();
+        if (!client_status.ok()) {
+          LOG(WARNING) << "Failed client RPC negotiation. Client trace:\n" << msg;
+        } else {
+          LOG(INFO) << "RPC negotiation tracing enabled. Client trace:\n" << msg;
+        }
+      }
   });
   thread server_thread([&] () {
       scoped_refptr<Trace> t(new Trace());
@@ -284,9 +308,9 @@ TEST_P(TestNegotiation, TestNegotiation) {
       if (FLAGS_rpc_trace_negotiation || !server_status.ok()) {
         string msg = Trace::CurrentTrace()->DumpToString();
         if (!server_status.ok()) {
-          LOG(WARNING) << "Failed RPC negotiation. Trace:\n" << msg;
+          LOG(WARNING) << "Failed server RPC negotiation. Server trace:\n" << msg;
         } else {
-          LOG(INFO) << "RPC negotiation tracing enabled. Trace:\n" << msg;
+          LOG(INFO) << "RPC negotiation tracing enabled. Server trace:\n" << msg;
         }
       }
   });
@@ -367,6 +391,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::NotAuthorized(".*client is not configured with an authentication type"),
           Status::NetworkError(""),
           AuthenticationType::INVALID,
@@ -390,6 +415,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::NotAuthorized(".* server mechanism list is empty"),
           Status::NotAuthorized(".* server mechanism list is empty"),
           AuthenticationType::INVALID,
@@ -413,6 +439,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -436,6 +463,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -459,6 +487,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -482,6 +511,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -505,6 +535,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::NotAuthorized(".*client does not have Kerberos enabled"),
           Status::NetworkError(""),
           AuthenticationType::INVALID,
@@ -528,6 +559,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             false,
             RpcEncryption::OPTIONAL,
           },
+          false,
           true,
           Status::OK(),
           Status::OK(),
@@ -554,6 +586,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -577,6 +610,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -600,6 +634,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::CERTIFICATE,
@@ -623,6 +658,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::TOKEN,
@@ -649,6 +685,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -672,6 +709,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::OPTIONAL,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::CERTIFICATE,
@@ -695,6 +733,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::NotAuthorized(".*client does not support required TLS encryption"),
           Status::NotAuthorized(".*client does not support required TLS encryption"),
           AuthenticationType::SASL,
@@ -718,6 +757,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::DISABLED,
           },
           false,
+          false,
           Status::NotAuthorized(".*server does not support required TLS encryption"),
           Status::NetworkError(""),
           AuthenticationType::SASL,
@@ -741,6 +781,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -764,6 +805,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -787,6 +829,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -810,6 +853,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
@@ -833,6 +877,7 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::NotAuthorized(".*client does not support required TLS encryption"),
           Status::NotAuthorized(".*client does not support required TLS encryption"),
           AuthenticationType::SASL,
@@ -856,11 +901,63 @@ INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
             RpcEncryption::REQUIRED,
           },
           false,
+          false,
           Status::OK(),
           Status::OK(),
           AuthenticationType::SASL,
           SaslMechanism::GSSAPI,
           true,
+        },
+
+        // client: PLAIN
+        // server: PLAIN
+        // connection from public routable IP
+        NegotiationDescriptor {
+            EndpointConfig {
+                PkiConfig::NONE,
+                { SaslMechanism::PLAIN },
+                false,
+                RpcEncryption::OPTIONAL
+            },
+            EndpointConfig {
+                PkiConfig::NONE,
+                { SaslMechanism::PLAIN },
+                false,
+                RpcEncryption::OPTIONAL
+            },
+            true,
+            false,
+            Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+            Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+            AuthenticationType::SASL,
+            SaslMechanism::PLAIN,
+            false,
+        },
+
+        // client: GSSAPI, TLS required, externally-signed cert
+        // server: GSSAPI, TLS required, externally-signed cert
+        // connection from public routable IP
+        NegotiationDescriptor {
+            EndpointConfig {
+                PkiConfig::EXTERNALLY_SIGNED,
+                { SaslMechanism::GSSAPI },
+                false,
+                RpcEncryption::REQUIRED,
+            },
+            EndpointConfig {
+                PkiConfig::EXTERNALLY_SIGNED,
+                { SaslMechanism::GSSAPI },
+                false,
+                RpcEncryption::REQUIRED,
+            },
+            true,
+            // true as no longer a loopback connection.
+            true,
+            Status::OK(),
+            Status::OK(),
+            AuthenticationType::SASL,
+            SaslMechanism::GSSAPI,
+            true,
         }
 ));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 7cebf8a..5e6d070 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/endian.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/blocking_ops.h"
 #include "kudu/rpc/constants.h"
@@ -45,6 +46,7 @@
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -65,10 +67,45 @@ TAG_FLAG(rpc_inject_invalid_authn_token_ratio, unsafe);
 
 DECLARE_bool(rpc_encrypt_loopback_connections);
 
+DEFINE_string(trusted_subnets,
+              "127.0.0.0/8,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,169.254.0.0/16",
+              "A trusted subnet whitelist. If set explicitly, all unauthenticated "
+              "or unencrypted connections are prohibited except the ones from the "
+              "specified address blocks. Otherwise, private network (127.0.0.0/8, etc.) "
+              "and local subnets of all local network interfaces will be used. Set it "
+              "to '0.0.0.0/0' to allow unauthenticated/unencrypted connections from all "
+              "remote IP addresses. However, if network access is not otherwise restricted "
+              "by a firewall, malicious users may be able to gain unauthorized access.");
+TAG_FLAG(trusted_subnets, advanced);
+TAG_FLAG(trusted_subnets, evolving);
+
+static bool ValidateTrustedSubnets(const char* /*flagname*/, const string& value) {
+  if (value.empty()) {
+    return true;
+  }
+
+  for (const auto& t : strings::Split(value, ",", strings::SkipEmpty())) {
+    kudu::Network network;
+    kudu::Status s = network.ParseCIDRString(t.ToString());
+    if (!s.ok()) {
+      LOG(ERROR) << "Invalid subnet address: " << t
+                 << ". Subnet must be specified in CIDR notation.";
+      return false;
+    }
+  }
+
+  return true;
+}
+
+DEFINE_validator(trusted_subnets, &ValidateTrustedSubnets);
 
 namespace kudu {
 namespace rpc {
 
+namespace {
+vector<Network>* g_trusted_subnets = nullptr;
+} // anonymous namespace
+
 static int ServerNegotiationGetoptCb(ServerNegotiation* server_negotiation,
                                      const char* plugin_name,
                                      const char* option,
@@ -174,6 +211,30 @@ Status ServerNegotiation::Negotiate() {
     tls_negotiated_ = true;
   }
 
+  // Rejects any connection from public routable IPs if encryption
+  // is disabled. See KUDU-1875.
+  if (!tls_negotiated_) {
+    Sockaddr addr;
+    RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+
+    if (!IsTrustedConnection(addr)) {
+      // Receives client response before sending error
+      // message, even though the response is never used,
+      // to avoid risk condition that connection gets
+      // closed before client receives server's error
+      // message.
+      NegotiatePB request;
+      RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+
+      Status s = Status::NotAuthorized("unencrypted connections from publicly routable "
+                                       "IPs are prohibited. See --trusted_subnets flag "
+                                       "for more information.",
+                                       addr.ToString());
+      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+      return s;
+    }
+  }
+
   // Step 4: Authentication
   switch (negotiated_authn_) {
     case AuthenticationType::SASL:
@@ -284,7 +345,7 @@ Status ServerNegotiation::SendError(ErrorStatusPB::RpcErrorCodePB code, const St
   msg.set_code(code);
   msg.set_message(err.ToString());
 
-  TRACE("Sending RPC error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code));
+  TRACE("Sending RPC error: $0: $1", ErrorStatusPB::RpcErrorCodePB_Name(code), err.ToString());
   RETURN_NOT_OK(SendFramedMessageBlocking(socket(), header, msg, deadline_));
 
   return Status::OK();
@@ -702,6 +763,22 @@ Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) {
 
   negotiated_mech_ = SaslMechanism::value_of(mechanism);
 
+  // Rejects any connection from public routable IPs if authentication mechanism
+  // is plain. See KUDU-1875.
+  if (negotiated_mech_ == SaslMechanism::PLAIN) {
+    Sockaddr addr;
+    RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+
+    if (!IsTrustedConnection(addr)) {
+      Status s = Status::NotAuthorized("unauthenticated connections from publicly "
+                                       "routable IPs are prohibited. See "
+                                       "--trusted_subnets flag for more information.",
+                                       addr.ToString());
+      RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+      return s;
+    }
+  }
+
   // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
   // integrity protection so that the channel bindings and nonce can be
   // verified.
@@ -876,5 +953,28 @@ int ServerNegotiation::PlainAuthCb(sasl_conn_t* /*conn*/,
   return SASL_OK;
 }
 
+bool ServerNegotiation::IsTrustedConnection(const Sockaddr& addr) {
+  static std::once_flag once;
+  std::call_once(once, [] {
+    g_trusted_subnets = new vector<Network>();
+    CHECK_OK(Network::ParseCIDRStrings(FLAGS_trusted_subnets, g_trusted_subnets));
+
+    // If --trusted_subnets is not set explicitly, local subnets of all local network
+    // interfaces as well as the default private subnets will be used.
+    if (google::GetCommandLineFlagInfoOrDie("trusted_subnets").is_default) {
+      std::vector<Network> local_networks;
+      WARN_NOT_OK(GetLocalNetworks(&local_networks),
+                  "Unable to get local networks.");
+
+      g_trusted_subnets->insert(g_trusted_subnets->end(),
+                                local_networks.begin(),
+                                local_networks.end());
+    }
+  });
+
+  return std::any_of(g_trusted_subnets->begin(), g_trusted_subnets->end(),
+                     [&](const Network& t) { return t.WithinNetwork(addr); });
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 07e0057..e9e945a 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -204,6 +204,9 @@ class ServerNegotiation {
   // Receive and validate the ConnectionContextPB.
   Status RecvConnectionContext(faststring* recv_buf) WARN_UNUSED_RESULT;
 
+  // Returns true if connection is from trusted subnets or local networks.
+  bool IsTrustedConnection(const Sockaddr& addr);
+
   // The socket to the remote client.
   std::unique_ptr<Socket> socket_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/util/net/net_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/net_util-test.cc b/src/kudu/util/net/net_util-test.cc
index b1c33ef..c77b054 100644
--- a/src/kudu/util/net/net_util-test.cc
+++ b/src/kudu/util/net/net_util-test.cc
@@ -92,6 +92,31 @@ TEST_F(NetUtilTest, TestResolveAddresses) {
   ASSERT_OK(hp.ResolveAddresses(nullptr));
 }
 
+TEST_F(NetUtilTest, TestWithinNetwork) {
+  Sockaddr addr;
+  Network network;
+
+  ASSERT_OK(addr.ParseString("10.0.23.0:12345", 0));
+  ASSERT_OK(network.ParseCIDRString("10.0.0.0/8"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("172.28.3.4:0", 0));
+  ASSERT_OK(network.ParseCIDRString("172.16.0.0/12"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("192.168.0.23", 0));
+  ASSERT_OK(network.ParseCIDRString("192.168.1.14/16"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("8.8.8.8:0", 0));
+  ASSERT_OK(network.ParseCIDRString("0.0.0.0/0"));
+  EXPECT_TRUE(network.WithinNetwork(addr));
+
+  ASSERT_OK(addr.ParseString("192.169.0.23", 0));
+  ASSERT_OK(network.ParseCIDRString("192.168.0.0/16"));
+  EXPECT_FALSE(network.WithinNetwork(addr));
+}
+
 // Ensure that we are able to do a reverse DNS lookup on various IP addresses.
 // The reverse lookups should never fail, but may return numeric strings.
 TEST_F(NetUtilTest, TestReverseLookup) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/util/net/net_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index e1f26cb..ac5c67f 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -16,9 +16,10 @@
 // under the License.
 
 #include <arpa/inet.h>
+#include <ifaddrs.h>
+#include <netdb.h>
 #include <sys/types.h>
 #include <sys/socket.h>
-#include <netdb.h>
 
 #include <algorithm>
 #include <boost/functional/hash.hpp>
@@ -27,6 +28,7 @@
 #include <utility>
 #include <vector>
 
+#include "kudu/gutil/endian.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
@@ -41,6 +43,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/subprocess.h"
 #include "kudu/util/trace.h"
@@ -173,6 +176,50 @@ string HostPort::ToCommaSeparatedString(const vector<HostPort>& hostports) {
   return JoinStrings(hostport_strs, ",");
 }
 
+Network::Network()
+  : addr_(0),
+    netmask_(0) {
+}
+
+Network::Network(uint32_t addr, uint32_t netmask)
+  : addr_(addr), netmask_(netmask) {}
+
+bool Network::WithinNetwork(const Sockaddr& addr) const {
+  return ((addr.addr().sin_addr.s_addr & netmask_) ==
+          (addr_ & netmask_));
+}
+
+Status Network::ParseCIDRString(const string& addr) {
+  std::pair<string, string> p = strings::Split(addr, strings::delimiter::Limit("/", 1));
+
+  kudu::Sockaddr sockaddr;
+  Status s = sockaddr.ParseString(p.first, 0);
+
+  uint32_t bits;
+  bool success = SimpleAtoi(p.second, &bits);
+
+  if (!s.ok() || !success || bits > 32) {
+    return Status::NetworkError("Unable to parse CIDR address", addr);
+  }
+
+  // Netmask in network byte order
+  uint32_t netmask = NetworkByteOrder::FromHost32(~(0xffffffff >> bits));
+  addr_ = sockaddr.addr().sin_addr.s_addr;
+  netmask_ = netmask;
+  return Status::OK();
+}
+
+Status Network::ParseCIDRStrings(const string& comma_sep_addrs,
+                                 vector<Network>* res) {
+  vector<string> addr_strings = strings::Split(comma_sep_addrs, ",", strings::SkipEmpty());
+  for (const string& addr_string : addr_strings) {
+    Network network;
+    RETURN_NOT_OK(network.ParseCIDRString(addr_string));
+    res->push_back(network);
+  }
+  return Status::OK();
+}
+
 bool IsPrivilegedPort(uint16_t port) {
   return port <= 1024 && port != 0;
 }
@@ -215,6 +262,33 @@ Status GetHostname(string* hostname) {
   return Status::OK();
 }
 
+Status GetLocalNetworks(std::vector<Network>* net) {
+  struct ifaddrs *ifap = nullptr;
+
+  int ret = getifaddrs(&ifap);
+  auto cleanup = MakeScopedCleanup([&]() {
+    if (ifap) freeifaddrs(ifap);
+  });
+
+  if (ret != 0) {
+    return Status::NetworkError("Unable to determine local network addresses",
+                                ErrnoToString(errno),
+                                errno);
+  }
+
+  net->clear();
+  for (struct ifaddrs *ifa = ifap; ifa; ifa = ifa->ifa_next) {
+    if (ifa->ifa_addr->sa_family == AF_INET) {
+      Sockaddr addr(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr));
+      Sockaddr mask(*reinterpret_cast<struct sockaddr_in*>(ifa->ifa_netmask));
+      Network network(addr.addr().sin_addr.s_addr, mask.addr().sin_addr.s_addr);
+      net->push_back(network);
+    }
+  }
+
+  return Status::OK();
+}
+
 Status GetFQDN(string* hostname) {
   TRACE_EVENT0("net", "GetFQDN");
   // Start with the non-qualified hostname

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/util/net/net_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/net_util.h b/src/kudu/util/net/net_util.h
index 863d0f3..b246c5e 100644
--- a/src/kudu/util/net/net_util.h
+++ b/src/kudu/util/net/net_util.h
@@ -91,6 +91,33 @@ struct HostPortEqualityPredicate {
   }
 };
 
+// A container for addr:mask pair.
+// Both addr and netmask are in big-endian byte order
+// (same as network byte order).
+class Network {
+ public:
+  Network();
+  Network(uint32_t addr, uint32_t netmask);
+
+  uint32_t addr() const { return addr_; }
+
+  uint32_t netmask() const { return netmask_; }
+
+  // Returns true if the address is within network.
+  bool WithinNetwork(const Sockaddr& addr) const;
+
+  // Parses a "addr/netmask" (CIDR notation) pair into this object.
+  Status ParseCIDRString(const std::string& addr);
+
+  // Parses a comma separated list of "addr/netmask" (CIDR notation)
+  // pairs into a vector of Network objects.
+  static Status ParseCIDRStrings(
+      const std::string& comma_sep_addrs, std::vector<Network>* res);
+ private:
+  uint32_t addr_;
+  uint32_t netmask_;
+};
+
 // Parse and resolve the given comma-separated list of addresses.
 //
 // The resulting addresses will be resolved, made unique, and added to
@@ -107,8 +134,11 @@ bool IsPrivilegedPort(uint16_t port);
 // Return the local machine's hostname.
 Status GetHostname(std::string* hostname);
 
+// Returns local subnets of all local network interfaces.
+Status GetLocalNetworks(std::vector<Network>* net);
+
 // Return the local machine's FQDN.
-Status GetFQDN(std::string* fqdn);
+Status GetFQDN(std::string* hostname);
 
 // Returns a single socket address from a HostPort.
 // If the hostname resolves to multiple addresses, returns the first in the

http://git-wip-us.apache.org/repos/asf/kudu/blob/c178563f/src/kudu/util/net/socket.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 1362e67..ce5b7bb 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -97,7 +97,8 @@ class Socket {
   Status GetSocketAddress(Sockaddr *cur_addr) const;
 
   // Call getpeername to get the address of the connected peer.
-  Status GetPeerAddress(Sockaddr *cur_addr) const;
+  // It is virtual so that tests can override.
+  virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
 
   // Return true if this socket is determined to be a loopback connection
   // (i.e. the local and remote peer share an IP address).