You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/13 14:51:56 UTC

[impala] branch master updated: IMPALA-8659: Allow self-RPCs for KRPC to go via loopback

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ee18c3  IMPALA-8659: Allow self-RPCs for KRPC to go via loopback
8ee18c3 is described below

commit 8ee18c3b7736e02ef76507162ed2592ba11d16e3
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Jun 11 17:27:11 2019 -0700

    IMPALA-8659: Allow self-RPCs for KRPC to go via loopback
    
    Adds a flag --rpc_use_loopback that causes two differences
    in behaviour when enabled:
    1. KRPC will listen on all interfaces, i.e. bind the socket
       to INADDR_ANY.
    2. KRPC RPCs to --hostname are sent to 127.0.0.1 instead of
       the IP (maybe external) that --hostname resolves to.
    
    There is no change in default behaviour, except in containers,
    where this flag is enabled by default.
    
    Testing:
    * Added a custom cluster test, which runs in exhaustive,
      as a sanity test for the behaviour of the flag.
    
    Change-Id: I9dbd477769ed49c05e624f06da4e51afaaf1670d
    Reviewed-on: http://gerrit.cloudera.org:8080/13592
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/rpc-mgr.cc                     | 12 +++++++-
 be/src/rpc/rpc-mgr.inline.h               | 12 +++++++-
 be/src/util/network-util.cc               |  4 +--
 be/src/util/network-util.h                |  2 ++
 docker/coord_exec/Dockerfile              |  2 +-
 docker/coordinator/Dockerfile             |  3 +-
 docker/executor/Dockerfile                |  2 +-
 tests/custom_cluster/test_krpc_options.py | 50 +++++++++++++++++++++++++++++++
 8 files changed, 80 insertions(+), 7 deletions(-)

diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index cda7161..e2028e3 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -82,6 +82,9 @@ DEFINE_int32(rpc_negotiation_timeout_ms, 300000,
     "Time in milliseconds of waiting for a negotiation to complete before timing out.");
 DEFINE_int32(rpc_negotiation_thread_count, 64,
     "Maximum number of threads dedicated to handling RPC connection negotiations.");
+DEFINE_bool(rpc_use_loopback, false,
+    "Always use loopback for local connections. This requires binding to all addresses, "
+    "not just the KRPC address.");
 
 namespace impala {
 
@@ -187,7 +190,14 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
   // Convert 'address' to Kudu's Sockaddr
   DCHECK(IsResolvedAddress(address));
   Sockaddr sockaddr;
-  RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+  if (FLAGS_rpc_use_loopback) {
+    // Listen on all addresses, including loopback.
+    sockaddr.set_port(address.port);
+    DCHECK(sockaddr.IsWildcard()) << sockaddr.ToString();
+  } else {
+    // Only listen on the canonical address for KRPC.
+    RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+  }
 
   // Call the messenger to create an AcceptorPool for us.
   KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool_),
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
index 76ee34d..e519c9f 100644
--- a/be/src/rpc/rpc-mgr.inline.h
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -20,13 +20,17 @@
 
 #include "rpc/rpc-mgr.h"
 
+#include <gflags/gflags.h>
+
 #include "exec/kudu-util.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/service_pool.h"
 #include "kudu/rpc/user_credentials.h"
+#include "runtime/exec-env.h"
 #include "util/network-util.h"
 
+DECLARE_bool(rpc_use_loopback);
 namespace impala {
 
 /// Always inline to avoid having to provide a definition for each use type P.
@@ -36,8 +40,14 @@ Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostn
   DCHECK(proxy != nullptr);
   DCHECK(is_inited()) << "Must call Init() before GetProxy()";
   DCHECK(IsResolvedAddress(address));
+  TNetworkAddress address_to_use = address;
+  // Talk to self via loopback.
+  if (FLAGS_rpc_use_loopback &&
+      address_to_use.hostname == ExecEnv::GetInstance()->krpc_address().hostname) {
+    address_to_use.__set_hostname(LOCALHOST_IP_STR);
+  }
   kudu::Sockaddr sockaddr;
-  RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+  RETURN_IF_ERROR(TNetworkAddressToSockaddr(address_to_use, &sockaddr));
   proxy->reset(new P(messenger_, sockaddr, hostname));
 
   // Always set the user credentials as Proxy ctor may fail in GetLoggedInUser().
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index e4b4234..d29e3e9 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -49,7 +49,7 @@ using std::random_device;
 
 namespace impala {
 
-static const string LOCALHOST("127.0.0.1");
+const string LOCALHOST_IP_STR("127.0.0.1");
 
 Status GetHostname(string* hostname) {
   char name[HOST_NAME_MAX];
@@ -122,7 +122,7 @@ bool IsResolvedAddress(const TNetworkAddress& addr) {
 
 bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) {
   for (const string& candidate: addresses) {
-    if (candidate != LOCALHOST) {
+    if (candidate != LOCALHOST_IP_STR) {
       *addr = candidate;
       return true;
     }
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 4fe9e04..947ea03 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -73,4 +73,6 @@ Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
 /// a free ephemeral port can't be found after 100 tries.
 int FindUnusedEphemeralPort();
 
+extern const std::string LOCALHOST_IP_STR;
+
 } // namespace impala
diff --git a/docker/coord_exec/Dockerfile b/docker/coord_exec/Dockerfile
index eeb1163..271fee5 100644
--- a/docker/coord_exec/Dockerfile
+++ b/docker/coord_exec/Dockerfile
@@ -31,4 +31,4 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-log_dir=/opt/impala/logs",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
      "-catalog_service_host=catalogd", "-mem_limit_includes_jvm=true",\
-     "-use_local_catalog=false"]
+     "-use_local_catalog=false", "--rpc_use_loopback=true"]
diff --git a/docker/coordinator/Dockerfile b/docker/coordinator/Dockerfile
index 22ea7d7..8251659 100644
--- a/docker/coordinator/Dockerfile
+++ b/docker/coordinator/Dockerfile
@@ -31,4 +31,5 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-log_dir=/opt/impala/logs",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
      "-catalog_service_host=catalogd", "-is_executor=false", \
-     "-mem_limit_includes_jvm=true", "-use_local_catalog=false"]
+     "-mem_limit_includes_jvm=true", "-use_local_catalog=false", \
+     "--rpc_use_loopback=true"]
diff --git a/docker/executor/Dockerfile b/docker/executor/Dockerfile
index 5d03bc6..a3fab6a 100644
--- a/docker/executor/Dockerfile
+++ b/docker/executor/Dockerfile
@@ -25,4 +25,4 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-log_dir=/opt/impala/logs",\
      "-abort_on_config_error=false", "-state_store_host=statestored",\
      "-catalog_service_host=catalogd", "-is_coordinator=false",\
-     "-mem_limit_includes_jvm=true"]
+     "-mem_limit_includes_jvm=true", "--rpc_use_loopback=true"]
diff --git a/tests/custom_cluster/test_krpc_options.py b/tests/custom_cluster/test_krpc_options.py
new file mode 100644
index 0000000..b949621
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_options.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import socket
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import DEFAULT_KRPC_PORT
+
+
+class TestKrpcOptions(CustomClusterTestSuite):
+  """Test for different options when using KRPC."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestKrpcOptions, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--rpc_use_loopback=true")
+  def test_krpc_use_loopback(self, vector):
+    """Sanity test for the --rpc_use_loopback flag."""
+    # Run a query that will execute on multiple hosts.
+    self.client.execute("select min(int_col) from functional_parquet.alltypes")
+
+    # Check that we can connect on multiple interfaces.
+    sock = socket.socket()
+    sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
+    sock.close()
+    sock = socket.socket()
+    sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
+    sock.close()