You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/07/26 07:44:18 UTC

[impala] 01/02: IMPALA-12267: DMLs/DDLs can hang as a result of catalogd restart

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 98059b029afc4950c7934e9a6edcdbbb99bc890b
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Wed Jul 12 17:20:34 2023 +0200

    IMPALA-12267: DMLs/DDLs can hang as a result of catalogd restart
    
    IMPALA-5476 added handling for changes in catalogd service ID during the
    lifetime of a DML/DDL query, but the loop that waits for a new catalogd
    service ID can wait indefinitely in case the DML/DDL was handled by the
    previous catalogd and restart happened just after replying to the
    coordinator:
    https://github.com/apache/impala/blob/d0fe4c604f72d41019832513ebf65cfe8f469953/be/src/service/impala-server.cc#L2204
    
    This change adds two new startup flags. The first one is
    '--wait_for_new_catalog_service_id_timeout_sec', which sets an upper
    limit on the waiting time (in seconds). The second one is
    '--wait_for_new_catalog_service_id_max_iterations', which limits how
    many valid catalog updates without a new catalog service ID the
    coordinator receives before it gives up and stops waiting.
    
    For both startup flags, negative values and zero turn the feature off.
    
    If the coordinator gives up waiting because of either of the new startup
    flags, the local catalog cache is not updated and a warning is logged.
    
    Testing:
     - added a custom cluster test that reproduces the above condition using
       a debug action and verifies that the waiting loop times out.
    
    Change-Id: Ib71bec8f67f80b0bdfe0a6cc46a16ef624163d8b
    Reviewed-on: http://gerrit.cloudera.org:8080/20192
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |   8 +-
 be/src/service/impala-server.cc                    | 134 ++++++++++++++++++---
 be/src/service/impala-server.h                     |  15 ++-
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  12 +-
 tests/custom_cluster/test_restart_services.py      | 120 +++++++++++++++++-
 5 files changed, 256 insertions(+), 33 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 64b3a4fc7..f6c0fbb2b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -725,7 +725,7 @@ void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
   // Add newly created table to catalog cache.
   status = parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl);
+      exec_request_->query_options.sync_ddl, query_options());
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -842,7 +842,7 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   status = parent_server_->ProcessCatalogUpdateResult(
       resp.result,
-      exec_request_->query_options.sync_ddl);
+      exec_request_->query_options.sync_ddl, query_options());
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -1594,7 +1594,7 @@ Status ClientRequestState::UpdateCatalog() {
         query_events_->MarkEvent("Transaction committed");
       }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
-          exec_request_->query_options.sync_ddl));
+          exec_request_->query_options.sync_ddl, query_options()));
     }
   } else if (InKuduTransaction()) {
     // Commit the Kudu transaction. Clear transaction state if it's successful.
@@ -1754,7 +1754,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
   RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
       *catalog_op_executor_->update_catalog_result(),
-      exec_request_->query_options.sync_ddl));
+      exec_request_->query_options.sync_ddl, query_options()));
 
   // Set the results to be reported to the client.
   SetResultSet(catalog_op_executor_->ddl_exec_response());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d2fe8f37d..58fa78ab4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -379,6 +379,28 @@ DEFINE_bool(auto_check_compaction, false,
     "additional RPCs to hive metastore for each table in a query during the query "
     "compilation.");
 
+DEFINE_int32(wait_for_new_catalog_service_id_timeout_sec, 5 * 60,
+    "During DDL/DML queries, if there is a mismatch between the catalog service ID that"
+    "the coordinator knows of and the one in the RPC response from the catalogd, the "
+    "coordinator waits for a statestore update with a new catalog service ID in order to "
+    "catch up with the one in the RPC response. However, in rare cases the service ID "
+    "the coordinator knows of is the more recent one, in which case it could wait "
+    "infinitely - to avoid this, this flag can be set to a positive value (in seconds) "
+    "to limit the waiting time. Negative values and zero have no effect. See also "
+    "'--wait_for_new_catalog_service_id_max_iterations,'.");
+
+DEFINE_int32(wait_for_new_catalog_service_id_max_iterations, 10,
+    "This flag is used in the same situation as described at the "
+    "'--wait_for_new_catalog_service_id_timeout_sec' flag. Instead of limiting the "
+    "waiting time, the effect of this flag is that the coordinator gives up waiting "
+    "after receiving the set number of valid catalog updates that do not change the "
+    "catalog service ID. Negative values and zero have no effect. If both this flag and "
+    "'--wait_for_new_catalog_service_id_timeout_sec' are set, the coordinator stops "
+    "waiting when the stop condition of either of them is met. Note that it is possible "
+    "that the coordinator does not receive any catalog update from the statestore and in "
+    "this case it will wait indefinitely if "
+    "'--wait_for_new_catalog_service_id_timeout_sec' is not set.");
+
 // Flags for JWT token based authentication.
 DECLARE_bool(jwt_token_auth);
 DECLARE_bool(jwt_validate_signature);
@@ -885,6 +907,67 @@ Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
   return Status::OK();
 }
 
+void ImpalaServer::WaitForNewCatalogServiceId(TUniqueId cur_service_id,
+    unique_lock<mutex>* ver_lock) {
+  DCHECK(ver_lock != nullptr);
+  // The catalog service ID of 'catalog_update_result' does not match the current catalog
+  // service ID. It is possible that catalogd has been restarted and
+  // 'catalog_update_result' contains the new service ID but we haven't received the
+  // statestore update about the new catalogd yet. We'll wait until we receive an update
+  // with a new catalog service ID or we give up (if
+  // --wait_for_new_catalog_service_id_timeout_sec is set and we time out OR if
+  // --wait_for_new_catalog_service_id_max_iterations is set and we reach the max number
+  // of updates without a new service ID). The timeout is useful in case the service ID of
+  // 'catalog_update_result' is actually older than the current catalog service ID. This
+  // is possible if the RPC response came from the old catalogd and we have already
+  // received the statestore update about the new one (see IMPALA-12267).
+  const bool timeout_set = FLAGS_wait_for_new_catalog_service_id_timeout_sec > 0;
+  const int64_t timeout_ms =
+      FLAGS_wait_for_new_catalog_service_id_timeout_sec * MILLIS_PER_SEC;
+  timespec wait_end_time;
+  if (timeout_set) TimeFromNowMillis(timeout_ms, &wait_end_time);
+
+  const bool max_statestore_updates_set =
+      FLAGS_wait_for_new_catalog_service_id_max_iterations > 0;
+
+  bool timed_out = false;
+  int num_statestore_updates = 0;
+
+  int64_t old_catalog_version = catalog_update_info_.catalog_version;
+  while (catalog_update_info_.catalog_service_id == cur_service_id) {
+    if (max_statestore_updates_set
+        && catalog_update_info_.catalog_version != old_catalog_version) {
+      old_catalog_version = catalog_update_info_.catalog_version;
+      ++num_statestore_updates;
+      if (num_statestore_updates < FLAGS_wait_for_new_catalog_service_id_max_iterations) {
+        LOG(INFO) << "Received " << num_statestore_updates << " non-empty catalog "
+            << "updates from the statestore while waiting for an update with a new "
+            << "catalog service ID but the catalog service ID has not changed. Going to "
+            << "give up waiting after "
+            << FLAGS_wait_for_new_catalog_service_id_max_iterations
+            << " such updates in total.";
+      } else {
+        LOG(WARNING) << "Received " << num_statestore_updates << " non-empty catalog "
+            << "updates from the statestore while waiting for an update with a new "
+            << "catalog service ID but the catalog service ID has not changed. "
+            << "Giving up waiting.";
+        break;
+      }
+    }
+
+    if (timeout_set) {
+      timed_out = !catalog_version_update_cv_.WaitUntil(*ver_lock, wait_end_time);
+      if (timed_out) {
+        LOG(WARNING) << "Waiting for catalog update with a new "
+            << "catalog service ID timed out.";
+        break;
+      }
+    } else {
+      catalog_version_update_cv_.Wait(*ver_lock);
+    }
+  }
+}
+
 Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& user,
     TExecSummary* result, TExecSummary* original_result, bool* was_retried) {
   if (was_retried != nullptr) *was_retried = false;
@@ -2171,7 +2254,8 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
 }
 
 Status ImpalaServer::ProcessCatalogUpdateResult(
-    const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers) {
+    const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
+    const TQueryOptions& query_options) {
   const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
   if (!catalog_update_result.__isset.updated_catalog_objects &&
       !catalog_update_result.__isset.removed_catalog_objects) {
@@ -2192,18 +2276,17 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
   } else {
     TUniqueId cur_service_id;
     {
+      Status status = DebugAction(query_options, "WAIT_BEFORE_PROCESSING_CATALOG_UPDATE");
+      DCHECK(status.ok());
+
       unique_lock<mutex> ver_lock(catalog_version_lock_);
       cur_service_id = catalog_update_info_.catalog_service_id;
-      if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+      if (cur_service_id != catalog_service_id) {
         LOG(INFO) << "Catalog service ID mismatch. Current ID: "
             << PrintId(cur_service_id) << ". ID in response: "
-            << PrintId(catalog_service_id) << ". Catalogd may be restarted. Waiting for"
-            " new catalog update from statestore.";
-        // Catalog service ID has been changed, and impalad request a full topic update.
-        // When impalad completes the full topic update, it will exit this loop.
-        while (cur_service_id == catalog_update_info_.catalog_service_id) {
-          catalog_version_update_cv_.Wait(ver_lock);
-        }
+            << PrintId(catalog_service_id) << ". Catalogd may have been restarted. "
+            "Waiting for new catalog update from statestore.";
+        WaitForNewCatalogServiceId(cur_service_id, &ver_lock);
         cur_service_id = catalog_update_info_.catalog_service_id;
       }
     }
@@ -2224,17 +2307,30 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       RETURN_IF_ERROR(status);
     } else {
       // We can't apply updates on another service id, because the local catalog is still
-      // inconsistent with the catalogd that executes the DDL. Catalogd may be restarted
-      // more than once inside a statestore update cycle. 'cur_service_id' could belong
-      // to 1) a stale update from the previous restarted catalogd, or 2) a newer update
-      // from next restarted catalogd. We are good to ignore the DDL result at the second
-      // case. However, in the first case clients may see stale catalog until the
-      // expected catalog topic update comes.
+      // inconsistent with the catalogd that executes the DDL/DML.
+      //
+      // 'cur_service_id' could belong to
+      // 1) a stale update about a previous catalogd; this is possible if
+      //     a) catalogd was restarted more than once (for example inside a statestore
+      //        update cycle) and we only got the updates about some but not all restarts
+      //        - the update about the catalogd that has 'catalog_service_id' has not
+      //        arrived yet OR
+      //     b) we gave up waiting (timed out or got a certain number of updates) before
+      //        getting the update about the new catalogd
+      // 2) an update about a restarted catalogd that is newer than the one with
+      //    'catalog_service_id' (in this case we also timed out waiting for an update)
+      //
+      // We are good to ignore the DDL/DML result in the second case. However, in the
+      // first case clients may see a stale catalog until the expected catalog topic
+      // update arrives.
       // TODO: handle the first case in IMPALA-10875.
-      LOG(WARNING) << "Ignoring catalog update result of catalog service ID: "
-          << PrintId(catalog_service_id) << ". The expected catalog service ID: "
-          << PrintId(catalog_service_id) << ". Current catalog service ID: "
-          << PrintId(cur_service_id) <<". Catalogd may be restarted more than once.";
+      LOG(WARNING) << "Ignoring catalog update result of catalog service ID "
+          << PrintId(catalog_service_id)
+          << " because it does not match with current catalog service ID "
+          << PrintId(cur_service_id)
+          << ". The current catalog service ID may be stale (this may be caused by the "
+          << "catalogd having been restarted more than once) or newer than the catalog "
+          << "service ID of the update result.";
     }
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the effects
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 6b40630d4..19941f80a 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -417,11 +417,14 @@ class ImpalaServer : public ImpalaServiceIf,
   /// been updated from a statestore heartbeat that includes this catalog
   /// update's version.
   ///
-  /// If wait_for_all_subscribers is true, this function also
-  /// waits for all other catalog topic subscribers to process this update by checking the
-  /// current min_subscriber_topic_version included in each state store heartbeat.
+  /// If 'wait_for_all_subscribers' is true, this function also waits for all other
+  /// catalog topic subscribers to process this update by checking the current
+  /// min_subscriber_topic_version included in each state store heartbeat.
+  ///
+  /// 'query_options' is used for running debug actions.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
-      bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
+      bool wait_for_all_subscribers, const TQueryOptions& query_options)
+      WARN_UNUSED_RESULT;
 
   /// Wait until the catalog update with version 'catalog_update_version' is
   /// received and applied in the local catalog cache or until the catalog
@@ -1253,6 +1256,9 @@ class ImpalaServer : public ImpalaServiceIf,
   Status DecompressToProfile(TRuntimeProfileFormat::type format,
       std::shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* profile);
 
+  void WaitForNewCatalogServiceId(TUniqueId cur_service_id,
+      std::unique_lock<std::mutex>* ver_lock);
+
   /// Logger for writing encoded query profiles, one per line with the following format:
   /// <ms-since-epoch> <query-id> <thrift query profile URL encoded and gzipped>
   boost::scoped_ptr<SimpleLogger> profile_logger_;
@@ -1549,6 +1555,7 @@ class ImpalaServer : public ImpalaServiceIf,
       catalog_topic_version(0L),
       catalog_object_version_lower_bound(0L) {
     }
+
     /// Update the metrics to store the current version of catalog, current topic and
     /// current service id used by impalad.
     void  UpdateCatalogVersionMetrics();
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 2d389f8ef..430736a07 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -169,15 +169,17 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
   /**
    * Update the catalog service Id. Trigger a full update if the service ID changes.
    */
-  private void setCatalogServiceId(TUniqueId catalog_service_id) throws CatalogException {
+  private void setCatalogServiceId(TUniqueId catalogServiceId) throws CatalogException {
     // Check for changes in the catalog service ID.
-    if (!catalogServiceId_.equals(catalog_service_id)) {
+    if (!catalogServiceId_.equals(catalogServiceId)) {
       boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
-      catalogServiceId_ = catalog_service_id;
+      TUniqueId oldCatalogServiceId = catalogServiceId_;
+      catalogServiceId_ = catalogServiceId;
       if (!firstRun) {
         // Throw an exception which will trigger a full topic update request.
-        throw new CatalogException("Detected catalog service ID change. Aborting " +
-            "updateCatalog()");
+        throw new CatalogException("Detected catalog service ID changes from " +
+            TUniqueIdUtil.PrintId(oldCatalogServiceId) + " to " +
+            TUniqueIdUtil.PrintId(catalogServiceId) + ". Aborting updateCatalog()");
       }
     }
   }
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 1e1dae001..56533b2c8 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -176,7 +176,7 @@ class TestRestart(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, "invalidate metadata")
 
     # No need to care whether the dll is executed successfully, it is just to make
-    # the local catalog catche of impalad out of sync
+    # the local catalog cache of impalad out of sync
     for i in range(0, 10):
       try:
         query = "alter table join_aa add columns (age" + str(i) + " int)"
@@ -191,6 +191,124 @@ class TestRestart(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, "select name from join_aa")
     self.execute_query_expect_success(self.client, "drop table join_aa")
 
+  WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC = 5
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args=("--wait_for_new_catalog_service_id_timeout_sec={} \
+                  --wait_for_new_catalog_service_id_max_iterations=-1"
+                  .format(WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC)))
+  def test_restart_catalogd_while_handling_rpc_response_with_timeout(self):
+    """Regression test for IMPALA-12267. We'd like to cause a situation where
+         - The coordinator issues a DDL or DML query
+         - Catalogd sends a response RPC
+         - Catalogd is restarted and gets a new catalog service ID
+         - The coordinator receives the update about the new catalogd from the statestore
+           before processing the RPC from the old catalogd.
+    Before IMPALA-12267 the coordinator hung infinitely in this situation, waiting for a
+    statestore update with a new catalog service ID assuming the service ID it had was
+    stale, but it already had the most recent one."""
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    debug_action_sleep_time_sec = 10
+    DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
+                    .format(debug_action_sleep_time_sec * 1000))
+
+    query = "alter table join_aa add columns (age" + " int)"
+    handle = self.execute_query_async(query, query_options={"debug_action": DEBUG_ACTION})
+
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
+    time.sleep(0.5)
+
+    self.cluster.catalogd.restart()
+
+    # Wait for the query to finish.
+    max_wait_time = (debug_action_sleep_time_sec
+        + self.WAIT_FOR_CATALOG_UPDATE_TIMEOUT_SEC + 10)
+    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], max_wait_time)
+
+    self.assert_impalad_log_contains("WARNING",
+        "Waiting for catalog update with a new catalog service ID timed out.")
+    self.assert_impalad_log_contains("WARNING",
+        "Ignoring catalog update result of catalog service ID")
+
+    self.execute_query_expect_success(self.client, "select age from join_aa")
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS = 3
+  STATESTORE_UPDATE_FREQ_SEC = 2
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+    statestored_args="--statestore_update_frequency_ms={}".format(
+        STATESTORE_UPDATE_FREQ_SEC * 1000),
+    impalad_args=("--wait_for_new_catalog_service_id_timeout_sec=-1 \
+                  --wait_for_new_catalog_service_id_max_iterations={}"
+                  .format(WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)))
+  def test_restart_catalogd_while_handling_rpc_response_with_max_iters(self):
+    """We create the same situation as described in
+    'test_restart_catalogd_while_handling_rpc_response_with_timeout()' but we get out of
+    it not by timing out but by giving up waiting after receiving
+    'WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS' updates from the statestore that don't change
+    the catalog service ID."""
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    debug_action_sleep_time_sec = 10
+    DEBUG_ACTION = ("WAIT_BEFORE_PROCESSING_CATALOG_UPDATE:SLEEP@{}"
+                    .format(debug_action_sleep_time_sec * 1000))
+
+    query = "alter table join_aa add columns (age" + " int)"
+    handle = self.execute_query_async(query, query_options={"debug_action": DEBUG_ACTION})
+
+    # Wait a bit so the RPC from the catalogd arrives to the coordinator.
+    time.sleep(0.5)
+
+    self.cluster.catalogd.restart()
+
+    # Sleep until the coordinator is done with the debug action sleep and it starts
+    # waiting for catalog updates.
+    time.sleep(debug_action_sleep_time_sec + 0.5)
+
+    # Issue DML queries so that the coordinator receives catalog updates.
+    for i in range(self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+        time.sleep(self.STATESTORE_UPDATE_FREQ_SEC)
+      except Exception as e:
+        LOG.info(str(e))
+
+    # Wait for the query to finish.
+    max_wait_time = 10
+    self.wait_for_state(handle, self.client.QUERY_STATES["FINISHED"], max_wait_time)
+
+    expected_log_msg = "Received {} non-empty catalog updates from the statestore " \
+        "while waiting for an update with a new catalog service ID but the catalog " \
+        "service ID has not changed. Giving up waiting.".format(
+            self.WAIT_FOR_CATALOG_UPDATE_MAX_ITERATIONS)
+
+    self.assert_impalad_log_contains("INFO", expected_log_msg)
+    self.assert_impalad_log_contains("WARNING",
+        "Ignoring catalog update result of catalog service ID")
+
+    self.execute_query_expect_success(self.client, "select age from join_aa")
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=5000")