You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/22 12:35:11 UTC

(impala) branch master updated: IMPALA-12711: Fix DDL errors are not shown in impalad logs

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ea4b94c IMPALA-12711: Fix DDL errors are not shown in impalad logs
b4ea4b94c is described below

commit b4ea4b94c67d2ca264b8106379bb413b2776e595
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Jan 15 13:51:30 2024 +0800

    IMPALA-12711: Fix DDL errors are not shown in impalad logs
    
    We use LOG_AND_RETURN_IF_ERROR to log the non-ok status of executing the
    DDL. However, when enable_async_ddl_execution is true (default), the
    returned status of ExecDdlRequest() and ExecLoadDataRequest() are about
    creating the async thread. It's not the error of executing the
    statement. If the DDL fails, no errors will be shown in impalad logs.
    
    This patch fixes it by logging the error when UpdateQueryStatus() is
    invoked by the async exec thread. A new parameter, 'log_error', is added
    to this method to control the logging behavior. It's false by default
    and only set to true when used in the async thread.
    
    Tests
     - Add e2e test to verify the error in logs
    
    Change-Id: I8f02f22fa8ebbd2dea722d5586899bf57b66cf40
    Reviewed-on: http://gerrit.cloudera.org:8080/20925
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc | 17 ++++++++++-------
 be/src/service/client-request-state.h  |  9 ++++++++-
 tests/metadata/test_ddl.py             | 23 +++++++++++++++++++++--
 3 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 13429e7cc..f1eb16a36 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -695,6 +695,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
 
   // Indirectly check if running in thread async_exec_thread_.
   if (exec_in_worker_thread) {
+    VLOG_QUERY << "Running in worker thread";
     DCHECK(exec_state() == ExecState::PENDING);
 
     // 1. For any non-CTAS DDLs, transition to RUNNING
@@ -710,7 +711,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   query_events_->MarkEvent("CatalogDdlRequest finished");
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 
   if (catalog_op_executor_->ddl_exec_response() != nullptr &&
@@ -739,7 +740,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
       exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 
   if (is_CTAS) {
@@ -802,6 +803,7 @@ Status ClientRequestState::ExecDdlRequest() {
 
 void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   if (exec_in_worker_thread) {
+    VLOG_QUERY << "Running in worker thread";
     DCHECK(exec_state() == ExecState::PENDING);
     UpdateNonErrorExecState(ExecState::RUNNING);
   }
@@ -815,7 +817,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   }
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 
   request_result_set_.reset(new vector<TResultRow>);
@@ -840,7 +842,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
       *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status);
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 
   TUpdateCatalogResponse resp;
@@ -848,7 +850,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
       &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp);
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 
   status = parent_server_->ProcessCatalogUpdateResult(
@@ -856,7 +858,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
       exec_request_->query_options.sync_ddl, query_options(), query_events_);
   {
     lock_guard<mutex> l(lock_);
-    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
   }
 }
 
@@ -1332,12 +1334,13 @@ void ClientRequestState::MarkAsRetrying(const Status& status) {
   summary_profile_->AddInfoStringRedacted("Retry Cause", query_status_.GetDetail());
 }
 
-Status ClientRequestState::UpdateQueryStatus(const Status& status) {
+Status ClientRequestState::UpdateQueryStatus(const Status& status, bool log_error) {
   // Preserve the first non-ok status
   if (!status.ok() && query_status_.ok()) {
     UpdateExecState(ExecState::ERROR);
     query_status_ = status;
     summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail());
+    if (log_error) VLOG_QUERY << status.GetDetail();
   }
 
   return status;
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 7b618f072..459fce310 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -167,11 +167,18 @@ class ClientRequestState {
   /// If current status is already != ok, no update is made (we preserve the first error)
   /// If called with a non-ok argument, the expectation is that the query will be aborted
   /// quickly.
+  /// If 'log_error' is true, log the error when query status become non-ok. This is used
+  /// when running DDLs in async threads. The async thread should take care of the logging
+  /// since the main thread only checks whether the thread creation succeeds.
+  /// E.g. ExecDdlRequest() returns the query status in sync mode, but returns the thread
+  /// creation status in async mode. So LOG_AND_RETURN_IF_ERROR(ExecDdlRequest()) doesn't
+  /// log the query failure in async mode.
   /// Returns the status argument (so we can write
   /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())).
   /// Does not take lock_, but requires it: caller must ensure lock_
   /// is taken before calling UpdateQueryStatus
-  Status UpdateQueryStatus(const Status& status) WARN_UNUSED_RESULT;
+  Status UpdateQueryStatus(const Status& status, bool log_error=false)
+      WARN_UNUSED_RESULT;
 
   /// Cancels the child queries and the coordinator with the given cause.
   /// If cause is NULL, it assumes this was deliberately cancelled by the user while in
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index fb2730ca3..394afd0b9 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -32,10 +32,10 @@ from tests.common.file_utils import create_table_from_orc
 from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import (SkipIf, SkipIfFS, SkipIfKudu, SkipIfLocal,
-                               SkipIfCatalogV2, SkipIfHive2)
+                               SkipIfCatalogV2, SkipIfHive2, SkipIfDockerizedCluster)
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_dimensions import (create_exec_option_dimension,
-    create_client_protocol_dimension)
+    create_client_protocol_dimension, create_exec_option_dimension_from_dict)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import (
     get_fs_path,
@@ -1142,6 +1142,25 @@ class TestAsyncDDLTiming(TestDdlBase):
       client.close()
 
 
+class TestDdlLogs(TestDdlBase):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestDdlLogs, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({
+        'enable_async_ddl_execution': [True, False]}))
+
+  @SkipIfDockerizedCluster.daemon_logs_not_exposed
+  def test_error_logs(self, vector, unique_database):
+    query_opts = vector.get_value('exec_option')
+    tbl_name = 'test_async' if query_opts['enable_async_ddl_execution'] else 'test_sync'
+    tbl_name = unique_database + '.' + tbl_name
+    result = self.execute_query_expect_failure(
+        self.client, 'invalidate metadata ' + tbl_name, query_opts)
+    err = "TableNotFoundException: Table not found: " + tbl_name
+    assert err in str(result)
+    self.assert_impalad_log_contains('INFO', err)
+
+
 # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache.
 class TestLibCache(TestDdlBase):
   @classmethod