You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/05 22:26:46 UTC

[1/9] incubator-impala git commit: KUDU-1942. Kerberos fails to log in on hostnames with capital letters

Repository: incubator-impala
Updated Branches:
  refs/heads/master b20f45962 -> 796db0fce


KUDU-1942. Kerberos fails to log in on hostnames with capital letters

This ensures that servers canonicalize their FQDNs to lower-case before
generating Kerberos principal names.

With this change I was able to set up a working cluster on my laptop
with a capitalized hostname, where before it would fail as described in
the JIRA.

I also verified that I was able to connect from both C++ and Java
clients.

Change-Id: I5ef65dd827459476a2d225d8e3f7c80ff2fdf627
Reviewed-on: http://gerrit.cloudera.org:8080/7693
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/7893
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Sailesh Mukil <sa...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d1239a9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d1239a9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d1239a9b

Branch: refs/heads/master
Commit: d1239a9b49131825f2949671f1f9b85cce199c68
Parents: b20f459
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 16 19:08:14 2017 -0700
Committer: Sailesh Mukil <sa...@cloudera.com>
Committed: Fri Sep 1 03:08:51 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/security/init.cc | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1239a9b/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index dfbb25c..c1e94ed 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/security/init.h"
 
+#include <ctype.h>
 #include <krb5/krb5.h>
 
 #include <algorithm>
@@ -394,6 +395,8 @@ Status GetConfiguredPrincipal(string* principal) {
   if (!GetFQDN(&hostname).ok()) {
     RETURN_NOT_OK(GetHostname(&hostname));
   }
+  // Hosts in principal names are canonicalized to lower-case.
+  std::transform(hostname.begin(), hostname.end(), hostname.begin(), tolower);
   GlobalReplaceSubstring("_HOST", hostname, &p);
   *principal = p;
   return Status::OK();


[4/9] incubator-impala git commit: rpc: some small cleanup in ConnectionId

Posted by ta...@apache.org.
rpc: some small cleanup in ConnectionId

Change-Id: I0788052f8c943ef102f3f551a85a8b219c65c361
Reviewed-on: http://gerrit.cloudera.org:8080/7686
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/7896
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Sailesh Mukil <sa...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1b70eb66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1b70eb66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1b70eb66

Branch: refs/heads/master
Commit: 1b70eb661a04ff1642d7862df9cd30a3e7d9ab6f
Parents: 7d41b96
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Aug 15 15:38:04 2017 -0700
Committer: Sailesh Mukil <sa...@cloudera.com>
Committed: Fri Sep 1 03:12:49 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/connection_id.cc    | 26 ++++----------------------
 be/src/kudu/rpc/connection_id.h     | 12 +++---------
 be/src/kudu/rpc/proxy.cc            |  5 +++--
 be/src/kudu/rpc/user_credentials.cc |  4 ++--
 be/src/kudu/rpc/user_credentials.h  |  2 +-
 5 files changed, 13 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b70eb66/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
index e4a4dba..a17b783 100644
--- a/be/src/kudu/rpc/connection_id.cc
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -19,7 +19,7 @@
 
 #include <boost/functional/hash.hpp>
 
-#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
 
 using std::string;
 
@@ -28,37 +28,19 @@ namespace rpc {
 
 ConnectionId::ConnectionId() {}
 
-ConnectionId::ConnectionId(const ConnectionId& other) {
-  DoCopyFrom(other);
-}
-
 ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
   remote_ = remote;
   user_credentials_ = std::move(user_credentials);
 }
 
-void ConnectionId::set_remote(const Sockaddr& remote) {
-  remote_ = remote;
-}
-
 void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
   user_credentials_ = std::move(user_credentials);
 }
 
-void ConnectionId::CopyFrom(const ConnectionId& other) {
-  DoCopyFrom(other);
-}
-
 string ConnectionId::ToString() const {
-  // Does not print the password.
-  return StringPrintf("{remote=%s, user_credentials=%s}",
-      remote_.ToString().c_str(),
-      user_credentials_.ToString().c_str());
-}
-
-void ConnectionId::DoCopyFrom(const ConnectionId& other) {
-  remote_ = other.remote_;
-  user_credentials_ = other.user_credentials_;
+  return strings::Substitute("{remote=$0, user_credentials=$1}",
+                             remote_.ToString(),
+                             user_credentials_.ToString());
 }
 
 size_t ConnectionId::HashCode() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b70eb66/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
index ae34b29..09f1738 100644
--- a/be/src/kudu/rpc/connection_id.h
+++ b/be/src/kudu/rpc/connection_id.h
@@ -32,19 +32,18 @@ class ConnectionId {
   ConnectionId();
 
   // Copy constructor required for use with STL unordered_map.
-  ConnectionId(const ConnectionId& other);
+  ConnectionId(const ConnectionId& other) = default;
 
   // Convenience constructor.
   ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
 
   // The remote address.
-  void set_remote(const Sockaddr& remote);
   const Sockaddr& remote() const { return remote_; }
 
   // The credentials of the user associated with this connection, if any.
   void set_user_credentials(UserCredentials user_credentials);
+
   const UserCredentials& user_credentials() const { return user_credentials_; }
-  UserCredentials* mutable_user_credentials() { return &user_credentials_; }
 
   // Copy state from another object to this one.
   void CopyFrom(const ConnectionId& other);
@@ -58,13 +57,8 @@ class ConnectionId {
  private:
   // Remember to update HashCode() and Equals() when new fields are added.
   Sockaddr remote_;
-  UserCredentials user_credentials_;
 
-  // Implementation of CopyFrom that can be shared with copy constructor.
-  void DoCopyFrom(const ConnectionId& other);
-
-  // Disable assignment operator.
-  void operator=(const ConnectionId&);
+  UserCredentials user_credentials_;
 };
 
 class ConnectionIdHash {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b70eb66/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
index 3ec907d..0d946ed 100644
--- a/be/src/kudu/rpc/proxy.cc
+++ b/be/src/kudu/rpc/proxy.cc
@@ -64,8 +64,9 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
         << s.ToString() << " before connecting to remote: " << remote.ToString();
   }
 
-  conn_id_.set_remote(remote);
-  conn_id_.mutable_user_credentials()->set_real_user(real_user);
+  UserCredentials creds;
+  creds.set_real_user(std::move(real_user));
+  conn_id_ = ConnectionId(remote, std::move(creds));
 }
 
 Proxy::~Proxy() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b70eb66/be/src/kudu/rpc/user_credentials.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc
index fdc3ac2..0debd01 100644
--- a/be/src/kudu/rpc/user_credentials.cc
+++ b/be/src/kudu/rpc/user_credentials.cc
@@ -32,8 +32,8 @@ bool UserCredentials::has_real_user() const {
   return !real_user_.empty();
 }
 
-void UserCredentials::set_real_user(const string& real_user) {
-  real_user_ = real_user;
+void UserCredentials::set_real_user(string real_user) {
+  real_user_ = std::move(real_user);
 }
 
 string UserCredentials::ToString() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b70eb66/be/src/kudu/rpc/user_credentials.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h
index 56af70a..8cb68cc 100644
--- a/be/src/kudu/rpc/user_credentials.h
+++ b/be/src/kudu/rpc/user_credentials.h
@@ -29,7 +29,7 @@ class UserCredentials {
  public:
   // Real user.
   bool has_real_user() const;
-  void set_real_user(const std::string& real_user);
+  void set_real_user(std::string real_user);
   const std::string& real_user() const { return real_user_; }
 
   // Returns a string representation of the object.


[8/9] incubator-impala git commit: IMPALA-5849: Remove compile-time checks for OpenSSL > 1.0.0

Posted by ta...@apache.org.
IMPALA-5849: Remove compile-time checks for OpenSSL > 1.0.0

(Prerequisite knowledge: Impala must support building and linking
against OpenSSL 1.0.0 and 1.0.1. 1.0.0 only supports TLS1.0. 1.0.1
supports TLS1.1 and 1.2 as well).

Rather than check for OpenSSL w/TLSv1.1+ support at compile time, we
altered Thrift-0.9.0-p11 to compile on all versions, and fail with an
error if an unsupported version is requested at runtime. This patch
is the Impala-side counterpart to that work, and also works around a bug.

The bug is in OpenSSL. When disabling, say, TLS1.1 we pass an option
bitmask to SSL_CTX_set_options() that includes SSL_OP_NO_TLSv1_1. In
OpenSSL 1.0.1 this is defined as 0x10000000L but in 1.0.0 it is not
defined. So to support compilation on all OpenSSL versions, we added
definitions of that constant to Thrift if it was not found at compile
time.

However, in OpenSSL 1.0.0 *another* option for SSL_CTX_set_options() has
the *same* value (0x10000000L). This means that passing
SSL_OP_NO_TLSv1_1 to OpenSSL 1.0.0 actually does something completely
different. This lack of backwards compatibility is the bug.

To work around it, we use the SSLeay() function, which, although
cryptically named, returns the value of OPENSSL_VERSION_NUMBER in the
version of OpenSSL currently linked against (I have tested that it
behaves differently when dynamically linked against different versions
of OpenSSL, but compiled against the same one). We use that method to
tell us which of the TLS protocol versions are actually supported, and
raise errors if they are not.

An alternative approach to doing this inside ThriftClient and
ThriftServer would be to do so in Thrift; this might be a reasonable
future option but for now it is too unwieldy to test a toolchain build
that is linked against different OpenSSL versions. To reduce risk, and
the number of ways things can go wrong, we do all the protocol version
whitelisting in the Impala code.

To simplify the code further, we also remove the values of
SSLProtoVersions that allowed us to restrict the TLS protocol to
exactly *one* value, rather than specify the minimum (e.g. we remove
TLSv1_1 but retain TLSv1_1_plus). The more restrictive options were not
intended for production use, and using them now will raise an error.

Error messages
--------------

Passing a valid, but unsupported version to --ssl_minimum_version:

  "TLS (6) version not supported (linked OpenSSL version is 1234567)"

Passing an invalid version to --ssl_minimum_version:

  "Unknown TLS version: 'tls_4_2'"

Testing
-------

We assume that tests will be run on the same machine that built Impala,
so still execute slightly different tests depending on the detected
OpenSSL version.

For thrift-server-test, we check to see if a protocol version is
supported at runtime, and use that to determine the expected behaviour
of a test which starts all combinations of clients and server versions.

For the webserver-test, we determine at compile-time the set of
supported TLS protocols, and change the test's expected behaviour based
on that.

Tests have been run when compiling and running against OpenSSL 1.0.0 and
OpenSSL 1.0.1.

Also include corresponding changes to Squeasel from this commit:

https://github.com/henryr/squeasel/commit/eded53

Testing: New webserver and thrift-server tests.

Change-Id: I20c5d39c0c4ae9c5445dd3ee3b175fe337a5728d
Reviewed-on: http://gerrit.cloudera.org:8080/7866
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ac8a72f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ac8a72f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ac8a72f3

Branch: refs/heads/master
Commit: ac8a72f3041688316a20c9a85eda5a5cb9af1578
Parents: e4a2d22
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Aug 25 11:39:20 2017 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Sat Sep 2 18:39:21 2017 +0000

----------------------------------------------------------------------
 be/src/rpc/thrift-client.cc           |  6 ++++++
 be/src/rpc/thrift-server-test.cc      | 32 +++++++++++++-----------------
 be/src/rpc/thrift-server.cc           | 21 +++++++++++++++-----
 be/src/rpc/thrift-server.h            |  6 +++++-
 be/src/service/impala-server.cc       |  6 +-----
 be/src/thirdparty/squeasel/squeasel.c | 28 ++++++++++++++++++++++----
 be/src/util/webserver-test.cc         |  9 +++++++++
 bin/impala-config.sh                  |  4 ++--
 8 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/rpc/thrift-client.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-client.cc b/be/src/rpc/thrift-client.cc
index f8da136..d4d246b 100644
--- a/be/src/rpc/thrift-client.cc
+++ b/be/src/rpc/thrift-client.cc
@@ -44,6 +44,12 @@ ThriftClientImpl::ThriftClientImpl(const std::string& ipaddress, int port, bool
     SSLProtocol version;
     init_status_ =
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &version);
+    if (init_status_.ok() && !SSLProtoVersions::IsSupported(version)) {
+      string err =
+          Substitute("TLS ($0) version not supported (linked OpenSSL version is $1)",
+              version, SSLeay());
+      init_status_ = Status(err);
+    }
     if (!init_status_.ok()) return;
     ssl_factory_.reset(new TSSLSocketFactory(version));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 67e149a..6ceaefd 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -263,21 +263,12 @@ TEST(SslTest, MismatchedCiphers) {
 // equivalents.
 TEST(SslTest, StringToProtocol) {
   SSLProtocol version;
-#if OPENSSL_VERSION_NUMBER < 0x10001000L
-  // No TLSv1.1+ support in OpenSSL v1.0.0.
-  EXPECT_FALSE(SSLProtoVersions::StringToProtocol("tlsv1.2", &version).ok());
-  EXPECT_FALSE(SSLProtoVersions::StringToProtocol("tlsv1.1", &version).ok());
-  EXPECT_OK(SSLProtoVersions::StringToProtocol("tlsv1", &version));
-  EXPECT_EQ(TLSv1_0_plus, version);
-#else
-  map<string, SSLProtocol> TEST_CASES = {{"tlsv1", TLSv1_0_plus},
-      {"tlsv1.1", TLSv1_1_plus}, {"tlsv1.2", TLSv1_2_plus}, {"tlsv1_only", TLSv1_0},
-      {"tlsv1.1_only", TLSv1_1}, {"tlsv1.2_only", TLSv1_2}};
+  map<string, SSLProtocol> TEST_CASES = {
+      {"tlsv1", TLSv1_0_plus}, {"tlsv1.1", TLSv1_1_plus}, {"tlsv1.2", TLSv1_2_plus}};
   for (auto p : TEST_CASES) {
     EXPECT_OK(SSLProtoVersions::StringToProtocol(p.first, &version));
     EXPECT_EQ(p.second, version) << "TLS version: " << p.first;
   }
-#endif
 }
 
 TEST(SslTest, TLSVersionControl) {
@@ -293,10 +284,9 @@ TEST(SslTest, TLSVersionControl) {
     set<SSLProtocol> whitelist;
   };
 
-#if OPENSSL_VERSION_NUMBER < 0x10001000L
-  vector<Config> configs = {
-      {TLSv1_0, {TLSv1_0, TLSv1_0_plus}}, {TLSv1_0_plus, {TLSv1_0, TLSv1_0_plus}}};
-#else
+  // Test all configurations supported by Thrift, even if some won't work with the linked
+  // OpenSSL(). We catch those by checking IsSupported() for both the client and ther
+  // server.
   vector<Config> configs = {{TLSv1_0, {TLSv1_0, TLSv1_0_plus}},
       {TLSv1_0_plus,
           {TLSv1_0, TLSv1_1, TLSv1_2, TLSv1_0_plus, TLSv1_1_plus, TLSv1_2_plus}},
@@ -304,7 +294,6 @@ TEST(SslTest, TLSVersionControl) {
       {TLSv1_1_plus, {TLSv1_1, TLSv1_2, TLSv1_0_plus, TLSv1_1_plus, TLSv1_2_plus}},
       {TLSv1_2, {TLSv1_2, TLSv1_0_plus, TLSv1_1_plus, TLSv1_2_plus}},
       {TLSv1_2_plus, {TLSv1_2, TLSv1_0_plus, TLSv1_1_plus, TLSv1_2_plus}}};
-#endif
 
   for (const auto& config : configs) {
     // For each config, start a server with the requested protocol spec, and then try to
@@ -317,17 +306,24 @@ TEST(SslTest, TLSVersionControl) {
                   .ssl(SERVER_CERT, PRIVATE_KEY)
                   .ssl_version(config.server_version)
                   .Build(&server));
-    EXPECT_OK(server->Start());
+    if (!SSLProtoVersions::IsSupported(config.server_version)) {
+      EXPECT_FALSE(server->Start().ok());
+      continue;
+    }
+    ASSERT_OK(server->Start());
 
     for (auto client_version : SSLProtoVersions::PROTO_MAP) {
       auto s = ScopedFlagSetter<string>::Make(
           &FLAGS_ssl_minimum_version, client_version.first);
       ThriftClient<StatestoreServiceClientWrapper> ssl_client(
           "localhost", port, "", nullptr, true);
+      if (!SSLProtoVersions::IsSupported(client_version.second)) {
+        EXPECT_FALSE(ssl_client.Open().ok());
+        continue;
+      }
       EXPECT_OK(ssl_client.Open());
       bool send_done = false;
       TRegisterSubscriberResponse resp;
-
       if (config.whitelist.find(client_version.second) == config.whitelist.end()) {
         EXPECT_THROW(ssl_client.iface()->RegisterSubscriber(
                          resp, TRegisterSubscriberRequest(), &send_done),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 35a1852..467c004 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -77,11 +77,7 @@ namespace impala {
 // that one (e.g. TLSv1.1 enables v1.1 and v1.2). Specifying TLSv1.1_only enables only
 // v1.1.
 map<string, SSLProtocol> SSLProtoVersions::PROTO_MAP = {
-#if OPENSSL_VERSION_NUMBER >= 0x10001000L
-    {"tlsv1.2", TLSv1_2_plus}, {"tlsv1.2_only", TLSv1_2}, {"tlsv1.1", TLSv1_1_plus},
-    {"tlsv1.1_only", TLSv1_1},
-#endif
-    {"tlsv1", TLSv1_0_plus}, {"tlsv1_only", TLSv1_0}};
+    {"tlsv1.2", TLSv1_2_plus}, {"tlsv1.1", TLSv1_1_plus}, {"tlsv1", TLSv1_0_plus}};
 
 Status SSLProtoVersions::StringToProtocol(const string& in, SSLProtocol* protocol) {
   for (const auto& proto : SSLProtoVersions::PROTO_MAP) {
@@ -94,6 +90,16 @@ Status SSLProtoVersions::StringToProtocol(const string& in, SSLProtocol* protoco
   return Status(Substitute("Unknown TLS version: '$0'", in));
 }
 
+#define OPENSSL_MIN_VERSION_WITH_TLS_1_1 0x10001000L
+
+bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) {
+  bool is_openssl_1_0_0_or_lower = (SSLeay() < OPENSSL_MIN_VERSION_WITH_TLS_1_1);
+  if (is_openssl_1_0_0_or_lower) return (protocol == TLSv1_0_plus);
+
+  // All other versions supported by OpenSSL 1.0.1 and later.
+  return true;
+}
+
 bool EnableInternalSslConnections() {
   // Enable SSL between servers only if both the client validation certificate and the
   // server certificate are specified. 'Client' here means clients that are used by Impala
@@ -374,6 +380,11 @@ class ImpalaSslSocketFactory : public TSSLSocketFactory {
 }
 Status ThriftServer::CreateSocket(boost::shared_ptr<TServerTransport>* socket) {
   if (ssl_enabled()) {
+    if (!SSLProtoVersions::IsSupported(version_)) {
+      return Status(TErrorCode::SSL_SOCKET_CREATION_FAILED,
+          Substitute("TLS ($0) version not supported (linked OpenSSL version is $1)",
+                        version_, SSLeay()));
+    }
     try {
       // This 'factory' is only called once, since CreateSocket() is only called from
       // Start(). The c'tor may throw if there is an error initializing the SSL context.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index e486e37..2002f7f 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -340,7 +340,7 @@ class ThriftServerBuilder {
 
   bool enable_ssl_ = false;
   apache::thrift::transport::SSLProtocol version_ =
-      apache::thrift::transport::SSLProtocol::TLSv1_0;
+      apache::thrift::transport::SSLProtocol::TLSv1_0_plus;
   std::string certificate_;
   std::string private_key_;
   std::string pem_password_cmd_;
@@ -355,6 +355,10 @@ struct SSLProtoVersions {
   /// one cannot be found. Matching is case-insensitive.
   static Status StringToProtocol(
       const std::string& in, apache::thrift::transport::SSLProtocol* protocol);
+
+  /// Returns true if 'protocol' is supported by the version of OpenSSL this binary is
+  /// linked to.
+  static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol);
 };
 
 // Returns true if, per the process configuration flags, server<->server communications

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4d55c85..18582da 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -183,11 +183,7 @@ DEFINE_string(ssl_cipher_list, "",
 
 const string SSL_MIN_VERSION_HELP = "The minimum SSL/TLS version that Thrift "
     "services should use for both client and server connections. Supported versions are "
-#if OPENSSL_VERSION_NUMBER >= 0x10001000L
-    "TLSv1.0, TLSv1.1 and TLSv1.2";
-#else
-    "TLSv1.0";
-#endif
+    "TLSv1.0, TLSv1.1 and TLSv1.2 (as long as the system OpenSSL library supports them)";
 DEFINE_string(ssl_minimum_version, "tlsv1", SSL_MIN_VERSION_HELP.c_str());
 
 DEFINE_int32(idle_session_timeout, 0, "The time, in seconds, that a session may be idle"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/thirdparty/squeasel/squeasel.c
----------------------------------------------------------------------
diff --git a/be/src/thirdparty/squeasel/squeasel.c b/be/src/thirdparty/squeasel/squeasel.c
index 055a959..d461b24 100644
--- a/be/src/thirdparty/squeasel/squeasel.c
+++ b/be/src/thirdparty/squeasel/squeasel.c
@@ -150,10 +150,20 @@ typedef int socklen_t;
 
 static const char *http_500_error = "Internal Server Error";
 
+#include <openssl/crypto.h>
 #include <openssl/ssl.h>
 #include <openssl/err.h>
 
-#define OPENSSL_VERSION_HAS_TLS_1_1 0x10001000L
+// If these constants aren't defined, we still need them to compile and maintain backwards
+// compatibility with pre-1.0.1 OpenSSL.
+#ifndef SSL_OP_NO_TLSv1
+#define SSL_OP_NO_TLSv1 0x04000000U
+#endif
+#ifndef SSL_OP_NO_TLSv1_1
+#define SSL_OP_NO_TLSv1_1 0x10000000U
+#endif
+
+#define OPENSSL_MIN_VERSION_WITH_TLS_1_1 0x10001000L
 
 static const char *month_names[] = {
   "Jan", "Feb", "Mar", "Apr", "May", "Jun",
@@ -4224,12 +4234,18 @@ static int set_ssl_option(struct sq_context *ctx) {
   int options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3;
   if (sq_strcasecmp(ssl_version, "tlsv1") == 0) {
     // No-op - don't exclude any TLS protocols.
-#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_HAS_TLS_1_1
   } else if (sq_strcasecmp(ssl_version, "tlsv1.1") == 0) {
+    if (SSLeay() < OPENSSL_MIN_VERSION_WITH_TLS_1_1) {
+      cry(fc(ctx), "Unsupported TLS version: %s", ssl_version);
+      return 0;
+    }
     options |= SSL_OP_NO_TLSv1;
   } else if (sq_strcasecmp(ssl_version, "tlsv1.2") == 0) {
+    if (SSLeay() < OPENSSL_MIN_VERSION_WITH_TLS_1_1) {
+      cry(fc(ctx), "Unsupported TLS version: %s", ssl_version);
+      return 0;
+    }
     options |= (SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1);
-#endif
   } else {
     cry(fc(ctx), "%s: unknown SSL version: %s", __func__, ssl_version);
     return 0;
@@ -4251,7 +4267,11 @@ static int set_ssl_option(struct sq_context *ctx) {
     return 0;
   }
 
-  SSL_CTX_set_options(ctx->ssl_ctx, options);
+  if ((SSL_CTX_set_options(ctx->ssl_ctx, options) & options) != options) {
+    cry(fc(ctx), "SSL_CTX_set_options (server) error: could not set options (%d)",
+        options);
+    return 0;
+  }
 
   if (ctx->config[SSL_PRIVATE_KEY_PASSWORD] != NULL) {
     SSL_CTX_set_default_passwd_cb(ctx->ssl_ctx, ssl_password_callback);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/be/src/util/webserver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver-test.cc b/be/src/util/webserver-test.cc
index ba6f276..78934c6 100644
--- a/be/src/util/webserver-test.cc
+++ b/be/src/util/webserver-test.cc
@@ -297,8 +297,10 @@ TEST(Webserver, SslGoodTlsVersion) {
 
 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
   auto versions = {"tlsv1", "tlsv1.1", "tlsv1.2"};
+  vector<string> unsupported_versions = {};
 #else
   auto versions = {"tlsv1"};
+  auto unsupported_versions = {"tlsv1.1", "tlsv1.2"};
 #endif
   for (auto v: versions) {
     auto ssl_version = ScopedFlagSetter<string>::Make(
@@ -307,6 +309,13 @@ TEST(Webserver, SslGoodTlsVersion) {
     Webserver webserver(FLAGS_webserver_port);
     ASSERT_OK(webserver.Start());
   }
+
+  for (auto v : unsupported_versions) {
+    auto ssl_version = ScopedFlagSetter<string>::Make(&FLAGS_ssl_minimum_version, v);
+
+    Webserver webserver(FLAGS_webserver_port);
+    EXPECT_FALSE(webserver.Start().ok()) << "Version: " << v;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac8a72f3/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 15951fd..f3354f1 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,7 +72,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=457-22902e9fb8
+export IMPALA_TOOLCHAIN_BUILD_ID=459-0157f69796
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -107,7 +107,7 @@ export IMPALA_SQUEASEL_VERSION=3.3
 # TPC utilities used for test/benchmark data generation.
 export IMPALA_TPC_DS_VERSION=2.1.0
 export IMPALA_TPC_H_VERSION=2.17.0
-export IMPALA_THRIFT_VERSION=0.9.0-p10
+export IMPALA_THRIFT_VERSION=0.9.0-p11
 export IMPALA_THRIFT_JAVA_VERSION=0.9.0
 export IMPALA_ZLIB_VERSION=1.2.8
 


[2/9] incubator-impala git commit: security: only lookup hostname if _HOST substitution is required

Posted by ta...@apache.org.
security: only lookup hostname if _HOST substitution is required

The Kerberos principal configuration uses the special token '_HOST' to
indicate that the FQDN of the host should be specified. Previously we
would always lookup the FQDN even if the substitution was not required,
which might mean that startup would fail if there was no FQDN available,
even if no _HOST substitution was required.

Now, we only lookup the FQDN if FLAGS_principal contains the
substitution token. This provides the possibility of a workaround of
explicit principal configuration on machines with no FQDN.

Change-Id: I5de8647d6cf63ea70d880fa530fa289e8bae24fe
Reviewed-on: http://gerrit.cloudera.org:8080/7694
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/7894
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Sailesh Mukil <sa...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e7bd0ce5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e7bd0ce5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e7bd0ce5

Branch: refs/heads/master
Commit: e7bd0ce5b9f2d44bc0d429672924d19a0142c2b1
Parents: d1239a9
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 16 19:12:44 2017 -0700
Committer: Sailesh Mukil <sa...@cloudera.com>
Committed: Fri Sep 1 03:09:25 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/security/init.cc | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e7bd0ce5/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index c1e94ed..aff20e9 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -390,14 +390,17 @@ Status KinitContext::Kinit(const string& keytab_path, const string& principal) {
 
 Status GetConfiguredPrincipal(string* principal) {
   string p = FLAGS_principal;
-  string hostname;
-  // Try to fill in either the FQDN or hostname.
-  if (!GetFQDN(&hostname).ok()) {
-    RETURN_NOT_OK(GetHostname(&hostname));
+  const auto& kHostToken = "_HOST";
+  if (p.find(kHostToken) != string::npos) {
+    string hostname;
+    // Try to fill in either the FQDN or hostname.
+    if (!GetFQDN(&hostname).ok()) {
+      RETURN_NOT_OK(GetHostname(&hostname));
+    }
+    // Hosts in principal names are canonicalized to lower-case.
+    std::transform(hostname.begin(), hostname.end(), hostname.begin(), tolower);
+    GlobalReplaceSubstring(kHostToken, hostname, &p);
   }
-  // Hosts in principal names are canonicalized to lower-case.
-  std::transform(hostname.begin(), hostname.end(), hostname.begin(), tolower);
-  GlobalReplaceSubstring("_HOST", hostname, &p);
   *principal = p;
   return Status::OK();
 }


[5/9] incubator-impala git commit: IMPALA-5871: KuduPartitionExpr incorrectly handles its child types

Posted by ta...@apache.org.
IMPALA-5871: KuduPartitionExpr incorrectly handles its child types

KuduPartitionExpr takes input rows and sends them to Kudu to determine
the partition the rows correspond to in a particular table's
partitioning scheme. This is then used to partition and sort rows
before sending them to Kudu when performing an INSERT.

If the input types are not the same as (but are compatible with) the
types of the columns in the table, we need to cast the input rows.
KuduPartitionExpr.analyze() actually already does this, but the casts
are dropped for the sort step during the INSERT in most cases.

As a result, attempting to insert a string value into a Kudu timestamp
column causes a crash.

Inserting a numeric value into a different but compatibly typed col
(eg. tinyint into an int col) will cause the sort during a Kudu INSERT
to operate on garbage values, potentially degrading performance and
causing INSERTs to fail due to Kudu timeouts (see IMPALA-3742).

Testing:
- Added an e2e test in kudu_insert.test

Change-Id: I44cf31e46a77f3e7c92cf6b9112653808a001705
Reviewed-on: http://gerrit.cloudera.org:8080/7922
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/64e28021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/64e28021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/64e28021

Branch: refs/heads/master
Commit: 64e28021957f6993aea5ceb3ad626fb577597107
Parents: 1b70eb6
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu Aug 31 09:15:22 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Sep 1 21:31:55 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/kudu-partition-expr.cc             |  1 +
 .../impala/analysis/KuduPartitionExpr.java      |  4 +--
 .../queries/QueryTest/kudu_insert.test          | 27 +++++++++++++++-----
 3 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64e28021/be/src/exprs/kudu-partition-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.cc b/be/src/exprs/kudu-partition-expr.cc
index 6f35268..2cc96f0 100644
--- a/be/src/exprs/kudu-partition-expr.cc
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -71,6 +71,7 @@ IntVal KuduPartitionExpr::GetIntVal(ScalarExprEvaluator* eval,
     int col = tkudu_partition_expr_.referenced_columns[i];
     const ColumnDescriptor& col_desc = table_desc_->col_descs()[col];
     PrimitiveType type = col_desc.type().type;
+    DCHECK_EQ(GetChild(i)->type().type, type);
     Status s = WriteKuduValue(col, type, val, false, row_.get());
     // This can only fail if we set a col to an incorect type, which would be a bug in
     // planning, so we can DCHECK.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64e28021/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
index cc42804..8d52d59 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
@@ -73,8 +73,8 @@ public class KuduPartitionExpr extends Expr {
     // IMPALA-5294: If one of the children is a NullLiteral, it has to be cast to a type
     // to be passed to the BE.
     for (int i = 0; i < children_.size(); ++i) {
-      children_.get(i).castTo(
-          targetTable_.getColumns().get(partitionColPos_.get(i)).getType());
+      children_.set(i, children_.get(i).castTo(
+          targetTable_.getColumns().get(partitionColPos_.get(i)).getType()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64e28021/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 76ad779..1150898 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -287,22 +287,35 @@ NumRowErrors: 7299
 ---- QUERY
 # Table with all supported types as primary key and distribution columns
 create table allkeytypes (i1 tinyint, i2 smallint, i3 int, i4 bigint, name string,
-  valf float, vald double, primary key (i1, i2, i3, i4, name)) partition by
-  hash partitions 3, range (partition value = (1,1,1,1,'1'),
-  partition value = (2,2,2,2,'2'), partition value = (3,3,3,3,'3')) stored as kudu
+  valt timestamp, valf float, vald double, primary key (i1, i2, i3, i4, name, valt))
+  partition by hash partitions 3, range
+  (partition value = (1,1,1,1,'1','2009-01-01 00:01:00'),
+  partition value = (2,2,2,2,'2','2009-01-01 00:02:00.100000000'),
+  partition value = (3,3,3,3,'3','2009-01-01 00:03:00.300000000')) stored as kudu
 ---- RESULTS
 ====
 ---- QUERY
 insert into allkeytypes select cast(id as tinyint), smallint_col, int_col,
-  cast (bigint_col/10 as bigint), string_col, float_col, double_col
-  from functional.alltypes where id > 0 and id < 10
+  cast (bigint_col/10 as bigint), string_col, timestamp_col, float_col, double_col
+  from functional.alltypes where id > 1 and id < 10
 ---- RESULTS
-: 3
+: 2
 ---- RUNTIME_PROFILE
-NumModifiedRows: 3
+NumModifiedRows: 2
 NumRowErrors: 6
 ====
 ---- QUERY
+# IMPALA-5871 - test that a cast is correctly added when inserting a string into a Kudu
+# timestamp partition column with distributed exec.
+set exec_single_node_rows_threshold=0;
+insert into allkeytypes values (1,1,1,1,'1','2009-01-01 00:01:00',null,null)
+---- RESULTS
+: 1
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 0
+====
+---- QUERY
 # Table with default values
 create table tbl_with_defaults (a int primary key, b int null default 10,
   c int not null default 100, d int default 1000, e int null, f int not null,


[6/9] incubator-impala git commit: [security] avoid kerberos ticket renewal and only reacquire

Posted by ta...@apache.org.
[security] avoid kerberos ticket renewal and only reacquire

It was found that if we use a file based credential cache that is
shared between the C++ side and the java side of a process, and we
encounter the specific edge case where we renew a ticket that has
less than 'ticket_lifetime' left before its 'renew_lifetime' expires,
the ticket is set to have a NULL 'renew_till' timestamp.

Eg:
ticket_lifetime = 10m
renew_lifetime = 100m

[current ticket being renewed] at '15:30:00'
endtime = '15:30:30'
renew_till = '15:31:00'

This ticket will be renewed and the renewed ticket will have the
following values:
endtime = '15:31:00'
renew_till = null

The Java krb5 library refuses to read these kinds of tickets which
have the RENEWABLE flag set but no 'renew_till' set, causing
unexpected failures.

Currently, the only way to work around this is to not renew tickets
at all and only always reacquire them. The reason for this is that
the Java side of a process or even another process may be running
its own renewal thread on the same credential cache for the same
principal(s). So even if we were to avoid renewing in this window,
the Java side could renew in this window, causing the above problem.
If we always reacquire the tickets, we're forcefully reseting this
window for that principal, thereby not allowing the Java side to hit
this bug.
The scenario where this bug played out is when using the kudu renewal
code in tandem with a hadoop process that use the same principals.

Also, currently there is no advantage we gain from just renewing the
tickets vs. reacquiring them, either in terms of security or
performance, since we login from a keytab.

Tracked on the Java side by:
http://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8186576

Change-Id: I8e5225de332ba785e3a73014b8418cfd4059fe07
Reviewed-on: http://gerrit.cloudera.org:8080/7810
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/7898
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2774d80f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2774d80f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2774d80f

Branch: refs/heads/master
Commit: 2774d80f6b604a4e6256606c6f50c9d21f715fa4
Parents: 64e2802
Author: Sailesh Mukil <sa...@apache.org>
Authored: Mon Aug 21 22:05:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Sep 2 03:22:00 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/security/init.cc | 82 +++++++++++++--------------------------
 be/src/kudu/security/init.h  |  4 +-
 2 files changed, 29 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2774d80f/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index aff20e9..9678373 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -68,12 +68,12 @@ class KinitContext;
 // Global context for usage of the Krb5 library.
 krb5_context g_krb5_ctx;
 
-// Global instance of the context used by the kinit/renewal thread.
+// Global instance of the context used by the kinit/reacquire thread.
 KinitContext* g_kinit_ctx;
 
-// This lock is used to avoid a race while renewing the kerberos ticket.
+// This lock is used to avoid a race while reacquiring the kerberos ticket.
 // The race can occur between the time we reinitialize the cache and the
-// time when we actually store the renewed credential back in the cache.
+// time when we actually store the new credentials back in the cache.
 RWMutex* g_kerberos_reinit_lock;
 
 class KinitContext {
@@ -184,15 +184,15 @@ void RenewThread() {
 int32_t KinitContext::GetNextRenewInterval(uint32_t num_retries) {
   int32_t time_remaining = ticket_end_timestamp_ - time(nullptr);
 
-  // If the last ticket renewal was a failure, we back off our retry attempts exponentially.
+  // If the last ticket reacqusition was a failure, we back off our retry attempts exponentially.
   if (num_retries > 0) return GetBackedOffRenewInterval(time_remaining, num_retries);
 
   // If the time remaining between now and ticket expiry is:
-  // * > 10 minutes:   We attempt to renew the ticket between 5 seconds and 5 minutes before the
+  // * > 10 minutes:   We attempt to reacquire the ticket between 5 seconds and 5 minutes before the
   //                   ticket expires.
-  // * 5 - 10 minutes: We attempt to renew the ticket betwen 5 seconds and 1 minute before the
+  // * 5 - 10 minutes: We attempt to reacquire the ticket betwen 5 seconds and 1 minute before the
   //                   ticket expires.
-  // * < 5 minutes:    Attempt to renew the ticket every 'time_remaining'.
+  // * < 5 minutes:    Attempt to reacquire the ticket every 'time_remaining'.
   // The jitter is added to make sure that every server doesn't flood the KDC at the same time.
   random_device rd;
   mt19937 generator(rd());
@@ -266,7 +266,7 @@ Status KinitContext::DoRenewal() {
         krb5_free_cred_contents(g_krb5_ctx, &creds); });
     if (krb5_is_config_principal(g_krb5_ctx, creds.server)) continue;
 
-    // We only want to renew the TGT (Ticket Granting Ticket). Ignore all other tickets.
+    // We only want to reacquire the TGT (Ticket Granting Ticket). Ignore all other tickets.
     // This follows the same format as is_local_tgt() from krb5:src/clients/klist/klist.c
     if (creds.server->length != 2 ||
         data_eq(creds.server->data[1], principal_->realm) == 0 ||
@@ -275,58 +275,30 @@ Status KinitContext::DoRenewal() {
       continue;
     }
 
-    time_t now = time(nullptr);
-    time_t ticket_expiry = creds.times.endtime;
-    time_t renew_till = creds.times.renew_till;
-    time_t renew_deadline = renew_till - 30;
-
     krb5_creds new_creds;
     memset(&new_creds, 0, sizeof(krb5_creds));
     auto cleanup_new_creds = MakeScopedCleanup([&]() {
         krb5_free_cred_contents(g_krb5_ctx, &new_creds); });
-    // If the ticket has already expired or if there's only a short period before which the
-    // renew window closes, we acquire a new ticket.
-    if (ticket_expiry < now || renew_deadline < now) {
-      // Acquire a new ticket using the keytab. This ticket will automatically be put into the
-      // credential cache.
-      {
-        std::lock_guard<RWMutex> l(*g_kerberos_reinit_lock);
-        KRB5_RETURN_NOT_OK_PREPEND(krb5_get_init_creds_keytab(g_krb5_ctx, &new_creds, principal_,
-                                                              keytab_, 0 /* valid from now */,
-                                                              nullptr /* TKT service name */,
-                                                              opts_),
-                                   "Reacquire error: unable to login from keytab");
+    // Acquire a new ticket using the keytab. This ticket will automatically be put into the
+    // credential cache.
+    {
+      std::lock_guard<RWMutex> l(*g_kerberos_reinit_lock);
+      KRB5_RETURN_NOT_OK_PREPEND(krb5_get_init_creds_keytab(g_krb5_ctx, &new_creds, principal_,
+                                                            keytab_, 0 /* valid from now */,
+                                                            nullptr /* TKT service name */,
+                                                            opts_),
+                                 "Reacquire error: unable to login from keytab");
 #ifndef HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE
-        // Heimdal krb5 doesn't have the 'krb5_get_init_creds_opt_set_out_ccache' option,
-        // so use this alternate route.
-        KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_initialize(g_krb5_ctx, ccache_, principal_),
-                                   "Reacquire error: could not init ccache");
+      // Heimdal krb5 doesn't have the 'krb5_get_init_creds_opt_set_out_ccache' option,
+      // so use this alternate route.
+      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_initialize(g_krb5_ctx, ccache_, principal_),
+                                 "Reacquire error: could not init ccache");
 
-        KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &creds),
-                                   "Reacquire error: could not store creds in cache");
+      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &creds),
+                                 "Reacquire error: could not store creds in cache");
 #endif
-      }
-      LOG(INFO) << "Successfully reacquired a new kerberos TGT";
-    } else {
-      // Renew existing ticket.
-      KRB5_RETURN_NOT_OK_PREPEND(krb5_get_renewed_creds(g_krb5_ctx, &new_creds, principal_,
-                                                        ccache_, nullptr),
-                                 "Failed to renew ticket");
-
-      {
-        // Take the write lock here so that any connections undergoing negotiation have to wait
-        // until the new credentials are placed in the cache.
-        std::lock_guard<RWMutex> l(*g_kerberos_reinit_lock);
-        // Clear existing credentials in cache.
-        KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_initialize(g_krb5_ctx, ccache_, principal_),
-                                   "Failed to re-initialize ccache");
-
-        // Store the new credentials in the cache.
-        KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &new_creds),
-                                   "Failed to store credentials in ccache");
-      }
-      LOG(INFO) << "Successfully renewed kerberos TGT";
     }
+    LOG(INFO) << "Successfully reacquired a new kerberos TGT";
     ticket_end_timestamp_ = new_creds.times.endtime;
     break;
   }
@@ -500,9 +472,9 @@ Status InitKerberosForServer() {
   RETURN_NOT_OK_PREPEND(g_kinit_ctx->Kinit(FLAGS_keytab_file, principal), "unable to kinit");
 
   g_kerberos_reinit_lock = new RWMutex(RWMutex::Priority::PREFER_WRITING);
-  scoped_refptr<Thread> renew_thread;
-  // Start the renewal thread.
-  RETURN_NOT_OK(Thread::Create("kerberos", "renewal thread", &RenewThread, &renew_thread));
+  scoped_refptr<Thread> reacquire_thread;
+  // Start the reacquire thread.
+  RETURN_NOT_OK(Thread::Create("kerberos", "reacquire thread", &RenewThread, &reacquire_thread));
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2774d80f/be/src/kudu/security/init.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.h b/be/src/kudu/security/init.h
index 61c9577..60a5a5e 100644
--- a/be/src/kudu/security/init.h
+++ b/be/src/kudu/security/init.h
@@ -32,8 +32,8 @@ namespace security {
 Status InitKerberosForServer();
 
 // Returns the process lock 'kerberos_reinit_lock'
-// This lock is acquired in write mode while the ticket is being renewed, and
-// acquired in read mode before using the SASL library which might require a ticket.
+// This lock is taken in write mode while the ticket is being reacquired, and
+// taken in read mode before using the SASL library which might require a ticket.
 RWMutex* KerberosReinitLock();
 
 // Return the full principal (user/host@REALM) that the server has used to


[7/9] incubator-impala git commit: KUDU-2032 (part 1): pass pre-resolution hostname into RPC proxies

Posted by ta...@apache.org.
KUDU-2032 (part 1): pass pre-resolution hostname into RPC proxies

This modifies the constructor of RPC proxies (generated and otherwise)
to take the remote hostname in addition to the existing resolved
Sockaddr parameter. The hostname is then passed into the ConnectionId
object, and plumbed through to the SASL client in place of the IP
address that was used previously.

The patch changes all of the construction sites of Proxy to fit the new
interface. In most of the test cases, we don't have real hostnames, so
we just use the dotted-decimal string form of the remote Sockaddr, which
matches the existing behavior.

In the real call sites, we have actual host names typically specified by
the user, and in those cases we'll need to pass those into the proxy. In
a few cases, they were conveniently available in the same function that
creates the proxy. In others, they are relatively far away, so this
patch just uses the dotted-decimal string and leaves TODOs.

In the case that Kerberos is not configured, this change should have no
effect since the hostname is ignored by SASL "plain". In the case that
Kerberos is configured with 'rdns=true', they also have no effect,
because the krb5 library will resolve and reverse the hostname from the
IP as it did before. In the case that 'rdns=false', this moves us one
step closer to fixing KUDU-2032 by getting a hostname into the SASL
library.

I verified that, if I set 'rdns = false' on a Kerberized client, I'm now
able to run  'kudu master status <host>' successfully where it would not
before. This tool uses a direct proxy instantiation where the hostname
was easy to plumb in. 'kudu table list <host>' still does not work because
it uses the client, which wasn't convenient to plumb quite yet.

Given that this makes incremental improvement towards fixing the issue
without any regression, and is already a fairly wide patch, I hope to
commit this and then address the remaining plumbing in a separate patch.

Change-Id: I96fb3c73382f0be6e30e29ae2e7176be42f3bb98
Reviewed-on: http://gerrit.cloudera.org:8080/7687
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/7897
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e4a2d226
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e4a2d226
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e4a2d226

Branch: refs/heads/master
Commit: e4a2d226a7649b29c346b3f4f072984a938af787
Parents: 2774d80
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Aug 15 18:20:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Sep 2 08:21:21 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/connection.h             | 15 +++---
 be/src/kudu/rpc/connection_id.cc         | 25 ++++++---
 be/src/kudu/rpc/connection_id.h          | 10 +++-
 be/src/kudu/rpc/exactly_once_rpc-test.cc |  3 +-
 be/src/kudu/rpc/mt-rpc-test.cc           |  6 ++-
 be/src/kudu/rpc/negotiation.cc           |  9 ++--
 be/src/kudu/rpc/protoc-gen-krpc.cc       | 11 ++--
 be/src/kudu/rpc/proxy.cc                 |  6 ++-
 be/src/kudu/rpc/proxy.h                  |  5 +-
 be/src/kudu/rpc/reactor.cc               |  5 +-
 be/src/kudu/rpc/rpc-bench.cc             |  4 +-
 be/src/kudu/rpc/rpc-test.cc              | 73 ++++++++++++++++++---------
 be/src/kudu/rpc/rpc_stub-test.cc         | 47 +++++++++--------
 be/src/kudu/util/net/net_util-test.cc    |  1 +
 be/src/kudu/util/net/sockaddr.cc         |  4 +-
 be/src/kudu/util/net/sockaddr.h          |  3 ++
 16 files changed, 142 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 7f16b7b..9a78c14 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -131,15 +131,17 @@ class Connection : public RefCountedThreadSafe<Connection> {
   const Sockaddr &remote() const { return remote_; }
 
   // Set the user credentials for an outbound connection.
-  void set_local_user_credentials(UserCredentials creds) {
+  void set_outbound_connection_id(ConnectionId conn_id) {
     DCHECK_EQ(direction_, CLIENT);
-    local_user_credentials_ = std::move(creds);
+    DCHECK(!outbound_connection_id_);
+    outbound_connection_id_ = std::move(conn_id);
   }
 
   // Get the user credentials which will be used to log in.
-  const UserCredentials& local_user_credentials() const {
+  const ConnectionId& outbound_connection_id() const {
     DCHECK_EQ(direction_, CLIENT);
-    return local_user_credentials_;
+    DCHECK(outbound_connection_id_);
+    return *outbound_connection_id_;
   }
 
   // Credentials policy to start connection negotiation.
@@ -288,8 +290,9 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // The socket we're communicating on.
   std::unique_ptr<Socket> socket_;
 
-  // The credentials of the user operating on this connection (if a client user).
-  UserCredentials local_user_credentials_;
+  // The ConnectionId that serves as a key into the client connection map
+  // within this reactor. Only set in the case of outbound connections.
+  boost::optional<ConnectionId> outbound_connection_id_;
 
   // The authenticated remote user (if this is an inbound connection on the server).
   RemoteUser remote_user_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
index a17b783..5e24086 100644
--- a/be/src/kudu/rpc/connection_id.cc
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -28,9 +28,13 @@ namespace rpc {
 
 ConnectionId::ConnectionId() {}
 
-ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
-  remote_ = remote;
-  user_credentials_ = std::move(user_credentials);
+ConnectionId::ConnectionId(const Sockaddr& remote,
+                           std::string hostname,
+                           UserCredentials user_credentials)
+    : remote_(remote),
+      hostname_(std::move(hostname)),
+      user_credentials_(std::move(user_credentials)) {
+  CHECK(!hostname_.empty());
 }
 
 void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
@@ -38,21 +42,30 @@ void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
 }
 
 string ConnectionId::ToString() const {
+  string remote;
+  if (hostname_ != remote_.host()) {
+    remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
+  } else {
+    remote = remote_.ToString();
+  }
+
   return strings::Substitute("{remote=$0, user_credentials=$1}",
-                             remote_.ToString(),
+                             remote,
                              user_credentials_.ToString());
 }
 
 size_t ConnectionId::HashCode() const {
   size_t seed = 0;
   boost::hash_combine(seed, remote_.HashCode());
+  boost::hash_combine(seed, hostname_);
   boost::hash_combine(seed, user_credentials_.HashCode());
   return seed;
 }
 
 bool ConnectionId::Equals(const ConnectionId& other) const {
-  return (remote() == other.remote()
-       && user_credentials().Equals(other.user_credentials()));
+  return remote() == other.remote() &&
+      hostname_ == other.hostname_ &&
+      user_credentials().Equals(other.user_credentials());
 }
 
 size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
index 09f1738..195b47c 100644
--- a/be/src/kudu/rpc/connection_id.h
+++ b/be/src/kudu/rpc/connection_id.h
@@ -35,11 +35,15 @@ class ConnectionId {
   ConnectionId(const ConnectionId& other) = default;
 
   // Convenience constructor.
-  ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
+  ConnectionId(const Sockaddr& remote,
+               std::string hostname,
+               UserCredentials user_credentials);
 
   // The remote address.
   const Sockaddr& remote() const { return remote_; }
 
+  const std::string& hostname() const { return hostname_; }
+
   // The credentials of the user associated with this connection, if any.
   void set_user_credentials(UserCredentials user_credentials);
 
@@ -58,6 +62,10 @@ class ConnectionId {
   // Remember to update HashCode() and Equals() when new fields are added.
   Sockaddr remote_;
 
+  // The original host name before it was resolved to 'remote_'.
+  // This must be retained since it is used to compute Kerberos Service Principal Names (SPNs).
+  std::string hostname_;
+
   UserCredentials user_credentials_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/exactly_once_rpc-test.cc b/be/src/kudu/rpc/exactly_once_rpc-test.cc
index 388919d..e16681d 100644
--- a/be/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/be/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -166,7 +166,8 @@ class ExactlyOnceRpcTest : public RpcTestBase {
     // Set up server.
     StartTestServerWithGeneratedCode(&server_addr_);
     client_messenger_ = CreateMessenger("Client");
-    proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_));
+    proxy_.reset(new CalculatorServiceProxy(
+        client_messenger_, server_addr_, server_addr_.host()));
     test_picker_.reset(new TestServerPicker(proxy_.get()));
     request_tracker_.reset(new RequestTracker(kClientId));
     attempt_nos_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/mt-rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc
index 73e3a13..10056ae 100644
--- a/be/src/kudu/rpc/mt-rpc-test.cc
+++ b/be/src/kudu/rpc/mt-rpc-test.cc
@@ -45,7 +45,8 @@ class MultiThreadedRpcTest : public RpcTestBase {
                   Status* result, CountDownLatch* latch) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
     shared_ptr<Messenger> client_messenger(CreateMessenger("ClientSC"));
-    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(client_messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
     *result = DoTestSyncCall(p, method_name);
     latch->CountDown();
   }
@@ -61,7 +62,8 @@ class MultiThreadedRpcTest : public RpcTestBase {
       Sockaddr server_addr, const char* method_name, Status* last_result,
       const shared_ptr<Messenger>& messenger) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
-    Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
 
     int i = 0;
     while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc
index 66a0112..0c5ca3f 100644
--- a/be/src/kudu/rpc/negotiation.cc
+++ b/be/src/kudu/rpc/negotiation.cc
@@ -169,11 +169,7 @@ static Status DoClientNegotiation(Connection* conn,
                                        authn_token,
                                        encryption);
 
-  // Note that the fqdn is an IP address here: we've already lost whatever DNS
-  // name the client was attempting to use. Unless krb5 is configured with 'rdns
-  // = false', it will automatically take care of reversing this address to its
-  // canonical hostname to determine the expected server principal.
-  client_negotiation.set_server_fqdn(conn->remote().host());
+  client_negotiation.set_server_fqdn(conn->outbound_connection_id().hostname());
 
   if (authentication != RpcAuthentication::DISABLED) {
     Status s = client_negotiation.EnableGSSAPI();
@@ -202,7 +198,8 @@ static Status DoClientNegotiation(Connection* conn,
   }
 
   if (authentication != RpcAuthentication::REQUIRED) {
-    RETURN_NOT_OK(client_negotiation.EnablePlain(conn->local_user_credentials().real_user(), ""));
+    const auto& creds = conn->outbound_connection_id().user_credentials();
+    RETURN_NOT_OK(client_negotiation.EnablePlain(creds.real_user(), ""));
   }
 
   client_negotiation.set_deadline(deadline);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/protoc-gen-krpc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/protoc-gen-krpc.cc b/be/src/kudu/rpc/protoc-gen-krpc.cc
index e892897..7873d08 100644
--- a/be/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/be/src/kudu/rpc/protoc-gen-krpc.cc
@@ -576,8 +576,9 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
       Print(printer, *subs,
         "class $service_name$Proxy : public ::kudu::rpc::Proxy {\n"
         " public:\n"
-        "  $service_name$Proxy(const std::shared_ptr< ::kudu::rpc::Messenger>\n"
-        "                &messenger, const ::kudu::Sockaddr &sockaddr);\n"
+        "  $service_name$Proxy(std::shared_ptr<::kudu::rpc::Messenger>\n"
+        "                messenger, const ::kudu::Sockaddr &sockaddr,"
+        "                std::string hostname);\n"
         "  ~$service_name$Proxy();\n"
         "\n"
         );
@@ -631,9 +632,9 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
       subs->PushService(service);
       Print(printer, *subs,
         "$service_name$Proxy::$service_name$Proxy(\n"
-        "   const std::shared_ptr< ::kudu::rpc::Messenger> &messenger,\n"
-        "   const ::kudu::Sockaddr &remote)\n"
-        "  : Proxy(messenger, remote, \"$full_service_name$\") {\n"
+        "   std::shared_ptr< ::kudu::rpc::Messenger> messenger,\n"
+        "   const ::kudu::Sockaddr &remote, std::string hostname)\n"
+        "  : Proxy(std::move(messenger), remote, std::move(hostname), \"$full_service_name$\") {\n"
         "}\n"
         "\n"
         "$service_name$Proxy::~$service_name$Proxy() {\n"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
index 0d946ed..4374cb1 100644
--- a/be/src/kudu/rpc/proxy.cc
+++ b/be/src/kudu/rpc/proxy.cc
@@ -48,7 +48,9 @@ namespace kudu {
 namespace rpc {
 
 Proxy::Proxy(std::shared_ptr<Messenger> messenger,
-             const Sockaddr& remote, string service_name)
+             const Sockaddr& remote,
+             string hostname,
+             string service_name)
     : service_name_(std::move(service_name)),
       messenger_(std::move(messenger)),
       is_started_(false) {
@@ -66,7 +68,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
 
   UserCredentials creds;
   creds.set_real_user(std::move(real_user));
-  conn_id_ = ConnectionId(remote, std::move(creds));
+  conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
 }
 
 Proxy::~Proxy() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h
index 6e43e93..a3d852e 100644
--- a/be/src/kudu/rpc/proxy.h
+++ b/be/src/kudu/rpc/proxy.h
@@ -55,8 +55,11 @@ class Messenger;
 // After initialization, multiple threads may make calls using the same proxy object.
 class Proxy {
  public:
-  Proxy(std::shared_ptr<Messenger> messenger, const Sockaddr& remote,
+  Proxy(std::shared_ptr<Messenger> messenger,
+        const Sockaddr& remote,
+        std::string hostname,
         std::string service_name);
+
   ~Proxy();
 
   // Call a remote method asynchronously.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index f55cca0..b935e68 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -448,7 +448,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   // Register the new connection in our map.
   *conn = new Connection(
       this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
-  (*conn)->set_local_user_credentials(conn_id.user_credentials());
+  (*conn)->set_outbound_connection_id(conn_id);
 
   // Kick off blocking client connection negotiation.
   Status s = StartConnectionNegotiation(*conn);
@@ -546,8 +546,7 @@ void ReactorThread::DestroyConnection(Connection *conn,
 
   // Unlink connection from lists.
   if (conn->direction() == Connection::CLIENT) {
-    ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
-    const auto range = client_conns_.equal_range(conn_id);
+    const auto range = client_conns_.equal_range(conn->outbound_connection_id());
     CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
     // The client_conns_ container is a multi-map.
     for (auto it = range.first; it != range.second;) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc-bench.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-bench.cc b/be/src/kudu/rpc/rpc-bench.cc
index d569ea1..47be939 100644
--- a/be/src/kudu/rpc/rpc-bench.cc
+++ b/be/src/kudu/rpc/rpc-bench.cc
@@ -129,7 +129,7 @@ class ClientThread {
   void Run() {
     shared_ptr<Messenger> client_messenger = bench_->CreateMessenger("Client");
 
-    CalculatorServiceProxy p(client_messenger, bench_->server_addr_);
+    CalculatorServiceProxy p(client_messenger, bench_->server_addr_, "localhost");
 
     AddRequestPB req;
     AddResponsePB resp;
@@ -182,7 +182,7 @@ class ClientAsyncWorkload {
       messenger_(std::move(messenger)),
       request_count_(0) {
     controller_.set_timeout(MonoDelta::FromSeconds(10));
-    proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_));
+    proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_, "localhost"));
   }
 
   void CallOneRpc() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 3989558..a7865ec 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -130,7 +130,8 @@ TEST_P(TestRpc, TestNegotiationDeadlock) {
   Sockaddr server_addr;
   StartTestServerWithCustomMessenger(&server_addr, messenger, enable_ssl);
 
-  Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 }
 
@@ -144,7 +145,8 @@ TEST_P(TestRpc, TestCall) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -170,7 +172,8 @@ TEST_P(TestRpc, TestCallWithChainCert) {
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -199,7 +202,8 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
                                                             "{remote=$0, user_credentials=",
                                                         server_addr.ToString()));
@@ -234,7 +238,8 @@ TEST_P(TestRpc, TestCallToBadServer) {
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
   Sockaddr addr;
   addr.set_port(0);
-  Proxy p(client_messenger, addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, addr, addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Loop a few calls to make sure that we properly set up and tear down
   // the connections.
@@ -255,7 +260,8 @@ TEST_P(TestRpc, TestInvalidMethodCall) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Call the method which fails.
   Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
@@ -273,7 +279,7 @@ TEST_P(TestRpc, TestWrongService) {
 
   // Set up client with the wrong service name.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, "WrongServiceName");
+  Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName");
 
   // Call the method which fails.
   Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
@@ -308,7 +314,8 @@ TEST_P(TestRpc, TestHighFDs) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 }
 
@@ -327,7 +334,8 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
 
@@ -374,7 +382,8 @@ TEST_P(TestRpc, TestReopenOutboundConnections) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Verify the initial counters.
   ReactorMetrics metrics;
@@ -415,7 +424,8 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Verify the initial counters.
   ReactorMetrics metrics;
@@ -483,7 +493,8 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Make a call which sleeps longer than the keepalive.
   RpcController controller;
@@ -504,7 +515,8 @@ TEST_P(TestRpc, TestRpcSidecar) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Test a zero-length sidecar
   DoTestSidecar(p, 0, 0);
@@ -543,7 +555,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
 
     // Set up client.
     shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
-    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+    Proxy p(client_messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
 
     RpcController controller;
     string s(FLAGS_rpc_max_message_size + 1, 'a');
@@ -574,7 +587,8 @@ TEST_P(TestRpc, TestCallTimeout) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Test a very short timeout - we expect this will time out while the
   // call is still trying to connect, or in the send queue. This was triggering ASAN failures
@@ -602,7 +616,8 @@ TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) {
   bool enable_ssl = GetParam();
   StartTestServer(&server_addr, enable_ssl);
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   FLAGS_rpc_negotiation_inject_delay_ms = 500;
   ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50)));
@@ -645,7 +660,8 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   bool is_negotiation_error = false;
   ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(
@@ -666,7 +682,8 @@ TEST_F(TestRpc, TestServerShutsDown) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   // Send a call.
   AddRequestPB req;
@@ -750,7 +767,8 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   RpcController controller;
   SleepRequestPB req;
@@ -798,7 +816,7 @@ TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) {
   RpcController controller;
   controller.set_timeout(MonoDelta::FromMilliseconds(1));
   {
-    Proxy p(client_messenger, bad_addr, "xxx");
+    Proxy p(client_messenger, bad_addr, "xxx-host", "xxx-service");
     p.AsyncRequest("my-fake-method", req, &resp, &controller,
                    boost::bind(&DestroyMessengerCallback, &client_messenger, &latch));
   }
@@ -817,7 +835,8 @@ TEST_P(TestRpc, TestRpcContextClientDeadline) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   SleepRequestPB req;
   req.set_sleep_micros(sleep_micros);
@@ -843,7 +862,8 @@ TEST_P(TestRpc, TestApplicationFeatureFlag) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   { // Supported flag
     AddRequestPB req;
@@ -884,7 +904,8 @@ TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
 
   // Set up client.
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          CalculatorService::static_service_name());
 
   { // Required flag
     AddRequestPB req;
@@ -919,7 +940,8 @@ TEST_P(TestRpc, TestCancellation) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   for (int i = OutboundCall::READY; i <= OutboundCall::FINISHED_SUCCESS; ++i) {
     FLAGS_rpc_inject_cancellation_state = i;
@@ -981,7 +1003,8 @@ TEST_P(TestRpc, TestCancellationAsync) {
   // Set up client.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
-  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
 
   RpcController controller;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_stub-test.cc b/be/src/kudu/rpc/rpc_stub-test.cc
index 2fe0708..ccb4fd1 100644
--- a/be/src/kudu/rpc/rpc_stub-test.cc
+++ b/be/src/kudu/rpc/rpc_stub-test.cc
@@ -60,7 +60,7 @@ class RpcStubTest : public RpcTestBase {
   }
  protected:
   void SendSimpleCall() {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
     RpcController controller;
     AddRequestPB req;
@@ -84,7 +84,7 @@ TEST_F(RpcStubTest, TestSimpleCall) {
 // reads and then makes a number of calls.
 TEST_F(RpcStubTest, TestShortRecvs) {
   FLAGS_socket_inject_short_recvs = true;
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   for (int i = 0; i < 100; i++) {
     NO_FATALS(SendSimpleCall());
@@ -102,7 +102,7 @@ TEST_F(RpcStubTest, TestBigCallData) {
   string data;
   data.resize(kMessageSize);
 
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   EchoRequestPB req;
   req.set_data(data);
@@ -127,7 +127,7 @@ TEST_F(RpcStubTest, TestBigCallData) {
 }
 
 TEST_F(RpcStubTest, TestRespondDeferred) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   SleepRequestPB req;
@@ -139,7 +139,7 @@ TEST_F(RpcStubTest, TestRespondDeferred) {
 
 // Test that the default user credentials are propagated to the server.
 TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   string expected;
   ASSERT_OK(GetLoggedInUser(&expected));
@@ -155,7 +155,7 @@ TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
 // Test that the user can specify other credentials.
 TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
   const char* const kFakeUserName = "some fake user";
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   UserCredentials creds;
   creds.set_real_user(kFakeUserName);
@@ -172,7 +172,7 @@ TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
 TEST_F(RpcStubTest, TestAuthorization) {
   // First test calling WhoAmI() as user "alice", who is disallowed.
   {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     UserCredentials creds;
     creds.set_real_user("alice");
     p.set_user_credentials(creds);
@@ -189,7 +189,7 @@ TEST_F(RpcStubTest, TestAuthorization) {
 
   // Try some calls as "bob".
   {
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     UserCredentials creds;
     creds.set_real_user("bob");
     p.set_user_credentials(creds);
@@ -217,7 +217,7 @@ TEST_F(RpcStubTest, TestAuthorization) {
 
 // Test that the user's remote address is accessible to the server.
 TEST_F(RpcStubTest, TestRemoteAddress) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   WhoAmIRequestPB req;
@@ -233,7 +233,8 @@ TEST_F(RpcStubTest, TestRemoteAddress) {
 // Test sending a PB parameter with a missing field, where the client
 // thinks it has sent a full PB. (eg due to version mismatch)
 TEST_F(RpcStubTest, TestCallWithInvalidParam) {
-  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+  Proxy p(client_messenger_, server_addr_, server_addr_.host(),
+          CalculatorService::static_service_name());
 
   rpc_test::AddRequestPartialPB req;
   req.set_x(rand());
@@ -258,7 +259,7 @@ static void DoIncrement(Atomic32* count) {
 // This also ensures that the async callback is only called once
 // (regression test for a previously-encountered bug).
 TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   AddRequestPB req;
@@ -278,7 +279,7 @@ TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
 }
 
 TEST_F(RpcStubTest, TestResponseWithMissingField) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController rpc;
   TestInvalidResponseRequestPB req;
@@ -293,7 +294,7 @@ TEST_F(RpcStubTest, TestResponseWithMissingField) {
 // configured RPC message size. The server should send the response, but the client
 // will reject it.
 TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController rpc;
   TestInvalidResponseRequestPB req;
@@ -305,7 +306,8 @@ TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
 
 // Test sending a call which isn't implemented by the server.
 TEST_F(RpcStubTest, TestCallMissingMethod) {
-  Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name());
+  Proxy p(client_messenger_, server_addr_, server_addr_.host(),
+          CalculatorService::static_service_name());
 
   Status s = DoTestSyncCall(p, "DoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
@@ -313,7 +315,7 @@ TEST_F(RpcStubTest, TestCallMissingMethod) {
 }
 
 TEST_F(RpcStubTest, TestApplicationError) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   RpcController controller;
   SleepRequestPB req;
@@ -377,7 +379,7 @@ TEST_F(RpcStubTest, TestRpcPanic) {
     CHECK_OK(env_->DeleteRecursively(test_dir_));
 
     // Make an RPC which causes the server to abort.
-    CalculatorServiceProxy p(client_messenger_, server_addr_);
+    CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
     RpcController controller;
     PanicRequestPB req;
     PanicResponsePB resp;
@@ -395,7 +397,7 @@ struct AsyncSleep {
 };
 
 TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
   vector<AsyncSleep*> sleeps;
   ElementDeleter d(&sleeps);
 
@@ -459,7 +461,8 @@ TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
   for (int thread_id = 0; thread_id < num_client_threads; thread_id++) {
     threads.emplace_back([&, thread_id] {
         Random rng(thread_id);
-        CalculatorServiceProxy p(client_messenger_, server_addr_);
+        CalculatorServiceProxy p(
+            client_messenger_, server_addr_, server_addr_.host());
         while (!done.load()) {
           // Set a deadline in the future. We'll keep using this same deadline
           // on each of our retries.
@@ -517,7 +520,7 @@ TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
 }
 
 TEST_F(RpcStubTest, TestDumpCallsInFlight) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
   AsyncSleep sleep;
   sleep.req.set_sleep_micros(100 * 1000); // 100ms
   p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
@@ -561,7 +564,7 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
 }
 
 TEST_F(RpcStubTest, TestDumpSampledCalls) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   // Issue two calls that fall into different latency buckets.
   AsyncSleep sleeps[2];
@@ -618,7 +621,7 @@ void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refp
 // is held. This is important when the callback holds a refcounted ptr,
 // since we expect to be able to release that pointer when the call is done.
 TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   CountDownLatch latch(1);
   scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest);
@@ -647,7 +650,7 @@ TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) {
       << "This test requires only a single reactor. Otherwise the injected sleep might "
       << "be scheduled on a different reactor than the RPC call.";
 
-  CalculatorServiceProxy p(client_messenger_, server_addr_);
+  CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
 
   // Schedule a 1-second sleep on the reactor thread.
   //

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/net_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/net_util-test.cc b/be/src/kudu/util/net/net_util-test.cc
index c77b054..106a020 100644
--- a/be/src/kudu/util/net/net_util-test.cc
+++ b/be/src/kudu/util/net/net_util-test.cc
@@ -53,6 +53,7 @@ TEST(SockaddrTest, Test) {
   Sockaddr addr;
   ASSERT_OK(addr.ParseString("1.1.1.1:12345", 12345));
   ASSERT_EQ(12345, addr.port());
+  ASSERT_EQ("1.1.1.1", addr.host());
 }
 
 TEST_F(NetUtilTest, TestParseAddresses) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/sockaddr.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc
index ed249c7..a462c05 100644
--- a/be/src/kudu/util/net/sockaddr.cc
+++ b/be/src/kudu/util/net/sockaddr.cc
@@ -98,9 +98,7 @@ const struct sockaddr_in& Sockaddr::addr() const {
 }
 
 std::string Sockaddr::ToString() const {
-  char str[INET_ADDRSTRLEN];
-  ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
-  return StringPrintf("%s:%d", str, port());
+  return Substitute("$0:$1", host(), port());
 }
 
 bool Sockaddr::IsWildcard() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e4a2d226/be/src/kudu/util/net/sockaddr.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h
index 09777f3..01506e7 100644
--- a/be/src/kudu/util/net/sockaddr.h
+++ b/be/src/kudu/util/net/sockaddr.h
@@ -54,11 +54,14 @@ class Sockaddr {
 
   uint32_t HashCode() const;
 
+  // Returns the dotted-decimal string '1.2.3.4' of the host component of this address.
   std::string host() const;
 
   void set_port(int port);
   int port() const;
   const struct sockaddr_in& addr() const;
+
+  // Returns the stringified address in '1.2.3.4:<port>' format.
   std::string ToString() const;
 
   // Returns true if the address is 0.0.0.0



[3/9] incubator-impala git commit: rpc: move ConnectionId to its own file

Posted by ta...@apache.org.
rpc: move ConnectionId to its own file

This class was previously implemented inside of outbound_call.{cc,h}
where it didn't quite belong. In the several years since the code was
initially written we've moved more towards a "single class per header"
model unless the classes are truly trivial or really tightly coupled.
Neither is the case here.

Change-Id: I5343c2f6ad305b5927d71457d5d19e3799052ec9
Reviewed-on: http://gerrit.cloudera.org:8080/7685
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/7895
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Sailesh Mukil <sa...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7d41b96b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7d41b96b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7d41b96b

Branch: refs/heads/master
Commit: 7d41b96b23c2a084247f38dcdf068053e46e432c
Parents: e7bd0ce
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Aug 15 15:06:35 2017 -0700
Committer: Sailesh Mukil <sa...@cloudera.com>
Committed: Fri Sep 1 03:11:43 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/CMakeLists.txt   |  1 +
 be/src/kudu/rpc/connection_id.cc | 85 +++++++++++++++++++++++++++++++++++
 be/src/kudu/rpc/connection_id.h  | 81 +++++++++++++++++++++++++++++++++
 be/src/kudu/rpc/outbound_call.cc | 63 +-------------------------
 be/src/kudu/rpc/outbound_call.h  | 60 ++-----------------------
 be/src/kudu/rpc/proxy.h          |  2 +-
 be/src/kudu/rpc/reactor.h        |  2 +
 be/src/kudu/rpc/rpc_context.cc   |  1 -
 8 files changed, 175 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt
index 643b854..f15e017 100644
--- a/be/src/kudu/rpc/CMakeLists.txt
+++ b/be/src/kudu/rpc/CMakeLists.txt
@@ -45,6 +45,7 @@ set(KRPC_SRCS
     blocking_ops.cc
     client_negotiation.cc
     connection.cc
+    connection_id.cc
     constants.cc
     inbound_call.cc
     messenger.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
new file mode 100644
index 0000000..e4a4dba
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/connection_id.h"
+
+#include <boost/functional/hash.hpp>
+
+#include "kudu/gutil/stringprintf.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+ConnectionId::ConnectionId() {}
+
+ConnectionId::ConnectionId(const ConnectionId& other) {
+  DoCopyFrom(other);
+}
+
+ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
+  remote_ = remote;
+  user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::set_remote(const Sockaddr& remote) {
+  remote_ = remote;
+}
+
+void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
+  user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::CopyFrom(const ConnectionId& other) {
+  DoCopyFrom(other);
+}
+
+string ConnectionId::ToString() const {
+  // Does not print the password.
+  return StringPrintf("{remote=%s, user_credentials=%s}",
+      remote_.ToString().c_str(),
+      user_credentials_.ToString().c_str());
+}
+
+void ConnectionId::DoCopyFrom(const ConnectionId& other) {
+  remote_ = other.remote_;
+  user_credentials_ = other.user_credentials_;
+}
+
+size_t ConnectionId::HashCode() const {
+  size_t seed = 0;
+  boost::hash_combine(seed, remote_.HashCode());
+  boost::hash_combine(seed, user_credentials_.HashCode());
+  return seed;
+}
+
+bool ConnectionId::Equals(const ConnectionId& other) const {
+  return (remote() == other.remote()
+       && user_credentials().Equals(other.user_credentials()));
+}
+
+size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
+  return conn_id.HashCode();
+}
+
+bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
+  return cid1.Equals(cid2);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
new file mode 100644
index 0000000..ae34b29
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.h
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/util/net/sockaddr.h"
+
+namespace kudu {
+namespace rpc {
+
+// Used to key on Connection information.
+// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
+// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
+class ConnectionId {
+ public:
+  ConnectionId();
+
+  // Copy constructor required for use with STL unordered_map.
+  ConnectionId(const ConnectionId& other);
+
+  // Convenience constructor.
+  ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
+
+  // The remote address.
+  void set_remote(const Sockaddr& remote);
+  const Sockaddr& remote() const { return remote_; }
+
+  // The credentials of the user associated with this connection, if any.
+  void set_user_credentials(UserCredentials user_credentials);
+  const UserCredentials& user_credentials() const { return user_credentials_; }
+  UserCredentials* mutable_user_credentials() { return &user_credentials_; }
+
+  // Copy state from another object to this one.
+  void CopyFrom(const ConnectionId& other);
+
+  // Returns a string representation of the object, not including the password field.
+  std::string ToString() const;
+
+  size_t HashCode() const;
+  bool Equals(const ConnectionId& other) const;
+
+ private:
+  // Remember to update HashCode() and Equals() when new fields are added.
+  Sockaddr remote_;
+  UserCredentials user_credentials_;
+
+  // Implementation of CopyFrom that can be shared with copy constructor.
+  void DoCopyFrom(const ConnectionId& other);
+
+  // Disable assignment operator.
+  void operator=(const ConnectionId&);
+};
+
+class ConnectionIdHash {
+ public:
+  std::size_t operator() (const ConnectionId& conn_id) const;
+};
+
+class ConnectionIdEqual {
+ public:
+  bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc
index bcc39c3..8e248d2 100644
--- a/be/src/kudu/rpc/outbound_call.cc
+++ b/be/src/kudu/rpc/outbound_call.cc
@@ -16,7 +16,6 @@
 // under the License.
 
 #include <algorithm>
-#include <boost/functional/hash.hpp>
 #include <gflags/gflags.h>
 #include <memory>
 #include <mutex>
@@ -24,10 +23,11 @@
 #include <unordered_set>
 #include <vector>
 
+#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
-#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/constants.h"
+#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
@@ -481,65 +481,6 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
 }
 
 ///
-/// ConnectionId
-///
-
-ConnectionId::ConnectionId() {}
-
-ConnectionId::ConnectionId(const ConnectionId& other) {
-  DoCopyFrom(other);
-}
-
-ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
-  remote_ = remote;
-  user_credentials_ = std::move(user_credentials);
-}
-
-void ConnectionId::set_remote(const Sockaddr& remote) {
-  remote_ = remote;
-}
-
-void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
-  user_credentials_ = std::move(user_credentials);
-}
-
-void ConnectionId::CopyFrom(const ConnectionId& other) {
-  DoCopyFrom(other);
-}
-
-string ConnectionId::ToString() const {
-  // Does not print the password.
-  return StringPrintf("{remote=%s, user_credentials=%s}",
-      remote_.ToString().c_str(),
-      user_credentials_.ToString().c_str());
-}
-
-void ConnectionId::DoCopyFrom(const ConnectionId& other) {
-  remote_ = other.remote_;
-  user_credentials_ = other.user_credentials_;
-}
-
-size_t ConnectionId::HashCode() const {
-  size_t seed = 0;
-  boost::hash_combine(seed, remote_.HashCode());
-  boost::hash_combine(seed, user_credentials_.HashCode());
-  return seed;
-}
-
-bool ConnectionId::Equals(const ConnectionId& other) const {
-  return (remote() == other.remote()
-       && user_credentials().Equals(other.user_credentials()));
-}
-
-size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
-  return conn_id.HashCode();
-}
-
-bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
-  return cid1.Equals(cid2);
-}
-
-///
 /// CallResponse
 ///
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h
index 221c368..0ab1553 100644
--- a/be/src/kudu/rpc/outbound_call.h
+++ b/be/src/kudu/rpc/outbound_call.h
@@ -25,16 +25,15 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/rpc/connection_id.h"
 #include "kudu/rpc/constants.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/transfer.h"
-#include "kudu/rpc/user_credentials.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/sockaddr.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -57,59 +56,6 @@ class RpcCallInProgressPB;
 class RpcController;
 class RpcSidecar;
 
-// Used to key on Connection information.
-// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
-// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
-class ConnectionId {
- public:
-  ConnectionId();
-
-  // Copy constructor required for use with STL unordered_map.
-  ConnectionId(const ConnectionId& other);
-
-  // Convenience constructor.
-  ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
-
-  // The remote address.
-  void set_remote(const Sockaddr& remote);
-  const Sockaddr& remote() const { return remote_; }
-
-  // The credentials of the user associated with this connection, if any.
-  void set_user_credentials(UserCredentials user_credentials);
-  const UserCredentials& user_credentials() const { return user_credentials_; }
-  UserCredentials* mutable_user_credentials() { return &user_credentials_; }
-
-  // Copy state from another object to this one.
-  void CopyFrom(const ConnectionId& other);
-
-  // Returns a string representation of the object, not including the password field.
-  std::string ToString() const;
-
-  size_t HashCode() const;
-  bool Equals(const ConnectionId& other) const;
-
- private:
-  // Remember to update HashCode() and Equals() when new fields are added.
-  Sockaddr remote_;
-  UserCredentials user_credentials_;
-
-  // Implementation of CopyFrom that can be shared with copy constructor.
-  void DoCopyFrom(const ConnectionId& other);
-
-  // Disable assignment operator.
-  void operator=(const ConnectionId&);
-};
-
-class ConnectionIdHash {
- public:
-  std::size_t operator() (const ConnectionId& conn_id) const;
-};
-
-class ConnectionIdEqual {
- public:
-  bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
-};
-
 // Tracks the status of a call on the client side.
 //
 // This is an internal-facing class -- clients interact with the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h
index ddbbe60..6e43e93 100644
--- a/be/src/kudu/rpc/proxy.h
+++ b/be/src/kudu/rpc/proxy.h
@@ -22,7 +22,7 @@
 #include <string>
 
 #include "kudu/gutil/atomicops.h"
-#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/connection_id.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index dcdc8a2..2c1bfc8 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -32,6 +32,8 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/connection.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/locks.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7d41b96b/be/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc
index 06fd8c5..d6d872e 100644
--- a/be/src/kudu/rpc/rpc_context.cc
+++ b/be/src/kudu/rpc/rpc_context.cc
@@ -21,7 +21,6 @@
 #include <ostream>
 #include <sstream>
 
-#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/result_tracker.h"


[9/9] incubator-impala git commit: IMPALA-5891: fix PeriodicCounterUpdater initialization

Posted by ta...@apache.org.
IMPALA-5891: fix PeriodicCounterUpdater initialization

Avoid running static destructors and constructors to avoid the potential
for startup and teardown races and hard-to-understand behaviour.

Testing:
Ran core tests.

Change-Id: Ieede9fa194605fb53033033959110f3ef12f18c3
Reviewed-on: http://gerrit.cloudera.org:8080/7942
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/796db0fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/796db0fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/796db0fc

Branch: refs/heads/master
Commit: 796db0fce374c9aff0ec6e1e82862db012b8854a
Parents: ac8a72f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Sep 1 14:36:23 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Sep 5 21:35:20 2017 +0000

----------------------------------------------------------------------
 be/src/common/init.cc                   |  3 ++
 be/src/util/periodic-counter-updater.cc | 58 ++++++++++++++--------------
 be/src/util/periodic-counter-updater.h  | 10 ++---
 3 files changed, 38 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 6b48bb8..f2df173 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -45,6 +45,7 @@
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/os-info.h"
+#include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
 #include "util/test-info.h"
@@ -209,6 +210,8 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 
   pause_monitor.reset(new Thread("common", "pause-monitor", &PauseMonitorLoop));
 
+  PeriodicCounterUpdater::Init();
+
   LOG(INFO) << impala::GetVersionString();
   LOG(INFO) << "Using hostname: " << FLAGS_hostname;
   impala::LogCommandLineFlags();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 0a8a8f7..b1c755a 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -32,12 +32,14 @@ namespace impala {
 DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
     " sampling counters in ms");
 
-PeriodicCounterUpdater PeriodicCounterUpdater::state_;
-
-PeriodicCounterUpdater::PeriodicCounterUpdater() {
-  DCHECK_EQ(this, &state_);
-  state_.update_thread_.reset(
-      new thread(&PeriodicCounterUpdater::UpdateLoop, this));
+PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
+
+void PeriodicCounterUpdater::Init() {
+  DCHECK(instance_ == nullptr);
+  // Create the singleton, which will live until the process terminates.
+  instance_ = new PeriodicCounterUpdater;
+  instance_->update_thread_.reset(
+      new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
 }
 
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
@@ -52,8 +54,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.src_counter = src_counter;
       counter.sample_fn = sample_fn;
       counter.elapsed_ms = 0;
-      lock_guard<SpinLock> ratelock(state_.rate_lock_);
-      state_.rate_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+      instance_->rate_counters_[dst_counter] = counter;
       break;
     }
     case SAMPLING_COUNTER: {
@@ -62,8 +64,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.sample_fn = sample_fn;
       counter.num_sampled = 0;
       counter.total_sampled_value = 0;
-      lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
-      state_.sampling_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+      instance_->sampling_counters_[dst_counter] = counter;
       break;
     }
     default:
@@ -72,13 +74,13 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
 }
 
 void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> ratelock(state_.rate_lock_);
-  state_.rate_counters_.erase(counter);
+  lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+  instance_->rate_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
-  state_.sampling_counters_.erase(counter);
+  lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+  instance_->sampling_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::RegisterBucketingCounters(
@@ -86,20 +88,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters(
   BucketCountersInfo info;
   info.src_counter = src_counter;
   info.num_sampled = 0;
-  lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
-  state_.bucketing_counters_[buckets] = info;
+  lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+  instance_->bucketing_counters_[buckets] = info;
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
     vector<RuntimeProfile::Counter*>* buckets, bool convert) {
   int64_t num_sampled = 0;
   {
-    lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
+    lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
     BucketCountersMap::iterator itr =
-        state_.bucketing_counters_.find(buckets);
-    if (itr != state_.bucketing_counters_.end()) {
+        instance_->bucketing_counters_.find(buckets);
+    if (itr != instance_->bucketing_counters_.end()) {
       num_sampled = itr->second.num_sampled;
-      state_.bucketing_counters_.erase(itr);
+      instance_->bucketing_counters_.erase(itr);
     }
   }
 
@@ -114,14 +116,14 @@ void PeriodicCounterUpdater::StopBucketingCounters(
 
 void PeriodicCounterUpdater::RegisterTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
-  state_.time_series_counters_.insert(counter);
+  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+  instance_->time_series_counters_.insert(counter);
 }
 
 void PeriodicCounterUpdater::StopTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
-  state_.time_series_counters_.erase(counter);
+  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+  instance_->time_series_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::UpdateLoop() {
@@ -132,7 +134,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     int elapsed_ms = elapsed.total_milliseconds();
 
     {
-      lock_guard<SpinLock> ratelock(state_.rate_lock_);
+      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
         it->second.elapsed_ms += elapsed_ms;
@@ -149,7 +151,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> samplinglock(state_.sampling_lock_);
+      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
       for (SamplingCounterMap::iterator it = sampling_counters_.begin();
            it != sampling_counters_.end(); ++it) {
         ++it->second.num_sampled;
@@ -168,7 +170,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> bucketinglock(state_.bucketing_lock_);
+      lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
       for (BucketCountersMap::iterator it = bucketing_counters_.begin();
            it != bucketing_counters_.end(); ++it) {
         int64_t val = it->second.src_counter->value();
@@ -179,7 +181,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> timeserieslock(state_.time_series_lock_);
+      lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
       for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
            it != time_series_counters_.end(); ++it) {
         (*it)->AddSample(elapsed_ms);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/796db0fc/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 893b887..c603522 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -44,6 +44,10 @@ class PeriodicCounterUpdater {
     SAMPLING_COUNTER,
   };
 
+  // Sets up data structures and starts the counter update thread. Should only be called
+  // once during process startup and must be called before other methods.
+  static void Init();
+
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
   /// is updated, it either gets the value from the dst_counter or calls the sample
@@ -96,10 +100,6 @@ class PeriodicCounterUpdater {
     /// TODO: customize bucketing
   };
 
-  // Starts the counter update thread. We only have a single static object, so this
-  // is executed automatically when the process starts up.
-  PeriodicCounterUpdater();
-
   /// Loop for periodic counter update thread.  This thread wakes up once in a while
   /// and updates all the added rate counters and sampling counters.
   [[noreturn]] void UpdateLoop();
@@ -140,7 +140,7 @@ class PeriodicCounterUpdater {
 
   /// Singleton object that keeps track of all rate counters and the thread
   /// for updating them.
-  static PeriodicCounterUpdater state_;
+  static PeriodicCounterUpdater* instance_;
 };
 
 }