You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/07/03 16:18:24 UTC

[1/4] impala git commit: [DOCS] Corrected the supported values for parquet_array_resolution

Repository: impala
Updated Branches:
  refs/heads/master e07fbc1b6 -> 2b6d71fee


[DOCS] Corrected the supported values for parquet_array_resolution

Change-Id: Icfc1b7209d0f6b28c814be4149b0bacfebad2356
Reviewed-on: http://gerrit.cloudera.org:8080/10840
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f20ecadb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f20ecadb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f20ecadb

Branch: refs/heads/master
Commit: f20ecadb0598166a747b2d89c479976cd8e9bedb
Parents: e07fbc1
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Jun 28 10:13:34 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Jul 2 23:06:52 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_parquet_array_resolution.xml | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f20ecadb/docs/topics/impala_parquet_array_resolution.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_parquet_array_resolution.xml b/docs/topics/impala_parquet_array_resolution.xml
index 62c78d2..e48555e 100644
--- a/docs/topics/impala_parquet_array_resolution.xml
+++ b/docs/topics/impala_parquet_array_resolution.xml
@@ -100,8 +100,9 @@ under the License.
     </p>
 
     <p>
-      <b>Type:</b> Enum of <codeph>ONE_LEVEL</codeph>, <codeph>TWO_LEVEL</codeph>,
-      <codeph>THREE_LEVEL</codeph>
+      <b>Type:</b> Enum of <codeph>TWO_LEVEL</codeph>,
+        <codeph>TWO_LEVEL_THEN_THREE_LEVEL</codeph>, and
+        <codeph>THREE_LEVEL</codeph>
     </p>
 
     <p>


[4/4] impala git commit: IMPALA-7224. Improve performance of UpdateCatalogMetrics

Posted by sa...@apache.org.
IMPALA-7224. Improve performance of UpdateCatalogMetrics

This function is called after every DDL query, and was implemented by
fetching the entire list of table names, even though only the length
of that list was needed. In workloads with millions of tables, this
could add several seconds of overhead following even simple requests
like 'USE' or 'DESCRIBE'.

I tested a backported version of this patch against one such workload.
It reduced the time taken for a simple DESCRIBE query from 12-14sec
down to about 40ms. I also tested locally that the metrics on impalad
were still updated by DDL operations.

Change-Id: Ic5467adbce1e760ff93996925db5611748efafc0
Reviewed-on: http://gerrit.cloudera.org:8080/10846
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2b6d71fe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2b6d71fe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2b6d71fe

Branch: refs/heads/master
Commit: 2b6d71fee779088af54cc416ee25027bbd415954
Parents: 0a47016
Author: Todd Lipcon <to...@cloudera.com>
Authored: Fri Jun 29 15:38:14 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 3 15:37:27 2018 +0000

----------------------------------------------------------------------
 be/src/service/frontend.cc                           |  5 +++++
 be/src/service/frontend.h                            |  4 ++++
 be/src/service/impala-server.cc                      | 15 ++++-----------
 common/thrift/Frontend.thrift                        |  6 ++++++
 .../java/org/apache/impala/service/Frontend.java     | 10 ++++++++++
 .../java/org/apache/impala/service/JniFrontend.java  | 11 +++++++++++
 6 files changed, 40 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 592a1dd..9ae9f90 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -82,6 +82,7 @@ Frontend::Frontend() {
     {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
     {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
     {"updateMembership", "([B)V", &update_membership_id_},
+    {"getCatalogMetrics", "()[B", &get_catalog_metrics_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
     {"describeDb", "([B)[B", &describe_db_id_},
     {"describeTable", "([B)[B", &describe_table_id_},
@@ -152,6 +153,10 @@ Status Frontend::ShowCreateFunction(const TGetFunctionsParams& params, string* r
   return JniUtil::CallJniMethod(fe_, show_create_function_id_, params, response);
 }
 
+Status Frontend::GetCatalogMetrics(TGetCatalogMetricsResult* resp) {
+  return JniUtil::CallJniMethod(fe_, get_catalog_metrics_id_, resp);
+}
+
 Status Frontend::GetTableNames(const string& db, const string* pattern,
     const TSessionState* session, TGetTablesResult* table_names) {
   TGetTablesParams params;

http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 08123b3..77836ab 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -55,6 +55,9 @@ class Frontend {
   /// Call FE to get TExecRequest.
   Status GetExecRequest(const TQueryCtx& query_ctx, TExecRequest* result);
 
+  /// Get the metrics from the catalog used by this frontend.
+  Status GetCatalogMetrics(TGetCatalogMetricsResult* resp);
+
   /// Returns all matching table names, per Hive's "SHOW TABLES <pattern>". Each
   /// table name returned is unqualified.
   /// If pattern is NULL, match all tables otherwise match only those tables that
@@ -194,6 +197,7 @@ class Frontend {
   jmethodID check_config_id_; // JniFrontend.checkConfiguration()
   jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache(byte[][])
   jmethodID update_membership_id_; // JniFrontend.updateMembership()
+  jmethodID get_catalog_metrics_id_; // JniFrontend.getCatalogMetrics()
   jmethodID get_table_names_id_; // JniFrontend.getTableNames
   jmethodID describe_db_id_; // JniFrontend.describeDb
   jmethodID describe_table_id_; // JniFrontend.describeTable

http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e9ddbe4..c1f00e3 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1154,17 +1154,10 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 }
 
 Status ImpalaServer::UpdateCatalogMetrics() {
-  TGetDbsResult dbs;
-  RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
-  ImpaladMetrics::CATALOG_NUM_DBS->SetValue(dbs.dbs.size());
-  ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(0L);
-  for (const TDatabase& db: dbs.dbs) {
-    TGetTablesResult table_names;
-    RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
-        &table_names));
-    ImpaladMetrics::CATALOG_NUM_TABLES->Increment(table_names.tables.size());
-  }
-
+  TGetCatalogMetricsResult metrics;
+  RETURN_IF_ERROR(exec_env_->frontend()->GetCatalogMetrics(&metrics));
+  ImpaladMetrics::CATALOG_NUM_DBS->SetValue(metrics.num_dbs);
+  ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(metrics.num_tables);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 8d32b53..6cbbf79 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -98,6 +98,12 @@ struct TGetTableMetricsResponse {
   1: required string metrics
 }
 
+// Response from a call to getCatalogMetrics.
+struct TGetCatalogMetricsResult {
+  1: required i32 num_dbs
+  2: required i32 num_tables
+}
+
 // Arguments to getDbs, which returns a list of dbs that match an optional pattern
 struct TGetDbsParams {
   // If not set, match every database

http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index ab330fd..9208184 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -114,6 +114,7 @@ import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainResult;
 import org.apache.impala.thrift.TFinalizeParams;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.TLineageGraph;
@@ -614,6 +615,15 @@ public class Frontend {
     return stringBuilder.toString();
   }
 
+  public TGetCatalogMetricsResult getCatalogMetrics() throws ImpalaException {
+    TGetCatalogMetricsResult resp = new TGetCatalogMetricsResult();
+    for (FeDb db : getCatalog().getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
+      resp.num_dbs++;
+      resp.num_tables += db.getAllTableNames().size();
+    }
+    return resp;
+  }
+
   /**
    * Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
    * accessible to 'user'.

http://git-wip-us.apache.org/repos/asf/impala/blob/2b6d71fe/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 25dee1c..ad8b165 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -68,6 +68,7 @@ import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.thrift.TGetAllHadoopConfigsResponse;
+import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.TGetDataSrcsParams;
 import org.apache.impala.thrift.TGetDataSrcsResult;
 import org.apache.impala.thrift.TGetDbsParams;
@@ -224,6 +225,16 @@ public class JniFrontend {
     return plan;
   }
 
+  public byte[] getCatalogMetrics() throws ImpalaException {
+    TGetCatalogMetricsResult metrics = frontend_.getCatalogMetrics();
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    try {
+      return serializer.serialize(metrics);
+    } catch (TException e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+
   /**
    * Implement Hive's pattern-matching semantics for "SHOW TABLE [[LIKE] 'pattern']", and
    * return a list of table names matching an optional pattern.


[2/4] impala git commit: IMPALA-6352: Dump backtrace on failure of TestTableSample

Posted by sa...@apache.org.
IMPALA-6352: Dump backtrace on failure of TestTableSample

TestTableSample is a flaky test which has been failing very rarely due
to a possible hung thread. Therefore this patch adds a timeout to the
test and logs the backtrace of all impalads if timeout occurs, so we
can get more information on the state of those threads.

Change-Id: I73fcdd30863cee105584c947bb0c48cf872809c1
Reviewed-on: http://gerrit.cloudera.org:8080/10851
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f3b1c4bc
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f3b1c4bc
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f3b1c4bc

Branch: refs/heads/master
Commit: f3b1c4bc65031899ca9d7e41ab6c7da79b18777d
Parents: f20ecad
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Jul 2 14:27:09 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 3 02:48:29 2018 +0000

----------------------------------------------------------------------
 tests/beeswax/impala_beeswax.py      | 26 +++++++++++++++++++++++---
 tests/common/impala_connection.py    |  5 +++++
 tests/query_test/test_tablesample.py | 23 +++++++++++++++++++++--
 3 files changed, 49 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f3b1c4bc/tests/beeswax/impala_beeswax.py
----------------------------------------------------------------------
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index e21f896..9489ed4 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -342,7 +342,7 @@ class ImpalaBeeswaxClient(object):
     """Executes a query and waits for completion"""
     handle = self.execute_query_async(query_string, user=user)
     # Wait for the query to finish execution.
-    self.wait_for_completion(handle)
+    self.wait_for_finished(handle)
     return handle
 
   def cancel_query(self, query_id):
@@ -351,8 +351,9 @@ class ImpalaBeeswaxClient(object):
   def close_query(self, handle):
     self.__do_rpc(lambda: self.imp_service.close(handle))
 
-  def wait_for_completion(self, query_handle):
-    """Given a query handle, polls the coordinator waiting for the query to complete"""
+  def wait_for_finished(self, query_handle):
+    """Given a query handle, polls the coordinator waiting for the query to transition to
+       'FINISHED' state"""
     while True:
       query_state = self.get_state(query_handle)
       # if the rpc succeeded, the output is the query state
@@ -367,6 +368,25 @@ class ImpalaBeeswaxClient(object):
           self.close_query(query_handle)
       time.sleep(0.05)
 
+  def wait_for_finished_timeout(self, query_handle, timeout=10):
+    """Given a query handle and a timeout, polls the coordinator waiting for the query to
+       transition to 'FINISHED' state till 'timeout' seconds"""
+    start_time = time.time()
+    while (time.time() - start_time < timeout):
+      query_state = self.get_state(query_handle)
+      # if the rpc succeeded, the output is the query state
+      if query_state == self.query_states["FINISHED"]:
+        return True
+      elif query_state == self.query_states["EXCEPTION"]:
+        try:
+          error_log = self.__do_rpc(
+            lambda: self.imp_service.get_log(query_handle.log_context))
+          raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
+        finally:
+          self.close_query(query_handle)
+      time.sleep(0.05)
+    return False
+
   def wait_for_admission_control(self, query_handle):
     """Given a query handle, polls the coordinator waiting for it to complete
       admission control processing of the query"""

http://git-wip-us.apache.org/repos/asf/impala/blob/f3b1c4bc/tests/common/impala_connection.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 84495dc..b075506 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -179,6 +179,11 @@ class BeeswaxConnection(ImpalaConnection):
     LOG.info("-- getting runtime profile operation: %s" % operation_handle)
     return self.__beeswax_client.get_runtime_profile(operation_handle.get_handle())
 
+  def wait_for_finished_timeout(self, operation_handle, timeout):
+    LOG.info("-- waiting for query to reach FINISHED state: %s" % operation_handle)
+    return self.__beeswax_client.wait_for_finished_timeout(
+      operation_handle.get_handle(), timeout)
+
   def wait_for_admission_control(self, operation_handle):
     LOG.info("-- waiting for completion of the admission control processing of the "
         "query: %s" % operation_handle)

http://git-wip-us.apache.org/repos/asf/impala/blob/f3b1c4bc/tests/query_test/test_tablesample.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py
index f3eaaaa..4bc7e1f 100644
--- a/tests/query_test/test_tablesample.py
+++ b/tests/query_test/test_tablesample.py
@@ -18,6 +18,7 @@
 # Tests the TABLESAMPLE clause.
 
 import pytest
+import subprocess
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_vector import ImpalaTestDimension
@@ -54,8 +55,26 @@ class TestTableSample(ImpalaTestSuite):
     for perc in [5, 20, 50]:
       rep_sql = ""
       if repeatable: rep_sql = " repeatable(1)"
-      result = self.client.execute(
-        "select count(*) from alltypes tablesample system(%s)%s" % (perc, rep_sql))
+      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s" \
+                 % (perc, rep_sql)
+      handle = self.client.execute_async(sql_stmt)
+      # IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 sec before
+      # failing and logging the backtraces of all impalads.
+      is_finished = self.client.wait_for_finished_timeout(handle, 500)
+      assert is_finished, 'Query Timed out. Dumping backtrace of all threads in ' \
+                          'impalads:\nthreads in the impalad1: %s \nthreads in the ' \
+                          'impalad2: %s \nthreads in the impalad3: %s' % \
+                        (subprocess.check_output(
+                          "gdb -ex \"set pagination 0\" -ex \"thread apply all bt\"  "
+                          "--batch -p $(pgrep impalad | sed -n 1p)", shell=True),
+                         subprocess.check_output(
+                          "gdb -ex \"set pagination 0\" -ex \"thread apply all bt\"  "
+                          "--batch -p $(pgrep impalad | sed -n 2p)", shell=True),
+                         subprocess.check_output(
+                          "gdb -ex \"set pagination 0\" -ex \"thread apply all bt\"  "
+                          "--batch -p $(pgrep impalad | sed -n 3p)", shell=True))
+      result = self.client.fetch(sql_stmt, handle)
+      self.client.close_query(handle)
       count = int(result.data[0])
       assert count < baseline_count
       if prev_count and repeatable:


[3/4] impala git commit: IMPALA-4784: Remove InProcessStatestore

Posted by sa...@apache.org.
IMPALA-4784: Remove InProcessStatestore

InProcessStatestore was only used by statestore-test, expr-test and
session-expiry-test. With a slight refactor of the Statestore class,
InProcessStatestore becomes obsolete.

This patch moves the ownership of the ThriftServer into the Statestore
class and Statestore::Init() now takes a 'port' parameter instead of
using the FLAGS_state_store_port directly.

We also remove the InProcessStatestore completely. A follow on patch will
get rid of the InProcessImpalaServer too (IMPALA-6013)

Testing: Ran 'core' tests.

Change-Id: I2621873e593b36c9612a6402ac6c5d8e3b49cde9
Reviewed-on: http://gerrit.cloudera.org:8080/10843
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0a470168
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0a470168
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0a470168

Branch: refs/heads/master
Commit: 0a470168138b5f3254d7604a120eb2376a91c20c
Parents: f3b1c4b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jun 26 10:15:26 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 3 08:21:48 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc             | 23 +++++++++++-----
 be/src/service/session-expiry-test.cc | 11 +++++---
 be/src/statestore/statestore-test.cc  | 43 +++++++++++++++++++-----------
 be/src/statestore/statestore.cc       | 31 ++++++++++++++++++++-
 be/src/statestore/statestore.h        | 14 ++++++++--
 be/src/statestore/statestored-main.cc | 24 +----------------
 be/src/testutil/in-process-servers.cc | 36 -------------------------
 be/src/testutil/in-process-servers.h  | 36 -------------------------
 8 files changed, 94 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 999f41a..e03f458 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -28,7 +28,6 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
 
-#include "testutil/gtest-util.h"
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/object-pool.h"
@@ -36,33 +35,36 @@
 #include "exprs/like-predicate.h"
 #include "exprs/literal.h"
 #include "exprs/null-literal.h"
-#include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
 #include "exprs/string-functions.h"
 #include "exprs/timestamp-functions.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/Exprs_types.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/hive_metastore_types.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-server.h"
-#include "runtime/runtime-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
+#include "runtime/runtime-state.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-parse-util.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "service/fe-support.h"
 #include "service/impala-server.h"
+#include "statestore/statestore.h"
+#include "testutil/gtest-util.h"
 #include "testutil/impalad-query-executor.h"
 #include "testutil/in-process-servers.h"
 #include "udf/udf-test-harness.h"
+#include "util/asan.h"
 #include "util/debug-util.h"
 #include "util/string-parser.h"
 #include "util/string-util.h"
 #include "util/test-info.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 
 #include "common/names.h"
 
@@ -83,6 +85,8 @@ using namespace impala;
 
 namespace impala {
 ImpaladQueryExecutor* executor_;
+scoped_ptr<MetricGroup> statestore_metrics(new MetricGroup("statestore_metrics"));
+Statestore* statestore;
 bool disable_codegen_;
 bool enable_expr_rewrites_;
 
@@ -8798,11 +8802,16 @@ int main(int argc, char** argv) {
   FLAGS_abort_on_config_error = false;
   VLOG_CONNECTION << "creating test env";
   VLOG_CONNECTION << "starting backends";
-  InProcessStatestore* ips;
-  ABORT_IF_ERROR(InProcessStatestore::StartWithEphemeralPorts(&ips));
+  statestore = new Statestore(statestore_metrics.get());
+  IGNORE_LEAKING_OBJECT(statestore);
+
+  // Pass in 0 to have the statestore use an ephemeral port for the service.
+  ABORT_IF_ERROR(statestore->Init(0));
   InProcessImpalaServer* impala_server;
   ABORT_IF_ERROR(InProcessImpalaServer::StartWithEphemeralPorts(
-      FLAGS_hostname, ips->port(), &impala_server));
+      FLAGS_hostname, statestore->port(), &impala_server));
+  IGNORE_LEAKING_OBJECT(impala_server);
+
   executor_ = new ImpaladQueryExecutor(FLAGS_hostname, impala_server->GetBeeswaxPort());
   ABORT_IF_ERROR(executor_->Setup());
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index b227c1f..89ae842 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -21,8 +21,10 @@
 #include "rpc/thrift-client.h"
 #include "service/fe-support.h"
 #include "service/impala-server.h"
+#include "statestore/statestore.h"
 #include "testutil/gtest-util.h"
 #include "testutil/in-process-servers.h"
+#include "util/asan.h"
 #include "util/impalad-metrics.h"
 #include "util/time.h"
 
@@ -48,11 +50,14 @@ TEST(SessionTest, TestExpiry) {
   FLAGS_idle_session_timeout = 1;
   // Skip validation checks for in-process backend.
   FLAGS_abort_on_config_error = false;
-  InProcessStatestore* ips;
-  ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips));
+  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+  Statestore* statestore = new Statestore(metrics.get());
+  IGNORE_LEAKING_OBJECT(statestore);
+  // Pass in 0 to have the statestore use an ephemeral port for the service.
+  ABORT_IF_ERROR(statestore->Init(0));
   InProcessImpalaServer* impala;
   ASSERT_OK(InProcessImpalaServer::StartWithEphemeralPorts(
-      "localhost", ips->port(), &impala));
+      "localhost", statestore->port(), &impala));
   IntCounter* expired_metric =
       impala->metrics()->FindMetricForTesting<IntCounter>(
           ImpaladMetricKeys::NUM_SESSIONS_EXPIRED);

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index b481a63..a9ee095 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "testutil/gtest-util.h"
-#include "testutil/in-process-servers.h"
 #include "common/init.h"
-#include "util/metrics.h"
 #include "statestore/statestore-subscriber.h"
+#include "testutil/gtest-util.h"
+#include "util/asan.h"
+#include "util/metrics.h"
 
 #include "common/names.h"
 
@@ -37,23 +37,29 @@ namespace impala {
 TEST(StatestoreTest, SmokeTest) {
   // All allocations done by 'new' to avoid problems shutting down Thrift servers
   // gracefully.
-  InProcessStatestore* ips;
-  ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&ips));
-  ASSERT_TRUE(ips != NULL) << "Could not start Statestore";
+  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+  Statestore* statestore = new Statestore(metrics.get());
+  // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
+  int statestore_port = 0;
+  IGNORE_LEAKING_OBJECT(statestore);
+  ASSERT_OK(statestore->Init(statestore_port));
+
+  scoped_ptr<MetricGroup> metrics_2(new MetricGroup("statestore_2"));
   // Port already in use
-  InProcessStatestore* statestore_wont_start =
-      new InProcessStatestore(ips->port(), ips->port() + 10);
-  ASSERT_FALSE(statestore_wont_start->Start().ok());
+  Statestore* statestore_wont_start = new Statestore(metrics_2.get());
+  ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
 
-  StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("sub1",
-      MakeNetworkAddress("localhost", 0),
-      MakeNetworkAddress("localhost", ips->port()), new MetricGroup(""));
+  StatestoreSubscriber* sub_will_start =
+      new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
+          MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+  IGNORE_LEAKING_OBJECT(sub_will_start);
   ASSERT_OK(sub_will_start->Start());
 
   // Confirm that a subscriber trying to use an in-use port will fail to start.
   StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("sub3",
       MakeNetworkAddress("localhost", sub_will_start->heartbeat_port()),
-      MakeNetworkAddress("localhost", ips->port()), new MetricGroup(""));
+      MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+  IGNORE_LEAKING_OBJECT(sub_will_not_start);
   ASSERT_FALSE(sub_will_not_start->Start().ok());
 }
 
@@ -67,13 +73,17 @@ TEST(StatestoreSslTest, SmokeTest) {
   server_key << impala_home << "/be/src/testutil/server-key.pem";
   FLAGS_ssl_private_key = server_key.str();
 
-  InProcessStatestore* statestore;
-  ASSERT_OK(InProcessStatestore::StartWithEphemeralPorts(&statestore));
-  if (statestore == NULL) FAIL() << "Unable to start Statestore";
+  // Thrift will internally pick an ephemeral port if we pass in 0 as the port.
+  int statestore_port = 0;
+  scoped_ptr<MetricGroup> metrics(new MetricGroup("statestore"));
+  Statestore* statestore = new Statestore(metrics.get());
+  IGNORE_LEAKING_OBJECT(statestore);
+  ASSERT_OK(statestore->Init(statestore_port));
 
   StatestoreSubscriber* sub_will_start = new StatestoreSubscriber("smoke_sub1",
       MakeNetworkAddress("localhost", 0),
       MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+  IGNORE_LEAKING_OBJECT(sub_will_start);
   ASSERT_OK(sub_will_start->Start());
 
   stringstream invalid_server_cert;
@@ -83,6 +93,7 @@ TEST(StatestoreSslTest, SmokeTest) {
   StatestoreSubscriber* sub_will_not_start = new StatestoreSubscriber("smoke_sub2",
       MakeNetworkAddress("localhost", 0),
       MakeNetworkAddress("localhost", statestore->port()), new MetricGroup(""));
+  IGNORE_LEAKING_OBJECT(sub_will_not_start);
   ASSERT_FALSE(sub_will_not_start->Start().ok());
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index a58aec1..a208d97 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -28,6 +28,7 @@
 
 #include "common/status.h"
 #include "gen-cpp/StatestoreService_types.h"
+#include "rpc/rpc-trace.h"
 #include "rpc/thrift-util.h"
 #include "statestore/failure-detector.h"
 #include "statestore/statestore-subscriber-client-wrapper.h"
@@ -98,6 +99,12 @@ DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time af
     "badly hung machines that are not able to respond to the update RPC in short "
     "order.");
 
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+DECLARE_string(ssl_minimum_version);
+
 // Metric keys
 // TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM
 const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
@@ -408,6 +415,7 @@ Statestore::Statestore(MetricGroup* metrics)
         FLAGS_statestore_max_missed_heartbeats / 2)) {
 
   DCHECK(metrics != NULL);
+  metrics_ = metrics;
   num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
   subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
       STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
@@ -426,10 +434,31 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
 }
 
-Status Statestore::Init() {
+Status Statestore::Init(int32_t state_store_port) {
+  boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
+  boost::shared_ptr<TProcessorEventHandler> event_handler(
+      new RpcEventHandler("statestore", metrics_));
+  processor->setEventHandler(event_handler);
+  ThriftServerBuilder builder("StatestoreService", processor, state_store_port);
+  if (IsInternalTlsConfigured()) {
+    SSLProtocol ssl_version;
+    RETURN_IF_ERROR(
+        SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
+    LOG(INFO) << "Enabling SSL for Statestore";
+    builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+        .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+        .ssl_version(ssl_version)
+        .cipher_list(FLAGS_ssl_cipher_list);
+  }
+  ThriftServer* server;
+  RETURN_IF_ERROR(builder.metrics(metrics_).Build(&server));
+  thrift_server_.reset(server);
+  RETURN_IF_ERROR(thrift_server_->Start());
+
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 71e1ade..1d7f1a2 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -123,9 +123,10 @@ class Statestore : public CacheLineAligned {
   /// The only constructor; initialises member variables only.
   Statestore(MetricGroup* metrics);
 
+  /// Initialize and start the backing ThriftServer with port 'state_store_port'.
   /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
-  /// ThreadPool initialization fails.
-  Status Init() WARN_UNUSED_RESULT;
+  /// any of the above initialization fails.
+  Status Init(int32_t state_store_port) WARN_UNUSED_RESULT;
 
   /// Registers a new subscriber with the given unique subscriber ID, running a subscriber
   /// service at the given location, with the provided list of topic subscriptions.
@@ -158,6 +159,9 @@ class Statestore : public CacheLineAligned {
   static const std::string IMPALA_MEMBERSHIP_TOPIC;
   /// Topic tracking the state of admission control on all coordinators.
   static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
+
+  int32_t port() { return thrift_server_->port(); }
+
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
   /// pair.
@@ -526,6 +530,12 @@ class Statestore : public CacheLineAligned {
   /// of time.
   boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_;
 
+  /// Container for the internal statestore service.
+  boost::scoped_ptr<ThriftServer> thrift_server_;
+
+  /// Pointer to the MetricGroup for this statestore. Not owned.
+  MetricGroup* metrics_;
+
   /// Thrift API implementation which proxies requests onto this Statestore
   boost::shared_ptr<StatestoreServiceIf> thrift_iface_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 633d449..d7db794 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -31,7 +31,6 @@
 #include "util/common-metrics.h"
 #include "util/debug-util.h"
 #include "util/metrics.h"
-#include "util/openssl-util.h"
 #include "util/memory-metrics.h"
 #include "util/webserver.h"
 #include "util/default-path-handlers.h"
@@ -39,12 +38,6 @@
 DECLARE_int32(state_store_port);
 DECLARE_int32(webserver_port);
 DECLARE_bool(enable_webserver);
-DECLARE_string(principal);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-DECLARE_string(ssl_minimum_version);
 
 #include "common/names.h"
 
@@ -82,7 +75,7 @@ int StatestoredMain(int argc, char** argv) {
   CommonMetrics::InitCommonMetrics(metrics.get());
 
   Statestore statestore(metrics.get());
-  ABORT_IF_ERROR(statestore.Init());
+  ABORT_IF_ERROR(statestore.Init(FLAGS_state_store_port));
   statestore.RegisterWebpages(webserver.get());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore.thrift_iface()));
@@ -90,21 +83,6 @@ int StatestoredMain(int argc, char** argv) {
       new RpcEventHandler("statestore", metrics.get()));
   processor->setEventHandler(event_handler);
 
-  ThriftServer* server;
-  ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port);
-  if (IsInternalTlsConfigured()) {
-    SSLProtocol ssl_version;
-    ABORT_IF_ERROR(
-        SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
-    LOG(INFO) << "Enabling SSL for Statestore";
-    builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-        .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-        .ssl_version(ssl_version)
-        .cipher_list(FLAGS_ssl_cipher_list);
-  }
-  ABORT_IF_ERROR(builder.metrics(metrics.get()).Build(&server));
-  ABORT_IF_ERROR(server->Start());
-
   statestore.MainLoop();
 
   return 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 031a07e..8a786e0 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -105,39 +105,3 @@ int InProcessImpalaServer::GetBeeswaxPort() const {
 int InProcessImpalaServer::GetHS2Port() const {
   return impala_server_->GetHS2Port();
 }
-
-Status InProcessStatestore::StartWithEphemeralPorts(InProcessStatestore** statestore) {
-  *statestore = new InProcessStatestore(0, 0);
-  return (*statestore)->Start();
-}
-
-InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port)
-    : webserver_(new Webserver(webserver_port)),
-      metrics_(new MetricGroup("statestore")),
-      statestore_port_(statestore_port),
-      statestore_(new Statestore(metrics_.get())) {
-  AddDefaultUrlCallbacks(webserver_.get());
-  statestore_->RegisterWebpages(webserver_.get());
-}
-
-Status InProcessStatestore::Start() {
-  RETURN_IF_ERROR(statestore_->Init());
-  RETURN_IF_ERROR(webserver_->Start());
-  boost::shared_ptr<TProcessor> processor(
-      new StatestoreServiceProcessor(statestore_->thrift_iface()));
-
-  ThriftServerBuilder builder("StatestoreService", processor, statestore_port_);
-  if (IsInternalTlsConfigured()) {
-    LOG(INFO) << "Enabling SSL for Statestore";
-    builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
-  }
-  ThriftServer* server;
-  ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server));
-  statestore_server_.reset(server);
-  RETURN_IF_ERROR(Thread::Create("statestore", "main-loop",
-      &Statestore::MainLoop, statestore_.get(), &statestore_main_loop_));
-
-  RETURN_IF_ERROR(statestore_server_->Start());
-  statestore_port_ = statestore_server_->port();
-  return WaitForServer("localhost", statestore_port_, 10, 100);
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/0a470168/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 6ac9734..f863650 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -93,42 +93,6 @@ class InProcessImpalaServer {
   boost::scoped_ptr<ExecEnv> exec_env_;
 };
 
-/// An in-process statestore, with webserver and metrics.
-class InProcessStatestore {
- public:
-
-  // Creates and starts an InProcessStatestore with ports chosen from the ephemeral port
-  // range. Returns OK and sets *statestore on success. On failure, an error is
-  /// returned and *statestore may or may not be set but is always invalid to use.
-  static Status StartWithEphemeralPorts(InProcessStatestore** statestore);
-
-  /// Constructs but does not start the statestore.
-  InProcessStatestore(int statestore_port, int webserver_port);
-
-  /// Starts the statestore server, and the processing thread.
-  Status Start();
-
-  uint32_t port() { return statestore_port_; }
-
- private:
-  /// Websever object to serve debug pages through.
-  boost::scoped_ptr<Webserver> webserver_;
-
-  /// MetricGroup object
-  boost::scoped_ptr<MetricGroup> metrics_;
-
-  /// Port to start the statestore on.
-  uint32_t statestore_port_;
-
-  /// The statestore instance
-  boost::scoped_ptr<Statestore> statestore_;
-
-  /// Statestore Thrift server
-  boost::scoped_ptr<ThriftServer> statestore_server_;
-
-  std::unique_ptr<Thread> statestore_main_loop_;
-};
-
 }
 
 #endif