You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/10/23 00:08:58 UTC

[impala] branch master updated: IMPALA-10811 RPC to submit query getting stuck for AWS NLB forever

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 975883c  IMPALA-10811 RPC to submit query getting stuck for AWS NLB forever
975883c is described below

commit 975883c47035843398ee99a21fa132f67a0d4954
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Fri Oct 8 11:29:21 2021 -0400

    IMPALA-10811 RPC to submit query getting stuck for AWS NLB forever
    
    This patch addresses Impala client hang due to AWS network load balancer
    timeout which is fixed at 350s. When some long DDL operations are
    executing and the timeout happens, AWS silently drops the connection and
    the Impala client enters the hang state.
    
    The fix maintains the current TCLIService protocol between the client
    and Impala server and is applicable to the following Impala clients
    which issue thrift RPC ExecuteStatement() followed by repeated call to
    GetOperationStatus() (HS2, Impyla and HUE) or a variant of it (Beeswax)
    to Impala backend.
    
      1. HS2
      2. Beeswax
      3. Impyla
      4. HUE
    
    In the fix, the backend method ClientRequestState::ExecDdlRequest()
    can start a new thread in 'async_exec_thread_' for ExecDdlRequestImpl()
    which executes most of the DDLs asynchronously. This thread is waited
    for in the wait thread 'wait_thread_'. Since the wait thread also runs
    asynchronously, the execution of the DDLs will not cause a wait on the
    Impala client. Thus the Impala client can keep checking its execution
    status via GetOperationStatus() without long waiting, say more than
    350s.
    
    As an optimization, the above asynchronous mode is not applied to the
    execution of certain DDLs that run very low risks of long execution.
    
      1. Operations that do not access catalog service;
      2. COMPUTE STATS as the stats computation queries already run
         asynchronously.
    
    External behavior change:
      1. A new field with name "DDL execution mode:" is added to the
         summary section in the runtime profile, next to "DDL Type". This
         field takes either 'asynchronous' or 'synchronous' as value.
      2. A new query option 'enable_async_ddl_execution', default to true,
         is added. It can be set to false to turn off the patch.
    
    Limitations:
      This patch does not handle potential AWS NLB-type time out for LOAD
      DATA (IMPALA-10967).
    
    Testing:
      1. Added new async. DDL unit tests with HS2, HS2-HTTP, Beeswax and
         JDBC clients.
      2. Ran core tests successfully.
    
    Change-Id: Ib57e86926a233ef13d27a9ec8d9c36d33a88a44e
    Reviewed-on: http://gerrit.cloudera.org:8080/17872
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             | 112 ++++++++--
 be/src/service/client-request-state.h              |  28 ++-
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../queries/QueryTest/async_ddl.test               |  16 ++
 tests/common/impala_test_suite.py                  |  16 ++
 tests/metadata/test_ddl.py                         | 226 +++++++++++++++++++++
 9 files changed, 386 insertions(+), 26 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 656c417..8edf8fa 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -239,7 +239,8 @@ Status ClientRequestState::Exec() {
     case TStmtType::QUERY:
     case TStmtType::DML:
       DCHECK(exec_request_->__isset.query_exec_request);
-      RETURN_IF_ERROR(ExecAsyncQueryOrDmlRequest(exec_request_->query_exec_request));
+      RETURN_IF_ERROR(
+          ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/));
       break;
     case TStmtType::EXPLAIN: {
       request_result_set_.reset(new vector<TResultRow>(
@@ -512,8 +513,8 @@ Status ClientRequestState::ExecLocalCatalogOp(
   }
 }
 
-Status ClientRequestState::ExecAsyncQueryOrDmlRequest(
-    const TQueryExecRequest& query_exec_request) {
+Status ClientRequestState::ExecQueryOrDmlRequest(
+    const TQueryExecRequest& query_exec_request, bool isAsync) {
   // we always need at least one plan fragment
   DCHECK(query_exec_request.plan_exec_info.size() > 0);
 
@@ -574,11 +575,18 @@ Status ClientRequestState::ExecAsyncQueryOrDmlRequest(
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
   }
-  // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
-  // the query should be in the PENDING state before the Exec RPC returns.
-  UpdateNonErrorExecState(ExecState::PENDING);
-  RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
-      &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true));
+  if (isAsync) {
+    // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
+    // the query should be in the PENDING state before the Exec RPC returns.
+    UpdateNonErrorExecState(ExecState::PENDING);
+    RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
+        &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_,
+        true));
+  } else {
+    // Update query_status_ as necessary.
+    FinishExecQueryOrDmlRequest();
+    return query_status_;
+  }
   return Status::OK();
 }
 
@@ -640,10 +648,7 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
   UpdateNonErrorExecState(ExecState::RUNNING);
 }
 
-Status ClientRequestState::ExecDdlRequest() {
-  string op_type = catalog_op_type() == TCatalogOpType::DDL ?
-      PrintThriftEnum(ddl_type()) : PrintThriftEnum(catalog_op_type());
-  summary_profile_->AddInfoString("DDL Type", op_type);
+Status ClientRequestState::ExecDdlRequestImplSync() {
 
   if (catalog_op_type() != TCatalogOpType::DDL &&
       catalog_op_type() != TCatalogOpType::RESET_METADATA) {
@@ -683,12 +688,34 @@ Status ClientRequestState::ExecDdlRequest() {
     return Status::OK();
   }
 
+  DCHECK(false) << "Not handled sync exec ddl request.";
+  return Status::OK();
+}
+
+void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
+  bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL
+      && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
+
   catalog_op_executor_.reset(
       new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
+
+  // Indirectly check if running in thread async_exec_thread_.
+  if (exec_in_worker_thread) {
+    DCHECK(exec_state() == ExecState::PENDING);
+
+    // 1. For any non-CTAS DDLs, transition to RUNNING
+    // 2. For CTAS DDLs, transition to RUNNING during FinishExecQueryOrDmlRequest()
+    //    called by ExecQueryOrDmlRequest().
+    if (!is_CTAS) UpdateNonErrorExecState(ExecState::RUNNING);
+  }
+
+  // Optionally wait with a debug action before Exec() below.
+  DebugActionNoFail(exec_request_->query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
+
   Status status = catalog_op_executor_->Exec(exec_request_->catalog_op_request);
   {
     lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
   }
 
   // If this is a CTAS request, there will usually be more work to do
@@ -700,27 +727,70 @@ Status ClientRequestState::ExecDdlRequest() {
       !catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(exec_request_->catalog_op_request.
         ddl_params.create_table_params.if_not_exists);
-    return Status::OK();
+    return;
   }
 
   // Add newly created table to catalog cache.
-  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+  status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl));
+      exec_request_->query_options.sync_ddl);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
 
-  if (catalog_op_type() == TCatalogOpType::DDL &&
-      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
+  if (is_CTAS) {
     // At this point, the remainder of the CTAS request executes
     // like a normal DML request. As with other DML requests, it will
     // wait for another catalog update if any partitions were altered as a result
     // of the operation.
     DCHECK(exec_request_->__isset.query_exec_request);
-    RETURN_IF_ERROR(ExecAsyncQueryOrDmlRequest(exec_request_->query_exec_request));
+    RETURN_VOID_IF_ERROR(
+        ExecQueryOrDmlRequest(exec_request_->query_exec_request, !exec_in_worker_thread));
   }
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
-  return Status::OK();
+}
+
+bool ClientRequestState::ShouldRunExecDdlAsync() {
+  // Local catalog op DDL will run synchronously.
+  if (catalog_op_type() != TCatalogOpType::DDL
+      && catalog_op_type() != TCatalogOpType::RESET_METADATA) {
+    return false;
+  }
+
+  // The exec DDL part of compute stats will run synchronously.
+  if (ddl_type() == TDdlType::COMPUTE_STATS) return false;
+
+  return true;
+}
+
+Status ClientRequestState::ExecDdlRequest() {
+  string op_type = catalog_op_type() == TCatalogOpType::DDL ?
+      PrintThriftEnum(ddl_type()) : PrintThriftEnum(catalog_op_type());
+  bool async_ddl = ShouldRunExecDdlAsync();
+  bool async_ddl_enabled = exec_request_->query_options.enable_async_ddl_execution;
+  string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous";
+
+  summary_profile_->AddInfoString("DDL Type", op_type);
+  summary_profile_->AddInfoString("DDL execution mode", exec_mode);
+  VLOG_QUERY << "DDL exec mode=" << exec_mode;
+
+  if (!async_ddl) return ExecDdlRequestImplSync();
+
+  if (async_ddl_enabled) {
+    // Transition the exec state out of INITIALIZED to PENDING to make available the
+    // runtime profile for the DDL. Later on in ExecDdlRequestImpl(), the state
+    // further transitions to RUNNING.
+    UpdateNonErrorExecState(ExecState::PENDING);
+    return Thread::Create("impala-server", "async_exec_thread_",
+        &ClientRequestState::ExecDdlRequestImpl, this, true /*exec in a worker thread*/,
+        &async_exec_thread_);
+  } else {
+    ExecDdlRequestImpl(false /*exec in the same thread as the caller*/);
+    return query_status_;
+  }
 }
 
 Status ClientRequestState::ExecShutdownRequest() {
@@ -1014,7 +1084,7 @@ Status ClientRequestState::RestartFetch() {
 void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) {
   lock_guard<mutex> l(lock_);
   ExecState old_state = exec_state();
-  static string error_msg = "Illegal state transition: $0 -> $1, query_id=$3";
+  static string error_msg = "Illegal state transition: $0 -> $1, query_id=$2";
   switch (new_state) {
     case ExecState::PENDING:
       DCHECK(old_state == ExecState::INITIALIZED)
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index d7361d5..da6f52a 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -494,6 +494,9 @@ class ClientRequestState {
   /// execution in the following cases:
   /// 1. exec_request().stmt_type == QUERY or DML.
   /// 2. CTAS query for creating a table that does not exist.
+  /// 3. DDLs that are not the following:
+  ///    a. Local catalog operations requiring no catalog service access;
+  ///    b. Compute Stats.
   std::unique_ptr<Thread> async_exec_thread_;
 
   /// Set by the async-exec-thread after successful admission. Accessed through
@@ -672,11 +675,13 @@ class ClientRequestState {
   /// actively processed. Takes expiration_data_lock_.
   void MarkActive();
 
-  /// Sets up profile and pre-execution counters, creates the query schedule, and spawns
-  /// a thread that calls FinishExecQueryOrDmlRequest() which contains the core logic of
-  /// executing a QUERY or DML execution request.
+  /// Sets up profile and pre-execution counters, creates the query schedule, and calls
+  /// FinishExecQueryOrDmlRequest() which contains the core logic of executing a QUERY or
+  /// DML execution request. When 'async' is true, spawn a thread to run
+  /// FinishExecQueryOrDmlRequest(). Otherwise, the method runs in the same thread as the
+  /// caller.
   /// Non-blocking.
-  Status ExecAsyncQueryOrDmlRequest(const TQueryExecRequest& query_exec_request)
+  Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request, bool async)
       WARN_UNUSED_RESULT;
 
   /// Submits the exec request to the admission controller and on successful admission,
@@ -689,6 +694,21 @@ class ClientRequestState {
   /// queries (e.g., compute stats) or dml (e.g., create table as select)
   Status ExecDdlRequest() WARN_UNUSED_RESULT;
 
+  /// A helper function to execute certain ddl requests synchronously.
+  Status ExecDdlRequestImplSync() WARN_UNUSED_RESULT;
+
+  /// A helper function to execute certain ddl requests optionally
+  /// asynchronously. 'exec_in_worker_thread' indicates whether the execution of
+  /// DDL is done in a worker thread or not. This flag is set to true when the
+  /// execution of the DDL request runs in 'async_exec_thread_', and set to false
+  /// otherwise.
+  void ExecDdlRequestImpl(bool exec_in_worker_thread);
+
+  /// Decide whether to call ExecDdlRequestImpl() or
+  /// ExecDdlRequestImplSync() for ExecDdlRequest(). Return true to call
+  /// ExecDdlRequestImpl() and false to call ExecDdlRequestImplSync().
+  bool ShouldRunExecDdlAsync();
+
   /// Executes a shut down request.
   Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index db34c66..3f3b07c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1121,6 +1121,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS:
         query_options->__set_minmax_filter_partition_columns(IsTrue(value));
         break;
+      case TImpalaQueryOptions::ENABLE_ASYNC_DDL_EXECUTION: {
+        query_options->__set_enable_async_ddl_execution(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fb1b670..715b421 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ORC_READ_STATISTICS+ 1);\
+      TImpalaQueryOptions::ENABLE_ASYNC_DDL_EXECUTION+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -260,6 +260,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
       TQueryOptionLevel::ADVANCED)
   ;
 
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 4d560d3..2d079dd 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -696,6 +696,9 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to use ORC's search argument to push down predicates.
   ORC_READ_STATISTICS = 135
+
+  // Indicates whether to run most of ddl requests in async mode.
+  ENABLE_ASYNC_DDL_EXECUTION = 136
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 849944d..8f824c2 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -549,6 +549,9 @@ struct TQueryOptions {
 
   // Indicates whether to use ORC's search argument to push down predicates.
   136: optional bool orc_read_statistics = true;
+
+  // Allow ddl exec request to run in a separate thread
+  137: optional bool enable_async_ddl_execution = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/testdata/workloads/functional-query/queries/QueryTest/async_ddl.test b/testdata/workloads/functional-query/queries/QueryTest/async_ddl.test
new file mode 100644
index 0000000..f0f7678
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/async_ddl.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# Clone a table and then recover partitions for it with a delay of 15s
+# for the asynchronous execution of the DDL. Expect the client to issue
+# multiple GetOperationStatus() call during the delay and to receive the
+# successful completaion status at the end.
+drop table if exists alltypes_clone;
+create external table alltypes_clone like functional_parquet.alltypes
+location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet';
+set debug_action="CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@15000";
+alter table alltypes_clone recover partitions;
+---- RESULTS
+'Partitions have been recovered.'
+---- TYPES
+STRING
+====
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index d6b9070..df8eadc 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -921,6 +921,22 @@ class ImpalaTestSuite(BaseTestSuite):
     assert (impala_results is not None) and (hive_results is not None)
     assert compare(impala_results, hive_results)
 
+  def exec_with_jdbc(self, stmt):
+    """Pass 'stmt' to IMPALA via Impala JDBC client and execute it"""
+    # execute_using_jdbc expects a Query object. Convert the query string into a Query
+    # object
+    query = Query()
+    query.query_str = stmt
+    # Run the statement targeting Impala
+    exec_opts = JdbcQueryExecConfig(impalad=IMPALAD_HS2_HOST_PORT, transport='NOSASL')
+    return execute_using_jdbc(query, exec_opts).data
+
+  def exec_with_jdbc_and_compare_result(self, stmt, expected):
+    """Execute 'stmt' via Impala JDBC client and compare the result with 'expected'"""
+    result = self.exec_with_jdbc(stmt)
+    # Check the results
+    assert (result is not None) and (result == expected)
+
   def load_query_test_file(self, workload, file_name, valid_section_names=None,
       encoding=None):
     """
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 6518388..739ba4f 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -21,6 +21,8 @@ import pytest
 import re
 import time
 
+from beeswaxd.BeeswaxService import QueryState
+from copy import deepcopy
 from test_ddl_base import TestDdlBase
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.environ import (HIVE_MAJOR_VERSION)
@@ -30,7 +32,11 @@ from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import (SkipIf, SkipIfABFS, SkipIfADLS, SkipIfKudu, SkipIfLocal,
                                SkipIfCatalogV2, SkipIfHive2, SkipIfS3, SkipIfGCS)
 from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.common.test_dimensions import (create_exec_option_dimension,
+    create_client_protocol_dimension)
+from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import (
+    get_fs_path,
     WAREHOUSE,
     IS_HDFS,
     IS_S3,
@@ -39,6 +45,7 @@ from tests.util.filesystem_utils import (
 from tests.common.impala_cluster import ImpalaCluster
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX
 
+
 # Validates DDL statements (create, drop)
 class TestDdlStatements(TestDdlBase):
   @SkipIfLocal.hdfs_client
@@ -897,6 +904,225 @@ class TestDdlStatements(TestDdlBase):
     comment = self._get_column_comment(table, 'j')
     assert "comment4" == comment
 
+
+# IMPALA-10811: RPC to submit query getting stuck for AWS NLB forever
+# Test HS2, Beeswax and HS2-HTTP three clients.
+class TestAsyncDDL(TestDdlBase):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAsyncDDL, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        sync_ddl=[0], disable_codegen_options=[False]))
+
+  def test_async_ddl(self, vector, unique_database):
+    self.run_test_case('QueryTest/async_ddl', vector, use_db=unique_database)
+
+  def test_async_ddl_with_JDBC(self, vector, unique_database):
+    self.exec_with_jdbc("drop table if exists {0}.test_table".format(unique_database))
+    self.exec_with_jdbc_and_compare_result(
+        "create table {0}.test_table(a int)".format(unique_database),
+        "'Table has been created.'")
+
+    self.exec_with_jdbc("drop table if exists {0}.alltypes_clone".format(unique_database))
+    self.exec_with_jdbc_and_compare_result(
+        "create table {0}.alltypes_clone as select * from\
+        functional_parquet.alltypes".format(unique_database),
+        "'Inserted 7300 row(s)'")
+
+  @classmethod
+  def test_get_operation_status_for_client(self, client, unique_database,
+          init_state, pending_state, running_state):
+    # Setup
+    client.execute("drop table if exists {0}.alltypes_clone".format(unique_database))
+    client.execute("select count(*) from functional_parquet.alltypes")
+    client.execute("set enable_async_ddl_execution=true")
+    client.execute("set debug_action=\"CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000\"")
+
+    # Run the test query which will only compile the DDL in execute_statement()
+    # and measure the time spent. Should be less than 3s.
+    start = time.time()
+    handle = client.execute_async(
+        "create table {0}.alltypes_clone as select * from \
+        functional_parquet.alltypes".format(unique_database))
+    end = time.time()
+    assert (end - start <= 3)
+
+    # The table creation and population part will be done in a separate thread.
+    # The repeated call below to get_operation_status() finds out the number of
+    # times that each state is reached in BE for that part of the work.
+    num_times_in_initialized_state = 0
+    num_times_in_pending_state = 0
+    num_times_in_running_state = 0
+    while not client.state_is_finished(handle):
+
+      state = client.get_state(handle)
+
+      if (state == init_state):
+        num_times_in_initialized_state += 1
+
+      if (state == pending_state):
+        num_times_in_pending_state += 1
+
+      if (state == running_state):
+        num_times_in_running_state += 1
+
+    # The query must reach INITIALIZED_STATE 0 time and PENDING_STATE at least
+    # once. The number of times in PENDING_STATE is a function of the length of
+    # the delay. The query reaches RUNNING_STATE when it populates the new table.
+    assert num_times_in_initialized_state == 0
+    assert num_times_in_pending_state > 1
+    assert num_times_in_running_state > 0
+
+  def test_get_operation_status_for_async_ddl(self, vector, unique_database):
+    """Tests that for an asynchronously executed DDL with delay, GetOperationStatus
+    must be issued repeatedly. Test client hs2-http, hs2 and beeswax"""
+
+    if vector.get_value('protocol') == 'hs2-http':
+      self.test_get_operation_status_for_client(self.hs2_http_client, unique_database,
+      "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE")
+
+    if vector.get_value('protocol') == 'hs2':
+      self.test_get_operation_status_for_client(self.hs2_client, unique_database,
+      "INITIALIZED_STATE", "PENDING_STATE", "RUNNING_STATE")
+
+    if vector.get_value('protocol') == 'beeswax':
+      self.test_get_operation_status_for_client(self.client, unique_database,
+      QueryState.INITIALIZED, QueryState.COMPILED, QueryState.RUNNING)
+
+
+class TestAsyncDDLTiming(TestDdlBase):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAsyncDDLTiming, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        sync_ddl=[0], disable_codegen_options=[False]))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('enable_async_ddl_execution', True, False))
+
+  def test_alter_table_recover(self, vector, unique_database):
+    enable_async_ddl = vector.get_value('enable_async_ddl_execution')
+    client = self.create_impala_client(protocol=vector.get_value('protocol'))
+    is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http']
+    pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED
+    running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING
+    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
+
+    try:
+      # Setup for the alter table case (create table that points to an existing
+      # location)
+      alltypes_location = get_fs_path("/test-warehouse/alltypes_parquet")
+      source_tbl = "functional_parquet.alltypes"
+      dest_tbl = "{0}.alltypes_clone".format(unique_database)
+      create_table_stmt = 'create external table {0} like {1} location "{2}"'.format(
+          dest_tbl, source_tbl, alltypes_location)
+      self.execute_query_expect_success(client, create_table_stmt)
+
+      # Describe the table to fetch its metadata
+      self.execute_query_expect_success(client, "describe {0}".format(dest_tbl))
+
+      # Configure whether to use async DDL and add appropriate delays
+      new_vector = deepcopy(vector)
+      new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl
+      new_vector.get_value('exec_option')['debug_action'] = \
+          "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000"
+      exec_start = time.time()
+      alter_stmt = "alter table {0} recover partitions".format(dest_tbl)
+      handle = self.execute_query_async_using_client(client, alter_stmt, new_vector)
+      exec_end = time.time()
+      exec_time = exec_end - exec_start
+      state = client.get_state(handle)
+      if enable_async_ddl:
+        assert state == pending_state or state == running_state
+      else:
+        assert state == running_state or state == finished_state
+
+      # Wait for the statement to finish with a timeout of 20 seconds
+      wait_start = time.time()
+      self.wait_for_state(handle, finished_state, 20, client=client)
+      wait_end = time.time()
+      wait_time = wait_end - wait_start
+      self.close_query_using_client(client, handle)
+      # In sync mode:
+      #  The entire DDL is processed in the exec step with delay. exec_time should be
+      #  more than 10 seconds.
+      #
+      # In async mode:
+      #  The compilation of DDL is processed in the exec step without delay. And the
+      #  processing of the DDL plan is in wait step with delay. The wait time should
+      #  definitely take more time than 10 seconds.
+      if enable_async_ddl:
+        assert(wait_time >= 10)
+      else:
+        assert(exec_time >= 10)
+    finally:
+      client.close()
+
+  def test_ctas(self, vector, unique_database):
+    enable_async_ddl = vector.get_value('enable_async_ddl_execution')
+    client = self.create_impala_client(protocol=vector.get_value('protocol'))
+    is_hs2 = vector.get_value('protocol') in ['hs2', 'hs2-http']
+    pending_state = "PENDING_STATE" if is_hs2 else QueryState.COMPILED
+    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
+
+    try:
+      # The CTAS is going to need the metadata of the source table in the
+      # select. To avoid flakiness about metadata loading, this selects from
+      # that source table first to get the metadata loaded.
+      self.execute_query_expect_success(client,
+          "select count(*) from functional_parquet.alltypes")
+
+      # Configure whether to use async DDL and add appropriate delays
+      new_vector = deepcopy(vector)
+      new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl
+      create_delay = "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000"
+      insert_delay = "CRS_BEFORE_COORD_STARTS:SLEEP@2000"
+      new_vector.get_value('exec_option')['debug_action'] = \
+          "{0}|{1}".format(create_delay, insert_delay)
+      dest_tbl = "{0}.ctas_test".format(unique_database)
+      source_tbl = "functional_parquet.alltypes"
+      ctas_stmt = 'create external table {0} as select * from {1}'.format(
+          dest_tbl, source_tbl)
+      exec_start = time.time()
+      handle = self.execute_query_async_using_client(client, ctas_stmt, new_vector)
+      exec_end = time.time()
+      exec_time = exec_end - exec_start
+      # The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING
+      # to RUNNING, so the sync case should be in PENDING state at the end of
+      # the execute call. This means that the sync and async cases are the same.
+      assert client.get_state(handle) == pending_state
+
+      # Wait for the statement to finish with a timeout of 20 seconds
+      wait_start = time.time()
+      self.wait_for_state(handle, finished_state, 20, client=client)
+      wait_end = time.time()
+      wait_time = wait_end - wait_start
+      self.close_query_using_client(client, handle)
+      # In sync mode:
+      #  The entire CTAS is processed in the exec step with delay. exec_time should be
+      #  more than 10 seconds.
+      #
+      # In async mode:
+      #  The compilation of CTAS is processed in the exec step without delay. And the
+      #  processing of the CTAS plan is in wait step with delay. The wait time should
+      #  definitely take more time than 10 seconds.
+      if enable_async_ddl:
+        assert(wait_time >= 10)
+      else:
+        assert(exec_time >= 10)
+    finally:
+      client.close()
+
+
 # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache.
 class TestLibCache(TestDdlBase):
   @classmethod