You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/09/27 09:51:49 UTC

[impala] 06/07: IMPALA-5476: Fix catalogd restart brings stale metadata

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.0.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb595e5c27182a46b7cf110530c2e076b4f77081
Author: liuyao <54...@163.com>
AuthorDate: Sat Jul 3 19:04:58 2021 +0800

    IMPALA-5476: Fix catalogd restart brings stale metadata
    
    ImpaladCatalog#updateCatalog() doesn't trigger a full topic update
    request when detecting catalogServiceId changes. It just updates the
    local catalogServiceId and throws an exception to abort applying the
    DDL/DML results. This causes a problem when catalogd is restarted and
    the DDL/DML is executed on the restarted instance. In this case, only
    the local catalogServiceId is updated to the latest. The local catalog
    remains stale. Then when dealing with the following updates from
    statestore, the catalogServiceId always matches, so updates will be
    applied without exceptions. However, the catalog objects usually won't
    be updated since they have higher versions (from the old catalogd
    instance) than those in the update. This brings the local catalog out
    of sync until the catalog version of the new catalogd grows larger
    enough.
    
    Note that in dealing with the catalog updates from statestore, if the
    catalogServiceId unmatches, impalad will request a full topic update.
    See more in ImpalaServer::CatalogUpdateCallback().
    
    This patch fixes this issue by checking the catalogServiceId before
    invoking UpdateCatalogCache() of FE. If catalogServiceId doesn't match
    the one in the DDL/DML result, wait until it changes. The following
    update from statestore will change it and unblocks the DDL/DML thread.
    
    Testing
    
    add several tests in
    tests/custom_cluster/test_restart_services.py
    
    Change-Id: I9fe25f5a2a42fb432e306ef08ae35750c8f3c50c
    Reviewed-on: http://gerrit.cloudera.org:8080/17645
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-server.cc               |  59 +++++++++++----
 tests/custom_cluster/test_restart_services.py | 104 ++++++++++++++++++++++++++
 2 files changed, 150 insertions(+), 13 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a48f7fe..df5541f 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2126,19 +2126,52 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       WaitForCatalogUpdateTopicPropagation(catalog_service_id);
     }
   } else {
-    CatalogUpdateResultIterator callback_ctx(catalog_update_result);
-    TUpdateCatalogCacheRequest update_req;
-    update_req.__set_is_delta(true);
-    update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
-    // The catalog version is updated in WaitForCatalogUpdate below. So we need a
-    // standalone field in the request to update the service ID without touching the
-    // catalog version.
-    update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
-    // Apply the changes to the local catalog cache.
-    TUpdateCatalogCacheResponse resp;
-    Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
-    if (!status.ok()) LOG(ERROR) << status.GetDetail();
-    RETURN_IF_ERROR(status);
+    TUniqueId cur_service_id;
+    {
+      unique_lock<mutex> ver_lock(catalog_version_lock_);
+      cur_service_id = catalog_update_info_.catalog_service_id;
+      if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+        LOG(INFO) << "Catalog service ID mismatch. Current ID: "
+            << PrintId(cur_service_id) << ". ID in response: "
+            << PrintId(catalog_service_id) << ". Catalogd may be restarted. Waiting for"
+            " new catalog update from statestore.";
+        // Catalog service ID has been changed, and impalad request a full topic update.
+        // When impalad completes the full topic update, it will exit this loop.
+        while (cur_service_id == catalog_update_info_.catalog_service_id) {
+          catalog_version_update_cv_.Wait(ver_lock);
+        }
+        cur_service_id = catalog_update_info_.catalog_service_id;
+      }
+    }
+
+    if (cur_service_id == catalog_service_id) {
+      CatalogUpdateResultIterator callback_ctx(catalog_update_result);
+      TUpdateCatalogCacheRequest update_req;
+      update_req.__set_is_delta(true);
+      update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
+      // The catalog version is updated in WaitForCatalogUpdate below. So we need a
+      // standalone field in the request to update the service ID without touching the
+      // catalog version.
+      update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
+      // Apply the changes to the local catalog cache.
+      TUpdateCatalogCacheResponse resp;
+      Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
+      if (!status.ok()) LOG(ERROR) << status.GetDetail();
+      RETURN_IF_ERROR(status);
+    } else {
+      // We can't apply updates on another service id, because the local catalog is still
+      // inconsistent with the catalogd that executes the DDL. Catalogd may be restarted
+      // more than once inside a statestore update cycle. 'cur_service_id' could belong
+      // to 1) a stale update from the previous restarted catalogd, or 2) a newer update
+      // from next restarted catalogd. We are good to ignore the DDL result at the second
+      // case. However, in the first case clients may see stale catalog until the
+      // expected catalog topic update comes.
+      // TODO: handle the first case in IMPALA-10875.
+      LOG(WARNING) << "Ignoring catalog update result of catalog service ID: "
+          << PrintId(catalog_service_id) << ". The expected catalog service ID: "
+          << PrintId(catalog_service_id) << ". Current catalog service ID: "
+          << PrintId(cur_service_id) <<". Catalogd may be restarted more than once.";
+    }
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the effects
     // (catalog objects) of this operation.
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 771c686..beccc27 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -163,6 +163,110 @@ class TestRestart(CustomClusterTestSuite):
     thread.join()
     self.wait_for_state(query_handle[0], QueryState.FINISHED, 30000)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd(self):
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    # No need to care whether the dll is executed successfully, it is just to make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd_sync_ddl(self):
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+    query_options = {"sync_ddl": "true"}
+
+    # No need to care whether the dll is executed successfully, it is just to make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query, query_options)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)", query_options)
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  UPDATE_FREQUENCY_S = 10
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms={frequency_ms}"
+    .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
+  def test_restart_catalogd_twice(self):
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.cluster.catalogd.restart()
+    query = "create table join_aa(id int)"
+    query_handle = []
+
+    def execute_query_async():
+      query_handle.append(self.execute_query(query))
+
+    thread = threading.Thread(target=execute_query_async)
+    thread.start()
+    sleep(self.UPDATE_FREQUENCY_S - 5)
+    self.cluster.catalogd.restart()
+    thread.join()
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal",
+      statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd_with_local_catalog(self):
+    self.execute_query_expect_success(self.client, "drop table if exists join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    # No need to care whether the dll is executed successfully, it is just to make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "select age0 from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
   SUBSCRIBER_TIMEOUT_S = 2
   CANCELLATION_GRACE_PERIOD_S = 5