You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/10/03 03:38:17 UTC

[4/4] incubator-impala git commit: IMPALA-4786: Clean up how ImpalaServers are created

IMPALA-4786: Clean up how ImpalaServers are created

ImpalaServer had to be created via an awkward CreateImpalaServer()
method that took a lot of arguments. This patch refactors that code (in
anticipation of some KRPC changes) to follow a more normal lifecycle:

1. ImpalaServer* server = new ImpalaServer(ExecEnv*)
2. RETURN_IF_ERROR(server->Init()) // for error-returning init operations
3. RETURN_IF_ERROR(server->Start())
4. server->Join()

Also add ExecEnv::Init(), and move calls to ExecEnv::StartServices() to
ImpalaServer::StartServices(). This captures a dependency that KRPC will
rely on - where initialization of both ExecEnv and ImpalaServer need to
happen before services are started.

This sets up a clean-up of InProcessImpalaServer, which is too heavy for
the work that it does. That work is deferred to a follow-on patch.

This is a slightly cleaned up version of Henry's abandoned patch:
https://gerrit.cloudera.org/#/c/7673/

Change-Id: If388c5618258a9c4529cd1d63e956566b92bd0d8
Reviewed-on: http://gerrit.cloudera.org:8080/8076
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fed75810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fed75810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fed75810

Branch: refs/heads/master
Commit: fed75810bc8eba0195e5c55845ae04715ea75203
Parents: ef990cf
Author: Sailesh Mukil <sa...@apache.org>
Authored: Thu Sep 14 13:54:25 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Oct 3 02:20:17 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc             |  2 +-
 be/src/runtime/exec-env.cc            | 14 +++--
 be/src/runtime/exec-env.h             |  4 ++
 be/src/scheduling/scheduler.cc        |  4 +-
 be/src/service/impala-server.cc       | 95 +++++++++++++++++++-----------
 be/src/service/impala-server.h        | 49 +++++++++------
 be/src/service/impalad-main.cc        | 40 +++----------
 be/src/testutil/in-process-servers.cc | 38 +++---------
 be/src/testutil/in-process-servers.h  | 17 +-----
 be/src/util/hdfs-util-test.cc         |  4 +-
 10 files changed, 129 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 6a63ad5..2065c80 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -7538,7 +7538,7 @@ int main(int argc, char** argv) {
   VLOG_CONNECTION << "starting backends";
   InProcessStatestore* ips = InProcessStatestore::StartWithEphemeralPorts();
   InProcessImpalaServer* impala_server =
-      InProcessImpalaServer::StartWithEphemeralPorts("localhost", ips->port());
+      InProcessImpalaServer::StartWithEphemeralPorts(FLAGS_hostname, ips->port());
   executor_ = new ImpaladQueryExecutor(impala_server->hostname(),
       impala_server->beeswax_port());
   ABORT_IF_ERROR(executor_->Setup());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 3652280..8942007 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -213,9 +213,7 @@ Status ExecEnv::InitForFeTests() {
   return Status::OK();
 }
 
-Status ExecEnv::StartServices() {
-  LOG(INFO) << "Starting global services";
-
+Status ExecEnv::Init() {
   // Initialize thread pools
   RETURN_IF_ERROR(exec_rpc_thread_pool_->Init());
   RETURN_IF_ERROR(async_rpc_pool_->Init());
@@ -340,8 +338,7 @@ Status ExecEnv::StartServices() {
   }
   if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
 
-  // Get the fs.defaultFS value set in core-site.xml and assign it to
-  // configured_defaultFs
+  // Get the fs.defaultFS value set in core-site.xml and assign it to configured_defaultFs
   TGetHadoopConfigRequest config_request;
   config_request.__set_name(DEFAULT_FS);
   TGetHadoopConfigResponse config_response;
@@ -351,6 +348,13 @@ Status ExecEnv::StartServices() {
   } else {
     default_fs_ = "hdfs://";
   }
+
+  return Status::OK();
+}
+
+Status ExecEnv::StartServices() {
+  LOG(INFO) << "Starting global services";
+
   // Must happen after all topic registrations / callbacks are done
   if (statestore_subscriber_.get() != nullptr) {
     Status status = statestore_subscriber_->Start();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 25d1055..b8a271d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -81,6 +81,10 @@ class ExecEnv {
   /// Destructor - only used in backend tests that create new environment per test.
   ~ExecEnv();
 
+  /// Initialize the exec environment, including parsing memory limits and initializing
+  /// subsystems like the webserver, scheduler etc.
+  Status Init();
+
   /// Starts any dependent services in their correct order
   Status StartServices() WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5c2f907..adac41f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -192,7 +192,9 @@ void Scheduler::UpdateMembership(
       // will try to re-register (i.e. overwrite their subscription), but there is
       // likely a configuration problem.
       LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
-                               << be_desc.address;
+                               << be_desc.address
+                               << " (we are: " << local_backend_descriptor_.address
+                               << ")";
       continue;
     }
     if (be_desc.is_executor) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bde2288..ac5b3a9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -104,7 +104,6 @@ using namespace beeswax;
 using namespace rapidjson;
 using namespace strings;
 
-DECLARE_int32(be_port);
 DECLARE_string(nn);
 DECLARE_int32(nn_port);
 DECLARE_string(authorized_proxy_user_config);
@@ -1929,35 +1928,30 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
-Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port,
-    ThriftServer** beeswax_server, ThriftServer** hs2_server, ThriftServer** be_server,
-    boost::shared_ptr<ImpalaServer>* impala_server) {
-  DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
-  DCHECK((hs2_port == 0) == (hs2_server == nullptr));
-  DCHECK((be_port == 0) == (be_server == nullptr));
+Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port) {
+  exec_env_->SetImpalaServer(this);
+  boost::shared_ptr<ImpalaServer> handler = shared_from_this();
 
   if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
     return Status("Impala does not have a valid role configured. "
         "Either --is_coordinator or --is_executor must be set to true.");
   }
 
-  impala_server->reset(new ImpalaServer(exec_env));
-
   SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
   if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) {
     RETURN_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
   }
 
-  if (be_port != 0 && be_server != nullptr) {
+  if (thrift_be_port > 0) {
     boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
     boost::shared_ptr<TProcessor> be_processor(
         new ImpalaInternalServiceProcessor(thrift_if));
     boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("backend", exec_env->metrics()));
+        new RpcEventHandler("backend", exec_env_->metrics()));
     be_processor->setEventHandler(event_handler);
 
-    ThriftServerBuilder be_builder("backend", be_processor, be_port);
+    ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port);
 
     if (EnableInternalSslConnections()) {
       LOG(INFO) << "Enabling SSL for backend";
@@ -1966,24 +1960,24 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
           .ssl_version(ssl_version)
           .cipher_list(FLAGS_ssl_cipher_list);
     }
-    RETURN_IF_ERROR(be_builder.metrics(exec_env->metrics()).Build(be_server));
-    LOG(INFO) << "ImpalaInternalService listening on " << be_port;
+    ThriftServer* server;
+    RETURN_IF_ERROR(be_builder.metrics(exec_env_->metrics()).Build(&server));
+    thrift_be_server_.reset(server);
   }
 
   if (!FLAGS_is_coordinator) {
+    // We don't start the Beeswax and HS2 servers if this impala daemon is just an
+    // executor.
     LOG(INFO) << "Started executor Impala server on "
               << ExecEnv::GetInstance()->backend_address();
     return Status::OK();
   }
 
-  // Initialize the HS2 and Beeswax services.
-  if (beeswax_port != 0 && beeswax_server != nullptr) {
-    // Beeswax FE must be a TThreadPoolServer because ODBC and Hue only support
-    // TThreadPoolServer.
-    boost::shared_ptr<TProcessor> beeswax_processor(
-        new ImpalaServiceProcessor(*impala_server));
+  // Start the Beeswax and HS2 servers.
+  if (beeswax_port > 0) {
+    boost::shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
     boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("beeswax", exec_env->metrics()));
+        new RpcEventHandler("beeswax", exec_env_->metrics()));
     beeswax_processor->setEventHandler(event_handler);
     ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
 
@@ -1994,22 +1988,22 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
           .ssl_version(ssl_version)
           .cipher_list(FLAGS_ssl_cipher_list);
     }
+
+    ThriftServer* server;
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env->metrics())
+            .metrics(exec_env_->metrics())
             .thread_pool(FLAGS_fe_service_threads)
-            .Build(beeswax_server));
-    (*beeswax_server)->SetConnectionHandler(impala_server->get());
-
-    LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_port;
+            .Build(&server));
+    beeswax_server_.reset(server);
+    beeswax_server_->SetConnectionHandler(this);
   }
 
-  if (hs2_port != 0 && hs2_server != nullptr) {
-    // HiveServer2 JDBC driver does not support non-blocking server.
+  if (hs2_port > 0) {
     boost::shared_ptr<TProcessor> hs2_fe_processor(
-        new ImpalaHiveServer2ServiceProcessor(*impala_server));
+        new ImpalaHiveServer2ServiceProcessor(handler));
     boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("hs2", exec_env->metrics()));
+        new RpcEventHandler("hs2", exec_env_->metrics()));
     hs2_fe_processor->setEventHandler(event_handler);
 
     ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
@@ -2022,21 +2016,54 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
           .cipher_list(FLAGS_ssl_cipher_list);
     }
 
+    ThriftServer* server;
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env->metrics())
+            .metrics(exec_env_->metrics())
             .thread_pool(FLAGS_fe_service_threads)
-            .Build(hs2_server));
-    (*hs2_server)->SetConnectionHandler(impala_server->get());
+            .Build(&server));
+    hs2_server_.reset(server);
+    hs2_server_->SetConnectionHandler(this);
 
-    LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
   }
 
   LOG(INFO) << "Started coordinator/executor Impala server on "
             << ExecEnv::GetInstance()->backend_address();
+
+  return Status::OK();
+}
+
+Status ImpalaServer::Start() {
+  RETURN_IF_ERROR(exec_env_->StartServices());
+  if (thrift_be_server_.get()) {
+    RETURN_IF_ERROR(thrift_be_server_->Start());
+    LOG(INFO) << "Impala InternalService listening on " << thrift_be_server_->port();
+  }
+
+  if (hs2_server_.get()) {
+    RETURN_IF_ERROR(hs2_server_->Start());
+    LOG(INFO) << "Impala HiveServer2 Service listening on " << beeswax_server_->port();
+  }
+  if (beeswax_server_.get()) {
+    RETURN_IF_ERROR(beeswax_server_->Start());
+    LOG(INFO) << "Impala Beeswax Service listening on " << hs2_server_->port();
+  }
   return Status::OK();
 }
 
+void ImpalaServer::Join() {
+  thrift_be_server_->Join();
+  thrift_be_server_.reset();
+
+  if (FLAGS_is_coordinator) {
+    beeswax_server_->Join();
+    hs2_server_->Join();
+    beeswax_server_.reset();
+    hs2_server_.reset();
+  }
+  shutdown_promise_.Get();
+}
+
 shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
     const TUniqueId& query_id) {
   lock_guard<mutex> l(client_request_state_map_lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index eb3251c..baca128 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -109,12 +109,29 @@ class ClientRequestState;
 /// TODO: The same doesn't apply to the execution state of an individual plan
 /// fragment: the originating coordinator might die, but we can get notified of
 /// that via the statestore. This still needs to be implemented.
-class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
-                     public ThriftServer::ConnectionHandlerIf {
+class ImpalaServer : public ImpalaServiceIf,
+                     public ImpalaHiveServer2ServiceIf,
+                     public ThriftServer::ConnectionHandlerIf,
+                     public boost::enable_shared_from_this<ImpalaServer> {
  public:
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
+  /// Initializes RPC services and other subsystems (like audit logging). Returns an error
+  /// if initialization failed. If any ports are <= 0, their respective service will not
+  /// be started.
+  Status Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
+
+  /// Starts client and internal services. Does not block. Returns an error if any service
+  /// failed to start.
+  Status Start();
+
+  /// Blocks until the server shuts down (by calling Shutdown()).
+  void Join();
+
+  /// Triggers service shutdown, by unblocking Join().
+  void Shutdown() { shutdown_promise_.Set(true); }
+
   /// ImpalaService rpcs: Beeswax API (implemented in impala-beeswax-server.cc)
   virtual void query(beeswax::QueryHandle& query_handle, const beeswax::Query& query);
   virtual void executeAndWait(beeswax::QueryHandle& query_handle,
@@ -977,25 +994,19 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   /// True if this ImpalaServer can execute query fragments.
   bool is_executor_;
+
+  /// Containers for client and internal services. May not be set if the ports passed to
+  /// Init() were <= 0.
+  /// Note that these hold a shared pointer to 'this', and so need to be reset()
+  /// explicitly.
+  boost::scoped_ptr<ThriftServer> beeswax_server_;
+  boost::scoped_ptr<ThriftServer> hs2_server_;
+  boost::scoped_ptr<ThriftServer> thrift_be_server_;
+
+  /// Set to true when this ImpalaServer should shut down.
+  Promise<bool> shutdown_promise_;
 };
 
-/// Create an ImpalaServer and Thrift servers.
-/// If beeswax_port != 0 (and fe_server != NULL), creates a ThriftServer exporting
-/// ImpalaService (Beeswax) on beeswax_port (returned via beeswax_server).
-/// If hs2_port != 0 (and hs2_server != NULL), creates a ThriftServer exporting
-/// ImpalaHiveServer2Service on hs2_port (returned via hs2_server).
-/// ImpalaService and ImpalaHiveServer2Service are initialized only if this
-/// Impala server is a coordinator (indicated by the is_coordinator flag).
-/// If be_port != 0 (and be_server != NULL), create a ThriftServer exporting
-/// ImpalaInternalService on be_port (returned via be_server).
-/// Returns created ImpalaServer. The caller owns fe_server and be_server.
-/// The returned ImpalaServer is referenced by both of these via shared_ptrs and will be
-/// deleted automatically.
-/// Returns OK unless there was some error creating the servers, in
-/// which case none of the output parameters can be assumed to be valid.
-Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port,
-    int be_port, ThriftServer** beeswax_server, ThriftServer** hs2_server,
-    ThriftServer** be_server, boost::shared_ptr<ImpalaServer>* impala_server);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 53b7d3e..c9627d9 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -75,53 +75,31 @@ int ImpaladMain(int argc, char** argv) {
     LOG(WARNING) << "*****************************************************************";
   }
 
-  // start backend service for the coordinator on be_port
   ExecEnv exec_env;
+  ABORT_IF_ERROR(exec_env.Init());
+  CommonMetrics::InitCommonMetrics(exec_env.metrics());
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread()); // Memory metrics are created in Init().
   ABORT_IF_ERROR(
       StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true));
   InitRpcEventTracing(exec_env.webserver());
 
-  CommonMetrics::InitCommonMetrics(exec_env.metrics());
-
-  ThriftServer* beeswax_server = NULL;
-  ThriftServer* hs2_server = NULL;
-  ThriftServer* be_server = NULL;
-  boost::shared_ptr<ImpalaServer> server;
-  ABORT_IF_ERROR(CreateImpalaServer(&exec_env, FLAGS_beeswax_port, FLAGS_hs2_port,
-      FLAGS_be_port, &beeswax_server, &hs2_server, &be_server, &server));
+  boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
+  ABORT_IF_ERROR(impala_server->Init(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port));
 
-  Status status = exec_env.StartServices();
+  DCHECK(exec_env.process_mem_tracker() != nullptr)
+      << "ExecEnv::StartServices() must be called before starting RPC services";
+  Status status = impala_server->Start();
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
                << status.GetDetail();
     ShutdownLogging();
     exit(1);
   }
-  // Memory metrics are created in StartServices().
-  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
-
-  DCHECK(exec_env.process_mem_tracker() != nullptr)
-      << "ExecEnv::StartServices() must be called before starting RPC services";
-  ABORT_IF_ERROR(be_server->Start());
-
-  if (FLAGS_is_coordinator) {
-    ABORT_IF_ERROR(beeswax_server->Start());
-    ABORT_IF_ERROR(hs2_server->Start());
-  }
 
   ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
   LOG(INFO) << "Impala has started.";
 
-  be_server->Join();
-  delete be_server;
-
-  if (FLAGS_is_coordinator) {
-    // this blocks until the beeswax and hs2 servers terminate
-    beeswax_server->Join();
-    hs2_server->Join();
-    delete beeswax_server;
-    delete hs2_server;
-  }
+  impala_server->Join();
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index d0036af..64d681c 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -66,9 +66,9 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
     int hs2_port = FindUnusedEphemeralPort(&used_ports);
     if (hs2_port == -1) continue;
 
-    InProcessImpalaServer* impala =
-        new InProcessImpalaServer("localhost", backend_port, krpc_port, subscriber_port,
-            webserver_port, statestore_host, statestore_port);
+    InProcessImpalaServer* impala = new InProcessImpalaServer(FLAGS_hostname,
+        backend_port, krpc_port, subscriber_port, webserver_port, statestore_host,
+        statestore_port);
     // Start the daemon and check if it works, if not delete the current server object and
     // pick a new set of ports
     Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
@@ -100,41 +100,21 @@ Status InProcessImpalaServer::SetCatalogInitialized() {
 }
 
 Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
-  RETURN_IF_ERROR(exec_env_->StartServices());
-
+  RETURN_IF_ERROR(exec_env_->Init());
   beeswax_port_ = beeswax_port;
   hs2_port_ = hs2_port;
-  ThriftServer* be_server;
-  ThriftServer* hs2_server;
-  ThriftServer* beeswax_server;
-  RETURN_IF_ERROR(CreateImpalaServer(exec_env_.get(), beeswax_port, hs2_port,
-                                     backend_port_, &beeswax_server, &hs2_server,
-                                     &be_server, &impala_server_));
-  be_server_.reset(be_server);
-  hs2_server_.reset(hs2_server);
-  beeswax_server_.reset(beeswax_server);
-
-  RETURN_IF_ERROR(be_server_->Start());
-  RETURN_IF_ERROR(hs2_server_->Start());
-  RETURN_IF_ERROR(beeswax_server_->Start());
+
+  impala_server_.reset(new ImpalaServer(exec_env_.get()));
+  RETURN_IF_ERROR(impala_server_->Init(backend_port_, beeswax_port, hs2_port));
+  RETURN_IF_ERROR(impala_server_->Start());
 
   // Wait for up to 1s for the backend server to start
   RETURN_IF_ERROR(WaitForServer(hostname_, backend_port_, 10, 100));
   return Status::OK();
 }
 
-Status InProcessImpalaServer::StartAsBackendOnly() {
-  RETURN_IF_ERROR(exec_env_->StartServices());
-  ThriftServer* be_server;
-  RETURN_IF_ERROR(CreateImpalaServer(exec_env_.get(), 0, 0, backend_port_, NULL, NULL,
-                                     &be_server, &impala_server_));
-  be_server_.reset(be_server);
-  RETURN_IF_ERROR(be_server_->Start());
-  return Status::OK();
-}
-
 Status InProcessImpalaServer::Join() {
-  be_server_->Join();
+  impala_server_->Join();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 9e3b2f5..6842255 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -60,10 +60,6 @@ class InProcessImpalaServer {
   /// servers.
   Status StartWithClientServers(int beeswax_port, int hs2_port);
 
-  /// Starts only the backend server; useful when running a cluster of
-  /// InProcessImpalaServers and only one is to serve client requests.
-  Status StartAsBackendOnly();
-
   /// Blocks until the backend server exits. Returns Status::OK unless
   /// there was an error joining.
   Status Join();
@@ -91,22 +87,11 @@ class InProcessImpalaServer {
 
   uint32_t hs2_port_;
 
-  /// The ImpalaServer that handles client and backend requests. Ownership is shared via
-  /// shared_ptrs with the ThriftServers. See CreateImpalaServer for details.
+  /// The ImpalaServer that handles client and backend requests.
   boost::shared_ptr<ImpalaServer> impala_server_;
 
   /// ExecEnv holds much of the per-service state
   boost::scoped_ptr<ExecEnv> exec_env_;
-
-  /// Backend Thrift server
-  boost::scoped_ptr<ThriftServer> be_server_;
-
-  /// Frontend HiveServer2 server
-  boost::scoped_ptr<ThriftServer> hs2_server_;
-
-  /// Frontend Beeswax server.
-  boost::scoped_ptr<ThriftServer> beeswax_server_;
-
 };
 
 /// An in-process statestore, with webserver and metrics.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fed75810/be/src/util/hdfs-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
index b389864..bf8c643 100644
--- a/be/src/util/hdfs-util-test.cc
+++ b/be/src/util/hdfs-util-test.cc
@@ -34,8 +34,8 @@ TEST(HdfsUtilTest, CheckFilesystemsMatch) {
   ExecEnv* exec_env = new ExecEnv();
 
   // We do this to retrieve the default FS from the frontend.
-  // It doesn't matter if starting the services fails.
-  discard_result(exec_env->StartServices());
+  // It doesn't matter if initializing the ExecEnv fails.
+  discard_result(exec_env->Init());
 
   // Tests with both paths qualified.
   EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",