You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/10/28 12:59:01 UTC

[impala] branch master updated (975883c -> 9d2ef85)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 975883c  IMPALA-10811 RPC to submit query getting stuck for AWS NLB forever
     new 78ce235  IMPALA-10967 Load data should handle AWS NLB-type timeout
     new 9d2ef85  IMPALA-10212. Support ofs scheme.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/client-request-state.cc             |  98 +++++++++++++------
 be/src/service/client-request-state.h              |   7 ++
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 be/src/util/hdfs-util.cc                           |   6 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/common/FileSystemUtil.java   |   9 +-
 .../apache/impala/common/FileSystemUtilTest.java   |   7 ++
 tests/metadata/test_load.py                        | 104 ++++++++++++++++++++-
 10 files changed, 211 insertions(+), 34 deletions(-)

[impala] 02/02: IMPALA-10212. Support ofs scheme.

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9d2ef8564786d858db7786ad338b7daa5386eb20
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Fri Oct 22 15:49:55 2021 +0800

    IMPALA-10212. Support ofs scheme.
    
    OFS is the new file system implementation for Ozone.
    The biggest difference compared to o3fs is that ofs supports operations across all
    volumes and buckets and provides a full view of all the volume/buckets.
    
    It uses the same transport as o3fs and therefore it shares the thread pool with o3fs.
    
    How it was tested:
    The patch was tested manually on a CDPD cluster, loaded TPC-DS data, ran TPC-DS, ran 'load data inpath' command.
    
    Change-Id: I69908f65c97f40ff01b25d6d6db53c37a9e978ba
    Reviewed-on: http://gerrit.cloudera.org:8080/17963
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/hdfs-util.cc                                         | 6 +++++-
 fe/src/main/java/org/apache/impala/common/FileSystemUtil.java    | 9 +++++++--
 .../test/java/org/apache/impala/common/FileSystemUtilTest.java   | 7 +++++++
 3 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 9b560fe..b2d9193 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -36,6 +36,7 @@ const char* FILESYS_PREFIX_ABFS_SEC = "abfss://";
 const char* FILESYS_PREFIX_ADL = "adl://";
 const char* FILESYS_PREFIX_GCS = "gs://";
 const char* FILESYS_PREFIX_OZONE = "o3fs://";
+const char* FILESYS_PREFIX_OFS = "ofs://";
 const char* FILESYS_PREFIX_OSS = "oss://";
 const char* FILESYS_PREFIX_JINDOFS = "jfs://";
 
@@ -120,8 +121,11 @@ bool IsGcsPath(const char* path, bool check_default_fs) {
   return IsSpecificPath(path, FILESYS_PREFIX_GCS, check_default_fs);
 }
 
+// o3fs and ofs uses the same transport implementation, so they should share
+// the same thread pool.
 bool IsOzonePath(const char* path, bool check_default_fs) {
-  return IsSpecificPath(path, FILESYS_PREFIX_OZONE, check_default_fs);
+  return IsSpecificPath(path, FILESYS_PREFIX_OZONE, check_default_fs)
+      || IsSpecificPath(path, FILESYS_PREFIX_OFS, check_default_fs);
 }
 
 // Returns the length of the filesystem name in 'path' which is the length of the
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 9393009..7efc124 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -65,6 +65,7 @@ public class FileSystemUtil {
   public static final String SCHEME_HDFS = "hdfs";
   public static final String SCHEME_S3A = "s3a";
   public static final String SCHEME_O3FS = "o3fs";
+  public static final String SCHEME_OFS = "ofs";
   public static final String SCHEME_ALLUXIO = "alluxio";
   public static final String SCHEME_GCS = "gs";
 
@@ -76,6 +77,7 @@ public class FileSystemUtil {
       ImmutableSet.<String>builder()
           .add(SCHEME_HDFS)
           .add(SCHEME_O3FS)
+          .add(SCHEME_OFS)
           .add(SCHEME_ALLUXIO)
           .build();
 
@@ -91,6 +93,7 @@ public class FileSystemUtil {
           .add(SCHEME_HDFS)
           .add(SCHEME_S3A)
           .add(SCHEME_O3FS)
+          .add(SCHEME_OFS)
           .add(SCHEME_GCS)
           .build();
 
@@ -118,6 +121,7 @@ public class FileSystemUtil {
           .add(SCHEME_HDFS)
           .add(SCHEME_S3A)
           .add(SCHEME_O3FS)
+          .add(SCHEME_OFS)
           .add(SCHEME_GCS)
           .build();
 
@@ -472,14 +476,14 @@ public class FileSystemUtil {
    * Returns true iff the filesystem is a OzoneFileSystem.
    */
   public static boolean isOzoneFileSystem(FileSystem fs) {
-    return hasScheme(fs, SCHEME_O3FS);
+    return hasScheme(fs, SCHEME_O3FS) || hasScheme(fs, SCHEME_OFS);
   }
 
   /**
    * Returns true iff the path is on OzoneFileSystem.
    */
   public static boolean isOzoneFileSystem(Path path) throws IOException {
-    return hasScheme(path, SCHEME_O3FS);
+    return hasScheme(path, SCHEME_O3FS) || hasScheme(path, SCHEME_OFS);
   }
 
   /**
@@ -526,6 +530,7 @@ public class FileSystemUtil {
             .put(SCHEME_HDFS, HDFS)
             .put(SCHEME_S3A, S3)
             .put(SCHEME_O3FS, OZONE)
+            .put(SCHEME_OFS, OZONE)
             .put(SCHEME_ALLUXIO, ALLUXIO)
             .put(SCHEME_GCS, GCS)
             .build();
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index 3105690..adbefe9 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -100,6 +100,7 @@ public class FileSystemUtilTest {
     testFsType(mockLocation(FileSystemUtil.SCHEME_HDFS), FileSystemUtil.FsType.HDFS);
     testFsType(mockLocation(FileSystemUtil.SCHEME_S3A), FileSystemUtil.FsType.S3);
     testFsType(mockLocation(FileSystemUtil.SCHEME_O3FS), FileSystemUtil.FsType.OZONE);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_OFS), FileSystemUtil.FsType.OZONE);
     testFsType(
         mockLocation(FileSystemUtil.SCHEME_ALLUXIO), FileSystemUtil.FsType.ALLUXIO);
   }
@@ -118,6 +119,7 @@ public class FileSystemUtilTest {
     // in impala mini cluster.
     // TODO: enable following tests if we add them into impala mini cluster.
     // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+    // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), true);
     // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), true);
   }
 
@@ -132,6 +134,7 @@ public class FileSystemUtilTest {
     testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_HDFS), true);
     testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_S3A), true);
     testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_OFS), true);
   }
 
   @Test
@@ -148,6 +151,7 @@ public class FileSystemUtilTest {
     // in impala mini cluster.
     // TODO: enable following tests if we add them into impala mini cluster.
     // testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_O3FS), false);
+    // testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_OFS), false);
     // testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
   }
 
@@ -165,6 +169,7 @@ public class FileSystemUtilTest {
     // in impala mini cluster.
     // TODO: enable following tests if we add them into impala mini cluster.
     // testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+    // testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_OFS), true);
     // testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
     // Also extend testIsPathOnFileSystem().
   }
@@ -230,6 +235,8 @@ public class FileSystemUtilTest {
         return "s3a://dummy-bucket/dummy-part-6";
       case FileSystemUtil.SCHEME_O3FS:
         return "o3fs://bucket.volume/key";
+      case FileSystemUtil.SCHEME_OFS:
+        return "ofs://svc1:9876/volume1/bucket2/dir3/";
       case FileSystemUtil.SCHEME_ALLUXIO:
         return "alluxio://zk@zk-1:2181,zk-2:2181,zk-3:2181/path/";
       default:

[impala] 01/02: IMPALA-10967 Load data should handle AWS NLB-type timeout

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 78ce235db6d5b720f3e3319ff571a2da054a2602
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Tue Oct 19 13:25:06 2021 -0400

    IMPALA-10967 Load data should handle AWS NLB-type timeout
    
    This patch addresses Impala client hang due to AWS network load balancer
    timeout which is fixed at 350s. When some long data loading operations
    are executing and the timeout happens, AWS silently drops the connection
    and the Impala client enters the hang state.
    
    The fix maintains the current TCLIService protocol between the client
    and Impala server and utilizes a separate thread to run the data loading
    and metadata refresh operation. Since this thread is waited for in a
    wait thread which runs asynchronously, the execution of the entire
    operation will not cause a wait on the Impala client. The Impala client
    can check the status of the operation via repeated GetOperationStatus()
    call.
    
    External behavior change:
      1. A new query option 'enable_async_load_data_execution', default to
         true, is added. It can be set to false to turn off the patch.
    
    Testing:
      1. Added a new test in test_load.py to verify that the asynchronous
         execution in BE keeps the session live for hs2, hs2-http and
         beeswax three clients;
      2. Ran core tests successfully.
    
    Change-Id: I8c2437e9894510204303ec07710cad60102c8821
    Reviewed-on: http://gerrit.cloudera.org:8080/17955
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc |  98 ++++++++++++++++++++++---------
 be/src/service/client-request-state.h  |   7 +++
 be/src/service/query-options.cc        |   4 ++
 be/src/service/query-options.h         |   4 +-
 common/thrift/ImpalaService.thrift     |   3 +
 common/thrift/Query.thrift             |   3 +
 tests/metadata/test_load.py            | 104 ++++++++++++++++++++++++++++++++-
 7 files changed, 192 insertions(+), 31 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 8edf8fa..7939c5f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -259,35 +259,7 @@ Status ClientRequestState::Exec() {
     }
     case TStmtType::LOAD: {
       DCHECK(exec_request_->__isset.load_data_request);
-      TLoadDataResp response;
-      RETURN_IF_ERROR(
-          frontend_->LoadData(exec_request_->load_data_request, &response));
-      request_result_set_.reset(new vector<TResultRow>);
-      request_result_set_->push_back(response.load_summary);
-
-      // Now refresh the table metadata.
-      TCatalogOpRequest reset_req;
-      reset_req.__set_sync_ddl(exec_request_->query_options.sync_ddl);
-      reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
-      reset_req.__set_reset_metadata_params(TResetMetadataRequest());
-      reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
-      reset_req.reset_metadata_params.header.__set_want_minimal_response(
-          FLAGS_use_local_catalog);
-      reset_req.reset_metadata_params.__set_is_refresh(true);
-      reset_req.reset_metadata_params.__set_table_name(
-          exec_request_->load_data_request.table_name);
-      if (exec_request_->load_data_request.__isset.partition_spec) {
-        reset_req.reset_metadata_params.__set_partition_spec(
-            exec_request_->load_data_request.partition_spec);
-      }
-      reset_req.reset_metadata_params.__set_sync_ddl(
-          exec_request_->query_options.sync_ddl);
-      catalog_op_executor_.reset(
-          new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
-      RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
-      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
-          *catalog_op_executor_->update_catalog_result(),
-          exec_request_->query_options.sync_ddl));
+      LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest());
       break;
     }
     case TStmtType::SET: {
@@ -793,6 +765,74 @@ Status ClientRequestState::ExecDdlRequest() {
   }
 }
 
+void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
+  if (exec_in_worker_thread) {
+    DCHECK(exec_state() == ExecState::PENDING);
+    UpdateNonErrorExecState(ExecState::RUNNING);
+  }
+  DebugActionNoFail(
+      exec_request_->query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
+
+  TLoadDataResp response;
+  Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  request_result_set_.reset(new vector<TResultRow>);
+  request_result_set_->push_back(response.load_summary);
+
+  // Now refresh the table metadata.
+  TCatalogOpRequest reset_req;
+  reset_req.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+  reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
+  reset_req.__set_reset_metadata_params(TResetMetadataRequest());
+  reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+  reset_req.reset_metadata_params.header.__set_want_minimal_response(
+      FLAGS_use_local_catalog);
+  reset_req.reset_metadata_params.__set_is_refresh(true);
+  reset_req.reset_metadata_params.__set_table_name(
+      exec_request_->load_data_request.table_name);
+  if (exec_request_->load_data_request.__isset.partition_spec) {
+    reset_req.reset_metadata_params.__set_partition_spec(
+        exec_request_->load_data_request.partition_spec);
+  }
+  reset_req.reset_metadata_params.__set_sync_ddl(
+      exec_request_->query_options.sync_ddl);
+  catalog_op_executor_.reset(
+      new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
+
+  status = catalog_op_executor_->Exec(reset_req);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  status = parent_server_->ProcessCatalogUpdateResult(
+      *catalog_op_executor_->update_catalog_result(),
+      exec_request_->query_options.sync_ddl);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+}
+
+
+Status ClientRequestState::ExecLoadDataRequest() {
+  if (exec_request_->query_options.enable_async_load_data_execution) {
+    // Transition the exec state out of INITIALIZED to PENDING to make available the
+    // runtime profile for the DDL.
+    UpdateNonErrorExecState(ExecState::PENDING);
+    return Thread::Create("impala-server", "async_exec_thread_",
+        &ClientRequestState::ExecLoadDataRequestImpl, this, true, &async_exec_thread_);
+  }
+
+  // sync exection
+  ExecLoadDataRequestImpl(false /* not use a worker thread */);
+  return query_status_;
+}
+
 Status ClientRequestState::ExecShutdownRequest() {
   const TShutdownParams& request = exec_request_->admin_request.shutdown_params;
   bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index da6f52a..29aa984 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -709,6 +709,13 @@ class ClientRequestState {
   /// ExecDdlRequestImpl() and false to call ExecDdlRequestImplSync().
   bool ShouldRunExecDdlAsync();
 
+  /// The logic of executing a load data statement which runs ExecLoadDataRequestImpl()
+  /// in async_exec_thread_ by default.
+  Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
+
+  /// Core logic of executing a load data statement.
+  void ExecLoadDataRequestImpl(bool exec_in_worker_thread);
+
   /// Executes a shut down request.
   Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3f3b07c..f7eadce 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1125,6 +1125,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_enable_async_ddl_execution(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION: {
+        query_options->__set_enable_async_load_data_execution(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 715b421..7e9b270 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_ASYNC_DDL_EXECUTION+ 1);\
+      TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -262,6 +262,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
+      TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,\
       TQueryOptionLevel::ADVANCED)
   ;
 
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2d079dd..334ef19 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -699,6 +699,9 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to run most of ddl requests in async mode.
   ENABLE_ASYNC_DDL_EXECUTION = 136
+
+  // Indicates whether to run load data requests in async mode.
+  ENABLE_ASYNC_LOAD_DATA_EXECUTION = 137
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 8f824c2..130c9db 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -552,6 +552,9 @@ struct TQueryOptions {
 
   // Allow ddl exec request to run in a separate thread
   137: optional bool enable_async_ddl_execution = true;
+
+  // Allow load data exec request to run in a separate thread
+  138: optional bool enable_async_load_data_execution = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index 5c0a5e5..bf2aa77 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -17,12 +17,18 @@
 
 # Functional tests for LOAD DATA statements.
 
+import time
+from beeswaxd.BeeswaxService import QueryState
+from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
+    create_client_protocol_dimension,
+    create_exec_option_dimension,
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 from tests.common.skip import SkipIfLocal
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.common.test_vector import ImpalaTestDimension
+from tests.util.filesystem_utils import (WAREHOUSE)
 
 TEST_TBL_PART = "test_load"
 TEST_TBL_NOPART = "test_load_nopart"
@@ -99,3 +105,99 @@ class TestLoadData(ImpalaTestSuite):
     # The hidden files should not have been moved as part of the load operation.
     for file_ in HIDDEN_FILES:
       assert self.filesystem_client.exists(file_), "{0} does not exist".format(file_)
+
+
+@SkipIfLocal.hdfs_client
+class TestAsyncLoadData(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAsyncLoadData, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    # Test all clients: hs2, hs2-http and beeswax
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    # Test two exec modes per client
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('enable_async_load_data_execution', True, False))
+    # Disable codegen = false
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        disable_codegen_options=[False]))
+
+  def test_async_load(self, vector, unique_database):
+    enable_async_load_data = vector.get_value('enable_async_load_data_execution')
+    protocol = vector.get_value('protocol')
+    client = self.create_impala_client(protocol=protocol)
+    is_hs2 = protocol in ['hs2', 'hs2-http']
+    running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING
+    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
+
+    # Form a fully qualified table name with '-' in protocol 'hs2-http' dropped as
+    # '-' is not allowed in Impala table name even delimited with ``.
+    qualified_table_name = '{0}.{1}_{2}_{3}'.format(unique_database, TEST_TBL_NOPART,
+        protocol if protocol != 'hs2-http' else 'hs2http', enable_async_load_data)
+
+    # Form a staging path that is protocol and enable_async_load_data dependent to
+    # allow parallel creating distinct HDFS directories for each test object.
+    staging_path = "{0}_{1}_{2}".format(STAGING_PATH, protocol, enable_async_load_data)
+
+    # Put some data into the staging path
+    self.filesystem_client.delete_file_dir(staging_path, recursive=True)
+    self.filesystem_client.make_dir(staging_path, permission=777)
+    self.filesystem_client.copy(ALLTYPES_PATH, "{0}/100101.txt".format(staging_path))
+
+    # Create a table with the staging path
+    self.client.execute("create table {0} like functional.alltypesnopart \
+        location \'/{1}\'".format(qualified_table_name, staging_path))
+
+    try:
+
+      # The load data is going to need the metadata of the table. To avoid flakiness
+      # about metadata loading, this selects from the table first to get the metadata
+      # loaded.
+      self.execute_query_expect_success(client,
+          "select count(*) from {0}".format(qualified_table_name))
+
+      # Configure whether to use async LOAD and add an appropriate delay of 3 seconds
+      new_vector = deepcopy(vector)
+      new_vector.get_value('exec_option')['enable_async_load_data_execution'] = \
+           enable_async_load_data
+      delay = "CRS_DELAY_BEFORE_LOAD_DATA:SLEEP@3000"
+      new_vector.get_value('exec_option')['debug_action'] = "{0}".format(delay)
+      load_stmt = "load data inpath \'/{1}\' \
+          into table {0}".format(qualified_table_name, staging_path)
+      exec_start = time.time()
+      handle = self.execute_query_async_using_client(client, load_stmt, new_vector)
+      exec_end = time.time()
+      exec_time = exec_end - exec_start
+      exec_end_state = client.get_state(handle)
+
+      # Wait for the statement to finish with a timeout of 10 seconds
+      wait_start = time.time()
+      self.wait_for_state(handle, finished_state, 10, client=client)
+      wait_end = time.time()
+      wait_time = wait_end - wait_start
+      self.close_query_using_client(client, handle)
+      # In sync mode:
+      #  The entire LOAD is processed in the exec step with delay. exec_time should be
+      #  more than 3 seconds.
+      #
+      # In async mode:
+      #  The compilation of LOAD is processed in the exec step without delay. And the
+      #  processing of the LOAD plan is in wait step with delay. The wait time should
+      #  definitely take more time than 3 seconds.
+      if enable_async_load_data:
+        assert(exec_end_state == running_state)
+        assert(wait_time >= 3)
+      else:
+        assert(exec_end_state == finished_state)
+        assert(exec_time >= 3)
+    finally:
+      client.close()
+
+    self.client.execute("drop table if exists {0}".format(qualified_table_name))
+    self.filesystem_client.delete_file_dir(staging_path, recursive=True)