You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/12/12 00:55:19 UTC

(impala) branch master updated: IMPALA-12426: Adds the backend InternalServer class.

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 408c606dd IMPALA-12426: Adds the backend InternalServer class.
408c606dd is described below

commit 408c606dd9b891b3abc7f5ce95c95a92334abb8b
Author: jasonmfehr <jf...@cloudera.com>
AuthorDate: Fri Sep 29 14:42:03 2023 -0700

    IMPALA-12426: Adds the backend InternalServer class.
    
    This class is intended for use by backend code within an
    Impala coordinator to submit queries to itself. It works
    by directly calling backend methods on the ImpalaServer
    instance. By directly calling these methods, user
    authentication is bypassed. However, authorization (such as
    Ranger rules) is applied. Additionally, overhead associated
    with protocol management (Beeswax and HS2) is eliminated.
    
    This code is not yet in use.
    
    Testing consists of a new type of backend ctest tests.
    These tests join an existing, running Impala cluster in
    order to execute queries, inserts, dmls, and ddls using
    the new InternalServer class. Test cases ensure multiple
    sql statement can be run in the same session and also in
    separate sessions. Negative testing also ensures timeouts
    and invalid query options are handled correctly.
    
    Change-Id: I27686aa563fac87429657e4980b29b0da91eb9e1
    Reviewed-on: http://gerrit.cloudera.org:8080/20524
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/CMakeLists.txt          |   2 +
 be/src/service/impala-server.cc        |  27 +-
 be/src/service/impala-server.h         |  37 ++-
 be/src/service/internal-server-test.cc | 554 +++++++++++++++++++++++++++++++++
 be/src/service/internal-server.cc      | 272 ++++++++++++++++
 be/src/service/internal-server.h       | 216 +++++++++++++
 be/src/testutil/http-util.h            | 104 +++++++
 be/src/util/test-info.h                |  10 +-
 be/src/util/webserver-test.cc          |  74 +----
 9 files changed, 1210 insertions(+), 86 deletions(-)

diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index adc282b36..56cfaa384 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -40,6 +40,7 @@ add_library(Service
   impala-http-handler.cc
   impalad-main.cc
   impala-server.cc
+  internal-server.cc
   query-options.cc
   query-result-set.cc
 )
@@ -139,3 +140,4 @@ ADD_BE_TEST(session-expiry-test session-expiry-test.cc) # TODO: this leaks thrif
 ADD_UNIFIED_BE_LSAN_TEST(hs2-util-test "StitchNullsTest.*:PrintTColumnValueTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(query-options-test QueryOptions.*)
 ADD_UNIFIED_BE_LSAN_TEST(impala-server-test ImpalaServerTest.*)
+ADD_BE_LSAN_TEST(internal-server-test)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ece04cd05..6e89c78d6 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -456,6 +456,7 @@ const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
 const string ImpalaServer::BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string ImpalaServer::HS2_SERVER_NAME = "hiveserver2-frontend";
 const string ImpalaServer::HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
+const string ImpalaServer::INTERNAL_SERVER_NAME = "internal-server";
 const string ImpalaServer::EXTERNAL_FRONTEND_SERVER_NAME = "external-frontend";
 
 const string ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME = "default";
@@ -549,7 +550,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
 
   // Register the catalog update callback if running in a real cluster as a coordinator.
-  if (!TestInfo::is_test() && FLAGS_is_coordinator) {
+  if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && FLAGS_is_coordinator) {
     auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
         vector<TTopicDelta>* topic_updates) {
       this->CatalogUpdateCallback(state, topic_updates);
@@ -2579,19 +2580,14 @@ bool ImpalaServer::QueryStateRecordLessThan::operator() (
 
 void ImpalaServer::ConnectionStart(
     const ThriftServer::ConnectionContext& connection_context) {
-  if (connection_context.server_name == BEESWAX_SERVER_NAME) {
+  if (connection_context.server_name == BEESWAX_SERVER_NAME ||
+      connection_context.server_name == INTERNAL_SERVER_NAME) {
     // Beeswax only allows for one session per connection, so we can share the session ID
     // with the connection ID
     const TUniqueId& session_id = connection_context.connection_id;
     // Generate a secret per Beeswax session so that the HS2 secret validation mechanism
     // prevent accessing of Beeswax sessions from HS2.
-    uuid secret_uuid;
-    {
-      lock_guard<mutex> l(uuid_lock_);
-      secret_uuid = crypto_uuid_generator_();
-    }
-    TUniqueId secret;
-    UUIDToTUniqueId(secret_uuid, &secret);
+    TUniqueId secret = this->RandomUniqueID();
     shared_ptr<SessionState> session_state =
         std::make_shared<SessionState>(this, session_id, secret);
     session_state->closed = false;
@@ -2655,6 +2651,7 @@ void ImpalaServer::ConnectionEnd(
             << " associated session(s).";
 
   bool close = connection_context.server_name == BEESWAX_SERVER_NAME
+      || connection_context.server_name == INTERNAL_SERVER_NAME
       || FLAGS_disconnected_session_timeout <= 0;
   if (close) {
     for (const TUniqueId& session_id : disconnected_sessions) {
@@ -3465,4 +3462,16 @@ void ImpalaServer::GetAllConnectionContexts(
     external_fe_server_->GetConnectionContextList(connection_contexts);
   }
 }
+
+TUniqueId ImpalaServer::RandomUniqueID() {
+  uuid conn_uuid;
+  {
+    lock_guard<mutex> l(this->uuid_lock_);
+    conn_uuid = this->crypto_uuid_generator_();
+  }
+  TUniqueId conn_id;
+  UUIDToTUniqueId(conn_uuid, &conn_id);
+
+  return conn_id;
 }
+} // namespace impala
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 61f593a7c..02bdd4581 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -37,6 +37,7 @@
 #include "kudu/util/random.h"
 #include "rpc/thrift-server.h"
 #include "runtime/types.h"
+#include "service/internal-server.h"
 #include "service/query-options.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
@@ -197,6 +198,7 @@ class TQueryExecRequest;
 class ImpalaServer : public ImpalaServiceIf,
                      public ImpalaHiveServer2ServiceIf,
                      public ThriftServer::ConnectionHandlerIf,
+                     public InternalServer,
                      public std::enable_shared_from_this<ImpalaServer>,
                      public CacheLineAligned {
  public:
@@ -382,7 +384,7 @@ class ImpalaServer : public ImpalaServiceIf,
   static void PrepareQueryContext(const std::string& hostname,
       const NetworkAddressPB& krpc_addr, TQueryCtx* query_ctx);
 
-  /// SessionHandlerIf methods
+  /// ThriftServer::ConnectionHandlerIf methods
 
   /// Called when a Beeswax or HS2 connection starts. For Beeswax, registers a new
   /// SessionState associated with the new connection. For HS2, this is a no-op (HS2
@@ -398,6 +400,27 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
   virtual bool IsIdleConnection(const ThriftServer::ConnectionContext& session_context);
 
+  /// InternalServer methods, see internal-server.h for details
+  virtual Status OpenSession(const std::string& user_name, TUniqueId& new_session_id,
+      const TQueryOptions& query_opts = TQueryOptions());
+  virtual bool CloseSession(const impala::TUniqueId& session_id);
+  virtual Status ExecuteIgnoreResults(const std::string& user_name,
+      const std::string& sql, TUniqueId* query_id = nullptr);
+  virtual Status ExecuteAndFetchAllText(const std::string& user_name,
+      const std::string& sql, query_results& results, results_columns* columns = nullptr,
+      TUniqueId* query_id = nullptr);
+  virtual Status SubmitAndWait(const std::string& user_name, const std::string& sql,
+      TUniqueId& new_session_id, TUniqueId& new_query_id);
+  virtual Status WaitForResults(TUniqueId& query_id);
+  virtual Status SubmitQuery(const std::string& sql, const impala::TUniqueId& session_id,
+      TUniqueId& new_query_id);
+  virtual Status FetchAllRows(const TUniqueId& query_id, query_results& results,
+      results_columns* columns = nullptr);
+  virtual void CloseQuery(const TUniqueId& query_id);
+  virtual void GetConnectionContextList(
+      ThriftServer::ConnectionContextList* connection_contexts);
+  /// end of InternalServer methods
+
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
@@ -668,6 +691,7 @@ class ImpalaServer : public ImpalaServiceIf,
   static const string BEESWAX_SERVER_NAME;
   static const string HS2_SERVER_NAME;
   static const string HS2_HTTP_SERVER_NAME;
+  static const string INTERNAL_SERVER_NAME;
   // Used to identify external frontend RPC calls
   static const string EXTERNAL_FRONTEND_SERVER_NAME;
 
@@ -1268,6 +1292,9 @@ class ImpalaServer : public ImpalaServiceIf,
   void WaitForNewCatalogServiceId(TUniqueId cur_service_id,
       std::unique_lock<std::mutex>* ver_lock);
 
+  /// Random `impala::TUniqueID` generator. Use wherever a new `TUniqueId` is needed.
+  TUniqueId RandomUniqueID();
+
   /// Logger for writing encoded query profiles, one per line with the following format:
   /// <ms-since-epoch> <query-id> <thrift query profile URL encoded and gzipped>
   boost::scoped_ptr<SimpleLogger> profile_logger_;
@@ -1488,6 +1515,14 @@ class ImpalaServer : public ImpalaServiceIf,
   typedef boost::unordered_map<TUniqueId, std::set<TUniqueId>> ConnectionToSessionMap;
   ConnectionToSessionMap connection_to_sessions_map_;
 
+  /// Map storing connections opened by the InternalServer functions. Key is the session
+  /// id, value is a shared pointer holding the connection's ConnectionContext. Use the
+  /// `internal_server_connections_lock_` mutex whenever accessing this map
+  typedef std::map<TUniqueId, std::shared_ptr<ThriftServer::ConnectionContext>>
+      SessionToConnectionContext;
+  SessionToConnectionContext internal_server_connections_;
+  std::mutex internal_server_connections_lock_;
+
   /// Returns session state for given session_id.
   /// If not found or validation of 'secret' against the stored secret in the
   /// SessionState fails, session_state will be NULL and an error status will be returned.
diff --git a/be/src/service/internal-server-test.cc b/be/src/service/internal-server-test.cc
new file mode 100644
index 000000000..dc0759515
--- /dev/null
+++ b/be/src/service/internal-server-test.cc
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <list>
+#include <memory>
+
+#include "catalog/catalog-util.h"
+#include "codegen/llvm-codegen.h"
+#include "common/status.h"
+#include "common/thread-debug-info.h"
+#include "gtest/gtest.h"
+#include "gutil/strings/strcat.h"
+#include "gutil/walltime.h"
+#include "rapidjson/rapidjson.h"
+#include "runtime/exec-env.h"
+#include "runtime/query-driver.h"
+#include "service/client-request-state.h"
+#include "service/fe-support.h"
+#include "service/frontend.h"
+#include "service/internal-server.h"
+#include "service/impala-server.h"
+#include "testutil/http-util.h"
+#include "testutil/gtest-util.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/debug-util.h"
+#include "util/jni-util.h"
+
+DECLARE_string(log_dir);
+DECLARE_string(debug_actions);
+DECLARE_string(jni_frontend_class);
+DECLARE_string(hostname);
+DECLARE_string(state_store_host);
+DECLARE_int32(state_store_port);
+DECLARE_string(catalog_service_host);
+DECLARE_int32(catalog_service_port);
+DECLARE_bool(enable_webserver);
+DECLARE_int32(beeswax_port);
+DECLARE_int32(hs2_port);
+DECLARE_int32(hs2_http_port);
+DECLARE_int32(krpc_port);
+DECLARE_int32(state_store_subscriber_port);
+DECLARE_int32(webserver_port);
+
+using namespace std;
+using namespace impala;
+using namespace rapidjson;
+
+shared_ptr<ImpalaServer> impala_server_;
+
+const static string QUERY_STATE_SUCCESS = "FINISHED";
+const static string QUERY_STATE_FAILED  = "EXCEPTION";
+
+namespace impala {
+namespace internalservertest {
+
+// Retrieves the json representation of the coordinator's queries and asserts the state
+// of the specified query matches the provided expected state.
+void assertQueryState(const TUniqueId& query_id, const string expected_state) {
+  // Give the Impala web server a second to refresh its completed queries list.
+  SleepForMs(1000);
+
+  // Retrieve the list of queries from the test's coordinator. Results contain the raw
+  // http response including headers.
+  Document queries;
+  stringstream contents;
+  ASSERT_OK(HttpGet("localhost", FLAGS_webserver_port, "/queries?json", &contents));
+  string contents_str = contents.str();
+
+  // Locate the start of the response body.
+  size_t header_end = contents_str.find("\r\n\r\n", 0);
+  ASSERT_NE(string::npos, header_end);
+
+  // Parse the json queries response.
+  ASSERT_FALSE(queries.Parse(contents_str.substr(header_end+2,
+      contents_str.length()).c_str()).HasParseError());
+  ASSERT_TRUE(queries.IsObject());
+
+  // Locate the completed query matching the provided query_id.
+  const Value& completed_queries = queries["completed_queries"];
+  const string query_id_str = PrintId(query_id);
+  string found_state;
+  ASSERT_TRUE(completed_queries.IsArray());
+  for (SizeType i=0; i<completed_queries.Size(); i++) {
+    const Value& query = completed_queries[i];
+    ASSERT_TRUE(query.IsObject());
+    if (query_id_str == query["query_id"].GetString()) {
+      found_state = query["state"].GetString();
+      break;
+    }
+  }
+
+  // Assert the completed query has the expected state. If expected_state is empty, then
+  // the query was not found.
+  ASSERT_EQ(expected_state, found_state);
+} // assertQueryState
+
+// Helper class to set up a uniquely named database. Not every test will need its own
+// database, thus an instance of this class must be instantiated in every that that needs
+// its own database.
+//
+// Upon construction, instances of this class create a database consisting of the
+// name_prefix parameter with the current time appended. Upon destruction, instances of
+// this class drop cascade the database that was created during construction.
+//
+// DO NOT provide a value for record_count that is less than 5.
+class DatabaseTest {
+  public:
+    DatabaseTest(const shared_ptr<ImpalaServer> impala_server, const string name_prefix,
+        const bool create_table = false, const int record_count = 5000) {
+      this->impala_server_ = impala_server;
+      this->database_name_ = StrCat(name_prefix, "_", GetCurrentTimeMicros());
+      TUniqueId query_id;
+      EXPECT_OK(this->impala_server_->ExecuteIgnoreResults("impala", StrCat("create "
+          "database ", this->database_name_), &query_id));
+      assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+      if (create_table) {
+        table_name_ = StrCat(database_name_, ".", "products");
+        EXPECT_OK(this->impala_server_->ExecuteIgnoreResults("impala", StrCat("create "
+            "table ", table_name_, "(id INT, name STRING, first_sold TIMESTAMP, "
+            "last_sold TIMESTAMP, price DECIMAL(30, 2)) partitioned by (category INT)"),
+            &query_id));
+        assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+        // Insert some products that have a last_sold time.
+        string sql1 = StrCat("insert into ", table_name_ , "(id,category,name,"
+            "first_sold,last_sold,price) values ");
+
+        const int secs_per_year = 31536000; // Seconds in 1 year.
+        for (int i=0; i<record_count; i+=2) {
+          int cat = (i % category_count_) + 1;
+          double price = cat * .01;
+          // Calculate a first sold offset.
+          int first_sold = secs_per_year * (cat * cat);
+          // Calculate a last sold offset that is a minimum 1 year after first_sold.
+          int last_sold = first_sold - (secs_per_year * cat);
+          sql1 += StrCat("(", i, ",", cat, ",'prod_", i,
+              "',seconds_sub(now(),", first_sold, "),seconds_sub(now(),", last_sold,
+              "),cast(", price, " as DECIMAL(30, 2)))");
+
+          if (i < record_count - 2) {
+            sql1 += ",";
+          }
+        }
+
+        EXPECT_OK(this->impala_server_->ExecuteIgnoreResults("impala", sql1, &query_id));
+        assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+        // Insert some products that do not have a last_sold time.
+        string sql2 = StrCat("insert into ", table_name_, "(id, category,name,first_sold,"
+            "price) values ");
+
+        for (int i=1; i<record_count; i+=2) {
+          int cat = (i % category_count_) + 1;
+          double price = cat * .01;
+          // Calculate a sold offset.
+          int sold = secs_per_year * (cat * cat);
+          sql2 += StrCat("(", i, ",", cat, ",'prod_", i,
+              "',seconds_sub(now(),", sold, "),cast(", price, " as DECIMAL(30, 2)))");
+
+          if (i < record_count - 2) {
+            sql2 += ",";
+          }
+        }
+
+        EXPECT_OK(this->impala_server_->ExecuteIgnoreResults("impala", sql2,
+            &query_id));
+        assertQueryState(query_id, QUERY_STATE_SUCCESS);
+      }
+    }
+
+    ~DatabaseTest() {
+      RETURN_VOID_IF_ERROR(this->impala_server_->ExecuteIgnoreResults("impala",
+          "drop database if exists " + this->database_name_ + " cascade"));
+    }
+
+    const string GetDbName() const {
+        return this->database_name_;
+    }
+
+    const string GetTableName() const {
+        return this->table_name_;
+    }
+
+    int GetCategoryCount() const {
+      return this->category_count_;
+    }
+
+  private:
+    const int category_count_ = 10;
+    mutable string database_name_;
+    mutable string table_name_;
+    shared_ptr<ImpalaServer> impala_server_;
+}; // class DatabaseTest
+
+// Asserts that a timeout error is returned if the query times out during execution.
+// This test must select from a table because selecting directly from the sleep() function
+// does not cause a timeout.
+TEST(InternalServerTest, QueryTimeout) {
+  DatabaseTest db_test = DatabaseTest(impala_server_, "query_timeout", true, 5);
+  InternalServer* fixture = impala_server_.get();
+
+  TQueryOptions query_opts;
+  query_opts.__set_fetch_rows_timeout_ms(1);
+
+  TUniqueId session_id;
+  TUniqueId query_id;
+
+  ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+
+  // Run a query that will execute for longer than the configured exec timeout.
+  ASSERT_OK(fixture->SubmitQuery(StrCat("select * from ", db_test.GetTableName(),
+      " where id=sleep(2000000)"), session_id, query_id));
+
+  Status stat = fixture->WaitForResults(query_id);
+
+  // Assert the expected timeout error was returned.
+  EXPECT_FALSE(stat.ok());
+  EXPECT_EQ(TErrorCode::GENERAL, stat.code());
+  EXPECT_EQ("query timed out waiting for results", stat.msg().msg());
+
+  fixture->CloseQuery(query_id);
+  fixture->CloseSession(session_id);
+
+  assertQueryState(query_id, QUERY_STATE_FAILED);
+} // TEST QueryTimeout
+
+// Asserts the expected error is returned when a query option is set to an invalid value.
+TEST(InternalServerTest, InvalidQueryOption) {
+  InternalServer* fixture = impala_server_.get();
+  TQueryOptions query_opts;
+
+  query_opts.__set_mem_limit_executors(-2);
+
+  TUniqueId session_id;
+  Status stat = fixture->OpenSession("impala", session_id, query_opts);
+
+  ASSERT_FALSE(stat.ok());
+  ASSERT_EQ("Failed to parse query option 'MEM_LIMIT_EXECUTORS': -2", stat.msg().msg());
+} // TEST InvalidQueryOption
+
+// Asserts that executing multiple queries over multiple sessions against the same
+// internal server instance works correctly.
+TEST(InternalServerTest, MultipleQueriesMultipleSessions) {
+  query_results results = make_shared<vector<string>>();
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_,
+      "multiple_queries_multiple_sessions");
+  const string test_table_name = StrCat(db_test.GetDbName(), ".test_table_a");
+
+  // Set up a test table using a new session.
+  TUniqueId query_id;
+  ASSERT_OK(fixture->ExecuteAndFetchAllText("impala",
+      StrCat("create table if not exists ", test_table_name,
+      "(id INT, first_name STRING, last_name STRING)"), results, nullptr, &query_id));
+  ASSERT_EQ(1, results->size());
+  ASSERT_EQ(results->at(0), "Table has been created.");
+  assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+  // Insert a record into the test table using a new session.
+  ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("insert into ",
+      test_table_name, "(id,first_name,last_name) VALUES (1,'test','person1')"),
+      &query_id));
+  assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+  // Select a record from the test table using a new session.
+  results->clear();
+  results_columns columns;
+  ASSERT_OK(fixture->ExecuteAndFetchAllText("impala", StrCat("select id,first_name,"
+      "last_name FROM ", test_table_name), results, &columns, &query_id));
+  assertQueryState(query_id, QUERY_STATE_SUCCESS);
+
+  ASSERT_EQ(3, columns.size());
+  EXPECT_EQ("id", columns.at(0).first);
+  EXPECT_EQ("int", columns.at(0).second);
+  EXPECT_EQ("first_name", columns.at(1).first);
+  EXPECT_EQ("string", columns.at(1).second);
+  EXPECT_EQ("last_name", columns.at(2).first);
+  EXPECT_EQ("string", columns.at(2).second);
+
+  ASSERT_EQ(1, results->size());
+  EXPECT_EQ(results->at(0), "1\ttest\tperson1");
+} // MultipleQueriesMultipleSessions
+
+// Simulates an RPC failure which causes the coordinator to automatically retry the query.
+TEST(InternalServerTest, RetryFailedQuery) {
+  InternalServer* fixture = impala_server_.get();
+
+  TUniqueId session_id;
+  TUniqueId query_id;
+  TUniqueId orig_query_id;
+
+  // Simulate a krpc failure which will cause the coordinator to automatically retry
+  // the query.
+  const ScopedFlagSetter sfs = ScopedFlagSetter<string>::Make(&FLAGS_debug_actions,
+      StrCat("IMPALA_SERVICE_POOL:127.0.0.1:",FLAGS_krpc_port,
+      ":ExecQueryFInstances:FAIL"));
+
+  TQueryOptions query_opts;
+  query_opts.__set_retry_failed_queries(true);
+
+  ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+
+  // Run a query that will fail and get automatically retried.
+  ASSERT_OK(fixture->SubmitQuery("select 1", session_id, query_id));
+  orig_query_id = query_id;
+
+  Status wait_status = fixture->WaitForResults(query_id);
+  ASSERT_OK(wait_status);
+
+  EXPECT_TRUE(orig_query_id != query_id);
+
+  fixture->CloseQuery(query_id);
+  fixture->CloseSession(session_id);
+
+  assertQueryState(query_id, QUERY_STATE_FAILED);
+} // TEST RetryFailedQuery
+
+// Asserts that executing multiple queries in one session against the same internal
+// server instance works correctly.
+TEST(InternalServerTest, MultipleQueriesOneSession) {
+  query_results results = make_shared<vector<string>>();
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_, "multiple_queries_one_session");
+  const string test_table_name = StrCat(db_test.GetDbName(), ".test_table_1");
+
+  // Open a new session that will be used to run all queries.
+  TUniqueId session_id;
+  ASSERT_OK(fixture->OpenSession("impala", session_id));
+
+  // Create a test table.
+  TUniqueId query_id1;
+  ASSERT_OK(fixture->SubmitQuery(StrCat("create table if not exists ", test_table_name,
+      "(id INT,name STRING)"), session_id, query_id1));
+  ASSERT_OK(fixture->WaitForResults(query_id1));
+  ASSERT_OK(fixture->FetchAllRows(query_id1, results));
+
+  // Assert the test table was created.
+  ASSERT_EQ(1, results->size());
+  EXPECT_EQ(results->at(0), "Table has been created.");
+  results->clear();
+  fixture->CloseQuery(query_id1);
+
+  // In the same session, insert into the newly created test table.
+  TUniqueId query_id2;
+  ASSERT_OK(fixture->SubmitQuery(StrCat("insert into ", test_table_name,
+      " (id, name) VALUES (1, 'one'), (2, 'two')"), session_id, query_id2));
+  ASSERT_OK(fixture->WaitForResults(query_id2));
+  ASSERT_OK(fixture->FetchAllRows(query_id2, results));
+
+  // Assert the insert succeeded.
+  ASSERT_EQ(0, results->size());
+  fixture->CloseQuery(query_id2);
+
+  // Still in the same session, select from the test table.
+  TUniqueId query_id3;
+  results_columns columns;
+
+  ASSERT_OK(fixture->SubmitQuery(StrCat("select name,id,name from ", test_table_name,
+      " order by id asc"), session_id, query_id3));
+  ASSERT_OK(fixture->WaitForResults(query_id3));
+  ASSERT_OK(fixture->FetchAllRows(query_id3, results, &columns));
+
+  // Assert the expected number of columns were returned from the select statement.
+  ASSERT_EQ(3, columns.size());
+  EXPECT_EQ("name", columns.at(0).first);
+  EXPECT_EQ("string", columns.at(0).second);
+  EXPECT_EQ("id", columns.at(1).first);
+  EXPECT_EQ("int", columns.at(1).second);
+  EXPECT_EQ("name", columns.at(2).first);
+  EXPECT_EQ("string", columns.at(2).second);
+
+  // Assert the expected number of rows were returned from the select statement.
+  ASSERT_EQ(2, results->size());
+  EXPECT_EQ(results->at(0), "one\t1\tone");
+  EXPECT_EQ(results->at(1), "two\t2\ttwo");
+  fixture->CloseQuery(query_id3);
+  assertQueryState(query_id3, QUERY_STATE_SUCCESS);
+
+  fixture->CloseSession(session_id);
+
+  // Assert the session was properly closed.
+  TUniqueId query_id4;
+  Status expect_fail = fixture->SubmitQuery( "select 1", session_id, query_id4);
+  EXPECT_EQ(PrintId(session_id), expect_fail.msg().msg());
+  EXPECT_EQ(2, expect_fail.code());
+} // TEST MultipleQueriesOneSession
+
+TEST(InternalServerTest, MissingClosingQuote) {
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_, "missing_quote", true, 5);
+  TUniqueId query_id;
+  Status res;
+
+  const string expected_msg = "ParseException: Unmatched string literal";
+  res = fixture->ExecuteIgnoreResults("impala",StrCat( "select * from ",
+      db_test.GetTableName(), " where name = 'foo"), &query_id);
+  EXPECT_EQ(TErrorCode::GENERAL, res.code());
+  EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
+  EXPECT_EQ(TUniqueId(), query_id);
+} // TEST MissingClosingQuote
+
+TEST(InternalServerTest, SyntaxError) {
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_, "syntax_error", true, 5);
+  TUniqueId query_id;
+  Status res;
+
+  const string expected_msg = "ParseException: Syntax error in line 1";
+  res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
+      db_test.GetTableName(), "; select"), &query_id);
+  EXPECT_EQ(TErrorCode::GENERAL, res.code());
+  EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
+  EXPECT_EQ(TUniqueId(), query_id);
+} // TEST SyntaxError
+
+TEST(InternalServerTest, UnclosedComment) {
+  InternalServer* fixture = impala_server_.get();
+  TUniqueId query_id;
+  Status res;
+
+  const string expected_msg = "ParseException: Syntax error in line 1";
+  res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", &query_id);
+  EXPECT_EQ(TErrorCode::GENERAL, res.code());
+  EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
+  EXPECT_EQ(TUniqueId(), query_id);
+} // TEST UnclosedComment
+
+TEST(InternalServerTest, TableNotExist) {
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_, "table_not_exist", true, 5);
+  TUniqueId query_id;
+  Status res;
+
+  const string expected_msg = "AnalysisException: Could not resolve table reference:"
+      " '" + db_test.GetTableName() + "'";
+      ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("drop table ",
+      db_test.GetTableName(), " purge")));
+  res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
+      db_test.GetTableName()), &query_id);
+  EXPECT_EQ(TErrorCode::GENERAL, res.code());
+  EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
+  EXPECT_EQ(TUniqueId(), query_id);
+} // TEST TableNotExist
+
+// Helper function for the SimultaneousMultipleQueriesOneSession test. Intended to be run
+// as part of a separate thread.
+void runTestQueries(const int thread_num, InternalServer* server,
+    const DatabaseTest* const db_test, const TUniqueId* session_id) {
+  const int queries_to_run = 100;
+
+  TUniqueId query_id;
+
+  for (int i=0; i<queries_to_run; i++) {
+    int cat = i % db_test->GetCategoryCount();
+    ASSERT_OK(server->SubmitQuery(StrCat("select * from ", db_test->GetTableName(),
+        " where category=", cat, " --thread '", thread_num, "' query '", i, "'"),
+        *session_id, query_id));
+    ASSERT_OK(server->WaitForResults(query_id));
+    server->CloseQuery(query_id);
+
+    assertQueryState(query_id, QUERY_STATE_SUCCESS);
+  }
+} // runTestQueries
+
+TEST(InternalServerTest, SimultaneousMultipleQueriesOneSession) {
+  const int test_threads = 10;
+  list<unique_ptr<Thread>> threads;
+  TUniqueId session_id;
+
+  InternalServer* fixture = impala_server_.get();
+  DatabaseTest db_test = DatabaseTest(impala_server_, "smqos", true);
+
+  // Open a new session for all the select queries.
+  ASSERT_OK(fixture->OpenSession("impala", session_id));
+
+  // Execute select queries across multiple threads.
+  for (int i=0; i<test_threads; i++) {
+    threads.emplace_back(unique_ptr<Thread>());
+    ABORT_IF_ERROR(Thread::Create("internal-server-test", "thread",
+        &runTestQueries, i, fixture, &db_test, &session_id, &(threads.back())));
+  }
+
+  for (auto iter = threads.cbegin(); iter != threads.cend(); iter++) {
+    iter->get()->Join();
+  }
+
+  fixture->CloseSession(session_id);
+} // TEST SimultaneousMultipleQueriesOneSession
+
+} // namespace internalservertest
+} // namespace impala
+
+int main(int argc, char** argv) {
+  FLAGS_jni_frontend_class = "org/apache/impala/service/JniFrontend";
+
+  FLAGS_hostname = "localhost";
+  FLAGS_state_store_host = FLAGS_hostname;
+  FLAGS_catalog_service_host = FLAGS_hostname;
+  FLAGS_enable_webserver = true;
+
+  FLAGS_beeswax_port = 21005;
+  FLAGS_hs2_port = 21055;
+  FLAGS_hs2_http_port = 28005;
+  FLAGS_krpc_port = 27005;
+  FLAGS_state_store_subscriber_port = 23005;
+  FLAGS_webserver_port = 25005;
+
+  // Provides information about the current thread in a minidump if one is generated.
+  ThreadDebugInfo debugInfo;
+
+  testing::InitGoogleTest(&argc, argv);
+  InitCommonRuntime(argc, argv, true, TestInfo::BE_CLUSTER_TEST);
+  ABORT_IF_ERROR(LlvmCodeGen::InitializeLlvm(argv[0]));
+  JniUtil::InitLibhdfs();
+  ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI());
+  InitFeSupport();
+
+  ExecEnv exec_env;
+  ABORT_IF_ERROR(exec_env.Init());
+
+  impala_server_ = make_shared<ImpalaServer>(&exec_env);
+  ABORT_IF_ERROR(impala_server_->Start(FLAGS_beeswax_port, FLAGS_hs2_port,
+      FLAGS_hs2_http_port, 0));
+
+  EXPECT_TRUE(impala_server_->IsCoordinator());
+  EXPECT_TRUE(impala_server_->IsExecutor());
+  exec_env.frontend()->WaitForCatalog();
+
+  ABORT_IF_ERROR(WaitForServer(FLAGS_hostname, FLAGS_beeswax_port, 10, 100));
+
+  int test_ret_val = RUN_ALL_TESTS();
+
+  ShutdownStatusPB shutdown_status;
+  ABORT_IF_ERROR(impala_server_->StartShutdown(5, &shutdown_status));
+  impala_server_->Join();
+
+  return test_ret_val;
+}
diff --git a/be/src/service/internal-server.cc b/be/src/service/internal-server.cc
new file mode 100644
index 000000000..7e9c62c58
--- /dev/null
+++ b/be/src/service/internal-server.cc
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "gen-cpp/ErrorCodes_types.h"
+#include "gen-cpp/Query_types.h"
+#include "gen-cpp/Types_types.h"
+#include "rpc/thrift-server.h"
+#include "runtime/query-driver.h"
+#include "service/client-request-state.h"
+#include "service/impala-server.h"
+#include "util/debug-util.h"
+#include "util/uid-util.h"
+
+using namespace std;
+
+namespace impala {
+
+Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session_id,
+    const TQueryOptions& query_opts) {
+  shared_ptr<ThriftServer::ConnectionContext> conn_ctx =
+      make_shared<ThriftServer::ConnectionContext>();
+  conn_ctx->connection_id = this->RandomUniqueID();
+  conn_ctx->server_name = ImpalaServer::INTERNAL_SERVER_NAME;
+  conn_ctx->username = user_name;
+  conn_ctx->network_address.hostname = "in-memory.localhost";
+
+  this->ConnectionStart(*conn_ctx.get());
+
+  {
+    lock_guard<mutex> l(this->connection_to_sessions_map_lock_);
+    new_session_id = *this->
+        connection_to_sessions_map_[conn_ctx->connection_id].cbegin();
+  }
+
+  {
+    lock_guard<mutex> l(this->internal_server_connections_lock_);
+    this->internal_server_connections_.insert(make_pair(new_session_id, conn_ctx));
+  }
+
+  shared_ptr<ImpalaServer::SessionState> session_state;
+  {
+    lock_guard<mutex> l(this->session_state_map_lock_);
+    session_state = this->session_state_map_[new_session_id];
+    std::map<string, string> query_opts_map;
+    TQueryOptionsToMap(query_opts, &query_opts_map);
+    for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend(); iter++) {
+      if (!iter->second.empty()) {
+        RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second,
+        &session_state->set_query_options, &session_state->set_query_options_mask));
+      }
+    }
+  }
+
+  this->MarkSessionActive(session_state);
+
+  return Status::OK();
+} // ImpalaServer::OpenSession
+
+bool ImpalaServer::CloseSession(const TUniqueId& session_id) {
+  {
+    lock_guard<mutex> l(this->session_state_map_lock_);
+
+    auto iter = this->session_state_map_.find(session_id);
+    if (iter == this->session_state_map_.end()) {
+      return false;
+    }
+
+    this->MarkSessionInactive(iter->second);
+  }
+
+  {
+    lock_guard<mutex> l(this->internal_server_connections_lock_, adopt_lock);
+    this->internal_server_connections_lock_.lock();
+
+    const auto iter = this->internal_server_connections_.find(session_id);
+    if (iter != this->internal_server_connections_.end()) {
+      this->internal_server_connections_lock_.unlock();
+      this->ConnectionEnd(*iter->second.get());
+      this->internal_server_connections_lock_.lock();
+      this->internal_server_connections_.erase(iter);
+    }
+  }
+
+  return true;
+} // ImpalaServer::CloseSession
+
+Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const string& sql,
+    TUniqueId* query_id) {
+  TUniqueId session_id;
+  TUniqueId internal_query_id;
+
+  RETURN_IF_ERROR(this->SubmitAndWait(user_name, sql, session_id, internal_query_id));
+
+  if (query_id != nullptr) {
+    *query_id = internal_query_id;
+  }
+
+  this->CloseQuery(internal_query_id);
+
+  this->CloseSession(session_id);
+
+  return Status::OK();
+} //ImpalaServer::ExecuteIgnoreResults
+
+Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name,
+    const std::string& sql, query_results& results, results_columns* columns,
+    TUniqueId* query_id){
+  TUniqueId session_id;
+  TUniqueId internal_query_id;
+
+  RETURN_IF_ERROR(this->SubmitAndWait(user_name, sql, session_id, internal_query_id));
+
+  if (query_id != nullptr) {
+    *query_id = internal_query_id;
+  }
+
+  RETURN_IF_ERROR(this->FetchAllRows(internal_query_id, results, columns));
+
+  this->CloseQuery(internal_query_id);
+  this->CloseSession(session_id);
+
+  return Status::OK();
+} // ImpalaServer::ExecuteAndFetchAllText
+
+Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql,
+    TUniqueId& new_session_id, TUniqueId& new_query_id) {
+
+  RETURN_IF_ERROR(this->OpenSession(user_name, new_session_id));
+  RETURN_IF_ERROR(this->SubmitQuery(sql, new_session_id, new_query_id));
+
+  return this->WaitForResults(new_query_id);
+} // ImpalaServer::SubmitAndWait
+
+Status ImpalaServer::WaitForResults(TUniqueId& query_id) {
+  QueryHandle query_handle;
+  RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
+
+  RETURN_IF_ERROR(query_handle->WaitAsync());
+
+  int64_t block_wait_time;
+  bool timed_out;
+  RETURN_IF_ERROR(this->WaitForResults(query_handle->query_id(), &query_handle,
+      &block_wait_time, &timed_out));
+  query_id = query_handle->query_id();
+
+  if (timed_out) {
+    return Status::Expected("query timed out waiting for results");
+  }
+
+  return Status::OK();
+} // ImpalaServer::WaitForResults
+
+Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id,
+    TUniqueId& new_query_id) {
+
+  // build a query context
+  TQueryCtx query_context;
+  query_context.client_request.stmt = sql;
+
+  // locate the previously opened session
+  shared_ptr<SessionState> session_state;
+  {
+    lock_guard<mutex> l(this->session_state_map_lock_);
+
+    const auto iter = this->session_state_map_.find(session_id);
+    if (iter == this->session_state_map_.end()) {
+      return Status::Expected(TErrorCode::GENERAL, PrintId(session_id));
+    }
+
+    session_state = iter->second;
+  }
+
+  session_state->ToThrift(session_state->session_id, &query_context.session);
+
+  QueryOptionsMask set_query_options_mask;
+  query_context.client_request.query_options = session_state->QueryOptions();
+  set_query_options_mask = session_state->set_query_options_mask;
+
+  this->AddPoolConfiguration(&query_context, ~set_query_options_mask);
+
+  QueryHandle query_handle;
+  RETURN_IF_ERROR(this->Execute(&query_context, session_state, &query_handle, nullptr));
+  new_query_id = query_handle->query_id();
+
+  RETURN_IF_ERROR(this->SetQueryInflight(session_state, query_handle));
+
+  return Status::OK();
+} // ImpalaServer::SubmitQuery
+
+
+Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& results,
+    results_columns* columns) {
+  QueryResultSet* result_set;
+  const TResultSetMetadata* results_metadata;
+  vector<string> row_set;
+
+  QueryHandle query_handle;
+  RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle));
+
+  {
+    lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
+    lock_guard<mutex> l2(*query_handle->lock());
+
+    if (query_handle->num_rows_fetched() == 0) {
+      query_handle->set_fetched_rows();
+    }
+
+    results_metadata = query_handle->result_metadata();
+
+    // populate column vector if provided by the user
+    if (columns != nullptr) {
+      for (int i=0; i<results_metadata->columns.size(); i++) {
+        // TODO: As of today, the ODBC driver does not support boolean and timestamp data
+        // type but it should. This is tracked by ODBC-189. We should verify that our
+        // boolean and timestamp type are correctly recognized when ODBC-189 is closed.
+        // TODO: Handle complex types.
+        const TColumnType& type = results_metadata->columns[i].columnType;
+        columns->emplace_back(make_pair(results_metadata->columns[i].columnName,
+            ColumnTypeToBeeswaxTypeString(type)));
+      }
+    }
+
+    result_set = QueryResultSet::CreateAsciiQueryResultSet(
+        *results_metadata, &row_set, true);
+  }
+
+  int64_t block_wait_time = 30000000;
+  while (!query_handle->eos()) {
+    lock_guard<mutex> l1(*query_handle->fetch_rows_lock());
+    lock_guard<mutex> l2(*query_handle->lock());
+
+    row_set.clear();
+
+    RETURN_IF_ERROR(query_handle->FetchRows(10, result_set, block_wait_time));
+    results->insert(results->cend(), row_set.cbegin(), row_set.cend());
+  }
+  return Status::OK();
+} // ImpalaServer::FetchAllRows
+
+void ImpalaServer::CloseQuery(const TUniqueId& query_id) {
+  this->UnregisterQueryDiscardResult(query_id, false);
+} // ImpalaServer::CloseQuery
+
+void ImpalaServer::GetConnectionContextList(
+    ThriftServer::ConnectionContextList* connection_contexts) {
+  lock_guard<mutex> l(this->internal_server_connections_lock_);
+
+  for(auto iter = this->internal_server_connections_.cbegin();
+      iter != this->internal_server_connections_.cend(); iter++) {
+    connection_contexts->push_back(iter->second);
+  }
+} // ImpalaServer::GetConnectionContextList
+
+} // namespace impala
diff --git a/be/src/service/internal-server.h b/be/src/service/internal-server.h
new file mode 100644
index 000000000..b6bbf9405
--- /dev/null
+++ b/be/src/service/internal-server.h
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gen-cpp/Query_types.h"
+#include "gen-cpp/Types_types.h"
+#include "common/status.h"
+
+namespace impala {
+
+  typedef std::shared_ptr<std::vector<std::string>> query_results;
+  typedef std::vector<std::pair<std::string, std::string>> results_columns;
+
+  /// Enables Impala coordinators to submit queries to themselves.
+  ///
+  /// Internally, this class directly calls the methods on ImpalaServer that are called by
+  /// the Beeswax and HS2 servers. Thus, it does not strictly adhere to either protocol.
+  /// Since Impala requires sessions to have a defined type, sessions created by
+  /// InternalServer show up as Beeswax sessions.
+  ///
+  /// Since this class directly calls ImpalaServer methods, it bypasses all authentication
+  /// methods.
+  ///
+  /// Usage:
+  ///   The easiest way to use this class is to call the `ExecuteAndFetchAllText` function
+  ///   which runs the provided sql, returns all the results, and closes the query and
+  ///   session. This function is useful for running create/insert/update queries that do
+  ///   not return many results.
+  ///
+  ///   The next way is to call the `SubmitAndWait` function. This function runs the
+  ///   provided sql and blocks until results become available. Retrieving the results can
+  ///   be done by leveraging the query id returned by this method. The `CloseQuery` and
+  ///   `CloseSession` functions must be called by clients once all results are read.
+  ///
+  ///   The lowest level function is `SubmitQuery`. This function starts a query running
+  ///   and returns immediately. Opening a connection/session, waiting for results,
+  ///   retrieving results, and closing the query/session is left up to the client.
+  class InternalServer {
+    public:
+      virtual ~InternalServer() {}
+
+      /// Creates and registers a new connection and session.
+      ///
+      /// Parameters:
+      ///   `user_name`      Specifies the username that will be reported as running this
+      ///                    query.
+      ///   `new_session_id` Output parameter that will be populated with the id of the
+      ///                    newly created session.
+      ///   `query_opts`     Optional, contains query options that will apply to all
+      ///                    queries executed during this session.
+      ///
+      /// Return:
+      ///   `impala::Status` Indicates the result of opening the new session.
+      virtual Status OpenSession(const std::string& user_name, TUniqueId& new_session_id,
+          const TQueryOptions& query_opts = TQueryOptions()) = 0;
+
+      /// Closes a given session cleaning up all associated resources.
+      ///
+      /// Parameters:
+      ///   `session_id` Id of the session that will be closed.
+      ///
+      /// Return:
+      ///   `bool` Indicates if the provided session id was for a known session created
+      ///          by the `OpenSession` function.
+      virtual bool CloseSession(const impala::TUniqueId& session_id) = 0;
+
+      /// Executes a given query cleaning up after the query has completed. Results are
+      /// never retrieved.
+      ///
+      /// Parameters:
+      ///   `user_name` Specifies the username that will be reported as running this
+      ///               query.
+      ///   `sql`       Text of the sql query/ddl/dml to run.
+      ///   `query_id`  Optional output parameter, if specified, it will be overwritten
+      ///               with the id of the query that was executed. Since the query is
+      ///               closed by this function, the query id is informational only.
+      ///
+      /// Return:
+      ///   `impala::Status` Indicates the result of submitting the query and waiting for
+      ///                    it to return.
+      virtual Status ExecuteIgnoreResults(const std::string& user_name,
+          const std::string& sql, TUniqueId* query_id = nullptr) = 0;
+
+      /// Creates a new session under the specified user and submits a query under that
+      /// session. No authentication is performed. Blocks until result rows are available.
+      /// Then, populates all result rows. Finally, cleans up the query and session.
+      ///
+      /// Intended for use as a convenience method when query results are small.
+      ///
+      /// Parameters:
+      ///   `user_name` Specifies the username that will be reported as running this
+      ///               query.
+      ///   `sql`       Text of the sql query/ddl/dml to run.
+      ///   `results`   Output parameter containing all result rows from the query. If
+      ///               this vector has existing elements, they will be left in place with
+      ///               result rows added at the end of the vector.
+      ///   `columns`   Optional output parameter where each element is a pair with the
+      ///               first element being the name of the column and the second element
+      ///               being the column type. Existing elements in the vector will be
+      ///               left in place with column pairs appended to the end of the vector.
+      ///               If this parameter is `nullptr`, then the list of columns is not
+      ///               generated and this parameter's value will remain `nullptr`.
+      ///   `query_id`  Optional output parameter, if specified, it will be overwritten
+      ///               with the id of the query that was executed. Since the query is
+      ///               closed by this function, the query id is informational only.
+      ///
+      /// Return:
+      ///   `impala::Status` indicating the result of submitting the query and waiting for
+      ///   it to return.
+      virtual Status ExecuteAndFetchAllText(const std::string& user_name,
+          const std::string& sql, query_results& results,
+          results_columns* columns = nullptr, TUniqueId* query_id = nullptr) = 0;
+
+      /// Creates a new session under the specified user and submits a query under that
+      /// session. No authentication is performed. Blocks until result rows are available.
+      ///
+      /// After retrieving the results, clients must call `CloseQuery` and `CloseSession`
+      /// to properly close and clean up the query and session.
+      ///
+      /// Parameters:
+      ///   `user_name`      Specifies the username that will be reported as running this
+      ///                    query.
+      ///   `sql`            Text of the sql query/ddl/dml to run.
+      ///   `new_session_id` Output parameter that will be set to the id of the newly
+      ///                    created session.
+      ///   `new_query_id`   Output parameter that will be set to the id of the newly
+      //                     started query.
+      ///
+      /// Return:
+      ///   `impala::Status` Indicates the result of submitting and waiting for the query.
+      virtual Status SubmitAndWait(const std::string& user_name,
+          const std::string& sql, TUniqueId& new_session_id, TUniqueId& new_query_id) = 0;
+
+      /// Waits until the given query has results available.
+      ///
+      /// Parameters:
+      ///   `query_id` Input/output parameter that contains the id of a query to wait for
+      ///              it to produce results. The value of this parameter will change if
+      ///              the query was automatically retried by the coordinator.
+      virtual Status WaitForResults(TUniqueId& query_id) = 0;
+
+      /// Submits a query to the current Impala coordinator under the provided session and
+      /// sets the query as in-flight before returning.
+      ///
+      /// Parameters:
+      ///   `sql`          Text of the sql query/ddl/dml to run.
+      ///   `session_id`   Id of the session that will run the query.
+      ///   `new_query_id` Output parameter that will be set to the id of the newly
+      //                   started query.
+      ///
+      /// Return:
+      ///   `impala::Status` Indicates the result of submitting the query.
+      virtual Status SubmitQuery(const std::string& sql,
+          const impala::TUniqueId& session_id, TUniqueId& new_query_id) = 0;
+
+      /// Retrieves all result rows for a given query. The query must have already been
+      /// submitted and one of the Wait methods called on the query to ensure results are
+      /// available.
+      ///
+      /// Note: Assumes the query represented by `query_id` was successful as this
+      ///       function does not check that the query status is a successful status.
+      ///
+      /// Parameters:
+      ///   `query_id`      Id of a query that was submitted and has results available.
+      ///   `query_results` Output parameter containing all result rows from the query. If
+      ///                   this vector has existing elements, they will be left in place
+      ///                   with result rows added at the end of the vector.
+      ///   `columns`       Optional output parameter where each element is a pair with
+      ///                   the first element being the name of the column and the second
+      ///                   element being the column type. Existing elements in the vector
+      ///                   will be left in place with column pairs appended to the end of
+      ///                   the vector. If this parameter is `nullptr`, then the list of
+      ///                   columns is not generated and this parameter's value will
+      ///                   remain `nullptr`.
+      /// Return:
+      ///   `impala::Status` Indicates the result of fetching rows.
+      virtual Status FetchAllRows(const TUniqueId& query_id, query_results& results,
+          results_columns* columns = nullptr) = 0;
+
+      /// Closes and cleans up the query and its associated session.
+      ///
+      /// Parameters:
+      ///   `query_id` Query that was submitted and has finished.
+      virtual void CloseQuery(const TUniqueId& query_id) = 0;
+
+      /// Populates the provided list with all connections currently managed by the
+      /// internal server.
+      ///
+      /// Parameters:
+      ///   `connection_contexts` Input/output parameter that will have all internal
+      ///                         server connections added to the end.
+      virtual void GetConnectionContextList(
+        ThriftServer::ConnectionContextList* connection_contexts) = 0;
+
+  }; // InternalServer class
+
+} // namespace impala
diff --git a/be/src/testutil/http-util.h b/be/src/testutil/http-util.h
new file mode 100644
index 000000000..09442f1c4
--- /dev/null
+++ b/be/src/testutil/http-util.h
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/asio.hpp>
+#include <boost/lexical_cast.hpp>
+#include <gutil/strings/substitute.h>
+#include <iosfwd>
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "gflags/gflags_declare.h"
+
+DECLARE_int32(webserver_port);
+
+using boost::asio::ip::tcp;
+
+namespace impala {
+
+struct HttpRequest {
+  std::string url_path = "/";
+  std::string host = "localhost";
+  int32_t port = FLAGS_webserver_port;
+  std::map<std::string, std::string> headers = {};
+
+  // Adapted from:
+  // http://stackoverflow.com/questions/10982717/get-html-without-header-with-boostasio
+  Status Do(std::ostream* out, int expected_code, const std::string& method) {
+    try {
+      tcp::iostream request_stream;
+      request_stream.connect(host, boost::lexical_cast<std::string>(port));
+      if (!request_stream) return Status("Could not connect request_stream");
+
+      request_stream << method << " " << url_path << " HTTP/1.1\r\n";
+      request_stream << "Host: " << host << ":" << port <<  "\r\n";
+      request_stream << "Accept: */*\r\n";
+      request_stream << "Cache-Control: no-cache\r\n";
+      if (method == "POST") {
+        request_stream << "Content-Length: 0\r\n";
+      }
+      for (const auto& header : headers) {
+        request_stream << header.first << ": " << header.second << "\r\n";
+      }
+
+      request_stream << "Connection: close\r\n\r\n";
+      request_stream.flush();
+
+      std::string line1;
+      getline(request_stream, line1);
+      if (!request_stream) return Status("No response");
+
+      std::stringstream response_stream(line1);
+      std::string http_version;
+      response_stream >> http_version;
+
+      unsigned int status_code;
+      response_stream >> status_code;
+
+      std::string status_message;
+      getline(response_stream, status_message);
+      if (!response_stream || http_version.substr(0,5) != "HTTP/") {
+        return Status("Malformed response");
+      }
+
+      if (status_code != expected_code) {
+        return Status(strings::Substitute("Unexpected status code: $0", status_code));
+      }
+
+      (*out) << request_stream.rdbuf();
+      return Status::OK();
+    } catch (const std::exception& e){
+      return Status(e.what());
+    }
+  }
+
+  Status Get(ostream* out, int expected_code = 200) {
+    return Do(out, expected_code, "GET");
+  }
+
+  Status Post(ostream* out, int expected_code = 200) {
+    return Do(out, expected_code, "POST");
+  }
+}; // struct HttpRequest
+
+Status HttpGet(const std::string& host, const int32_t& port, const std::string& url_path,
+    ostream* out, int expected_code = 200, const std::string& method = "GET") {
+  return HttpRequest{url_path, host, port}.Do(out, expected_code, method);
+
+}
+} // namespace impala
diff --git a/be/src/util/test-info.h b/be/src/util/test-info.h
index b1ec64da0..b988b4530 100644
--- a/be/src/util/test-info.h
+++ b/be/src/util/test-info.h
@@ -26,8 +26,10 @@ class TestInfo {
  public:
   enum Mode {
     NON_TEST, // Not a test, one of the main daemons
-    BE_TEST,
-    FE_TEST,
+    BE_TEST,         // backend test
+    BE_CLUSTER_TEST, // backend test that instantiates an Impala coordinator which joins
+                     // an existing, running cluster
+    FE_TEST,         // frontend test
   };
 
   /// Called in InitCommonRuntime().
@@ -35,7 +37,9 @@ class TestInfo {
 
   static bool is_be_test() { return mode_ == BE_TEST; }
   static bool is_fe_test() { return mode_ == FE_TEST; }
-  static bool is_test() { return mode_ == BE_TEST || mode_ == FE_TEST; }
+  static bool is_be_cluster_test() { return mode_ == BE_CLUSTER_TEST; }
+  static bool is_test() { return mode_ == BE_TEST || mode_ == FE_TEST ||
+      mode_ == BE_CLUSTER_TEST; }
 
  private:
   static Mode mode_;
diff --git a/be/src/util/webserver-test.cc b/be/src/util/webserver-test.cc
index 824ff6ce5..e10fe3dec 100644
--- a/be/src/util/webserver-test.cc
+++ b/be/src/util/webserver-test.cc
@@ -18,16 +18,14 @@
 #include <array>
 #include <memory>
 #include <string>
-#include <boost/asio.hpp>
 #include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
-#include <boost/lexical_cast.hpp>
 #include <gutil/strings/substitute.h>
-#include <map>
 #include <openssl/ssl.h>
 #include <regex>
 
 #include "common/init.h"
+#include "testutil/http-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/scoped-flag-setter.h"
 
@@ -68,76 +66,6 @@ const string TO_ESCAPE_KEY = "ToEscape";
 const string TO_ESCAPE_VALUE = "<script language='javascript'>";
 const string ESCAPED_VALUE = "&lt;script language=&apos;javascript&apos;&gt;";
 
-struct HttpRequest {
-  string url_path = "/";
-  string host = "localhost";
-  int32_t port = FLAGS_webserver_port;
-  map<string, string> headers = {};
-
-  // Adapted from:
-  // http://stackoverflow.com/questions/10982717/get-html-without-header-with-boostasio
-  Status Do(ostream* out, int expected_code, const string& method) {
-    try {
-      tcp::iostream request_stream;
-      request_stream.connect(host, lexical_cast<string>(port));
-      if (!request_stream) return Status("Could not connect request_stream");
-
-      request_stream << method << " " << url_path << " HTTP/1.1\r\n";
-      request_stream << "Host: " << host << ":" << port <<  "\r\n";
-      request_stream << "Accept: */*\r\n";
-      request_stream << "Cache-Control: no-cache\r\n";
-      if (method == "POST") {
-        request_stream << "Content-Length: 0\r\n";
-      }
-      for (const auto& header : headers) {
-        request_stream << header.first << ": " << header.second << "\r\n";
-      }
-
-      request_stream << "Connection: close\r\n\r\n";
-      request_stream.flush();
-
-      string line1;
-      getline(request_stream, line1);
-      if (!request_stream) return Status("No response");
-
-      stringstream response_stream(line1);
-      string http_version;
-      response_stream >> http_version;
-
-      unsigned int status_code;
-      response_stream >> status_code;
-
-      string status_message;
-      getline(response_stream, status_message);
-      if (!response_stream || http_version.substr(0,5) != "HTTP/") {
-        return Status("Malformed response");
-      }
-
-      if (status_code != expected_code) {
-        return Status(Substitute("Unexpected status code: $0", status_code));
-      }
-
-      (*out) << request_stream.rdbuf();
-      return Status::OK();
-    } catch (const std::exception& e){
-      return Status(e.what());
-    }
-  }
-
-  Status Get(ostream* out, int expected_code = 200) {
-    return Do(out, expected_code, "GET");
-  }
-
-  Status Post(ostream* out, int expected_code = 200) {
-    return Do(out, expected_code, "POST");
-  }
-};
-
-Status HttpGet(const string& host, const int32_t& port, const string& url_path,
-    ostream* out, int expected_code = 200, const string& method = "GET") {
-  return HttpRequest{url_path, host, port}.Do(out, expected_code, method);
-}
-
 string exec(const char* cmd) {
     std::array<char, 1024> buffer;
     string result;