You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/10/28 12:59:02 UTC

[impala] 01/02: IMPALA-10967 Load data should handle AWS NLB-type timeout

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 78ce235db6d5b720f3e3319ff571a2da054a2602
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Tue Oct 19 13:25:06 2021 -0400

    IMPALA-10967 Load data should handle AWS NLB-type timeout
    
    This patch addresses Impala client hang due to AWS network load balancer
    timeout which is fixed at 350s. When some long data loading operations
    are executing and the timeout happens, AWS silently drops the connection
    and the Impala client enters the hang state.
    
    The fix maintains the current TCLIService protocol between the client
    and Impala server and utilizes a separate thread to run the data loading
    and metadata refresh operation. Since this thread is waited for in a
    wait thread which runs asynchronously, the execution of the entire
    operation will not cause a wait on the Impala client. The Impala client
    can check the status of the operation via repeated GetOperationStatus()
    call.
    
    External behavior change:
      1. A new query option 'enable_async_load_data_execution', default to
         true, is added. It can be set to false to turn off the patch.
    
    Testing:
      1. Added a new test in test_load.py to verify that the asynchronous
         execution in BE keeps the session live for hs2, hs2-http and
         beeswax three clients;
      2. Ran core tests successfully.
    
    Change-Id: I8c2437e9894510204303ec07710cad60102c8821
    Reviewed-on: http://gerrit.cloudera.org:8080/17955
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc |  98 ++++++++++++++++++++++---------
 be/src/service/client-request-state.h  |   7 +++
 be/src/service/query-options.cc        |   4 ++
 be/src/service/query-options.h         |   4 +-
 common/thrift/ImpalaService.thrift     |   3 +
 common/thrift/Query.thrift             |   3 +
 tests/metadata/test_load.py            | 104 ++++++++++++++++++++++++++++++++-
 7 files changed, 192 insertions(+), 31 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 8edf8fa..7939c5f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -259,35 +259,7 @@ Status ClientRequestState::Exec() {
     }
     case TStmtType::LOAD: {
       DCHECK(exec_request_->__isset.load_data_request);
-      TLoadDataResp response;
-      RETURN_IF_ERROR(
-          frontend_->LoadData(exec_request_->load_data_request, &response));
-      request_result_set_.reset(new vector<TResultRow>);
-      request_result_set_->push_back(response.load_summary);
-
-      // Now refresh the table metadata.
-      TCatalogOpRequest reset_req;
-      reset_req.__set_sync_ddl(exec_request_->query_options.sync_ddl);
-      reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
-      reset_req.__set_reset_metadata_params(TResetMetadataRequest());
-      reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
-      reset_req.reset_metadata_params.header.__set_want_minimal_response(
-          FLAGS_use_local_catalog);
-      reset_req.reset_metadata_params.__set_is_refresh(true);
-      reset_req.reset_metadata_params.__set_table_name(
-          exec_request_->load_data_request.table_name);
-      if (exec_request_->load_data_request.__isset.partition_spec) {
-        reset_req.reset_metadata_params.__set_partition_spec(
-            exec_request_->load_data_request.partition_spec);
-      }
-      reset_req.reset_metadata_params.__set_sync_ddl(
-          exec_request_->query_options.sync_ddl);
-      catalog_op_executor_.reset(
-          new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
-      RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
-      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
-          *catalog_op_executor_->update_catalog_result(),
-          exec_request_->query_options.sync_ddl));
+      LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest());
       break;
     }
     case TStmtType::SET: {
@@ -793,6 +765,74 @@ Status ClientRequestState::ExecDdlRequest() {
   }
 }
 
+void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
+  if (exec_in_worker_thread) {
+    DCHECK(exec_state() == ExecState::PENDING);
+    UpdateNonErrorExecState(ExecState::RUNNING);
+  }
+  DebugActionNoFail(
+      exec_request_->query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
+
+  TLoadDataResp response;
+  Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  request_result_set_.reset(new vector<TResultRow>);
+  request_result_set_->push_back(response.load_summary);
+
+  // Now refresh the table metadata.
+  TCatalogOpRequest reset_req;
+  reset_req.__set_sync_ddl(exec_request_->query_options.sync_ddl);
+  reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
+  reset_req.__set_reset_metadata_params(TResetMetadataRequest());
+  reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+  reset_req.reset_metadata_params.header.__set_want_minimal_response(
+      FLAGS_use_local_catalog);
+  reset_req.reset_metadata_params.__set_is_refresh(true);
+  reset_req.reset_metadata_params.__set_table_name(
+      exec_request_->load_data_request.table_name);
+  if (exec_request_->load_data_request.__isset.partition_spec) {
+    reset_req.reset_metadata_params.__set_partition_spec(
+        exec_request_->load_data_request.partition_spec);
+  }
+  reset_req.reset_metadata_params.__set_sync_ddl(
+      exec_request_->query_options.sync_ddl);
+  catalog_op_executor_.reset(
+      new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
+
+  status = catalog_op_executor_->Exec(reset_req);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  status = parent_server_->ProcessCatalogUpdateResult(
+      *catalog_op_executor_->update_catalog_result(),
+      exec_request_->query_options.sync_ddl);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+}
+
+
+Status ClientRequestState::ExecLoadDataRequest() {
+  if (exec_request_->query_options.enable_async_load_data_execution) {
+    // Transition the exec state out of INITIALIZED to PENDING to make available the
+    // runtime profile for the DDL.
+    UpdateNonErrorExecState(ExecState::PENDING);
+    return Thread::Create("impala-server", "async_exec_thread_",
+        &ClientRequestState::ExecLoadDataRequestImpl, this, true, &async_exec_thread_);
+  }
+
+  // sync exection
+  ExecLoadDataRequestImpl(false /* not use a worker thread */);
+  return query_status_;
+}
+
 Status ClientRequestState::ExecShutdownRequest() {
   const TShutdownParams& request = exec_request_->admin_request.shutdown_params;
   bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index da6f52a..29aa984 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -709,6 +709,13 @@ class ClientRequestState {
   /// ExecDdlRequestImpl() and false to call ExecDdlRequestImplSync().
   bool ShouldRunExecDdlAsync();
 
+  /// The logic of executing a load data statement which runs ExecLoadDataRequestImpl()
+  /// in async_exec_thread_ by default.
+  Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
+
+  /// Core logic of executing a load data statement.
+  void ExecLoadDataRequestImpl(bool exec_in_worker_thread);
+
   /// Executes a shut down request.
   Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3f3b07c..f7eadce 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1125,6 +1125,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_enable_async_ddl_execution(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION: {
+        query_options->__set_enable_async_load_data_execution(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 715b421..7e9b270 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_ASYNC_DDL_EXECUTION+ 1);\
+      TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -262,6 +262,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
+      TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,\
       TQueryOptionLevel::ADVANCED)
   ;
 
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2d079dd..334ef19 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -699,6 +699,9 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to run most of ddl requests in async mode.
   ENABLE_ASYNC_DDL_EXECUTION = 136
+
+  // Indicates whether to run load data requests in async mode.
+  ENABLE_ASYNC_LOAD_DATA_EXECUTION = 137
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 8f824c2..130c9db 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -552,6 +552,9 @@ struct TQueryOptions {
 
   // Allow ddl exec request to run in a separate thread
   137: optional bool enable_async_ddl_execution = true;
+
+  // Allow load data exec request to run in a separate thread
+  138: optional bool enable_async_load_data_execution = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index 5c0a5e5..bf2aa77 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -17,12 +17,18 @@
 
 # Functional tests for LOAD DATA statements.
 
+import time
+from beeswaxd.BeeswaxService import QueryState
+from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
+    create_client_protocol_dimension,
+    create_exec_option_dimension,
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 from tests.common.skip import SkipIfLocal
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.common.test_vector import ImpalaTestDimension
+from tests.util.filesystem_utils import (WAREHOUSE)
 
 TEST_TBL_PART = "test_load"
 TEST_TBL_NOPART = "test_load_nopart"
@@ -99,3 +105,99 @@ class TestLoadData(ImpalaTestSuite):
     # The hidden files should not have been moved as part of the load operation.
     for file_ in HIDDEN_FILES:
       assert self.filesystem_client.exists(file_), "{0} does not exist".format(file_)
+
+
+@SkipIfLocal.hdfs_client
+class TestAsyncLoadData(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAsyncLoadData, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    # Test all clients: hs2, hs2-http and beeswax
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    # Test two exec modes per client
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('enable_async_load_data_execution', True, False))
+    # Disable codegen = false
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        disable_codegen_options=[False]))
+
+  def test_async_load(self, vector, unique_database):
+    enable_async_load_data = vector.get_value('enable_async_load_data_execution')
+    protocol = vector.get_value('protocol')
+    client = self.create_impala_client(protocol=protocol)
+    is_hs2 = protocol in ['hs2', 'hs2-http']
+    running_state = "RUNNING_STATE" if is_hs2 else QueryState.RUNNING
+    finished_state = "FINISHED_STATE" if is_hs2 else QueryState.FINISHED
+
+    # Form a fully qualified table name with '-' in protocol 'hs2-http' dropped as
+    # '-' is not allowed in Impala table name even delimited with ``.
+    qualified_table_name = '{0}.{1}_{2}_{3}'.format(unique_database, TEST_TBL_NOPART,
+        protocol if protocol != 'hs2-http' else 'hs2http', enable_async_load_data)
+
+    # Form a staging path that is protocol and enable_async_load_data dependent to
+    # allow parallel creating distinct HDFS directories for each test object.
+    staging_path = "{0}_{1}_{2}".format(STAGING_PATH, protocol, enable_async_load_data)
+
+    # Put some data into the staging path
+    self.filesystem_client.delete_file_dir(staging_path, recursive=True)
+    self.filesystem_client.make_dir(staging_path, permission=777)
+    self.filesystem_client.copy(ALLTYPES_PATH, "{0}/100101.txt".format(staging_path))
+
+    # Create a table with the staging path
+    self.client.execute("create table {0} like functional.alltypesnopart \
+        location \'/{1}\'".format(qualified_table_name, staging_path))
+
+    try:
+
+      # The load data is going to need the metadata of the table. To avoid flakiness
+      # about metadata loading, this selects from the table first to get the metadata
+      # loaded.
+      self.execute_query_expect_success(client,
+          "select count(*) from {0}".format(qualified_table_name))
+
+      # Configure whether to use async LOAD and add an appropriate delay of 3 seconds
+      new_vector = deepcopy(vector)
+      new_vector.get_value('exec_option')['enable_async_load_data_execution'] = \
+           enable_async_load_data
+      delay = "CRS_DELAY_BEFORE_LOAD_DATA:SLEEP@3000"
+      new_vector.get_value('exec_option')['debug_action'] = "{0}".format(delay)
+      load_stmt = "load data inpath \'/{1}\' \
+          into table {0}".format(qualified_table_name, staging_path)
+      exec_start = time.time()
+      handle = self.execute_query_async_using_client(client, load_stmt, new_vector)
+      exec_end = time.time()
+      exec_time = exec_end - exec_start
+      exec_end_state = client.get_state(handle)
+
+      # Wait for the statement to finish with a timeout of 10 seconds
+      wait_start = time.time()
+      self.wait_for_state(handle, finished_state, 10, client=client)
+      wait_end = time.time()
+      wait_time = wait_end - wait_start
+      self.close_query_using_client(client, handle)
+      # In sync mode:
+      #  The entire LOAD is processed in the exec step with delay. exec_time should be
+      #  more than 3 seconds.
+      #
+      # In async mode:
+      #  The compilation of LOAD is processed in the exec step without delay. And the
+      #  processing of the LOAD plan is in wait step with delay. The wait time should
+      #  definitely take more time than 3 seconds.
+      if enable_async_load_data:
+        assert(exec_end_state == running_state)
+        assert(wait_time >= 3)
+      else:
+        assert(exec_end_state == finished_state)
+        assert(exec_time >= 3)
+    finally:
+      client.close()
+
+    self.client.execute("drop table if exists {0}".format(qualified_table_name))
+    self.filesystem_client.delete_file_dir(staging_path, recursive=True)