You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/10/06 15:31:57 UTC

[2/2] incubator-impala git commit: IMPALA-4670: Introduces RpcMgr class

IMPALA-4670: Introduces RpcMgr class

This patch introduces a new class, RpcMgr which is the abstraction
layer around KRPC core mechanics. It provides an interface
RegisterService() for various services to register themselves.

Kudu RPC is invoked via an auto-generated interface called proxy.
This change implements an inline wrapper for KRPC client to obtain
a proxy for a particular service exported by remote server.

Last but not least, the RpcMgr will start all registered services
if FLAGS_use_krpc is true. This patch hasn't yet added any service
except for some test services in rpc-mgr-test.

This patch is based on an abandoned patch by Henry Robinson.

Testing done: a new backend test is added to exercise the code
and demonstrate the way to interact with KRPC framework.

Change-Id: I8adb10ae375d7bf945394c38a520f12d29cf7b46
Reviewed-on: http://gerrit.cloudera.org:8080/7901
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dd4c6be8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dd4c6be8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dd4c6be8

Branch: refs/heads/master
Commit: dd4c6be8e082d4dd42e099e9a03ed23fdae9c13b
Parents: e6594bf
Author: Michael Ho <kw...@cloudera.com>
Authored: Sun Aug 20 13:53:34 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 07:09:55 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                           |   3 +-
 be/src/exec/kudu-util.h                  |  13 ++
 be/src/rpc/CMakeLists.txt                |  12 ++
 be/src/rpc/rpc-mgr-test.cc               | 251 ++++++++++++++++++++++++++
 be/src/rpc/rpc-mgr.cc                    | 115 ++++++++++++
 be/src/rpc/rpc-mgr.h                     | 181 +++++++++++++++++++
 be/src/rpc/rpc-mgr.inline.h              |  45 +++++
 be/src/runtime/exec-env.cc               |  23 ++-
 be/src/runtime/exec-env.h                |  22 ++-
 be/src/scheduling/scheduler-test-util.cc |  12 +-
 be/src/scheduling/scheduler.cc           |  27 +--
 be/src/scheduling/scheduler.h            |   8 +-
 be/src/service/impala-server.cc          |  16 +-
 be/src/util/counting-barrier.h           |   3 +
 be/src/util/network-util.cc              |  11 ++
 be/src/util/network-util.h               |  12 +-
 cmake_modules/FindKRPC.cmake             |   4 +-
 common/protobuf/CMakeLists.txt           |  31 ++++
 common/protobuf/rpc_test.proto           |  42 +++++
 19 files changed, 780 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d60487f..865f7b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -347,12 +347,13 @@ add_subdirectory(common/function-registry)
 add_subdirectory(common/thrift)
 add_subdirectory(common/fbs)
 add_subdirectory(common/yarn-extras)
+add_subdirectory(common/protobuf)
 add_subdirectory(be)
 add_subdirectory(fe)
 add_subdirectory(ext-data-source)
 
 # Build target for all generated files which most backend code depends on
-add_custom_target(gen-deps ALL DEPENDS thrift-deps)
+add_custom_target(gen-deps ALL DEPENDS thrift-deps proto-deps)
 
 add_custom_target(tarballs ALL DEPENDS shell_tarball)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 28c6b27..6113401 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -81,5 +81,18 @@ Status WriteKuduValue(int col, PrimitiveType type, const void* value,
 /// Takes a Kudu client DataType and returns the corresponding Impala ColumnType.
 ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type);
 
+/// Utility function for creating an Impala Status object based on a kudu::Status object.
+/// 'k_status' is the kudu::Status object.
+/// 'prepend' is a string to be prepended to details of 'k_status' when creating the
+/// Impala Status object.
+/// Note that we don't translate the kudu::Status error code to Impala error code
+/// so the returned status' type is always of TErrorCode::GENERAL.
+inline Status FromKuduStatus(
+    const kudu::Status& k_status, const std::string prepend = "") {
+  if (LIKELY(k_status.ok())) return Status::OK();
+  if (prepend.empty()) return Status(k_status.ToString());
+  return Status(strings::Substitute("$0: $1", prepend, k_status.ToString()));
+}
+
 } /// namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index d837f6c..95be43f 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -21,8 +21,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 # where to put generated binaries
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 
+# Mark the protobuf files as generated
+set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
 add_library(Rpc
   authentication.cc
+  rpc-mgr.cc
   rpc-trace.cc
   TAcceptQueueServer.cpp
   thrift-util.cc
@@ -35,3 +39,11 @@ add_dependencies(Rpc gen-deps)
 ADD_BE_TEST(thrift-util-test)
 ADD_BE_TEST(thrift-server-test)
 ADD_BE_TEST(authentication-test)
+
+ADD_BE_TEST(rpc-mgr-test)
+add_dependencies(rpc-mgr-test rpc_test_proto)
+target_link_libraries(rpc-mgr-test rpc_test_proto)
+
+add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
+add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)
+target_link_libraries(rpc_test_proto krpc)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
new file mode 100644
index 0000000..3eb0d92
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "testutil/gtest-util.h"
+#include "util/counting-barrier.h"
+#include "util/network-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+
+#include "common/names.h"
+
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::ServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::MonoDelta;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
+
+#define PAYLOAD_SIZE (4096)
+
+class RpcMgrTest : public testing::Test {
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(entity, tracker), cb_(cb) {}
+
+  virtual void Ping(
+      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+    response->set_int_response(42);
+    cb_(context);
+  }
+
+ private:
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
+    : ScanMemServiceIf(entity, tracker) {
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+      RpcContext* context) {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    context->RespondSuccess();
+  }
+};
+
+TEST_F(RpcMgrTest, MultipleServices) {
+  // Test that a service can be started, and will respond to requests.
+  unique_ptr<ServiceIf> ping_impl(
+      new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl)));
+
+  // Test that a second service, that verifies the RPC payload is not corrupted,
+  // can be started.
+  unique_ptr<ServiceIf> scan_mem_impl(
+      new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      kudu::Status status = ping_proxy->Ping(request, &response, &controller);
+      ASSERT_TRUE(status.ok());
+      ASSERT_EQ(response.int_response(), 42);
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      SetupScanMemRequest(&request, &controller);
+      kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller);
+      ASSERT_TRUE(status.ok());
+    }
+  }
+}
+
+TEST_F(RpcMgrTest, SlowCallback) {
+
+  // Use a callback which is slow to respond.
+  auto slow_cb = [](RpcContext* ctx) {
+    SleepForMs(300);
+    ctx->RespondSuccess();
+  };
+
+  // Test a service which is slow to respond and has a short queue.
+  // Set a timeout on the client side. Expect either a client timeout
+  // or the service queue filling up.
+  unique_ptr<ServiceIf> impl(
+      new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+  const int num_service_threads = 1;
+  const int queue_size = 3;
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl)));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  unique_ptr<PingServiceProxy> proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &proxy));
+
+  PingRequestPB request;
+  PingResponsePB response;
+  RpcController controller;
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    controller.set_timeout(MonoDelta::FromMilliseconds(50));
+    kudu::Status status = proxy->Ping(request, &response, &controller);
+    ASSERT_TRUE(status.IsTimedOut() || RpcMgr::IsServerTooBusy(controller));
+  }
+}
+
+TEST_F(RpcMgrTest, AsyncCall) {
+  unique_ptr<ServiceIf> scan_mem_impl(
+      new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  RpcController controller;
+  srand(0);
+  for (int i = 0; i < 10; ++i) {
+    controller.Reset();
+    ScanMemRequestPB request;
+    ScanMemResponsePB response;
+    SetupScanMemRequest(&request, &controller);
+    CountingBarrier barrier(1);
+    scan_mem_proxy->ScanMemAsync(request, &response, &controller,
+        [barrier_ptr = &barrier]() { barrier_ptr->Notify(); });
+    // TODO: Inject random cancellation here.
+    barrier.Wait();
+    ASSERT_TRUE(controller.status().ok()) << controller.status().ToString();
+  }
+}
+
+} // namespace impala
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
new file mode 100644
index 0000000..f6491be
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.h"
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/net/net_util.h"
+#include "util/cpu-info.h"
+#include "util/network-util.h"
+
+#include "common/names.h"
+
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::Messenger;
+using kudu::rpc::AcceptorPool;
+using kudu::rpc::RpcController;
+using kudu::rpc::ServiceIf;
+using kudu::rpc::ServicePool;
+using kudu::Sockaddr;
+using kudu::HostPort;
+using kudu::MetricEntity;
+
+DECLARE_string(hostname);
+
+DEFINE_int32(num_acceptor_threads, 2,
+    "Number of threads dedicated to accepting connection requests for RPC services");
+DEFINE_int32(num_reactor_threads, 0,
+    "Number of threads dedicated to managing network IO for RPC services. If left at "
+    "default value 0, it will be set to number of CPU cores.");
+
+namespace impala {
+
+Status RpcMgr::Init() {
+  MessengerBuilder bld("impala-server");
+  const scoped_refptr<MetricEntity> entity(
+      METRIC_ENTITY_server.Instantiate(&registry_, "krpc-metrics"));
+  int num_reactor_threads =
+      FLAGS_num_reactor_threads > 0 ? FLAGS_num_reactor_threads : CpuInfo::num_cores();
+  bld.set_num_reactors(num_reactor_threads).set_metric_entity(entity);
+  KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger");
+  return Status::OK();
+}
+
+Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
+    unique_ptr<ServiceIf> service_ptr) {
+  DCHECK(is_inited()) << "Must call Init() before RegisterService()";
+  DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
+  scoped_refptr<ServicePool> service_pool =
+      new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release()),
+          messenger_->metric_entity(), service_queue_depth);
+  // Start the thread pool first before registering the service in case the startup fails.
+  KUDU_RETURN_IF_ERROR(
+      service_pool->Init(num_service_threads), "Service pool failed to start");
+  KUDU_RETURN_IF_ERROR(
+      messenger_->RegisterService(service_pool->service_name(), service_pool),
+      "Could not register service");
+  service_pools_.push_back(service_pool);
+
+  return Status::OK();
+}
+
+Status RpcMgr::StartServices(const TNetworkAddress& address) {
+  DCHECK(is_inited()) << "Must call Init() before StartServices()";
+  DCHECK(!services_started_) << "May not call StartServices() twice";
+
+  // Convert 'address' to Kudu's Sockaddr
+  DCHECK(IsResolvedAddress(address));
+  Sockaddr sockaddr;
+  RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+
+  // Call the messenger to create an AcceptorPool for us.
+  shared_ptr<AcceptorPool> acceptor_pool;
+  KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool),
+      "Failed to add acceptor pool");
+  KUDU_RETURN_IF_ERROR(acceptor_pool->Start(FLAGS_num_acceptor_threads),
+      "Acceptor pool failed to start");
+  VLOG_QUERY << "Started " << FLAGS_num_acceptor_threads << " acceptor threads";
+  services_started_ = true;
+  return Status::OK();
+}
+
+void RpcMgr::Shutdown() {
+  if (messenger_.get() == nullptr) return;
+  for (auto service_pool : service_pools_) service_pool->Shutdown();
+
+  messenger_->UnregisterAllServices();
+  messenger_->Shutdown();
+  service_pools_.clear();
+}
+
+bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) {
+  const kudu::Status status = rpc_controller.status();
+  const kudu::rpc::ErrorStatusPB* err = rpc_controller.error_response();
+  return status.IsRemoteError() && err != nullptr && err->has_code() &&
+      err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
new file mode 100644
index 0000000..d414bb6
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.h
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_H
+#define IMPALA_RPC_RPC_MGR_H
+
+#include "common/status.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/metrics.h"
+
+#include "gen-cpp/Types_types.h"
+
+namespace kudu {
+namespace rpc {
+class RpcController;
+class ServiceIf;
+} // rpc
+} // kudu
+
+namespace impala {
+
+/// Singleton class which manages all KRPC services and proxies.
+///
+/// SERVICES
+/// --------
+///
+/// An RpcMgr manages 0 or more services: RPC interfaces that are a collection of remotely
+/// accessible methods. A new service is registered by calling RegisterService(). All
+/// services are served on the same port; the underlying RPC layer takes care of
+/// de-multiplexing RPC calls to their respective endpoints.
+///
+/// Services are made available to remote clients when RpcMgr::StartServices() is called;
+/// before this method no service method will be called.
+///
+/// Services may only be registered and started after RpcMgr::Init() is called.
+///
+/// PROXIES
+/// -------
+///
+/// A proxy is a client-side interface to a remote service. Remote methods exported by
+/// that service may be called through a proxy as though they were local methods.
+///
+/// A proxy can be obtained by calling GetProxy(). Proxies implement local methods which
+/// call remote service methods, e.g. proxy->Foo(request, &response) will call the Foo()
+/// service method on the service that 'proxy' points to.
+///
+/// Proxies may only be created after RpcMgr::Init() is called.
+///
+/// For example usage of proxies, please see rpc-mgr-test.cc
+///
+/// LIFECYCLE
+/// ---------
+///
+/// RpcMgr resides inside the singleton ExecEnv class.
+///
+/// Before any proxy or service interactions, RpcMgr::Init() must be called exactly once
+/// to start the reactor threads that service network events. Services must be registered
+/// with RpcMgr::RegisterService() before RpcMgr::StartServices() is called. When shutting
+/// down, RpcMgr::Shutdown() must be called to ensure that all services are cleanly
+/// terminated. RpcMgr::Init() and RpcMgr::Shutdown() are not thread safe.
+///
+/// KRPC INTERNALS
+/// --------------
+///
+/// Each service and proxy interacts with the network via a shared pool of 'reactor'
+/// threads which respond to incoming and outgoing RPC events. The number of 'reactor'
+/// threads are configurable via FLAGS_reactor_thread. By default, it's set to the number
+/// of cpu cores. Incoming events are passed immediately to one of two thread pools: new
+/// connections are handled by an 'acceptor' pool, and RPC request events are handled by
+/// a per-service 'service' pool. The size of a 'service' pool is specified when calling
+/// RegisterService().
+///
+/// All incoming RPC requests are placed into a per-service pool's fixed-size queue.
+/// The service threads will dequeue from this queue and process the requests. If the
+/// queue becomes full, the RPC will fail at the caller. The function IsServerTooBusy()
+/// below will return true for this case. The size of the queue is specified when calling
+/// RegisterService().
+///
+/// Inbound connection set-up is handled by a small fixed-size pool of 'acceptor'
+/// threads. The number of threads that accept new TCP connection requests to the service
+/// port is configurable via FLAGS_acceptor_threads.
+class RpcMgr {
+ public:
+  /// Initializes the reactor threads, and prepares for sending outbound RPC requests.
+  Status Init() WARN_UNUSED_RESULT;
+
+  bool is_inited() const { return messenger_.get() != nullptr; }
+
+  /// Start the acceptor threads which listen on 'address', making KRPC services
+  /// available. 'address' has to be a resolved IP address. Before this method is called,
+  /// remote clients will get a 'connection refused' error when trying to invoke an RPC
+  /// on this host.
+  Status StartServices(const TNetworkAddress& address) WARN_UNUSED_RESULT;
+
+  /// Register a new service.
+  ///
+  /// 'num_service_threads' is the number of threads that should be started to execute RPC
+  /// handlers for the new service.
+  ///
+  /// 'service_queue_depth' is the maximum number of requests that may be queued for this
+  /// service before clients begin to see rejection errors.
+  ///
+  /// 'service_ptr' contains an interface implementation that will handle RPCs. Note that
+  /// the service name has to be unique within an Impala instance or the registration will
+  /// fail.
+  ///
+  /// It is an error to call this after StartServices() has been called.
+  Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
+      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr) WARN_UNUSED_RESULT;
+
+  /// Creates a new proxy for a remote service of type P at location 'address', and places
+  /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must
+  /// be a resolved IP address.
+  template <typename P>
+  Status GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy)
+      WARN_UNUSED_RESULT;
+
+  /// Shut down all previously registered services. All service pools are shut down.
+  /// All acceptor and reactor threads within the messenger are also shut down.
+  /// All unprocessed incoming requests will be replied with error messages.
+  void Shutdown();
+
+  /// Returns true if the last RPC of 'rpc_controller' failed because the remote
+  /// service's queue filled up and couldn't accept more incoming requests.
+  /// 'rpc_controller' should contain the status of the last RPC call.
+  static bool IsServerTooBusy(const kudu::rpc::RpcController& rpc_controller);
+
+  const scoped_refptr<kudu::rpc::ResultTracker> result_tracker() const {
+    return tracker_;
+  }
+
+  scoped_refptr<kudu::MetricEntity> metric_entity() const {
+    return messenger_->metric_entity();
+  }
+
+  ~RpcMgr() {
+    DCHECK_EQ(service_pools_.size(), 0)
+        << "Must call Shutdown() before destroying RpcMgr";
+  }
+
+ private:
+  /// One pool per registered service. scoped_refptr<> is dictated by the Kudu interface.
+  std::vector<scoped_refptr<kudu::rpc::ServicePool>> service_pools_;
+
+  /// Required Kudu boilerplate for constructing the MetricEntity passed
+  /// to c'tor of ServiceIf when creating a service.
+  /// TODO(KRPC): Integrate with Impala MetricGroup.
+  kudu::MetricRegistry registry_;
+
+  /// Used when creating a new service. Shared across all services which don't really
+  /// track results for idempotent RPC calls.
+  const scoped_refptr<kudu::rpc::ResultTracker> tracker_;
+
+  /// Container for reactor threads which run event loops for RPC services, plus acceptor
+  /// threads which manage connection setup. Has to be a shared_ptr as required by
+  /// MessangerBuilder::Build().
+  std::shared_ptr<kudu::rpc::Messenger> messenger_;
+
+  /// True after StartServices() completes.
+  bool services_started_ = false;
+};
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.inline.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h
new file mode 100644
index 0000000..474ac45
--- /dev/null
+++ b/be/src/rpc/rpc-mgr.inline.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_INLINE_H
+#define IMPALA_RPC_RPC_MGR_INLINE_H
+
+#include "rpc/rpc-mgr.h"
+
+#include "exec/kudu-util.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/service_pool.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+/// Always inline to avoid having to provide a definition for each use type P.
+template <typename P>
+Status RpcMgr::GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy) {
+  DCHECK(proxy != nullptr);
+  DCHECK(is_inited()) << "Must call Init() before GetProxy()";
+  DCHECK(IsResolvedAddress(address));
+  kudu::Sockaddr sockaddr;
+  RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+  proxy->reset(new P(messenger_, sockaddr, address.hostname));
+  return Status::OK();
+}
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8942007..94d2ca6 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -28,6 +28,7 @@
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
 #include "gen-cpp/ImpalaInternalService.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
@@ -168,10 +169,12 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
-    backend_address_(MakeNetworkAddress(hostname, backend_port)),
-    krpc_port_(krpc_port) {
+    backend_address_(MakeNetworkAddress(hostname, backend_port)) {
 
   if (FLAGS_use_krpc) {
+    // KRPC relies on resolved IP address. It's set in StartServices().
+    krpc_address_.__set_port(krpc_port);
+    rpc_mgr_.reset(new RpcMgr());
     stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
   } else {
     stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
@@ -204,6 +207,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
 
 ExecEnv::~ExecEnv() {
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
+  if (rpc_mgr_ != nullptr) rpc_mgr_->Shutdown();
   disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
 }
 
@@ -281,6 +285,15 @@ Status ExecEnv::Init() {
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
 
+  // Resolve hostname to IP address.
+  RETURN_IF_ERROR(HostnameToIpAddr(backend_address_.hostname, &ip_address_));
+
+  // Initialize the RPCMgr before allowing services registration.
+  if (FLAGS_use_krpc) {
+    krpc_address_.__set_hostname(ip_address_);
+    RETURN_IF_ERROR(rpc_mgr_->Init());
+  }
+
   mem_tracker_.reset(
       new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
   // Add BufferPool MemTrackers for cached memory that is not tracked against queries
@@ -334,7 +347,7 @@ Status ExecEnv::Init() {
   }
 
   if (scheduler_ != nullptr) {
-    RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_port_));
+    RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_address_, ip_address_));
   }
   if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
 
@@ -364,6 +377,8 @@ Status ExecEnv::StartServices() {
     }
   }
 
+  // Start this last so everything is in place before accepting the first call.
+  if (FLAGS_use_krpc) RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
   return Status::OK();
 }
 
@@ -402,4 +417,4 @@ KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
   return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
 }
 
-}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index b8a271d..31532df 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -27,9 +27,14 @@
 #include "common/status.h"
 #include "runtime/client-cache-types.h"
 #include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
+#include "util/network-util.h"
 #include "util/spinlock.h"
 
-namespace kudu { namespace client { class KuduClient; } }
+namespace kudu {
+namespace client {
+class KuduClient;
+} // namespace client
+} // namespace kudu
 
 namespace impala {
 
@@ -53,6 +58,7 @@ class ObjectPool;
 class QueryResourceMgr;
 class RequestPoolService;
 class ReservationTracker;
+class RpcMgr;
 class Scheduler;
 class StatestoreSubscriber;
 class ThreadResourceMgr;
@@ -131,7 +137,9 @@ class ExecEnv {
 
   const TNetworkAddress& backend_address() const { return backend_address_; }
 
-  int krpc_port() const { return krpc_port_; }
+  const IpAddr& ip_address() const { return ip_address_; }
+
+  const TNetworkAddress& krpc_address() const { return krpc_address_; }
 
   /// Initializes the exec env for running FE tests.
   Status InitForFeTests() WARN_UNUSED_RESULT;
@@ -173,6 +181,7 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> exec_rpc_thread_pool_;
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
+  boost::scoped_ptr<RpcMgr> rpc_mgr_;
 
   /// Query-wide buffer pool and the root reservation tracker for the pool. The
   /// reservation limit is equal to the maximum capacity of the pool. Created in
@@ -191,11 +200,14 @@ class ExecEnv {
   static ExecEnv* exec_env_;
   bool is_fe_tests_ = false;
 
-  /// Address of the Impala backend server instance
+  /// Address of the thrift based ImpalaInternalService
   TNetworkAddress backend_address_;
 
-  /// Port number on which all KRPC-based services are exported.
-  int krpc_port_;
+  /// Resolved IP address of the host name.
+  IpAddr ip_address_;
+
+  /// Address of the KRPC-based ImpalaInternalService
+  TNetworkAddress krpc_address_;
 
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2d85a9a..05cfc42 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -20,7 +20,6 @@
 #include <boost/unordered_set.hpp>
 
 #include "common/names.h"
-#include "runtime/exec-env.h"
 #include "scheduling/scheduler.h"
 
 using namespace impala;
@@ -509,13 +508,14 @@ void SchedulerWrapper::InitializeScheduler() {
                                            << "hosts.";
   const Host& scheduler_host = plan_.cluster().hosts()[0];
   string scheduler_backend_id = scheduler_host.ip;
-  TNetworkAddress scheduler_backend_address;
-  scheduler_backend_address.hostname = scheduler_host.ip;
-  scheduler_backend_address.port = scheduler_host.be_port;
-
+  TNetworkAddress scheduler_backend_address =
+      MakeNetworkAddress(scheduler_host.ip, scheduler_host.be_port);
+  TNetworkAddress scheduler_krpc_address =
+      MakeNetworkAddress(scheduler_host.ip, FLAGS_krpc_port);
   scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
       &metrics_, nullptr, nullptr));
-  const Status status = scheduler_->Init(scheduler_backend_address, FLAGS_krpc_port);
+  const Status status = scheduler_->Init(scheduler_backend_address,
+      scheduler_krpc_address, scheduler_host.ip);
   DCHECK(status.ok()) << "Scheduler init failed in test";
   // Initialize the scheduler backend maps.
   SendFullMembershipMap();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index adac41f..5cf0f01 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -65,29 +65,20 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
     request_pool_service_(request_pool_service) {
 }
 
-Status Scheduler::Init(const TNetworkAddress& backend_address, int krpc_port) {
+Status Scheduler::Init(const TNetworkAddress& backend_address,
+    const TNetworkAddress& krpc_address, const IpAddr& ip) {
   LOG(INFO) << "Starting scheduler";
-
-  // Figure out what our IP address is, so that each subscriber doesn't have to resolve
-  // it on every heartbeat. KRPC also assumes that the address is resolved already.
-  // May as well do it up front to avoid frequent DNS requests.
   local_backend_descriptor_.address = backend_address;
-  IpAddr ip;
-  const Hostname& hostname = backend_address.hostname;
-  Status status = HostnameToIpAddr(hostname, &ip);
-  if (!status.ok()) {
-    VLOG(1) << status.GetDetail();
-    status.AddDetail("Scheduler failed to start");
-    return status;
-  }
-
+  // Store our IP address so that each subscriber doesn't have to resolve
+  // it on every heartbeat. May as well do it up front to avoid frequent DNS
+  // requests.
   local_backend_descriptor_.ip_address = ip;
   LOG(INFO) << "Scheduler using " << ip << " as IP address";
-
   if (FLAGS_use_krpc) {
-    // KRPC expects address to have been resolved already.
-    TNetworkAddress krpc_svc_addr = MakeNetworkAddress(ip, krpc_port);
-    local_backend_descriptor_.__set_krpc_address(krpc_svc_addr);
+    // KRPC relies on resolved IP address.
+    DCHECK(IsResolvedAddress(krpc_address));
+    DCHECK_EQ(krpc_address.hostname, ip);
+    local_backend_descriptor_.__set_krpc_address(krpc_address);
   }
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index d1a22f8..2fe90b8 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -86,9 +86,11 @@ class Scheduler {
   /// decisions once this method returns. Register with the subscription manager if
   /// required. Also initializes the local backend descriptor. Returns error status
   /// on failure. 'backend_address' is the address of thrift based ImpalaInternalService
-  /// of this backend. 'krpc_port' is the port on which KRPC based ImpalaInternalService
-  /// is exported.
-  Status Init(const TNetworkAddress& backend_address, int krpc_port);
+  /// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port
+  /// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved
+  /// IP address of this backend.
+  Status Init(const TNetworkAddress& backend_address,
+      const TNetworkAddress& krpc_address, const IpAddr& ip);
 
   /// Populates given query schedule and assigns fragments to hosts based on scan
   /// ranges in the query exec request.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d9d2629..fabc8fa 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1629,18 +1629,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
   local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
   local_backend_descriptor.__set_address(exec_env_->backend_address());
-  IpAddr ip;
-  const Hostname& hostname = local_backend_descriptor.address.hostname;
-  Status status = HostnameToIpAddr(hostname, &ip);
-  if (!status.ok()) {
-    // TODO: Should we do something about this failure?
-    LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: "
-                 << status.GetDetail();
-    return;
-  }
-  local_backend_descriptor.ip_address = ip;
+  local_backend_descriptor.ip_address = exec_env_->ip_address();
   if (FLAGS_use_krpc) {
-    TNetworkAddress krpc_address = MakeNetworkAddress(ip, exec_env_->krpc_port());
+    const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+    DCHECK(IsResolvedAddress(krpc_address));
     local_backend_descriptor.__set_krpc_address(krpc_address);
   }
   subscriber_topic_updates->emplace_back(TTopicDelta());
@@ -1650,7 +1642,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
 
   TTopicItem& item = update.topic_entries.back();
   item.key = local_backend_id;
-  status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
+  Status status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
   if (!status.ok()) {
     LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
                  << " " << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 76dbe35..49b0bde 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -18,6 +18,9 @@
 #ifndef IMPALA_UTIL_COUNTING_BARRIER_H
 #define IMPALA_UTIL_COUNTING_BARRIER_H
 
+#include "common/atomic.h"
+#include "util/promise.h"
+
 namespace impala {
 
 /// Allows clients to wait for the arrival of a fixed number of notifications before they

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 1a9ce53..7a10965 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -28,6 +28,7 @@
 #include <vector>
 #include <boost/algorithm/string.hpp>
 
+#include "exec/kudu-util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
@@ -212,4 +213,14 @@ int FindUnusedEphemeralPort(vector<int>* used_ports) {
   close(sockfd);
   return -1;
 }
+
+Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
+    kudu::Sockaddr* sockaddr) {
+  DCHECK(IsResolvedAddress(address));
+  KUDU_RETURN_IF_ERROR(
+      sockaddr->ParseString(TNetworkAddressToString(address), address.port),
+      "Failed to parse address to Kudu Sockaddr.");
+  return Status::OK();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index e964f5c..5b108dc 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -20,6 +20,10 @@
 #include "gen-cpp/Types_types.h"
 #include <vector>
 
+namespace kudu {
+class Sockaddr;
+} // namespace kudu
+
 namespace impala {
 
 /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
@@ -64,6 +68,11 @@ bool IsWildcardAddress(const std::string& ipaddress);
 /// Utility method to print address as address:port
 std::string TNetworkAddressToString(const TNetworkAddress& address);
 
+/// Utility method to convert TNetworkAddress to Kudu sock addr.
+/// Note that 'address' has to contain a resolved IP address.
+Status TNetworkAddressToSockaddr(const TNetworkAddress& address,
+    kudu::Sockaddr* sockaddr);
+
 /// Prints a hostport as ipaddress:port
 std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport);
 
@@ -71,4 +80,5 @@ std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport);
 /// a free ephemeral port can't be found after 100 tries. If 'used_ports' is non-NULL,
 /// does not select those ports and adds the selected port to 'used_ports'.
 int FindUnusedEphemeralPort(std::vector<int>* used_ports);
-}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/cmake_modules/FindKRPC.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindKRPC.cmake b/cmake_modules/FindKRPC.cmake
index 593edcd..1547c20 100644
--- a/cmake_modules/FindKRPC.cmake
+++ b/cmake_modules/FindKRPC.cmake
@@ -101,7 +101,9 @@ function(KRPC_GENERATE SRCS HDRS TGTS)
     # This custom target enforces that there's just one invocation of protoc
     # when there are multiple consumers of the generated files. The target name
     # must be unique; adding parts of the filename helps ensure this.
-    set(TGT_NAME ${REL_DIR}${FIL})
+    # Adding the prefix "KRPC_" to avoid conflation with the input proto file
+    # when ninja is used. Otherwise, ninja will flag a false circular dependency.
+    set(TGT_NAME KRPC_${REL_DIR}${FIL})
     string(REPLACE "/" "-" TGT_NAME ${TGT_NAME})
     add_custom_target(${TGT_NAME}
       DEPENDS "${SERVICE_CC}" "${SERVICE_H}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt
new file mode 100644
index 0000000..4d5f121
--- /dev/null
+++ b/common/protobuf/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+cmake_minimum_required(VERSION 2.6)
+
+set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
+
+KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS
+  RPC_TEST_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+  PROTO_FILES rpc_test.proto)
+add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS})
+set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE)
+
+add_custom_target(proto-deps ALL DEPENDS token_proto rpc_header_proto)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/rpc_test.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto
new file mode 100644
index 0000000..fd22331
--- /dev/null
+++ b/common/protobuf/rpc_test.proto
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package impala;
+
+// Definitions for service used for rpc-mgr-test.
+message PingRequestPB {
+}
+
+message PingResponsePB {
+  required int32 int_response = 1;
+}
+
+service PingService {
+  rpc Ping(PingRequestPB) returns (PingResponsePB);
+}
+
+message ScanMemRequestPB {
+  required int32 pattern = 1;
+  required int32 sidecar_idx = 2;
+}
+
+message ScanMemResponsePB {
+}
+
+service ScanMemService {
+  rpc ScanMem(ScanMemRequestPB) returns (ScanMemResponsePB);
+}