You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2017/11/13 22:11:22 UTC

[1/6] incubator-impala git commit: IMPALA-6173: Fix SHOW CREATE TABLE for unpartitioned Kudu tables

Repository: incubator-impala
Updated Branches:
  refs/heads/master b4ea57a7e -> 6a2b7a64f


IMPALA-6173: Fix SHOW CREATE TABLE for unpartitioned Kudu tables

IMPALA-5546 added the ability to create unpartitioned Kudu tables, but
when SHOW CREATE TABLE is run on it still prints 'PARTITION BY' just
without a partition clause. This patch removes the 'PARTITION BY' from
the output.

Testing:
- Added test that runs SHOW CREATE on an unpartitioned Kudu table.

Change-Id: Icc327266cfb8b5c05efec97348528cea6904bb20
Reviewed-on: http://gerrit.cloudera.org:8080/8506
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3a1073c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3a1073c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3a1073c8

Branch: refs/heads/master
Commit: 3a1073c87c6ea08d9e8e2367851775748495300f
Parents: b4ea57a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Nov 8 14:41:38 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Nov 9 23:59:13 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/analysis/ToSqlUtils.java     |  2 +-
 tests/query_test/test_kudu.py                           | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3a1073c8/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 471947c..facebfd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -320,7 +320,7 @@ public class ToSqlUtils {
           Joiner.on(", \n  ").join(partitionColumnsSql)));
     }
 
-    if (kuduPartitionByParams != null) {
+    if (kuduPartitionByParams != null && !kuduPartitionByParams.equals("")) {
       sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3a1073c8/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index b951237..27ee757 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -805,6 +805,18 @@ class TestShowCreateTable(KuduTestSuite):
         STORED AS KUDU
         TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
             db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT PRIMARY KEY) STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
+          PRIMARY KEY (c)
+        )
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+
 
   def test_timestamp_default_value(self, cursor):
     create_sql_fmt = """


[6/6] incubator-impala git commit: IMPALA-4704: Turns on client connections when local catalog initialized.

Posted by jr...@apache.org.
IMPALA-4704: Turns on client connections when local catalog initialized.

Currently, impalad starts beeswax and hs2 servers even if the
catalog has not yet been initialized. As a result, client
connections see an error message stating that the impalad
is not yet ready.

This patch changes the impalad startup sequence to wait
until the catalog is received before opening beeswax and hs2 ports
and starting their servers.

Testing:
- python e2e tests that start a cluster without a catalog
  and check that client connections are rejected as expected.

Change-Id: I52b881cba18a7e4533e21a78751c2e35c3d4c8a6
Reviewed-on: http://gerrit.cloudera.org:8080/8202
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6a2b7a64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6a2b7a64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6a2b7a64

Branch: refs/heads/master
Commit: 6a2b7a64fb1fd710fe2e3c2a106a3bf589fefe76
Parents: 11bbc26
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Fri Sep 29 18:51:02 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Nov 13 21:14:14 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   2 +-
 be/src/common/global-flags.cc                   |   3 +
 be/src/runtime/exec-env.cc                      |  14 +-
 be/src/runtime/exec-env.h                       |   8 +-
 be/src/service/frontend.cc                      |  26 +++-
 be/src/service/frontend.h                       |  16 ++-
 be/src/service/impala-server.cc                 | 144 ++++++++++---------
 be/src/service/impala-server.h                  |  41 ++++--
 be/src/service/impalad-main.cc                  |  12 +-
 be/src/testutil/in-process-servers.cc           |  16 +--
 be/src/testutil/in-process-servers.h            |   2 +-
 bin/start-impala-cluster.py                     |  70 ++++++---
 .../org/apache/impala/service/Frontend.java     |  28 +++-
 .../org/apache/impala/service/JniFrontend.java  |   4 +-
 .../org/apache/impala/service/FrontendTest.java |  64 ---------
 tests/common/custom_cluster_test_suite.py       |  23 ++-
 tests/common/impala_cluster.py                  |   6 +-
 tests/common/impala_service.py                  |   2 +-
 tests/custom_cluster/test_catalog_wait.py       |  79 ++++++++++
 tests/custom_cluster/test_coordinators.py       |   2 +
 20 files changed, 347 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index f1208f8..b10a70f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -65,7 +65,7 @@ using namespace impala;
 class Planner {
  public:
   Planner() {
-    ABORT_IF_ERROR(frontend_.SetCatalogInitialized());
+    frontend_.SetCatalogIsReady();
     ABORT_IF_ERROR(exec_env_.InitForFeTests());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 29c16a5..c2b8e42 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -140,6 +140,9 @@ DEFINE_int32(stress_scratch_write_delay_ms, 0, "A stress option which causes wri
 DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option that "
     " causes calls to Thread::Create() to fail randomly 1% of the time on eligible "
     " codepaths. Effective in debug builds only.");
+DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects extra delay"
+    " in milliseconds when initializing an impalad's local catalog replica. Delay <= 0"
+    " inject no delay.");
 #endif
 
 // Used for testing the path where the Kudu client is stubbed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0ceb636..999b56a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -382,8 +382,8 @@ Status ExecEnv::Init() {
   return Status::OK();
 }
 
-Status ExecEnv::StartServices() {
-  LOG(INFO) << "Starting global services";
+Status ExecEnv::StartStatestoreSubscriberService() {
+  LOG(INFO) << "Starting statestore subscriber service";
 
   // Must happen after all topic registrations / callbacks are done
   if (statestore_subscriber_.get() != nullptr) {
@@ -394,8 +394,14 @@ Status ExecEnv::StartServices() {
     }
   }
 
-  // Start this last so everything is in place before accepting the first call.
-  if (FLAGS_use_krpc) RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
+  return Status::OK();
+}
+
+Status ExecEnv::StartKrpcService() {
+  if (FLAGS_use_krpc) {
+    LOG(INFO) << "Starting KRPC service";
+    RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index df0d926..8fafdc5 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -91,8 +91,12 @@ class ExecEnv {
   /// subsystems like the webserver, scheduler etc.
   Status Init();
 
-  /// Starts any dependent services in their correct order
-  Status StartServices() WARN_UNUSED_RESULT;
+  /// Starts the service to subscribe to the statestore.
+  Status StartStatestoreSubscriberService() WARN_UNUSED_RESULT;
+
+  /// Starts krpc, if needed. Start this last so everything is in place before accepting
+  /// the first call.
+  Status StartKrpcService() WARN_UNUSED_RESULT;
 
   /// TODO: Should ExecEnv own the ImpalaServer as well?
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index e48cb1e..df39114 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -24,9 +24,14 @@
 #include "rpc/jni-thrift-util.h"
 #include "util/backend-gflag-util.h"
 #include "util/jni-util.h"
+#include "util/time.h"
 
 #include "common/names.h"
 
+#ifndef NDEBUG
+DECLARE_int32(stress_catalog_init_delay_ms);
+#endif
+
 using namespace impala;
 
 // Authorization related flags. Must be set to valid values to properly configure
@@ -77,7 +82,8 @@ Frontend::Frontend() {
     {"getRoles", "([B)[B", &show_roles_id_},
     {"getRolePrivileges", "([B)[B", &get_role_privileges_id_},
     {"execHiveServer2MetadataOp", "([B)[B", &exec_hs2_metadata_op_id_},
-    {"setCatalogInitialized", "()V", &set_catalog_initialized_id_},
+    {"setCatalogIsReady", "()V", &set_catalog_is_ready_id_},
+    {"waitForCatalog", "()V", &wait_for_catalog_id_},
     {"loadTableData", "([B)[B", &load_table_data_id_},
     {"getTableFiles", "([B)[B", &get_table_files_id_},
     {"showCreateFunction", "([B)Ljava/lang/String;", &show_create_function_id_},
@@ -238,13 +244,19 @@ bool Frontend::IsAuthorizationError(const Status& status) {
   return !status.ok() && status.GetDetail().find("AuthorizationException") == 0;
 }
 
-Status Frontend::SetCatalogInitialized() {
+void Frontend::SetCatalogIsReady() {
   JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jni_env->CallObjectMethod(fe_, set_catalog_initialized_id_);
-  RETURN_ERROR_IF_EXC(jni_env);
-  return Status::OK();
+  jni_env->CallVoidMethod(fe_, set_catalog_is_ready_id_);
+}
+
+void Frontend::WaitForCatalog() {
+#ifndef NDEBUG
+  if (FLAGS_stress_catalog_init_delay_ms > 0) {
+    SleepForMs(FLAGS_stress_catalog_init_delay_ms);
+  }
+#endif
+  JNIEnv* jni_env = getJNIEnv();
+  jni_env->CallVoidMethod(fe_, wait_for_catalog_id_);
 }
 
 Status Frontend::GetTableFiles(const TShowFilesParams& params, TResultSet* result) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index c5c4895..4881220 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -159,11 +159,14 @@ class Frontend {
   /// Returns true if the error returned by the FE was due to an AuthorizationException.
   static bool IsAuthorizationError(const Status& status);
 
-  /// Sets the FE catalog to be initialized. This is only used for testing in
-  /// conjunction with InProcessImpalaServer. This sets the FE catalog to
-  /// be initialized, ready to receive queries without needing a catalog
-  /// server.
-  Status SetCatalogInitialized();
+  /// Sets the frontend's catalog in the ready state. This is only used for testing in
+  /// conjunction with InProcessImpalaServer. This sets the frontend's catalog as
+  /// ready, so can receive queries without needing a catalog server.
+  void SetCatalogIsReady();
+
+  /// Waits for the FE catalog to be initialized and ready to receive queries.
+  /// There is no bound on the wait time.
+  void WaitForCatalog();
 
   /// Call FE to get files info for a table or partition.
   Status GetTableFiles(const TShowFilesParams& params, TResultSet* result);
@@ -197,7 +200,8 @@ class Frontend {
   jmethodID get_role_privileges_id_; // JniFrontend.getRolePrivileges
   jmethodID exec_hs2_metadata_op_id_; // JniFrontend.execHiveServer2MetadataOp
   jmethodID load_table_data_id_; // JniFrontend.loadTableData
-  jmethodID set_catalog_initialized_id_; // JniFrontend.setCatalogInitialized
+  jmethodID set_catalog_is_ready_id_; // JniFrontend.setCatalogIsReady
+  jmethodID wait_for_catalog_id_; // JniFrontend.waitForCatalog
   jmethodID get_table_files_id_; // JniFrontend.getTableFiles
   jmethodID show_create_function_id_; // JniFrontend.showCreateFunction
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4c540b7..0aecece 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -274,7 +274,8 @@ class CancellationWork {
 
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
-      thrift_serializer_(false) {
+      thrift_serializer_(false),
+      services_started_(false) {
   // Initialize default config
   InitializeConfigVariables();
 
@@ -1468,7 +1469,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       update_req.__set_removed_objects(catalog_update_result.removed_catalog_objects);
     }
 
-     // Apply the changes to the local catalog cache.
+    // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
     Status status = exec_env_->frontend()->UpdateCatalogCache(
         vector<TUpdateCatalogCacheRequest>{update_req}, &resp);
@@ -1540,7 +1541,8 @@ void ImpalaServer::MembershipCallback(
     }
 
     // Register the local backend in the statestore and update the list of known backends.
-    AddLocalBackendToStatestore(subscriber_topic_updates);
+    // Only register if all ports have been opened and are ready.
+    if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
 
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
@@ -1924,21 +1926,28 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
-Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port) {
+Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
+   int32_t hs2_port) {
   exec_env_->SetImpalaServer(this);
-  boost::shared_ptr<ImpalaServer> handler = shared_from_this();
 
   if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
     return Status("Impala does not have a valid role configured. "
         "Either --is_coordinator or --is_executor must be set to true.");
   }
 
+  // Subscribe with the statestore. Coordinators need to subscribe to the catalog topic
+  // then wait for the initial catalog update.
+  RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
+
+  if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog();
+
   SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
   if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) {
     RETURN_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
   }
 
+  // Start the internal service.
   if (thrift_be_port > 0) {
     boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
     boost::shared_ptr<TProcessor> be_processor(
@@ -1962,88 +1971,85 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t
   }
 
   if (!FLAGS_is_coordinator) {
-    // We don't start the Beeswax and HS2 servers if this impala daemon is just an
-    // executor.
-    LOG(INFO) << "Started executor Impala server on "
+    LOG(INFO) << "Initialized executor Impala server on "
               << ExecEnv::GetInstance()->backend_address();
-    return Status::OK();
-  }
-
-  // Start the Beeswax and HS2 servers.
-  if (beeswax_port > 0) {
-    boost::shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("beeswax", exec_env_->metrics()));
-    beeswax_processor->setEventHandler(event_handler);
-    ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
+  } else {
+    // Initialize the client servers.
+    boost::shared_ptr<ImpalaServer> handler = shared_from_this();
+    if (beeswax_port > 0) {
+      boost::shared_ptr<TProcessor> beeswax_processor(
+          new ImpalaServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("beeswax", exec_env_->metrics()));
+      beeswax_processor->setEventHandler(event_handler);
+      ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
+
+      if (!FLAGS_ssl_server_certificate.empty()) {
+        LOG(INFO) << "Enabling SSL for Beeswax";
+        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+              .ssl_version(ssl_version)
+              .cipher_list(FLAGS_ssl_cipher_list);
+      }
 
-    if (!FLAGS_ssl_server_certificate.empty()) {
-      LOG(INFO) << "Enabling SSL for Beeswax";
-      builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
+      ThriftServer* server;
+      RETURN_IF_ERROR(
+          builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+          .metrics(exec_env_->metrics())
+          .max_concurrent_connections(FLAGS_fe_service_threads)
+          .Build(&server));
+      beeswax_server_.reset(server);
+      beeswax_server_->SetConnectionHandler(this);
     }
 
-    ThriftServer* server;
-    RETURN_IF_ERROR(
-        builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env_->metrics())
-            .max_concurrent_connections(FLAGS_fe_service_threads)
-            .Build(&server));
-    beeswax_server_.reset(server);
-    beeswax_server_->SetConnectionHandler(this);
-  }
-
-  if (hs2_port > 0) {
-    boost::shared_ptr<TProcessor> hs2_fe_processor(
-        new ImpalaHiveServer2ServiceProcessor(handler));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("hs2", exec_env_->metrics()));
-    hs2_fe_processor->setEventHandler(event_handler);
-
-    ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
+    if (hs2_port > 0) {
+      boost::shared_ptr<TProcessor> hs2_fe_processor(
+          new ImpalaHiveServer2ServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("hs2", exec_env_->metrics()));
+      hs2_fe_processor->setEventHandler(event_handler);
+
+      ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
+
+      if (!FLAGS_ssl_server_certificate.empty()) {
+        LOG(INFO) << "Enabling SSL for HiveServer2";
+        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+              .ssl_version(ssl_version)
+              .cipher_list(FLAGS_ssl_cipher_list);
+      }
 
-    if (!FLAGS_ssl_server_certificate.empty()) {
-      LOG(INFO) << "Enabling SSL for HiveServer2";
-      builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
+      ThriftServer* server;
+      RETURN_IF_ERROR(
+          builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+          .metrics(exec_env_->metrics())
+          .max_concurrent_connections(FLAGS_fe_service_threads)
+          .Build(&server));
+      hs2_server_.reset(server);
+      hs2_server_->SetConnectionHandler(this);
     }
-
-    ThriftServer* server;
-    RETURN_IF_ERROR(
-        builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
-            .metrics(exec_env_->metrics())
-            .max_concurrent_connections(FLAGS_fe_service_threads)
-            .Build(&server));
-    hs2_server_.reset(server);
-    hs2_server_->SetConnectionHandler(this);
-
   }
+  LOG(INFO) << "Initialized coordinator/executor Impala server on "
+      << ExecEnv::GetInstance()->backend_address();
 
-  LOG(INFO) << "Started coordinator/executor Impala server on "
-            << ExecEnv::GetInstance()->backend_address();
-
-  return Status::OK();
-}
-
-Status ImpalaServer::Start() {
-  RETURN_IF_ERROR(exec_env_->StartServices());
+  // Start the RPC services.
+  RETURN_IF_ERROR(exec_env_->StartKrpcService());
   if (thrift_be_server_.get()) {
     RETURN_IF_ERROR(thrift_be_server_->Start());
     LOG(INFO) << "Impala InternalService listening on " << thrift_be_server_->port();
   }
-
   if (hs2_server_.get()) {
     RETURN_IF_ERROR(hs2_server_->Start());
-    LOG(INFO) << "Impala HiveServer2 Service listening on " << beeswax_server_->port();
+    LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
   }
   if (beeswax_server_.get()) {
     RETURN_IF_ERROR(beeswax_server_->Start());
-    LOG(INFO) << "Impala Beeswax Service listening on " << hs2_server_->port();
+    LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
   }
+  services_started_ = true;
+  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
+  LOG(INFO) << "Impala has started.";
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 81ec929..c808c35 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -18,6 +18,7 @@
 #ifndef IMPALA_SERVICE_IMPALA_SERVER_H
 #define IMPALA_SERVICE_IMPALA_SERVER_H
 
+#include <atomic>
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -75,6 +76,30 @@ class ClientRequestState;
 /// An ImpalaServer contains both frontend and backend functionality;
 /// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service (HiveServer2)
 /// and ImpalaInternalService APIs.
+/// ImpalaServer can be started in 1 of 3 roles: executor, coordinator, or both executor
+/// and coordinator. All roles start ImpalaInternalService API's. The
+/// coordinator role additionally starts client API's (Beeswax and HiveServer2).
+///
+/// Startup Sequence
+/// ----------------
+/// The startup sequence opens and starts all services so that they are ready to be used
+/// by clients at the same time. The Impala server is considered 'ready' only when it can
+/// process requests with all of its specified roles. Avoiding states where some roles are
+/// ready and some are not makes it easier to reason about the state of the server.
+///
+/// Main thread (caller code), after instantiating the server, must call Start().
+/// Start() does the following:
+///    - Start internal services
+///    - Wait (indefinitely) for local catalog to be initialized from statestore
+///      (if coordinator)
+///    - Open ImpalaInternalService ports
+///    - Open client ports (if coordinator)
+///    - Start ImpalaInternalService API
+///    - Start client service API's (if coordinator)
+///    - Set services_started_ flag
+///
+/// Internally, the Membership callback thread also participates in startup:
+///    - If services_started_, then register to the statestore as an executor.
 ///
 /// Locking
 /// -------
@@ -118,14 +143,10 @@ class ImpalaServer : public ImpalaServiceIf,
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
-  /// Initializes RPC services and other subsystems (like audit logging). Returns an error
-  /// if initialization failed. If any ports are <= 0, their respective service will not
-  /// be started.
-  Status Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
-
-  /// Starts client and internal services. Does not block. Returns an error if any service
-  /// failed to start.
-  Status Start();
+  /// Initializes and starts RPC services and other subsystems (like audit logging).
+  /// Returns an error if starting any services failed. If the port is <= 0, their
+  ///respective service will not be started.
+  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
 
   /// Blocks until the server shuts down (by calling Shutdown()).
   void Join();
@@ -1015,6 +1036,10 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::scoped_ptr<ThriftServer> hs2_server_;
   boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
+  /// Flag that records if backend and/or client services have been started. The flag is
+  /// set after all services required for the server have been started.
+  std::atomic_bool services_started_;
+
   /// Set to true when this ImpalaServer should shut down.
   Promise<bool> shutdown_promise_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index c9627d9..a7f6abd 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -84,21 +84,15 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  ABORT_IF_ERROR(impala_server->Init(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port));
-
-  DCHECK(exec_env.process_mem_tracker() != nullptr)
-      << "ExecEnv::StartServices() must be called before starting RPC services";
-  Status status = impala_server->Start();
+  Status status =
+      impala_server->Start(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
-               << status.GetDetail();
+        << status.GetDetail();
     ShutdownLogging();
     exit(1);
   }
 
-  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
-  LOG(INFO) << "Impala has started.";
-
   impala_server->Join();
 
   return 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 64d681c..4817d7f 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -72,11 +72,9 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
     // Start the daemon and check if it works, if not delete the current server object and
     // pick a new set of ports
     Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
-    if (started.ok()) {
-      const Status status = impala->SetCatalogInitialized();
-      if (!status.ok()) LOG(WARNING) << status.GetDetail();
-      return impala;
-    }
+    if (started.ok()) return impala;
+    LOG(WARNING) << started.GetDetail();
+
     delete impala;
   }
   DCHECK(false) << "Could not find port to start Impalad.";
@@ -94,9 +92,9 @@ InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend
           webserver_port, statestore_host, statestore_port)) {
 }
 
-Status InProcessImpalaServer::SetCatalogInitialized() {
+void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
-  return exec_env_->frontend()->SetCatalogInitialized();
+  exec_env_->frontend()->SetCatalogIsReady();
 }
 
 Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
@@ -105,8 +103,8 @@ Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_p
   hs2_port_ = hs2_port;
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
-  RETURN_IF_ERROR(impala_server_->Init(backend_port_, beeswax_port, hs2_port));
-  RETURN_IF_ERROR(impala_server_->Start());
+  SetCatalogIsReady();
+  RETURN_IF_ERROR(impala_server_->Start(backend_port_, beeswax_port, hs2_port));
 
   // Wait for up to 1s for the backend server to start
   RETURN_IF_ERROR(WaitForServer(hostname_, backend_port_, 10, 100));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 6842255..fb69135 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -70,7 +70,7 @@ class InProcessImpalaServer {
 
   /// Sets the catalog on this impalad to be initialized. If we don't
   /// start up a catalogd, then there is no one to initialize it otherwise.
-  Status SetCatalogInitialized();
+  void SetCatalogIsReady();
 
   uint32_t beeswax_port() const { return beeswax_port_; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 670e693..e7dcea4 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -25,7 +25,8 @@ import psutil
 import sys
 from getpass import getuser
 from time import sleep, time
-from optparse import OptionParser
+from optparse import OptionParser, SUPPRESS_HELP
+from testdata.common import cgroups
 
 KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
 DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
@@ -69,9 +70,6 @@ parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
                   help='Max number of log files before rotation occurs.')
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
                   help="Prints all output to stderr/stdout.")
-parser.add_option("--wait_for_cluster", dest="wait_for_cluster", action="store_true",
-                  default=False, help="Wait until the cluster is ready to accept "
-                  "queries before returning.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
 parser.add_option("--jvm_args", dest="jvm_args", default="",
@@ -80,6 +78,11 @@ parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
                   help="The host name or address of the Kudu master. Multiple masters "
                       "can be specified using a comma separated list.")
 
+# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
+# replica initialization. The ith delay is applied to the ith impalad.
+parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="",
+                  help=SUPPRESS_HELP)
+
 options, args = parser.parse_args()
 
 IMPALA_HOME = os.environ['IMPALA_HOME']
@@ -217,6 +220,10 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
   # --impalad_args flag. virtual_memory().total returns the total physical memory.
   mem_limit = int(0.8 * psutil.virtual_memory().total / cluster_size)
 
+  delay_list = []
+  if options.catalog_init_delays != "":
+    delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")]
+
   # Start each impalad instance and optionally redirect the output to a log file.
   for i in range(cluster_size):
     if i == 0:
@@ -248,6 +255,9 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
       # Coordinator instance that doesn't execute non-coordinator fragments
       args = "-is_executor=false %s" % (args)
 
+    if i < len(delay_list):
+      args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
+
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
@@ -281,37 +291,53 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   A cluster is deemed "ready" if:
     - All backends are registered with the statestore.
     - Each impalad knows about all other impalads.
+    - Each coordinator impalad's catalog cache is ready.
   This information is retrieved by querying the statestore debug webpage
   and each individual impalad's metrics webpage.
   """
   impala_cluster = ImpalaCluster()
   # impalad processes may take a while to come up.
   wait_for_impala_process_count(impala_cluster)
+
+  # TODO: fix this for coordinator-only nodes as well.
+  expected_num_backends = options.cluster_size
+  if options.catalog_init_delays != "":
+    for delay in options.catalog_init_delays.split(","):
+      if int(delay.strip()) != 0: expected_num_backends -= 1
+
   for impalad in impala_cluster.impalads:
-    impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+    impalad.service.wait_for_num_known_live_backends(expected_num_backends,
         timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-      wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
+    if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
+       impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
+      wait_for_catalog(impalad)
 
-def wait_for_catalog(impalad, timeout_in_seconds):
-  """Waits for the impalad catalog to become ready"""
+def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
+  """Waits for a catalog copy to be received by the impalad. When its received,
+     additionally waits for client ports to be opened."""
   start_time = time()
-  catalog_ready = False
-  attempt = 0
-  while (time() - start_time < timeout_in_seconds and not catalog_ready):
+  client_beeswax = None
+  client_hs2 = None
+  num_dbs = 0
+  num_tbls = 0
+  while (time() - start_time < timeout_in_seconds):
     try:
       num_dbs = impalad.service.get_metric_value('catalog.num-databases')
       num_tbls = impalad.service.get_metric_value('catalog.num-tables')
-      catalog_ready = impalad.service.get_metric_value('catalog.ready')
-      if catalog_ready or attempt % 4 == 0:
-          print 'Waiting for Catalog... Status: %s DBs / %s tables (ready=%s)' %\
-              (num_dbs, num_tbls, catalog_ready)
-      attempt += 1
-    except Exception, e:
-      print e
+      client_beeswax = impalad.service.create_beeswax_client()
+      client_hs2 = impalad.service.create_hs2_client()
+      break
+    except Exception as e:
+      print 'Client services not ready.'
+      print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again ...' %\
+        (num_dbs, num_tbls)
+    finally:
+      if client_beeswax is not None: client_beeswax.close()
     sleep(0.5)
-  if not catalog_ready:
-    raise RuntimeError('Catalog was not initialized in expected time period.')
+
+  if client_beeswax is None or client_hs2 is None:
+    raise RuntimeError('Unable to open client ports within %s seconds.'\
+                       % timeout_in_seconds)
 
 def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   """Checks if the cluster is "ready" by executing a simple query in a loop"""
@@ -389,6 +415,8 @@ if __name__ == "__main__":
                             options.use_exclusive_coordinators)
     # Sleep briefly to reduce log spam: the cluster takes some time to start up.
     sleep(3)
+
+    # Check for the cluster to be ready.
     wait_for_cluster()
   except Exception, e:
     print 'Error starting cluster: %s' % e

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 63941c1..f9a29f4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -856,6 +856,28 @@ public class Frontend {
   }
 
   /**
+   * Waits indefinitely for the local catalog to be ready. The catalog is "ready" after
+   * the first catalog update is received from the statestore.
+   *
+   * @see ImpaladCatalog.isReady
+   */
+  public void waitForCatalog() {
+    LOG.info("Waiting for first catalog update from the statestore.");
+    int numTries = 0;
+    long startTimeMs = System.currentTimeMillis();
+    while (true) {
+      if (getCatalog().isReady()) {
+        LOG.info("Local catalog initialized after: " +
+            (System.currentTimeMillis() - startTimeMs) + " ms.");
+        return;
+      }
+      LOG.info("Waiting for local catalog to be initialized, attempt: " + numTries);
+      getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
+      ++numTries;
+    }
+  }
+
+  /**
    * Overload of requestTblLoadAndWait that uses the default timeout.
    */
   public boolean requestTblLoadAndWait(Set<TableName> requestedTbls)
@@ -879,10 +901,8 @@ public class Frontend {
    */
   private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
       throws AnalysisException, InternalException, AuthorizationException {
-    if (!impaladCatalog_.get().isReady()) {
-      throw new AnalysisException("This Impala daemon is not ready to accept user " +
-          "requests. Status: Waiting for catalog update from the StateStore.");
-    }
+    Preconditions.checkState(getCatalog().isReady(),
+        "Local catalog has not been initialized. Aborting query analysis.");
 
     AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_.get(), queryCtx,
         authzConfig_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index cfd83a5..688bd0e 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -580,10 +580,12 @@ public class JniFrontend {
     }
   }
 
-  public void setCatalogInitialized() {
+  public void setCatalogIsReady() {
     frontend_.getCatalog().setIsReady(true);
   }
 
+  public void waitForCatalog() { frontend_.waitForCatalog(); }
+
   // Caching this saves ~50ms per call to getHadoopConfigAsHtml
   private static final Configuration CONF = new Configuration();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/fe/src/test/java/org/apache/impala/service/FrontendTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index deab9eb..c87882d 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -66,70 +66,6 @@ import com.google.common.collect.Sets;
 public class FrontendTest extends FrontendTestBase {
 
   @Test
-  public void TestCatalogReadiness() throws ImpalaException {
-    // Test different authorization configurations.
-    List<AuthorizationConfig> authzConfigs = Lists.newArrayList();
-    authzConfigs.add(AuthorizationConfig.createAuthDisabledConfig());
-    authzConfigs.add(AuthorizationTest.createPolicyFileAuthzConfig());
-    authzConfigs.add(AuthorizationTest.createSentryServiceAuthzConfig());
-    // Test the behavior with different stmt types.
-    List<String> testStmts = Lists.newArrayList();
-    testStmts.add("select * from functional.alltypesagg");
-    testStmts.add("select 1");
-    testStmts.add("show tables in tpch");
-    testStmts.add("create table tpch.ready_test (i int)");
-    testStmts.add("insert into functional.alltypes partition (year, month) " +
-        "select * from functional.alltypestiny");
-    for (AuthorizationConfig authzConfig: authzConfigs) {
-      ImpaladTestCatalog catalog = new ImpaladTestCatalog(authzConfig);
-      Frontend fe = new Frontend(authzConfig, catalog);
-
-      // When the catalog is ready, all stmts should pass analysis.
-      Preconditions.checkState(catalog.isReady());
-      for (String stmt: testStmts) testCatalogIsReady(stmt, fe);
-
-      // When the catalog is not ready, all stmts should fail analysis.
-      catalog.setIsReady(false);
-      for (String stmt: testStmts) testCatalogIsNotReady(stmt, fe);
-    }
-  }
-
-  /**
-   * Creates an exec request from 'stmt' using the given 'fe'.
-   * Expects that no exception is thrown.
-   */
-  private void testCatalogIsReady(String stmt, Frontend fe) {
-    System.out.println(stmt);
-    TQueryCtx queryCtx = TestUtils.createQueryContext(
-        Catalog.DEFAULT_DB, AuthorizationTest.USER.getName());
-    queryCtx.client_request.setStmt(stmt);
-    try {
-      fe.createExecRequest(queryCtx, new StringBuilder());
-    } catch (Exception e) {
-      fail("Failed to create exec request due to: " + ExceptionUtils.getStackTrace(e));
-    }
-  }
-
-  /**
-   * Creates an exec request from 'stmt' using the given 'fe'.
-   * Expects that the stmt fails to analyze because the catalog is not ready.
-   */
-  private void testCatalogIsNotReady(String stmt, Frontend fe) {
-    TQueryCtx queryCtx = TestUtils.createQueryContext(
-        Catalog.DEFAULT_DB, AuthorizationTest.USER.getName());
-    queryCtx.client_request.setStmt(stmt);
-    try {
-      fe.createExecRequest(queryCtx, new StringBuilder());
-      fail("Expected failure to due uninitialized catalog.");
-    } catch (AnalysisException e) {
-      assertEquals("This Impala daemon is not ready to accept user requests. " +
-          "Status: Waiting for catalog update from the StateStore.", e.getMessage());
-    } catch (Exception e) {
-      fail("Failed to create exec request due to: " + ExceptionUtils.getStackTrace(e));
-    }
-  }
-
-  @Test
   public void TestGetTypeInfo() throws ImpalaException {
     // Verify that the correct number of types are returned.
     TMetadataOpRequest getInfoReq = new TMetadataOpRequest();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index a48fd0d..c264d0e 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -31,12 +31,13 @@ from time import sleep
 IMPALA_HOME = os.environ['IMPALA_HOME']
 CLUSTER_SIZE = 3
 NUM_COORDINATORS = CLUSTER_SIZE
-# The number of statestore subscribers is CLUSTER_SIZE (# of impalad) + 1 (for catalogd).
-NUM_SUBSCRIBERS = CLUSTER_SIZE + 1
 
+# Additional args passed to respective daemon command line.
 IMPALAD_ARGS = 'impalad_args'
 STATESTORED_ARGS = 'state_store_args'
 CATALOGD_ARGS = 'catalogd_args'
+# Additional args passed to the start-impala-cluster script.
+START_ARGS = 'start_args'
 
 class CustomClusterTestSuite(ImpalaTestSuite):
   """Every test in a test suite deriving from this class gets its own Impala cluster.
@@ -81,7 +82,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     pass
 
   @staticmethod
-  def with_args(impalad_args=None, statestored_args=None, catalogd_args=None):
+  def with_args(impalad_args=None, statestored_args=None, catalogd_args=None, start_args=None):
     """Records arguments to be passed to a cluster by adding them to the decorated
     method's func_dict"""
     def decorate(func):
@@ -91,6 +92,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[STATESTORED_ARGS] = statestored_args
       if catalogd_args is not None:
         func.func_dict[CATALOGD_ARGS] = catalogd_args
+      if start_args is not None:
+        func.func_dict[START_ARGS] = start_args
       return func
     return decorate
 
@@ -99,6 +102,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS]:
       if arg in method.func_dict:
         cluster_args.append("--%s=\"%s\" " % (arg, method.func_dict[arg]))
+    if START_ARGS in method.func_dict:
+      cluster_args.append(method.func_dict[START_ARGS])
+
     # Start a clean new cluster before each test
     self._start_impala_cluster(cluster_args)
     super(CustomClusterTestSuite, self).setup_class()
@@ -117,7 +123,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   @classmethod
   def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"),
       cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS,
-      use_exclusive_coordinators=False, log_level=1):
+      use_exclusive_coordinators=False, log_level=1, expected_num_executors=CLUSTER_SIZE):
     cls.impala_log_dir = log_dir
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
@@ -133,9 +139,14 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     statestored = cls.cluster.statestored
     if statestored is None:
       raise Exception("statestored was not found")
-    statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
+
+    # The number of statestore subscribers is
+    # cluster_size (# of impalad) + 1 (for catalogd).
+    expected_subscribers = cluster_size + 1
+
+    statestored.service.wait_for_live_subscribers(expected_subscribers, timeout=60)
     for impalad in cls.cluster.impalads:
-      impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
+      impalad.service.wait_for_num_known_live_backends(expected_num_executors, timeout=60)
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index d27b89a..3fbcacf 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -169,6 +169,7 @@ class Process(object):
       assert 0, "No processes %s found" % self.cmd
     LOG.info('Killing: %s (PID: %d) with signal %s'  % (' '.join(self.cmd), pid, signal))
     exec_process("kill -%d %d" % (signal, pid))
+
     return pid
 
   def restart(self):
@@ -222,8 +223,9 @@ class ImpaladProcess(BaseImpalaProcess):
   def start(self, wait_until_ready=True):
     """Starts the impalad and waits until the service is ready to accept connections."""
     super(ImpaladProcess, self).start()
-    self.service.wait_for_metric_value('impala-server.ready',
-        expected_value=1, timeout=30)
+    if wait_until_ready:
+      self.service.wait_for_metric_value('impala-server.ready',
+                                         expected_value=1, timeout=30)
 
 
 # Represents a statestored process

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 3fb73bc..be33328 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -78,7 +78,7 @@ class BaseImpalaService(object):
         LOG.error(e)
 
       if value == expected_value:
-        LOG.info("Metric '%s' has reach desired value: %s" % (metric_name, value))
+        LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, value))
         return value
       else:
         LOG.info("Waiting for metric value '%s'=%s. Current value: %s" %

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/custom_cluster/test_catalog_wait.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
new file mode 100644
index 0000000..307e8a9
--- /dev/null
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from time import sleep
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestCatalogWait(CustomClusterTestSuite):
+  """Impalad coordinators must wait for their local replica of the catalog to be
+     initialized from the statestore prior to opening up client ports.
+     This test simulates a failed or slow catalog on impalad startup."""
+
+  def expect_connection(self, impalad):
+    impalad.service.create_beeswax_client()
+    impalad.service.create_hs2_client()
+
+  def expect_no_connection(self, impalad):
+    with pytest.raises(Exception) as e:
+      impalad.service.create_beeswax_client()
+      assert 'Could not connect to' in str(e.value)
+
+    with pytest.raises(Exception) as e:
+      impalad.service.create_hs2_client()
+      assert 'Could not connect to' in str(e.value)
+
+  @pytest.mark.execute_serially
+  def test_delayed_catalog(self):
+    """ Tests client interactions with the cluster when one of the daemons,
+        impalad[2], is delayed in initializing its local catalog replica."""
+
+    # On startup, expect only two executors to be registered.
+    self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
+                               expected_num_executors=2)
+
+    # Expect that impalad[2] is not ready.
+    self.cluster.impalads[2].service.wait_for_metric_value('impala-server.ready', 0);
+
+    # Expect that impalad[0,1] are both ready and with initialized catalog.
+    self.cluster.impalads[0].service.wait_for_metric_value('impala-server.ready', 1);
+    self.cluster.impalads[0].service.wait_for_metric_value('catalog.ready', 1);
+    self.cluster.impalads[1].service.wait_for_metric_value('impala-server.ready', 1);
+    self.cluster.impalads[1].service.wait_for_metric_value('catalog.ready', 1);
+
+    # Expect that connections can be made to impalads[0,1], but not to impalads[2].
+    self.expect_connection(self.cluster.impalads[0])
+    self.expect_connection(self.cluster.impalads[1])
+    self.expect_no_connection(self.cluster.impalads[2])
+
+    # Issues a query to check that impalad[2] does not evaluate any fragments
+    # and does not prematurely register itself as an executor. The former is
+    # verified via query fragment metrics and the latter would fail if registered
+    # but unable to process fragments.
+    client0 = self.cluster.impalads[0].service.create_beeswax_client()
+    client1 = self.cluster.impalads[1].service.create_beeswax_client()
+
+    self.execute_query_expect_success(client0, "select * from functional.alltypes");
+    self.execute_query_expect_success(client1, "select * from functional.alltypes");
+
+    # Check that fragments were run on impalad[0,1] and none on impalad[2].
+    # Each ready impalad runs a fragment per query and one coordinator fragment. With
+    # two queries, one coordinated per ready impalad, that should be 3 total fragments.
+    self.cluster.impalads[0].service.wait_for_metric_value('impala-server.num-fragments', 3);
+    self.cluster.impalads[1].service.wait_for_metric_value('impala-server.num-fragments', 3);
+    self.cluster.impalads[2].service.wait_for_metric_value('impala-server.num-fragments', 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a2b7a64/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 4d0b814..eb5a125 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -93,6 +93,7 @@ class TestCoordinators(CustomClusterTestSuite):
       """Connects to the coordinator node, runs a query and verifies that certain
         operators are executed on 'expected_num_of_executors' nodes."""
       coordinator = self.cluster.impalads[0]
+      client = None
       try:
         client = coordinator.service.create_beeswax_client()
         assert client is not None
@@ -106,6 +107,7 @@ class TestCoordinators(CustomClusterTestSuite):
           elif rows['operator'] == '01:AGGREGATE':
             assert rows['num_hosts'] == expected_num_of_executors
       finally:
+        assert client is not None
         client.close()
 
     # Cluster config where the coordinator can execute query fragments


[2/6] incubator-impala git commit: IMPALA-6170: Remove broken backend test from llvm-codegen-test

Posted by jr...@apache.org.
IMPALA-6170: Remove broken backend test from llvm-codegen-test

Remove backend test that expects hdfs service to be running, which is
not an expectation backend test should have. This caused test runs to
fail that use the local filesystem as their default filesystem.

Also the code path exercised by that test is already covered in the end
to end tests, like in test_udfs.py for cases where the functions used in
the query live in the same lib file.

Change-Id: Iaed0109f5163343427015d571d6d24233b9d3fdc
Reviewed-on: http://gerrit.cloudera.org:8080/8505
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5ab07f03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5ab07f03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5ab07f03

Branch: refs/heads/master
Commit: 5ab07f032941537755a3d8bc8e58caa6c8ac8233
Parents: 3a1073c
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Nov 8 13:59:46 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 10 01:21:45 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen-test.cc | 11 -----------
 1 file changed, 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5ab07f03/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index e42b041..68a5ff8 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -479,17 +479,6 @@ TEST_F(LlvmCodeGenTest, HandleLinkageError) {
   codegen->Close();
 }
 
-// Test that Impala does not return error when trying to link the same lib file twice.
-TEST_F(LlvmCodeGenTest, LinkageTest) {
-  scoped_ptr<LlvmCodeGen> codegen;
-  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, nullptr, "test", &codegen));
-  EXPECT_TRUE(codegen.get() != nullptr);
-  string hdfs_file_path("/test-warehouse/test-udfs.ll");
-  ASSERT_OK(LinkModuleFromHdfs(codegen.get(), hdfs_file_path));
-  ASSERT_OK(LinkModuleFromHdfs(codegen.get(), hdfs_file_path));
-  codegen->Close();
-}
-
 }
 
 int main(int argc, char **argv) {


[4/6] incubator-impala git commit: IMPALA-6164: Fix stale query profile in TestAlwaysFalseFilter

Posted by jr...@apache.org.
IMPALA-6164: Fix stale query profile in TestAlwaysFalseFilter

TestAlwaysFalseFilter gets the query profile without fetching all the
rows, resulting in a stale query profile and failing the test. With
this patch all the rows are fetched before getting the query profile.
This is enough to get the final profile because the query profile
finalization is performed in Coordinator::GetNext after we hit eos.
A bug in Base64Decode related to query profile decoding is also fixed.
Currently Base64Decode may produce incorrect output length if the output
parameter is not initialized with 0.

Testing: TestAlwaysFalseFilter is run and passes 1000 times. It doesn't
pass 1000 times consecutively without this patch.

Change-Id: I04bb76d20541fa035d88167b593d1b8bc3873e89
Reviewed-on: http://gerrit.cloudera.org:8080/8498
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fdf94a40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fdf94a40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fdf94a40

Branch: refs/heads/master
Commit: fdf94a400341c175943272d823b73d4b23f3919d
Parents: 2212a88
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Mon Nov 6 16:36:50 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 10 04:27:59 2017 +0000

----------------------------------------------------------------------
 be/src/util/coding-util.cc                       | 4 +++-
 tests/custom_cluster/test_always_false_filter.py | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fdf94a40/be/src/util/coding-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/coding-util.cc b/be/src/util/coding-util.cc
index 78b9ebd..6a8ea2e 100644
--- a/be/src/util/coding-util.cc
+++ b/be/src/util/coding-util.cc
@@ -195,9 +195,11 @@ bool Base64DecodeBufLen(const char* in, int64_t in_len, int64_t* out_max) {
 
 bool Base64Decode(const char* in, int64_t in_len, int64_t out_max, char* out,
     int64_t* out_len) {
+  uint32_t out_len_u32 = 0;
   if (UNLIKELY((in_len & 3) != 0)) return false;
   const int decode_result = sasl_decode64(in, static_cast<unsigned>(in_len), out,
-      static_cast<unsigned>(out_max), reinterpret_cast<unsigned*>(out_len));
+      static_cast<unsigned>(out_max), &out_len_u32);
+  *out_len = out_len_u32;
   if (UNLIKELY(decode_result != SASL_OK || *out_len != out_max - 1)) return false;
   return true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fdf94a40/tests/custom_cluster/test_always_false_filter.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_always_false_filter.py b/tests/custom_cluster/test_always_false_filter.py
index 461c128..e02c64a 100644
--- a/tests/custom_cluster/test_always_false_filter.py
+++ b/tests/custom_cluster/test_always_false_filter.py
@@ -42,13 +42,16 @@ class TestAlwaysFalseFilter(CustomClusterTestSuite):
     for table_suffix in ['_avro', '_rc', '_seq']:
       cursor.execute("use functional" + table_suffix)
       cursor.execute(query)
+      # Fetch all rows to finalize the query profile.
+      cursor.fetchall()
       profile = cursor.get_profile()
       assert re.search("Files rejected: [^0] \([^0]\)", profile) is None
       assert re.search("Splits rejected: [^0] \([^0]\)", profile) is None
     for table_suffix in ['', '_parquet']:
       cursor.execute("use functional" + table_suffix)
       cursor.execute(query)
+      # Fetch all rows to finalize the query profile.
+      cursor.fetchall()
       profile = cursor.get_profile()
       assert re.search("Files rejected: [^0] \([^0]\)", profile) is None
       assert re.search("Splits rejected: 8 \(8\)", profile) is not None
-


[5/6] incubator-impala git commit: [DOCS] Remove unnecessary 'incubator' from URLs

Posted by jr...@apache.org.
[DOCS] Remove unnecessary 'incubator' from URLs

URLs like https://impala.incubator.apache.org/ are aliases with
https://impala.apache.org/, so we can use the latter and avoid making
any changes if or when Impala graduates from the incubator.

Change-Id: If0f8d51b147094e629e60c4a8c5aecbb6cdb6a8e
Reviewed-on: http://gerrit.cloudera.org:8080/8524
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/11bbc26d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/11bbc26d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/11bbc26d

Branch: refs/heads/master
Commit: 11bbc26d09d0fddb2f680a1ef279f528f82912a5
Parents: fdf94a4
Author: Jim Apple <jb...@apache.org>
Authored: Sat Nov 11 11:32:32 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Nov 13 18:59:48 2017 +0000

----------------------------------------------------------------------
 docs/impala_keydefs.ditamap                | 8 ++++----
 docs/topics/impala_langref_unsupported.xml | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/11bbc26d/docs/impala_keydefs.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap
index 86112e9..24b621f 100644
--- a/docs/impala_keydefs.ditamap
+++ b/docs/impala_keydefs.ditamap
@@ -116,7 +116,7 @@ under the License.
 
   <keydef href="https://issues.apache.org/jira/browse/HIVE-656" scope="external" format="html" keys="HIVE-656"/>
 
-  <keydef href="http://sentry.incubator.apache.org/" scope="external" format="html" keys="sentry"/>
+  <keydef href="http://sentry.apache.org/" scope="external" format="html" keys="sentry"/>
 
   <!-- Download page that leads to ODBC drivers. -->
   <keydef href="https://www.cloudera.com/downloads/connectors/impala/odbc.html" scope="external" format="html" keys="odbc_driver_download"/>
@@ -10586,9 +10586,9 @@ under the License.
   <keydef keys="impala13_full"><topicmeta><keywords><keyword>Impala 1.3</keyword></keywords></topicmeta></keydef>
 
 <!-- Pointers to changelog pages -->
-  <keydef keys="changelog_210" href="https://impala.incubator.apache.org/docs/changelog-2.10.html" scope="external" format="html"/>
-  <keydef keys="changelog_29" href="https://impala.incubator.apache.org/docs/changelog-2.9.html" scope="external" format="html"/>
-  <keydef keys="changelog_28" href="https://impala.incubator.apache.org/docs/changelog-2.8.html" scope="external" format="html"/>
+  <keydef keys="changelog_210" href="https://impala.apache.org/docs/changelog-2.10.html" scope="external" format="html"/>
+  <keydef keys="changelog_29" href="https://impala.apache.org/docs/changelog-2.9.html" scope="external" format="html"/>
+  <keydef keys="changelog_28" href="https://impala.apache.org/docs/changelog-2.8.html" scope="external" format="html"/>
 
 <!-- Indirect addresses for all the impala*.xml files. The key is the ID of the first topic within each file. -->
   <keydef href="topics/impala_concepts.xml" keys="concepts"/>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/11bbc26d/docs/topics/impala_langref_unsupported.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_langref_unsupported.xml b/docs/topics/impala_langref_unsupported.xml
index b04fd13..e1bcc68 100644
--- a/docs/topics/impala_langref_unsupported.xml
+++ b/docs/topics/impala_langref_unsupported.xml
@@ -204,7 +204,7 @@ under the License.
       </p>
 
       <p>
-        Impala utilizes the <xref href="http://sentry.incubator.apache.org/" scope="external" format="html">Apache
+        Impala utilizes the <xref keyref="sentry">Apache
         Sentry </xref> authorization framework, which provides fine-grained role-based access control
         to protect data against unauthorized access or tampering.
       </p>


[3/6] incubator-impala git commit: IMPALA-6148: Specifying thirdparty deps as URLs

Posted by jr...@apache.org.
IMPALA-6148: Specifying thirdparty deps as URLs

If the environment variable $IMPALA_<NAME>_URL is configured in
impala-config-branch.sh or impala-config-local, for a thirdparty
dependency, use that to download it instead of the s3://native-toolchain
bucket. This makes testing against arbitrary versions of the
dependencies easier.

I did a little bit of refactoring while here, creating a small class for
a Package to handle reading the environment variables. I also changed
bootstrap_toolchain.py to use Python logging, which cleans up the output
during the multi-threaded downloading.

I tested this by both with customized URLs and by running the regular
build (pre-review-test, without most of the slow test suites).

Change-Id: I4628d86022d4bd8b762313f7056d76416a58b422
Reviewed-on: http://gerrit.cloudera.org:8080/8456
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2212a889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2212a889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2212a889

Branch: refs/heads/master
Commit: 2212a8897e66e1a5c548132d09a8e33e275a0e3b
Parents: 5ab07f0
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Thu Nov 2 15:43:10 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 10 02:42:16 2017 +0000

----------------------------------------------------------------------
 bin/bootstrap_toolchain.py | 150 +++++++++++++++++++++++++---------------
 bin/impala-config.sh       |  45 ++++++++++++
 2 files changed, 138 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2212a889/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index bea3a99..8494c6c 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -25,9 +25,16 @@
 # the CDH components (i.e. Hadoop, Hive, HBase and Sentry) into
 # CDH_COMPONENTS_HOME.
 #
+# By default, packages are downloaded from an S3 bucket named native-toolchain.
+# The exact URL is based on IMPALA_<PACKAGE>_VERSION environment variables
+# (configured in impala-config.sh) as well as the OS version being built on.
+# The URL can be overridden with an IMPALA_<PACKAGE>_URL environment variable
+# set in impala-config-{local,branch}.sh.
+#
 # The script is called as follows without any additional parameters:
 #
 #     python bootstrap_toolchain.py
+import logging
 import os
 import random
 import re
@@ -57,6 +64,28 @@ OS_MAPPING = {
   "ubuntu16.04" : "ec2-package-ubuntu-16-04",
 }
 
+class Package(object):
+  """
+  Represents a package to be downloaded. A version, if not specified
+  explicitly, is retrieved from the environment variable IMPALA_<NAME>_VERSION.
+  URLs are retrieved from IMPALA_<NAME>_URL, but are optional.
+  """
+  def __init__(self, name, version=None, url=None):
+    self.name = name
+    self.version = version
+    self.url = url
+    package_env_name = name.replace("-", "_").upper()
+    if self.version is None:
+      version_env_var = "IMPALA_{0}_VERSION".format(package_env_name)
+
+      self.version = os.environ.get(version_env_var)
+      if not self.version:
+        raise Exception("Could not find version for {0} in environment var {1}".format(
+          name, version_env_var))
+    if self.url is None:
+      url_env_var = "IMPALA_{0}_URL".format(package_env_name)
+      self.url = os.environ.get(url_env_var)
+
 def try_get_platform_release_label():
   """Gets the right package label from the OS version. Return None if not found."""
   try:
@@ -64,24 +93,35 @@ def try_get_platform_release_label():
   except:
     return None
 
+# Cache "lsb_release -irs" to avoid excessive logging from sh, and
+# to shave a little bit of time.
+lsb_release_cache = None
+
 def get_platform_release_label(release=None):
   """Gets the right package label from the OS version. Raise exception if not found.
      'release' can be provided to override the underlying OS version.
   """
+  global lsb_release_cache
   if not release:
-    release = "".join(map(lambda x: x.lower(), sh.lsb_release("-irs").split()))
+    if lsb_release_cache:
+      release = lsb_release_cache
+    else:
+      release = "".join(map(lambda x: x.lower(), sh.lsb_release("-irs").split()))
+      lsb_release_cache = release
   for k, v in OS_MAPPING.iteritems():
     if re.search(k, release):
       return v
 
   raise Exception("Could not find package label for OS version: {0}.".format(release))
 
-
 def wget_and_unpack_package(download_path, file_name, destination, wget_no_clobber):
-  print "URL {0}".format(download_path)
+  if not download_path.endswith("/" + file_name):
+    raise Exception("URL {0} does not match with expected file_name {1}"
+        .format(download_path, file_name))
   NUM_ATTEMPTS = 3
   for attempt in range(1, NUM_ATTEMPTS + 1):
-    print "Downloading {0} to {1} (attempt {2})".format(file_name, destination, attempt)
+    logging.info("Downloading {0} to {1}/{2} (attempt {3})".format(
+      download_path, destination, file_name, attempt))
     # --no-clobber avoids downloading the file if a file with the name already exists
     try:
       sh.wget(download_path, directory_prefix=destination, no_clobber=wget_no_clobber)
@@ -89,24 +129,27 @@ def wget_and_unpack_package(download_path, file_name, destination, wget_no_clobb
     except Exception, e:
       if attempt == NUM_ATTEMPTS:
         raise
-      print "Download failed; retrying after sleep: " + str(e)
+      logging.error("Download failed; retrying after sleep: " + str(e))
       time.sleep(10 + random.random() * 5) # Sleep between 10 and 15 seconds.
-  print "Extracting {0}".format(file_name)
+  logging.info("Extracting {0}".format(file_name))
   sh.tar(z=True, x=True, f=os.path.join(destination, file_name), directory=destination)
   sh.rm(os.path.join(destination, file_name))
 
-def download_package(destination, product, version, compiler, platform_release=None):
-  remove_existing_package(destination, product, version)
+def download_package(destination, package, compiler, platform_release=None):
+  remove_existing_package(destination, package.name, package.version)
 
   toolchain_build_id = os.environ["IMPALA_TOOLCHAIN_BUILD_ID"]
   label = get_platform_release_label(release=platform_release)
-  format_params = {'product': product, 'version': version, 'compiler': compiler,
-      'label': label, 'toolchain_build_id': toolchain_build_id}
+  format_params = {'product': package.name, 'version': package.version,
+      'compiler': compiler, 'label': label, 'toolchain_build_id': toolchain_build_id}
   file_name = "{product}-{version}-{compiler}-{label}.tar.gz".format(**format_params)
   format_params['file_name'] = file_name
-  url_path = "/{toolchain_build_id}/{product}/{version}-{compiler}/{file_name}".format(
-      **format_params)
-  download_path = HOST + url_path
+  if package.url is None:
+    url_path = "/{toolchain_build_id}/{product}/{version}-{compiler}/{file_name}".format(
+        **format_params)
+    download_path = HOST + url_path
+  else:
+    download_path = package.url
 
   wget_and_unpack_package(download_path, file_name, destination, True)
 
@@ -122,14 +165,13 @@ def bootstrap(toolchain_root, packages):
   compiler = "gcc-{0}".format(os.environ["IMPALA_GCC_VERSION"])
 
   def handle_package(p):
-    pkg_name, pkg_version = unpack_name_and_version(p)
-    if check_for_existing_package(toolchain_root, pkg_name, pkg_version, compiler):
+    if check_for_existing_package(toolchain_root, p.name, p.version, compiler):
       return
-    if pkg_name != "kudu" or os.environ["KUDU_IS_SUPPORTED"] == "true":
-      download_package(toolchain_root, pkg_name, pkg_version, compiler)
+    if p.name != "kudu" or os.environ["KUDU_IS_SUPPORTED"] == "true":
+      download_package(toolchain_root, p, compiler)
     else:
-      build_kudu_stub(toolchain_root, pkg_version, compiler)
-    write_version_file(toolchain_root, pkg_name, pkg_version, compiler,
+      build_kudu_stub(toolchain_root, p.version, compiler)
+    write_version_file(toolchain_root, p.name, p.version, compiler,
         get_platform_release_label())
   execute_many(handle_package, packages)
 
@@ -156,18 +198,18 @@ def version_file_path(toolchain_root, pkg_name, pkg_version):
 def check_custom_toolchain(toolchain_root, packages):
   missing = []
   for p in packages:
-    pkg_name, pkg_version = unpack_name_and_version(p)
-    pkg_dir = package_directory(toolchain_root, pkg_name, pkg_version)
+    pkg_dir = package_directory(toolchain_root, p.name, p.version)
     if not os.path.isdir(pkg_dir):
       missing.append((p, pkg_dir))
 
   if missing:
-    print("The following packages are not in their expected locations.")
+    msg = "The following packages are not in their expected locations.\n"
     for p, pkg_dir in missing:
-      print("  %s (expected directory %s to exist)" % (p, pkg_dir))
-    print("Pre-built toolchain archives not available for your platform.")
-    print("Clone and build native toolchain from source using this repository:")
-    print("    https://github.com/cloudera/native-toolchain")
+      msg += "  %s (expected directory %s to exist)\n" % (p, pkg_dir)
+    msg += "Pre-built toolchain archives not available for your platform.\n"
+    msg += "Clone and build native toolchain from source using this repository:\n"
+    msg += "    https://github.com/cloudera/native-toolchain\n"
+    logging.error(msg)
     raise Exception("Toolchain bootstrap failed: required packages were missing")
 
 def check_for_existing_package(toolchain_root, pkg_name, pkg_version, compiler):
@@ -190,27 +232,13 @@ def write_version_file(toolchain_root, pkg_name, pkg_version, compiler, label):
 def remove_existing_package(toolchain_root, pkg_name, pkg_version):
   dir_path = package_directory(toolchain_root, pkg_name, pkg_version)
   if os.path.exists(dir_path):
-    print "Removing existing package directory {0}".format(dir_path)
+    logging.info("Removing existing package directory {0}".format(dir_path))
     shutil.rmtree(dir_path)
 
-def unpack_name_and_version(package):
-  """A package definition is either a string where the version is fetched from the
-  environment or a tuple where the package name and the package version are fully
-  specified.
-  """
-  if isinstance(package, basestring):
-    env_var = "IMPALA_{0}_VERSION".format(package).replace("-", "_").upper()
-    try:
-      return package, os.environ[env_var]
-    except KeyError:
-      raise Exception("Could not find version for {0} in environment var {1}".format(
-        package, env_var))
-  return package[0], package[1]
-
 def build_kudu_stub(toolchain_root, kudu_version, compiler):
   # When Kudu isn't supported, the CentOS 7 package will be downloaded and the client
   # lib will be replaced with a stubbed client.
-  download_package(toolchain_root, "kudu", kudu_version, compiler,
+  download_package(toolchain_root, Package("kudu", kudu_version), compiler,
       platform_release="centos7")
 
   # Find the client lib files in the extracted dir. There may be several files with
@@ -330,11 +358,11 @@ def execute_many(f, args):
 
 def download_cdh_components(toolchain_root, cdh_components):
   """Downloads and unpacks the CDH components into $CDH_COMPONENTS_HOME if not found."""
-  cdh_components_home = os.getenv("CDH_COMPONENTS_HOME")
+  cdh_components_home = os.environ.get("CDH_COMPONENTS_HOME")
   if not cdh_components_home:
-    print("Impala environment not set up correctly, make sure "
+    logging.error("Impala environment not set up correctly, make sure "
           "$CDH_COMPONENTS_HOME is present.")
-    return
+    sys.exit(1)
 
   # Create the directory where CDH components live if necessary.
   if not os.path.exists(cdh_components_home):
@@ -343,16 +371,18 @@ def download_cdh_components(toolchain_root, cdh_components):
   # The URL prefix of where CDH components live in S3.
   download_path_prefix = HOST + "/cdh_components/"
 
-
   def download(component):
-    pkg_name, pkg_version = unpack_name_and_version(component)
-    pkg_directory = package_directory(cdh_components_home, pkg_name, pkg_version)
+    pkg_directory = package_directory(cdh_components_home, component.name,
+        component.version)
     if os.path.isdir(pkg_directory):
       return
 
     # Download the package if it doesn't exist
-    file_name = "{0}-{1}.tar.gz".format(pkg_name, pkg_version)
-    download_path = download_path_prefix + file_name
+    file_name = "{0}-{1}.tar.gz".format(component.name, component.version)
+    if component.url is None:
+      download_path = download_path_prefix + file_name
+    else:
+      download_path = component.url
     wget_and_unpack_package(download_path, file_name, cdh_components_home, False)
 
   execute_many(download, cdh_components)
@@ -366,15 +396,20 @@ if __name__ == "__main__":
   the CDH components (i.e. hadoop, hbase, hive, llama, llama-minikidc and sentry) into the
   directory specified by $CDH_COMPONENTS_HOME.
   """
-  if not os.getenv("IMPALA_HOME"):
-    print("Impala environment not set up correctly, make sure "
+  logging.basicConfig(level=logging.INFO,
+      format='%(asctime)s %(threadName)s %(levelname)s: %(message)s')
+  # 'sh' module logs at every execution, which is too noisy
+  logging.getLogger("sh").setLevel(logging.WARNING)
+
+  if not os.environ.get("IMPALA_HOME"):
+    logging.error("Impala environment not set up correctly, make sure "
           "impala-config.sh is sourced.")
     sys.exit(1)
 
   # Create the destination directory if necessary
-  toolchain_root = os.getenv("IMPALA_TOOLCHAIN")
+  toolchain_root = os.environ.get("IMPALA_TOOLCHAIN")
   if not toolchain_root:
-    print("Impala environment not set up correctly, make sure "
+    logging.error("Impala environment not set up correctly, make sure "
           "$IMPALA_TOOLCHAIN is present.")
     sys.exit(1)
 
@@ -383,14 +418,15 @@ if __name__ == "__main__":
 
   # LLVM and Kudu are the largest packages. Sort them first so that
   # their download starts as soon as possible.
-  packages = ["llvm", ("llvm", "3.9.1-asserts"), "kudu",
+  packages = map(Package, ["llvm", "kudu",
       "avro", "binutils", "boost", "breakpad", "bzip2", "cmake", "crcutil",
       "flatbuffers", "gcc", "gflags", "glog", "gperftools", "gtest", "libev",
       "lz4", "openldap", "openssl", "protobuf",
-      "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"]
+      "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"])
+  packages.insert(0, Package("llvm", "3.9.1-asserts"))
   bootstrap(toolchain_root, packages)
 
   # Download the CDH components if necessary.
   if os.getenv("DOWNLOAD_CDH_COMPONENTS", "false") == "true":
-    cdh_components = ["hadoop", "hbase", "hive", "llama-minikdc", "sentry"]
+    cdh_components = map(Package, ["hadoop", "hbase", "hive", "llama-minikdc", "sentry"])
     download_cdh_components(toolchain_root, cdh_components)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2212a889/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index eb04be0..0fd976b 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -76,51 +76,90 @@ export IMPALA_TOOLCHAIN_BUILD_ID=474-6c406b4a88
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
+unset IMPALA_AVRO_URL
 export IMPALA_BINUTILS_VERSION=2.26.1
+unset IMPALA_BINUTILS_URL
 export IMPALA_BOOST_VERSION=1.57.0-p3
+unset IMPALA_BOOST_URL
 export IMPALA_BREAKPAD_VERSION=1b704857f1e78a864e6942e613457e55f1aecb60-p3
+unset IMPALA_BREAKPAD_URL
 export IMPALA_BZIP2_VERSION=1.0.6-p2
+unset IMPALA_BZIP2_URL
 export IMPALA_CMAKE_VERSION=3.8.2-p1
+unset IMPALA_CMAKE_URL
 export IMPALA_CRCUTIL_VERSION=440ba7babeff77ffad992df3a10c767f184e946e-p1
+unset IMPALA_CRCUTIL_URL
 export IMPALA_CYRUS_SASL_VERSION=2.1.23
+unset IMPALA_CYRUS_SASL_URL
 export IMPALA_FLATBUFFERS_VERSION=1.6.0
+unset IMPALA_FLATBUFFERS_URL
 export IMPALA_GCC_VERSION=4.9.2
+unset IMPALA_GCC_URL
 export IMPALA_GFLAGS_VERSION=2.2.0-p1
+unset IMPALA_GFLAGS_URL
 export IMPALA_GLOG_VERSION=0.3.4-p2
+unset IMPALA_GLOG_URL
 export IMPALA_GPERFTOOLS_VERSION=2.5
+unset IMPALA_GPERFTOOLS_URL
 export IMPALA_GTEST_VERSION=1.6.0
+unset IMPALA_GTEST_URL
 export IMPALA_LIBEV_VERSION=4.20
+unset IMPALA_LIBEV_URL
 export IMPALA_LLVM_VERSION=3.9.1
+unset IMPALA_LLVM_URL
 export IMPALA_LLVM_ASAN_VERSION=3.9.1
+unset IMPALA_LLVM_ASAN_URL
+
 # Debug builds should use the release+asserts build to get additional coverage.
 # Don't use the LLVM debug build because the binaries are too large to distribute.
 export IMPALA_LLVM_DEBUG_VERSION=3.9.1-asserts
+unset IMPALA_LLVM_DEBUG_URL
 export IMPALA_LZ4_VERSION=1.7.5
+unset IMPALA_LZ4_URL
 export IMPALA_OPENLDAP_VERSION=2.4.25
+unset IMPALA_OPENLDAP_URL
 export IMPALA_OPENSSL_VERSION=1.0.2l
+unset IMPALA_OPENSSL_URL
 export IMPALA_PROTOBUF_VERSION=2.6.1
+unset IMPALA_PROTOBUF_URL
 export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801
+unset IMPALA_POSTGRES_JDBC_DRIVER_URL
 export IMPALA_RAPIDJSON_VERSION=0.11
+unset IMPALA_RAPIDJSON_URL
 export IMPALA_RE2_VERSION=20130115-p1
+unset IMPALA_RE2_URL
 export IMPALA_SNAPPY_VERSION=1.1.4
+unset IMPALA_SNAPPY_URL
 export IMPALA_SQUEASEL_VERSION=3.3
+unset IMPALA_SQUEASEL_URL
 # TPC utilities used for test/benchmark data generation.
 export IMPALA_TPC_DS_VERSION=2.1.0
+unset IMPALA_TPC_DS_URL
 export IMPALA_TPC_H_VERSION=2.17.0
+unset IMPALA_TPC_H_URL
 export IMPALA_THRIFT_VERSION=0.9.0-p11
+unset IMPALA_THRIFT_URL
 export IMPALA_THRIFT_JAVA_VERSION=0.9.0
+unset IMPALA_THRIFT_JAVA_URL
 export IMPALA_ZLIB_VERSION=1.2.8
+unset IMPALA_ZLIB_URL
 
 if [[ $OSTYPE == "darwin"* ]]; then
   IMPALA_CYRUS_SASL_VERSION=2.1.26
+  unset IMPALA_CYRUS_SASL_URL
   IMPALA_GPERFTOOLS_VERSION=2.3
+  unset IMPALA_GPERFTOOLS_URL
   IMPALA_OPENSSL_VERSION=1.0.1p
+  unset IMPALA_OPENSSL_URL
   IMPALA_THRIFT_VERSION=0.9.2
+  unset IMPALA_THRIFT_URL
   IMPALA_THRIFT_JAVA_VERSION=0.9.2
+  unset IMPALA_THRIFT_JAVA_URL
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
 export IMPALA_KUDU_VERSION=bec2a24
+unset IMPALA_KUDU_URL
 
 # Kudu version used to identify Java client jar from maven
 export KUDU_JAVA_VERSION=1.6.0-cdh5.14.0-SNAPSHOT
@@ -129,11 +168,17 @@ export KUDU_JAVA_VERSION=1.6.0-cdh5.14.0-SNAPSHOT
 # ------------------------------------------
 export CDH_MAJOR_VERSION=5
 export IMPALA_HADOOP_VERSION=2.6.0-cdh5.14.0-SNAPSHOT
+unset IMPALA_HADOOP_URL
 export IMPALA_HBASE_VERSION=1.2.0-cdh5.14.0-SNAPSHOT
+unset IMPALA_HBASE_URL
 export IMPALA_HIVE_VERSION=1.1.0-cdh5.14.0-SNAPSHOT
+unset IMPALA_HIVE_URL
 export IMPALA_SENTRY_VERSION=1.5.1-cdh5.14.0-SNAPSHOT
+unset IMPALA_SENTRY_URL
 export IMPALA_PARQUET_VERSION=1.5.0-cdh5.14.0-SNAPSHOT
+unset IMPALA_PARQUET_URL
 export IMPALA_LLAMA_MINIKDC_VERSION=1.0.0
+unset IMPALA_LLAMA_MINIKDC_URL
 
 # Source the branch and local config override files here to override any
 # variables above or any variables below that allow overriding via environment