You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/13 14:51:56 UTC
[impala] branch master updated: IMPALA-8659: Allow self-RPCs for
KRPC to go via loopback
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 8ee18c3 IMPALA-8659: Allow self-RPCs for KRPC to go via loopback
8ee18c3 is described below
commit 8ee18c3b7736e02ef76507162ed2592ba11d16e3
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Jun 11 17:27:11 2019 -0700
IMPALA-8659: Allow self-RPCs for KRPC to go via loopback
Adds a flag --rpc_use_loopback that causes two differences
in behaviour when enabled:
1. KRPC will listen on all interfaces, i.e. bind the socket
to INADDR_ANY.
2. KRPC RPCs to --hostname are sent to 127.0.0.1 instead of
the IP (maybe external) that --hostname resolves to.
There is no change in default behaviour, except in containers,
where this flag is enabled by default.
Testing:
* Added a custom cluster test, which runs in exhaustive,
as a sanity test for the behaviour of the flag.
Change-Id: I9dbd477769ed49c05e624f06da4e51afaaf1670d
Reviewed-on: http://gerrit.cloudera.org:8080/13592
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/rpc/rpc-mgr.cc | 12 +++++++-
be/src/rpc/rpc-mgr.inline.h | 12 +++++++-
be/src/util/network-util.cc | 4 +--
be/src/util/network-util.h | 2 ++
docker/coord_exec/Dockerfile | 2 +-
docker/coordinator/Dockerfile | 3 +-
docker/executor/Dockerfile | 2 +-
tests/custom_cluster/test_krpc_options.py | 50 +++++++++++++++++++++++++++++++
8 files changed, 80 insertions(+), 7 deletions(-)
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index cda7161..e2028e3 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -82,6 +82,9 @@ DEFINE_int32(rpc_negotiation_timeout_ms, 300000,
"Time in milliseconds of waiting for a negotiation to complete before timing out.");
DEFINE_int32(rpc_negotiation_thread_count, 64,
"Maximum number of threads dedicated to handling RPC connection negotiations.");
+DEFINE_bool(rpc_use_loopback, false,
+ "Always use loopback for local connections. This requires binding to all addresses, "
+ "not just the KRPC address.");
namespace impala {
@@ -187,7 +190,14 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
// Convert 'address' to Kudu's Sockaddr
DCHECK(IsResolvedAddress(address));
Sockaddr sockaddr;
- RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+ if (FLAGS_rpc_use_loopback) {
+ // Listen on all addresses, including loopback.
+ sockaddr.set_port(address.port);
+ DCHECK(sockaddr.IsWildcard()) << sockaddr.ToString();
+ } else {
+ // Only listen on the canonical address for KRPC.
+ RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+ }
// Call the messenger to create an AcceptorPool for us.
KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool_),
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
index 76ee34d..e519c9f 100644
--- a/be/src/rpc/rpc-mgr.inline.h
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -20,13 +20,17 @@
#include "rpc/rpc-mgr.h"
+#include <gflags/gflags.h>
+
#include "exec/kudu-util.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/rpc/user_credentials.h"
+#include "runtime/exec-env.h"
#include "util/network-util.h"
+DECLARE_bool(rpc_use_loopback);
namespace impala {
/// Always inline to avoid having to provide a definition for each use type P.
@@ -36,8 +40,14 @@ Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostn
DCHECK(proxy != nullptr);
DCHECK(is_inited()) << "Must call Init() before GetProxy()";
DCHECK(IsResolvedAddress(address));
+ TNetworkAddress address_to_use = address;
+ // Talk to self via loopback.
+ if (FLAGS_rpc_use_loopback &&
+ address_to_use.hostname == ExecEnv::GetInstance()->krpc_address().hostname) {
+ address_to_use.__set_hostname(LOCALHOST_IP_STR);
+ }
kudu::Sockaddr sockaddr;
- RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+ RETURN_IF_ERROR(TNetworkAddressToSockaddr(address_to_use, &sockaddr));
proxy->reset(new P(messenger_, sockaddr, hostname));
// Always set the user credentials as Proxy ctor may fail in GetLoggedInUser().
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index e4b4234..d29e3e9 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -49,7 +49,7 @@ using std::random_device;
namespace impala {
-static const string LOCALHOST("127.0.0.1");
+const string LOCALHOST_IP_STR("127.0.0.1");
Status GetHostname(string* hostname) {
char name[HOST_NAME_MAX];
@@ -122,7 +122,7 @@ bool IsResolvedAddress(const TNetworkAddress& addr) {
bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) {
for (const string& candidate: addresses) {
- if (candidate != LOCALHOST) {
+ if (candidate != LOCALHOST_IP_STR) {
*addr = candidate;
return true;
}
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 4fe9e04..947ea03 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -73,4 +73,6 @@ Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
/// a free ephemeral port can't be found after 100 tries.
int FindUnusedEphemeralPort();
+extern const std::string LOCALHOST_IP_STR;
+
} // namespace impala
diff --git a/docker/coord_exec/Dockerfile b/docker/coord_exec/Dockerfile
index eeb1163..271fee5 100644
--- a/docker/coord_exec/Dockerfile
+++ b/docker/coord_exec/Dockerfile
@@ -31,4 +31,4 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
"-log_dir=/opt/impala/logs",\
"-abort_on_config_error=false", "-state_store_host=statestored",\
"-catalog_service_host=catalogd", "-mem_limit_includes_jvm=true",\
- "-use_local_catalog=false"]
+ "-use_local_catalog=false", "--rpc_use_loopback=true"]
diff --git a/docker/coordinator/Dockerfile b/docker/coordinator/Dockerfile
index 22ea7d7..8251659 100644
--- a/docker/coordinator/Dockerfile
+++ b/docker/coordinator/Dockerfile
@@ -31,4 +31,5 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
"-log_dir=/opt/impala/logs",\
"-abort_on_config_error=false", "-state_store_host=statestored",\
"-catalog_service_host=catalogd", "-is_executor=false", \
- "-mem_limit_includes_jvm=true", "-use_local_catalog=false"]
+ "-mem_limit_includes_jvm=true", "-use_local_catalog=false", \
+ "--rpc_use_loopback=true"]
diff --git a/docker/executor/Dockerfile b/docker/executor/Dockerfile
index 5d03bc6..a3fab6a 100644
--- a/docker/executor/Dockerfile
+++ b/docker/executor/Dockerfile
@@ -25,4 +25,4 @@ ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
"-log_dir=/opt/impala/logs",\
"-abort_on_config_error=false", "-state_store_host=statestored",\
"-catalog_service_host=catalogd", "-is_coordinator=false",\
- "-mem_limit_includes_jvm=true"]
+ "-mem_limit_includes_jvm=true", "--rpc_use_loopback=true"]
diff --git a/tests/custom_cluster/test_krpc_options.py b/tests/custom_cluster/test_krpc_options.py
new file mode 100644
index 0000000..b949621
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_options.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import socket
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import DEFAULT_KRPC_PORT
+
+
+class TestKrpcOptions(CustomClusterTestSuite):
+ """Test for different options when using KRPC."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestKrpcOptions, cls).setup_class()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args("--rpc_use_loopback=true")
+ def test_krpc_use_loopback(self, vector):
+ """Sanity test for the --rpc_use_loopback flag."""
+ # Run a query that will execute on multiple hosts.
+ self.client.execute("select min(int_col) from functional_parquet.alltypes")
+
+ # Check that we can connect on multiple interfaces.
+ sock = socket.socket()
+ sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
+ sock.close()
+ sock = socket.socket()
+ sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
+ sock.close()