You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/10/06 15:31:57 UTC
[2/2] incubator-impala git commit: IMPALA-4670: Introduces RpcMgr
class
IMPALA-4670: Introduces RpcMgr class
This patch introduces a new class, RpcMgr which is the abstraction
layer around KRPC core mechanics. It provides an interface
RegisterService() for various services to register themselves.
Kudu RPC is invoked via an auto-generated interface called proxy.
This change implements an inline wrapper for KRPC client to obtain
a proxy for a particular service exported by remote server.
Last but not least, the RpcMgr will start all registered services
if FLAGS_use_krpc is true. This patch hasn't yet added any service
except for some test services in rpc-mgr-test.
This patch is based on an abandoned patch by Henry Robinson.
Testing done: a new backend test is added to exercise the code
and demonstrate the way to interact with KRPC framework.
Change-Id: I8adb10ae375d7bf945394c38a520f12d29cf7b46
Reviewed-on: http://gerrit.cloudera.org:8080/7901
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dd4c6be8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dd4c6be8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dd4c6be8
Branch: refs/heads/master
Commit: dd4c6be8e082d4dd42e099e9a03ed23fdae9c13b
Parents: e6594bf
Author: Michael Ho <kw...@cloudera.com>
Authored: Sun Aug 20 13:53:34 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 07:09:55 2017 +0000
----------------------------------------------------------------------
CMakeLists.txt | 3 +-
be/src/exec/kudu-util.h | 13 ++
be/src/rpc/CMakeLists.txt | 12 ++
be/src/rpc/rpc-mgr-test.cc | 251 ++++++++++++++++++++++++++
be/src/rpc/rpc-mgr.cc | 115 ++++++++++++
be/src/rpc/rpc-mgr.h | 181 +++++++++++++++++++
be/src/rpc/rpc-mgr.inline.h | 45 +++++
be/src/runtime/exec-env.cc | 23 ++-
be/src/runtime/exec-env.h | 22 ++-
be/src/scheduling/scheduler-test-util.cc | 12 +-
be/src/scheduling/scheduler.cc | 27 +--
be/src/scheduling/scheduler.h | 8 +-
be/src/service/impala-server.cc | 16 +-
be/src/util/counting-barrier.h | 3 +
be/src/util/network-util.cc | 11 ++
be/src/util/network-util.h | 12 +-
cmake_modules/FindKRPC.cmake | 4 +-
common/protobuf/CMakeLists.txt | 31 ++++
common/protobuf/rpc_test.proto | 42 +++++
19 files changed, 780 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d60487f..865f7b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -347,12 +347,13 @@ add_subdirectory(common/function-registry)
add_subdirectory(common/thrift)
add_subdirectory(common/fbs)
add_subdirectory(common/yarn-extras)
+add_subdirectory(common/protobuf)
add_subdirectory(be)
add_subdirectory(fe)
add_subdirectory(ext-data-source)
# Build target for all generated files which most backend code depends on
-add_custom_target(gen-deps ALL DEPENDS thrift-deps)
+add_custom_target(gen-deps ALL DEPENDS thrift-deps proto-deps)
add_custom_target(tarballs ALL DEPENDS shell_tarball)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 28c6b27..6113401 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -81,5 +81,18 @@ Status WriteKuduValue(int col, PrimitiveType type, const void* value,
/// Takes a Kudu client DataType and returns the corresponding Impala ColumnType.
ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type);
+/// Utility function for creating an Impala Status object based on a kudu::Status object.
+/// 'k_status' is the kudu::Status object.
+/// 'prepend' is a string to be prepended to details of 'k_status' when creating the
+/// Impala Status object.
+/// Note that we don't translate the kudu::Status error code to Impala error code
+/// so the returned status' type is always of TErrorCode::GENERAL.
+inline Status FromKuduStatus(
+ const kudu::Status& k_status, const std::string prepend = "") {
+ if (LIKELY(k_status.ok())) return Status::OK();
+ if (prepend.empty()) return Status(k_status.ToString());
+ return Status(strings::Substitute("$0: $1", prepend, k_status.ToString()));
+}
+
} /// namespace impala
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index d837f6c..95be43f 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -21,8 +21,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
+# Mark the protobuf files as generated
+set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
add_library(Rpc
authentication.cc
+ rpc-mgr.cc
rpc-trace.cc
TAcceptQueueServer.cpp
thrift-util.cc
@@ -35,3 +39,11 @@ add_dependencies(Rpc gen-deps)
ADD_BE_TEST(thrift-util-test)
ADD_BE_TEST(thrift-server-test)
ADD_BE_TEST(authentication-test)
+
+ADD_BE_TEST(rpc-mgr-test)
+add_dependencies(rpc-mgr-test rpc_test_proto)
+target_link_libraries(rpc-mgr-test rpc_test_proto)
+
+add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
+add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)
+target_link_libraries(rpc_test_proto krpc)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
new file mode 100644
index 0000000..3eb0d92
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "testutil/gtest-util.h"
+#include "util/counting-barrier.h"
+#include "util/network-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+
+#include "common/names.h"
+
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::ServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::MonoDelta;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
+
+#define PAYLOAD_SIZE (4096)
+
+class RpcMgrTest : public testing::Test {
+ protected:
+ TNetworkAddress krpc_address_;
+ RpcMgr rpc_mgr_;
+
+ virtual void SetUp() {
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+ krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+ ASSERT_OK(rpc_mgr_.Init());
+ }
+
+ virtual void TearDown() {
+ rpc_mgr_.Shutdown();
+ }
+
+ // Utility function to initialize the parameter for ScanMem RPC.
+ // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+ // to 'controller'. Also sets up 'request' with the random value and index of the
+ // sidecar.
+ void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+ int32_t pattern = random();
+ for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+ int idx;
+ Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+ controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+ request->set_pattern(pattern);
+ request->set_sidecar_idx(idx);
+ }
+
+ private:
+ int32_t payload_[PAYLOAD_SIZE];
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+ // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+ PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+ const scoped_refptr<kudu::rpc::ResultTracker> tracker,
+ ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+ : PingServiceIf(entity, tracker), cb_(cb) {}
+
+ virtual void Ping(
+ const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+ response->set_int_response(42);
+ cb_(context);
+ }
+
+ private:
+ ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+ ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+ const scoped_refptr<kudu::rpc::ResultTracker> tracker)
+ : ScanMemServiceIf(entity, tracker) {
+ }
+
+ // The request comes with an int 'pattern' and a payload of int array sent with
+ // sidecar. Scan the array to make sure every element matches 'pattern'.
+ virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+ RpcContext* context) {
+ int32_t pattern = request->pattern();
+ Slice payload;
+ ASSERT_OK(
+ FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+ ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+ const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+ for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+ int32_t val = v[i];
+ if (val != pattern) {
+ context->RespondFailure(kudu::Status::Corruption(
+ Substitute("Expecting $1; Found $2", pattern, val)));
+ return;
+ }
+ }
+ context->RespondSuccess();
+ }
+};
+
+TEST_F(RpcMgrTest, MultipleServices) {
+ // Test that a service can be started, and will respond to requests.
+ unique_ptr<ServiceIf> ping_impl(
+ new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+ ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl)));
+
+ // Test that a second service, that verifies the RPC payload is not corrupted,
+ // can be started.
+ unique_ptr<ServiceIf> scan_mem_impl(
+ new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+ ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+
+ FLAGS_num_acceptor_threads = 2;
+ FLAGS_num_reactor_threads = 10;
+ ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+ unique_ptr<PingServiceProxy> ping_proxy;
+ ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy));
+
+ unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+ ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+
+ RpcController controller;
+ srand(0);
+ // Randomly invoke either services to make sure a RpcMgr can host multiple
+ // services at the same time.
+ for (int i = 0; i < 100; ++i) {
+ controller.Reset();
+ if (random() % 2 == 0) {
+ PingRequestPB request;
+ PingResponsePB response;
+ kudu::Status status = ping_proxy->Ping(request, &response, &controller);
+ ASSERT_TRUE(status.ok());
+ ASSERT_EQ(response.int_response(), 42);
+ } else {
+ ScanMemRequestPB request;
+ ScanMemResponsePB response;
+ SetupScanMemRequest(&request, &controller);
+ kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller);
+ ASSERT_TRUE(status.ok());
+ }
+ }
+}
+
+TEST_F(RpcMgrTest, SlowCallback) {
+
+ // Use a callback which is slow to respond.
+ auto slow_cb = [](RpcContext* ctx) {
+ SleepForMs(300);
+ ctx->RespondSuccess();
+ };
+
+ // Test a service which is slow to respond and has a short queue.
+ // Set a timeout on the client side. Expect either a client timeout
+ // or the service queue filling up.
+ unique_ptr<ServiceIf> impl(
+ new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+ const int num_service_threads = 1;
+ const int queue_size = 3;
+ ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl)));
+
+ FLAGS_num_acceptor_threads = 2;
+ FLAGS_num_reactor_threads = 10;
+ ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+ unique_ptr<PingServiceProxy> proxy;
+ ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &proxy));
+
+ PingRequestPB request;
+ PingResponsePB response;
+ RpcController controller;
+ for (int i = 0; i < 100; ++i) {
+ controller.Reset();
+ controller.set_timeout(MonoDelta::FromMilliseconds(50));
+ kudu::Status status = proxy->Ping(request, &response, &controller);
+ ASSERT_TRUE(status.IsTimedOut() || RpcMgr::IsServerTooBusy(controller));
+ }
+}
+
+TEST_F(RpcMgrTest, AsyncCall) {
+ unique_ptr<ServiceIf> scan_mem_impl(
+ new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+ ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+
+ unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+ ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+
+ FLAGS_num_acceptor_threads = 2;
+ FLAGS_num_reactor_threads = 10;
+ ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+ RpcController controller;
+ srand(0);
+ for (int i = 0; i < 10; ++i) {
+ controller.Reset();
+ ScanMemRequestPB request;
+ ScanMemResponsePB response;
+ SetupScanMemRequest(&request, &controller);
+ CountingBarrier barrier(1);
+ scan_mem_proxy->ScanMemAsync(request, &response, &controller,
+ [barrier_ptr = &barrier]() { barrier_ptr->Notify(); });
+ // TODO: Inject random cancellation here.
+ barrier.Wait();
+ ASSERT_TRUE(controller.status().ok()) << controller.status().ToString();
+ }
+}
+
+} // namespace impala
+
+IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
new file mode 100644
index 0000000..f6491be
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.h"
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/net/net_util.h"
+#include "util/cpu-info.h"
+#include "util/network-util.h"
+
+#include "common/names.h"
+
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::Messenger;
+using kudu::rpc::AcceptorPool;
+using kudu::rpc::RpcController;
+using kudu::rpc::ServiceIf;
+using kudu::rpc::ServicePool;
+using kudu::Sockaddr;
+using kudu::HostPort;
+using kudu::MetricEntity;
+
+DECLARE_string(hostname);
+
+DEFINE_int32(num_acceptor_threads, 2,
+ "Number of threads dedicated to accepting connection requests for RPC services");
+DEFINE_int32(num_reactor_threads, 0,
+ "Number of threads dedicated to managing network IO for RPC services. If left at "
+ "default value 0, it will be set to number of CPU cores.");
+
+namespace impala {
+
+Status RpcMgr::Init() {
+ MessengerBuilder bld("impala-server");
+ const scoped_refptr<MetricEntity> entity(
+ METRIC_ENTITY_server.Instantiate(®istry_, "krpc-metrics"));
+ int num_reactor_threads =
+ FLAGS_num_reactor_threads > 0 ? FLAGS_num_reactor_threads : CpuInfo::num_cores();
+ bld.set_num_reactors(num_reactor_threads).set_metric_entity(entity);
+ KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger");
+ return Status::OK();
+}
+
+Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
+ unique_ptr<ServiceIf> service_ptr) {
+ DCHECK(is_inited()) << "Must call Init() before RegisterService()";
+ DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
+ scoped_refptr<ServicePool> service_pool =
+ new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release()),
+ messenger_->metric_entity(), service_queue_depth);
+ // Start the thread pool first before registering the service in case the startup fails.
+ KUDU_RETURN_IF_ERROR(
+ service_pool->Init(num_service_threads), "Service pool failed to start");
+ KUDU_RETURN_IF_ERROR(
+ messenger_->RegisterService(service_pool->service_name(), service_pool),
+ "Could not register service");
+ service_pools_.push_back(service_pool);
+
+ return Status::OK();
+}
+
+Status RpcMgr::StartServices(const TNetworkAddress& address) {
+ DCHECK(is_inited()) << "Must call Init() before StartServices()";
+ DCHECK(!services_started_) << "May not call StartServices() twice";
+
+ // Convert 'address' to Kudu's Sockaddr
+ DCHECK(IsResolvedAddress(address));
+ Sockaddr sockaddr;
+ RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+
+ // Call the messenger to create an AcceptorPool for us.
+ shared_ptr<AcceptorPool> acceptor_pool;
+ KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool),
+ "Failed to add acceptor pool");
+ KUDU_RETURN_IF_ERROR(acceptor_pool->Start(FLAGS_num_acceptor_threads),
+ "Acceptor pool failed to start");
+ VLOG_QUERY << "Started " << FLAGS_num_acceptor_threads << " acceptor threads";
+ services_started_ = true;
+ return Status::OK();
+}
+
+void RpcMgr::Shutdown() {
+ if (messenger_.get() == nullptr) return;
+ for (auto service_pool : service_pools_) service_pool->Shutdown();
+
+ messenger_->UnregisterAllServices();
+ messenger_->Shutdown();
+ service_pools_.clear();
+}
+
+bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) {
+ const kudu::Status status = rpc_controller.status();
+ const kudu::rpc::ErrorStatusPB* err = rpc_controller.error_response();
+ return status.IsRemoteError() && err != nullptr && err->has_code() &&
+ err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
+}
+
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
new file mode 100644
index 0000000..d414bb6
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.h
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_H
+#define IMPALA_RPC_RPC_MGR_H
+
+#include "common/status.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/metrics.h"
+
+#include "gen-cpp/Types_types.h"
+
+namespace kudu {
+namespace rpc {
+class RpcController;
+class ServiceIf;
+} // rpc
+} // kudu
+
+namespace impala {
+
+/// Singleton class which manages all KRPC services and proxies.
+///
+/// SERVICES
+/// --------
+///
+/// An RpcMgr manages 0 or more services: RPC interfaces that are a collection of remotely
+/// accessible methods. A new service is registered by calling RegisterService(). All
+/// services are served on the same port; the underlying RPC layer takes care of
+/// de-multiplexing RPC calls to their respective endpoints.
+///
+/// Services are made available to remote clients when RpcMgr::StartServices() is called;
+/// before this method no service method will be called.
+///
+/// Services may only be registered and started after RpcMgr::Init() is called.
+///
+/// PROXIES
+/// -------
+///
+/// A proxy is a client-side interface to a remote service. Remote methods exported by
+/// that service may be called through a proxy as though they were local methods.
+///
+/// A proxy can be obtained by calling GetProxy(). Proxies implement local methods which
+/// call remote service methods, e.g. proxy->Foo(request, &response) will call the Foo()
+/// service method on the service that 'proxy' points to.
+///
+/// Proxies may only be created after RpcMgr::Init() is called.
+///
+/// For example usage of proxies, please see rpc-mgr-test.cc
+///
+/// LIFECYCLE
+/// ---------
+///
+/// RpcMgr resides inside the singleton ExecEnv class.
+///
+/// Before any proxy or service interactions, RpcMgr::Init() must be called exactly once
+/// to start the reactor threads that service network events. Services must be registered
+/// with RpcMgr::RegisterService() before RpcMgr::StartServices() is called. When shutting
+/// down, RpcMgr::Shutdown() must be called to ensure that all services are cleanly
+/// terminated. RpcMgr::Init() and RpcMgr::Shutdown() are not thread safe.
+///
+/// KRPC INTERNALS
+/// --------------
+///
+/// Each service and proxy interacts with the network via a shared pool of 'reactor'
+/// threads which respond to incoming and outgoing RPC events. The number of 'reactor'
+/// threads are configurable via FLAGS_reactor_thread. By default, it's set to the number
+/// of cpu cores. Incoming events are passed immediately to one of two thread pools: new
+/// connections are handled by an 'acceptor' pool, and RPC request events are handled by
+/// a per-service 'service' pool. The size of a 'service' pool is specified when calling
+/// RegisterService().
+///
+/// All incoming RPC requests are placed into a per-service pool's fixed-size queue.
+/// The service threads will dequeue from this queue and process the requests. If the
+/// queue becomes full, the RPC will fail at the caller. The function IsServerTooBusy()
+/// below will return true for this case. The size of the queue is specified when calling
+/// RegisterService().
+///
+/// Inbound connection set-up is handled by a small fixed-size pool of 'acceptor'
+/// threads. The number of threads that accept new TCP connection requests to the service
+/// port is configurable via FLAGS_acceptor_threads.
+class RpcMgr {
+ public:
+ /// Initializes the reactor threads, and prepares for sending outbound RPC requests.
+ Status Init() WARN_UNUSED_RESULT;
+
+ bool is_inited() const { return messenger_.get() != nullptr; }
+
+ /// Start the acceptor threads which listen on 'address', making KRPC services
+ /// available. 'address' has to be a resolved IP address. Before this method is called,
+ /// remote clients will get a 'connection refused' error when trying to invoke an RPC
+ /// on this host.
+ Status StartServices(const TNetworkAddress& address) WARN_UNUSED_RESULT;
+
+ /// Register a new service.
+ ///
+ /// 'num_service_threads' is the number of threads that should be started to execute RPC
+ /// handlers for the new service.
+ ///
+ /// 'service_queue_depth' is the maximum number of requests that may be queued for this
+ /// service before clients begin to see rejection errors.
+ ///
+ /// 'service_ptr' contains an interface implementation that will handle RPCs. Note that
+ /// the service name has to be unique within an Impala instance or the registration will
+ /// fail.
+ ///
+ /// It is an error to call this after StartServices() has been called.
+ Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
+ std::unique_ptr<kudu::rpc::ServiceIf> service_ptr) WARN_UNUSED_RESULT;
+
+ /// Creates a new proxy for a remote service of type P at location 'address', and places
+ /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must
+ /// be a resolved IP address.
+ template <typename P>
+ Status GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy)
+ WARN_UNUSED_RESULT;
+
+ /// Shut down all previously registered services. All service pools are shut down.
+ /// All acceptor and reactor threads within the messenger are also shut down.
+ /// All unprocessed incoming requests will be replied with error messages.
+ void Shutdown();
+
+ /// Returns true if the last RPC of 'rpc_controller' failed because the remote
+ /// service's queue filled up and couldn't accept more incoming requests.
+ /// 'rpc_controller' should contain the status of the last RPC call.
+ static bool IsServerTooBusy(const kudu::rpc::RpcController& rpc_controller);
+
+ const scoped_refptr<kudu::rpc::ResultTracker> result_tracker() const {
+ return tracker_;
+ }
+
+ scoped_refptr<kudu::MetricEntity> metric_entity() const {
+ return messenger_->metric_entity();
+ }
+
+ ~RpcMgr() {
+ DCHECK_EQ(service_pools_.size(), 0)
+ << "Must call Shutdown() before destroying RpcMgr";
+ }
+
+ private:
+ /// One pool per registered service. scoped_refptr<> is dictated by the Kudu interface.
+ std::vector<scoped_refptr<kudu::rpc::ServicePool>> service_pools_;
+
+ /// Required Kudu boilerplate for constructing the MetricEntity passed
+ /// to c'tor of ServiceIf when creating a service.
+ /// TODO(KRPC): Integrate with Impala MetricGroup.
+ kudu::MetricRegistry registry_;
+
+ /// Used when creating a new service. Shared across all services which don't really
+ /// track results for idempotent RPC calls.
+ const scoped_refptr<kudu::rpc::ResultTracker> tracker_;
+
+ /// Container for reactor threads which run event loops for RPC services, plus acceptor
+ /// threads which manage connection setup. Has to be a shared_ptr as required by
+ /// MessangerBuilder::Build().
+ std::shared_ptr<kudu::rpc::Messenger> messenger_;
+
+ /// True after StartServices() completes.
+ bool services_started_ = false;
+};
+
+} // namespace impala
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.inline.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
new file mode 100644
index 0000000..474ac45
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_INLINE_H
+#define IMPALA_RPC_RPC_MGR_INLINE_H
+
+#include "rpc/rpc-mgr.h"
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/service_pool.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+/// Always inline to avoid having to provide a definition for each use type P.
+template <typename P>
+Status RpcMgr::GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy) {
+ DCHECK(proxy != nullptr);
+ DCHECK(is_inited()) << "Must call Init() before GetProxy()";
+ DCHECK(IsResolvedAddress(address));
+ kudu::Sockaddr sockaddr;
+ RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+ proxy->reset(new P(messenger_, sockaddr, address.hostname));
+ return Status::OK();
+}
+
+} // namespace impala
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8942007..94d2ca6 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -28,6 +28,7 @@
#include "common/object-pool.h"
#include "exec/kudu-util.h"
#include "gen-cpp/ImpalaInternalService.h"
+#include "rpc/rpc-mgr.h"
#include "runtime/backend-client.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
@@ -168,10 +169,12 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
query_exec_mgr_(new QueryExecMgr()),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
- backend_address_(MakeNetworkAddress(hostname, backend_port)),
- krpc_port_(krpc_port) {
+ backend_address_(MakeNetworkAddress(hostname, backend_port)) {
if (FLAGS_use_krpc) {
+ // KRPC relies on resolved IP address. It's set in StartServices().
+ krpc_address_.__set_port(krpc_port);
+ rpc_mgr_.reset(new RpcMgr());
stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
} else {
stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
@@ -204,6 +207,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
ExecEnv::~ExecEnv() {
if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
+ if (rpc_mgr_ != nullptr) rpc_mgr_->Shutdown();
disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
}
@@ -281,6 +285,15 @@ Status ExecEnv::Init() {
RETURN_IF_ERROR(RegisterMemoryMetrics(
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
+ // Resolve hostname to IP address.
+ RETURN_IF_ERROR(HostnameToIpAddr(backend_address_.hostname, &ip_address_));
+
+ // Initialize the RPCMgr before allowing services registration.
+ if (FLAGS_use_krpc) {
+ krpc_address_.__set_hostname(ip_address_);
+ RETURN_IF_ERROR(rpc_mgr_->Init());
+ }
+
mem_tracker_.reset(
new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
// Add BufferPool MemTrackers for cached memory that is not tracked against queries
@@ -334,7 +347,7 @@ Status ExecEnv::Init() {
}
if (scheduler_ != nullptr) {
- RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_port_));
+ RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_address_, ip_address_));
}
if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
@@ -364,6 +377,8 @@ Status ExecEnv::StartServices() {
}
}
+ // Start this last so everything is in place before accepting the first call.
+ if (FLAGS_use_krpc) RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
return Status::OK();
}
@@ -402,4 +417,4 @@ KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
}
-}
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index b8a271d..31532df 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -27,9 +27,14 @@
#include "common/status.h"
#include "runtime/client-cache-types.h"
#include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
+#include "util/network-util.h"
#include "util/spinlock.h"
-namespace kudu { namespace client { class KuduClient; } }
+namespace kudu {
+namespace client {
+class KuduClient;
+} // namespace client
+} // namespace kudu
namespace impala {
@@ -53,6 +58,7 @@ class ObjectPool;
class QueryResourceMgr;
class RequestPoolService;
class ReservationTracker;
+class RpcMgr;
class Scheduler;
class StatestoreSubscriber;
class ThreadResourceMgr;
@@ -131,7 +137,9 @@ class ExecEnv {
const TNetworkAddress& backend_address() const { return backend_address_; }
- int krpc_port() const { return krpc_port_; }
+ const IpAddr& ip_address() const { return ip_address_; }
+
+ const TNetworkAddress& krpc_address() const { return krpc_address_; }
/// Initializes the exec env for running FE tests.
Status InitForFeTests() WARN_UNUSED_RESULT;
@@ -173,6 +181,7 @@ class ExecEnv {
boost::scoped_ptr<CallableThreadPool> exec_rpc_thread_pool_;
boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
+ boost::scoped_ptr<RpcMgr> rpc_mgr_;
/// Query-wide buffer pool and the root reservation tracker for the pool. The
/// reservation limit is equal to the maximum capacity of the pool. Created in
@@ -191,11 +200,14 @@ class ExecEnv {
static ExecEnv* exec_env_;
bool is_fe_tests_ = false;
- /// Address of the Impala backend server instance
+ /// Address of the thrift based ImpalaInternalService
TNetworkAddress backend_address_;
- /// Port number on which all KRPC-based services are exported.
- int krpc_port_;
+ /// Resolved IP address of the host name.
+ IpAddr ip_address_;
+
+ /// Address of the KRPC-based ImpalaInternalService
+ TNetworkAddress krpc_address_;
/// fs.defaultFs value set in core-site.xml
std::string default_fs_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2d85a9a..05cfc42 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -20,7 +20,6 @@
#include <boost/unordered_set.hpp>
#include "common/names.h"
-#include "runtime/exec-env.h"
#include "scheduling/scheduler.h"
using namespace impala;
@@ -509,13 +508,14 @@ void SchedulerWrapper::InitializeScheduler() {
<< "hosts.";
const Host& scheduler_host = plan_.cluster().hosts()[0];
string scheduler_backend_id = scheduler_host.ip;
- TNetworkAddress scheduler_backend_address;
- scheduler_backend_address.hostname = scheduler_host.ip;
- scheduler_backend_address.port = scheduler_host.be_port;
-
+ TNetworkAddress scheduler_backend_address =
+ MakeNetworkAddress(scheduler_host.ip, scheduler_host.be_port);
+ TNetworkAddress scheduler_krpc_address =
+ MakeNetworkAddress(scheduler_host.ip, FLAGS_krpc_port);
scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
&metrics_, nullptr, nullptr));
- const Status status = scheduler_->Init(scheduler_backend_address, FLAGS_krpc_port);
+ const Status status = scheduler_->Init(scheduler_backend_address,
+ scheduler_krpc_address, scheduler_host.ip);
DCHECK(status.ok()) << "Scheduler init failed in test";
// Initialize the scheduler backend maps.
SendFullMembershipMap();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index adac41f..5cf0f01 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -65,29 +65,20 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
request_pool_service_(request_pool_service) {
}
-Status Scheduler::Init(const TNetworkAddress& backend_address, int krpc_port) {
+Status Scheduler::Init(const TNetworkAddress& backend_address,
+ const TNetworkAddress& krpc_address, const IpAddr& ip) {
LOG(INFO) << "Starting scheduler";
-
- // Figure out what our IP address is, so that each subscriber doesn't have to resolve
- // it on every heartbeat. KRPC also assumes that the address is resolved already.
- // May as well do it up front to avoid frequent DNS requests.
local_backend_descriptor_.address = backend_address;
- IpAddr ip;
- const Hostname& hostname = backend_address.hostname;
- Status status = HostnameToIpAddr(hostname, &ip);
- if (!status.ok()) {
- VLOG(1) << status.GetDetail();
- status.AddDetail("Scheduler failed to start");
- return status;
- }
-
+ // Store our IP address so that each subscriber doesn't have to resolve
+ // it on every heartbeat. May as well do it up front to avoid frequent DNS
+ // requests.
local_backend_descriptor_.ip_address = ip;
LOG(INFO) << "Scheduler using " << ip << " as IP address";
-
if (FLAGS_use_krpc) {
- // KRPC expects address to have been resolved already.
- TNetworkAddress krpc_svc_addr = MakeNetworkAddress(ip, krpc_port);
- local_backend_descriptor_.__set_krpc_address(krpc_svc_addr);
+ // KRPC relies on resolved IP address.
+ DCHECK(IsResolvedAddress(krpc_address));
+ DCHECK_EQ(krpc_address.hostname, ip);
+ local_backend_descriptor_.__set_krpc_address(krpc_address);
}
coord_only_backend_config_.AddBackend(local_backend_descriptor_);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index d1a22f8..2fe90b8 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -86,9 +86,11 @@ class Scheduler {
/// decisions once this method returns. Register with the subscription manager if
/// required. Also initializes the local backend descriptor. Returns error status
/// on failure. 'backend_address' is the address of thrift based ImpalaInternalService
- /// of this backend. 'krpc_port' is the port on which KRPC based ImpalaInternalService
- /// is exported.
- Status Init(const TNetworkAddress& backend_address, int krpc_port);
+ /// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port
+ /// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved
+ /// IP address of this backend.
+ Status Init(const TNetworkAddress& backend_address,
+ const TNetworkAddress& krpc_address, const IpAddr& ip);
/// Populates given query schedule and assigns fragments to hosts based on scan
/// ranges in the query exec request.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d9d2629..fabc8fa 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1629,18 +1629,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
local_backend_descriptor.__set_address(exec_env_->backend_address());
- IpAddr ip;
- const Hostname& hostname = local_backend_descriptor.address.hostname;
- Status status = HostnameToIpAddr(hostname, &ip);
- if (!status.ok()) {
- // TODO: Should we do something about this failure?
- LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: "
- << status.GetDetail();
- return;
- }
- local_backend_descriptor.ip_address = ip;
+ local_backend_descriptor.ip_address = exec_env_->ip_address();
if (FLAGS_use_krpc) {
- TNetworkAddress krpc_address = MakeNetworkAddress(ip, exec_env_->krpc_port());
+ const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+ DCHECK(IsResolvedAddress(krpc_address));
local_backend_descriptor.__set_krpc_address(krpc_address);
}
subscriber_topic_updates->emplace_back(TTopicDelta());
@@ -1650,7 +1642,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
TTopicItem& item = update.topic_entries.back();
item.key = local_backend_id;
- status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
+ Status status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
if (!status.ok()) {
LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
<< " " << status.GetDetail();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 76dbe35..49b0bde 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -18,6 +18,9 @@
#ifndef IMPALA_UTIL_COUNTING_BARRIER_H
#define IMPALA_UTIL_COUNTING_BARRIER_H
+#include "common/atomic.h"
+#include "util/promise.h"
+
namespace impala {
/// Allows clients to wait for the arrival of a fixed number of notifications before they
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 1a9ce53..7a10965 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -28,6 +28,7 @@
#include <vector>
#include <boost/algorithm/string.hpp>
+#include "exec/kudu-util.h"
#include "kudu/util/net/sockaddr.h"
#include "util/debug-util.h"
#include "util/error-util.h"
@@ -212,4 +213,14 @@ int FindUnusedEphemeralPort(vector<int>* used_ports) {
close(sockfd);
return -1;
}
+
+Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
+ kudu::Sockaddr* sockaddr) {
+ DCHECK(IsResolvedAddress(address));
+ KUDU_RETURN_IF_ERROR(
+ sockaddr->ParseString(TNetworkAddressToString(address), address.port),
+ "Failed to parse address to Kudu Sockaddr.");
+ return Status::OK();
+}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index e964f5c..5b108dc 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -20,6 +20,10 @@
#include "gen-cpp/Types_types.h"
#include <vector>
+namespace kudu {
+class Sockaddr;
+} // namespace kudu
+
namespace impala {
/// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
@@ -64,6 +68,11 @@ bool IsWildcardAddress(const std::string& ipaddress);
/// Utility method to print address as address:port
std::string TNetworkAddressToString(const TNetworkAddress& address);
+/// Utility method to convert TNetworkAddress to Kudu sock addr.
+/// Note that 'address' has to contain a resolved IP address.
+Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
+ kudu::Sockaddr* sockaddr);
+
/// Prints a hostport as ipaddress:port
std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport);
@@ -71,4 +80,5 @@ std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport);
/// a free ephemeral port can't be found after 100 tries. If 'used_ports' is non-NULL,
/// does not select those ports and adds the selected port to 'used_ports'.
int FindUnusedEphemeralPort(std::vector<int>* used_ports);
-}
+
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/cmake_modules/FindKRPC.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindKRPC.cmake b/cmake_modules/FindKRPC.cmake
index 593edcd..1547c20 100644
--- a/cmake_modules/FindKRPC.cmake
+++ b/cmake_modules/FindKRPC.cmake
@@ -101,7 +101,9 @@ function(KRPC_GENERATE SRCS HDRS TGTS)
# This custom target enforces that there's just one invocation of protoc
# when there are multiple consumers of the generated files. The target name
# must be unique; adding parts of the filename helps ensure this.
- set(TGT_NAME ${REL_DIR}${FIL})
+ # Adding the prefix "KRPC_" to avoid conflation with the input proto file
+ # when ninja is used. Otherwise, ninja will flag a false circular dependency.
+ set(TGT_NAME KRPC_${REL_DIR}${FIL})
string(REPLACE "/" "-" TGT_NAME ${TGT_NAME})
add_custom_target(${TGT_NAME}
DEPENDS "${SERVICE_CC}" "${SERVICE_H}"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt
new file mode 100644
index 0000000..4d5f121
--- /dev/null
+++ b/common/protobuf/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+cmake_minimum_required(VERSION 2.6)
+
+set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
+
+KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS
+ RPC_TEST_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+ BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+ PROTO_FILES rpc_test.proto)
+add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS})
+set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE)
+
+add_custom_target(proto-deps ALL DEPENDS token_proto rpc_header_proto)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/rpc_test.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto
new file mode 100644
index 0000000..fd22331
--- /dev/null
+++ b/common/protobuf/rpc_test.proto
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package impala;
+
+// Definitions for service used for rpc-mgr-test.
+message PingRequestPB {
+}
+
+message PingResponsePB {
+ required int32 int_response = 1;
+}
+
+service PingService {
+ rpc Ping(PingRequestPB) returns (PingResponsePB);
+}
+
+message ScanMemRequestPB {
+ required int32 pattern = 1;
+ required int32 sidecar_idx = 2;
+}
+
+message ScanMemResponsePB {
+}
+
+service ScanMemService {
+ rpc ScanMem(ScanMemRequestPB) returns (ScanMemResponsePB);
+}