You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/19 14:53:03 UTC

[3/4] impala git commit: IMPALA-7298: Stop passing IP address as hostname in Kerberos principal

IMPALA-7298: Stop passing IP address as hostname in Kerberos principal

Previously, we pass the resolved IP address of a KRPC destination
host as the hostname when creating a proxy for making KRPC calls.
This may lead to connection negotiation failure in KRPC when Kerberos
is enabled. In particular, if reversed DNS isn't enabled in Kerberos,
KDC may fail to look up the principal of the destination host if the
principal includes the hostname instead of resolved IP address.

This change fixes the problem above by passing the actual hostname
of the destination host when calling RpcMgr::GetProxy().

rpc-mgr-kerberized-test.cc is also updated to use hostname
instead of the resolved IP address as Kerberos principal.

Change-Id: I3e3e978746cf03766eee151835aad5877d9ed63e
Reviewed-on: http://gerrit.cloudera.org:8080/10980
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c7d2c2ec
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c7d2c2ec
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c7d2c2ec

Branch: refs/heads/master
Commit: c7d2c2ec73a5e2073de22e10742faa0553c5018d
Parents: 940d536
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Jul 17 16:46:55 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Jul 19 05:00:10 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-mgr-kerberized-test.cc      |  2 +-
 be/src/rpc/rpc-mgr-test-base.h             |  6 ++++--
 be/src/rpc/rpc-mgr-test.cc                 |  5 +++--
 be/src/rpc/rpc-mgr.h                       | 10 +++++-----
 be/src/rpc/rpc-mgr.inline.h                |  5 +++--
 be/src/runtime/data-stream-sender.cc       |  2 +-
 be/src/runtime/data-stream-test.cc         |  6 +++---
 be/src/runtime/krpc-data-stream-sender.cc  | 14 ++++++++------
 be/src/scheduling/scheduler.cc             |  4 ++--
 common/thrift/ImpalaInternalService.thrift |  6 +++---
 10 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 6fdb1de..0f1f3bb 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -89,7 +89,7 @@ int main(int argc, char** argv) {
   impala::IpAddr ip;
   impala::Status status = impala::HostnameToIpAddr(FLAGS_hostname, &ip);
   DCHECK(status.ok());
-  kdc_principal = Substitute("impala-test/$0", ip);
+  kdc_principal = Substitute("impala-test/$0", FLAGS_hostname);
   kdc_realm = "KRBTEST.COM";
 
   int port = impala::FindUnusedEphemeralPort();

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
index bbc92a0..258bd4b 100644
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -246,10 +246,12 @@ Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
   RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
 
   unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
+      &ping_proxy));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
+      &scan_mem_proxy));
 
   RpcController controller;
   srand(0);

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 1e11099..6a774bd 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -191,7 +191,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
   ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
 
   unique_ptr<PingServiceProxy> proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &proxy));
+  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, FLAGS_hostname, &proxy));
 
   PingRequestPB request;
   PingResponsePB response;
@@ -211,7 +211,8 @@ TEST_F(RpcMgrTest, AsyncCall) {
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, FLAGS_hostname,
+      &scan_mem_proxy));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index c7107d2..c25f754 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -133,12 +133,12 @@ class RpcMgr {
       kudu::rpc::GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
-  /// Creates a new proxy for a remote service of type P at location 'address', and places
-  /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must
-  /// be a resolved IP address.
+  /// Creates a new proxy for a remote service of type P at location 'address' with
+  /// hostname 'hostname' and places it in 'proxy'. 'P' must descend from
+  /// kudu::rpc::ServiceIf. Note that 'address' must be a resolved IP address.
   template <typename P>
-  Status GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy)
-      WARN_UNUSED_RESULT;
+  Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
+      std::unique_ptr<P>* proxy) WARN_UNUSED_RESULT;
 
   /// Shut down all previously registered services. All service pools are shut down.
   /// All acceptor and reactor threads within the messenger are also shut down.

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/rpc/rpc-mgr.inline.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
index 474ac45..934b28b 100644
--- a/be/src/rpc/rpc-mgr.inline.h
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -30,13 +30,14 @@ namespace impala {
 
 /// Always inline to avoid having to provide a definition for each use type P.
 template <typename P>
-Status RpcMgr::GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy) {
+Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostname,
+    std::unique_ptr<P>* proxy) {
   DCHECK(proxy != nullptr);
   DCHECK(is_inited()) << "Must call Init() before GetProxy()";
   DCHECK(IsResolvedAddress(address));
   kudu::Sockaddr sockaddr;
   RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
-  proxy->reset(new P(messenger_, sockaddr, address.hostname));
+  proxy->reset(new P(messenger_, sockaddr, hostname));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index a37c3c1..ad500e9 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -353,7 +353,7 @@ DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor* row_desc,
   // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
-        new Channel(this, row_desc, destinations[i].server,
+        new Channel(this, row_desc, destinations[i].thrift_backend,
             destinations[i].fragment_instance_id, sink.dest_node_id,
             per_channel_buffer_size));
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 8851d1a..9f866ba 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -358,10 +358,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     dest_.push_back(TPlanFragmentDestination());
     TPlanFragmentDestination& dest = dest_.back();
     dest.fragment_instance_id = next_instance_id_;
-    dest.server.hostname = "127.0.0.1";
-    dest.server.port = FLAGS_port;
+    dest.thrift_backend.hostname = "localhost";
+    dest.thrift_backend.port = FLAGS_port;
     if (GetParam() == USE_KRPC) {
-      dest.__set_krpc_server(krpc_address_);
+      dest.__set_krpc_backend(krpc_address_);
     }
     *instance_id = next_instance_id_;
     ++next_instance_id_.lo;

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index b362758..e6cea1f 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -105,10 +105,11 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // data is getting accumulated before being sent; it only applies when data is added via
   // AddRow() and not sent directly via SendBatch().
   Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
-      const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int buffer_size)
+      const std::string& hostname, const TNetworkAddress& destination,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
     : parent_(parent),
       row_desc_(row_desc),
+      hostname_(hostname),
       address_(destination),
       fragment_instance_id_(fragment_instance_id),
       dest_node_id_(dest_node_id) {
@@ -160,6 +161,7 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   const RowDescriptor* row_desc_;
 
   // The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver.
+  const std::string hostname_;
   const TNetworkAddress address_;
   const TUniqueId fragment_instance_id_;
   const PlanNodeId dest_node_id_;
@@ -305,7 +307,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
 
   // Create a DataStreamService proxy to the destination.
   RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
-  RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, &proxy_));
+  RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, hostname_, &proxy_));
   return Status::OK();
 }
 
@@ -593,9 +595,9 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r
 
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
-        new Channel(this, row_desc, destinations[i].krpc_server,
-            destinations[i].fragment_instance_id, sink.dest_node_id,
-            per_channel_buffer_size));
+        new Channel(this, row_desc, destinations[i].thrift_backend.hostname,
+            destinations[i].krpc_backend, destinations[i].fragment_instance_id,
+            sink.dest_node_id, per_channel_buffer_size));
   }
 
   if (partition_type_ == TPartitionType::UNPARTITIONED ||

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 7c0af6f..2baffbb 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -345,12 +345,12 @@ void Scheduler::ComputeFragmentExecParams(
         TPlanFragmentDestination& dest = src_params->destinations[i];
         dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
         const TNetworkAddress& host = dest_params->instance_exec_params[i].host;
-        dest.__set_server(host);
+        dest.__set_thrift_backend(host);
         if (FLAGS_use_krpc) {
           const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
           DCHECK(desc.__isset.krpc_address);
           DCHECK(IsResolvedAddress(desc.krpc_address));
-          dest.__set_krpc_server(desc.krpc_address);
+          dest.__set_krpc_backend(desc.krpc_address);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c7d2c2ec/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 120aebc..4797eba 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -431,11 +431,11 @@ struct TPlanFragmentDestination {
   // the globally unique fragment instance id
   1: required Types.TUniqueId fragment_instance_id
 
-  // IP address + port of the thrift based ImpalaInteralService on the destination
-  2: required Types.TNetworkAddress server
+  // hostname + port of the Thrift based ImpalaInteralService on the destination
+  2: required Types.TNetworkAddress thrift_backend
 
   // IP address + port of the KRPC based ImpalaInternalService on the destination
-  3: optional Types.TNetworkAddress krpc_server
+  3: optional Types.TNetworkAddress krpc_backend
 }
 
 // Context to collect information, which is shared among all instances of that plan