You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/05 14:01:22 UTC

(impala) branch master updated: IMPALA-12670: getIfPresent should throw the cause of error

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f434e846 IMPALA-12670: getIfPresent should throw the cause of error
5f434e846 is described below

commit 5f434e84678e4401c26172a7121c8e3c70ab664f
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Jan 4 16:01:36 2024 +0800

    IMPALA-12670: getIfPresent should throw the cause of error
    
    CatalogdMetaProvider maintains a map (a Guava cache) as its local
    catalog cache. It has a piggyback mechanism to load metadata from
    catalogd that when concurrent threads want to load the same content
    (identified by the same key) from catalogd, only one of them actually
    sends the request and load the result into the cache. Other threads wait
    and get the result when the work is done.
    
    The piggyback mechanism is implemented by putting a Future object as the
    value when the key doesn't exist in the cache. The Future object handles
    the loading. Other threads that want the same value just invoke
    Future.get() to wait. See more in the comments in loadWithCaching().
    
    If there are any errors thrown in the loading process, Future.get() will
    encapsulate the error into an ExecutionException and throw it instead.
    The cause could be an InconsistentMetadataFetchException which indicates
    FE should retry the planning. It's handled in Frontend#getTExecRequest().
    
    In loadWithCaching(), we try to throw the cause of the exception thrown
    from Future.get(). So the InconsistentMetadataFetchException can be
    handled as expected. However, in getIfPresent(), the error handling is
    inconsistent that it try to throw the current exception. That causes
    retriable failures can't be retried. Note that this is an existing bug
    but got more easy to be hitted after IMPALA-11501 because getIfPresent()
    is now used in LocalDb#getTableIfCached() which is used in many places.
    
    This patch fixes getIfPresent() to have the same logic of using the
    Future object (including error handling) as loadWithCaching(). Also
    adds more loggings in both catalogd and impalad sides when the lookup
    status is abnormal.
    
    In order to test the loading error more easily, this patch adds a hidden
    flag, inject_failure_ratio_in_catalog_fetch, to randomly inject
    retriable errors.
    
    Tests
     - Ran test_local_catalog_ddls_with_invalidate_metadata 700 times.
     - Add e2e test that will easily fail without this fix.
    
    Change-Id: I74268ba2bb700988107780e13ffbdbb4c767d09d
    Reviewed-on: http://gerrit.cloudera.org:8080/20853
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/catalog-op-executor.cc                 |  8 +++++
 .../impala/catalog/CatalogServiceCatalog.java      | 20 ++++++++-----
 .../impala/catalog/local/CatalogdMetaProvider.java | 14 +++++----
 tests/custom_cluster/test_local_catalog.py         | 34 ++++++++++++++++++++++
 4 files changed, 64 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 074d828e3..4039a31f9 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -57,6 +57,10 @@ DEFINE_int32_hidden(inject_latency_before_catalog_fetch_ms, 0,
     "Latency (ms) to be injected before fetching catalog data from the catalogd");
 DEFINE_int32_hidden(inject_latency_after_catalog_fetch_ms, 0,
     "Latency (ms) to be injected after fetching catalog data from the catalogd");
+DEFINE_double_hidden(inject_failure_ratio_in_catalog_fetch, -1,
+    "Ratio to randomly fail the GetPartialCatalogObject RPC with TABLE_NOT_LOADED "
+    "status. 0.1 means fail it in a possibility of 10%. Negative values disable this. "
+    "values >= 1 will always make the RPC fail");
 
 /// Used purely for debug actions. The DEBUG_ACTION is only executed on the first RPC
 /// attempt.
@@ -377,6 +381,10 @@ Status CatalogOpExecutor::GetPartialCatalogObject(
   if (FLAGS_inject_latency_after_catalog_fetch_ms > 0) {
     SleepForMs(FLAGS_inject_latency_after_catalog_fetch_ms);
   }
+  if (FLAGS_inject_failure_ratio_in_catalog_fetch > 0
+      && rand() < FLAGS_inject_failure_ratio_in_catalog_fetch * (RAND_MAX + 1L)) {
+    resp->lookup_status = CatalogLookupStatus::TABLE_NOT_LOADED;
+  }
   return Status::OK();
 }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 6e6fae5ae..5ffe33e93 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3646,7 +3646,8 @@ public class CatalogServiceCatalog extends Catalog {
       try {
         Db db = getDb(dbDesc.getDb_name());
         if (db == null) {
-          return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
+          return createGetPartialCatalogObjectError(req,
+              CatalogLookupStatus.DB_NOT_FOUND);
         }
 
         return db.getPartialInfo(req);
@@ -3672,14 +3673,16 @@ public class CatalogServiceCatalog extends Catalog {
             objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
             tableLoadReason, writeIdList, tableId);
       } catch (DatabaseNotFoundException e) {
-        return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
+        return createGetPartialCatalogObjectError(req, CatalogLookupStatus.DB_NOT_FOUND);
       }
       if (table == null) {
-        return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
+        return createGetPartialCatalogObjectError(req,
+            CatalogLookupStatus.TABLE_NOT_FOUND);
       } else if (!table.isLoaded()) {
         // Table can still remain in an incomplete state if there was a concurrent
         // invalidate request.
-        return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
+        return createGetPartialCatalogObjectError(req,
+            CatalogLookupStatus.TABLE_NOT_LOADED);
       }
       Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos;
       TGetPartialCatalogObjectResponse resp;
@@ -3706,12 +3709,13 @@ public class CatalogServiceCatalog extends Catalog {
       try {
         Db db = getDb(objectDesc.fn.name.db_name);
         if (db == null) {
-          return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
+          return createGetPartialCatalogObjectError(req,
+              CatalogLookupStatus.DB_NOT_FOUND);
         }
 
         List<Function> funcs = db.getFunctions(objectDesc.fn.name.function_name);
         if (funcs.isEmpty()) {
-          return createGetPartialCatalogObjectError(
+          return createGetPartialCatalogObjectError(req,
               CatalogLookupStatus.FUNCTION_NOT_FOUND);
         }
         TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
@@ -3854,9 +3858,11 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   private static TGetPartialCatalogObjectResponse createGetPartialCatalogObjectError(
-      CatalogLookupStatus status) {
+      TGetPartialCatalogObjectRequest req, CatalogLookupStatus status) {
     TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
     resp.setLookup_status(status);
+    LOG.warn("Fetching {} failed: {}. Could not find {}", req.object_desc.type,
+        status, req.object_desc);
     return resp;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 2a7b611b1..fb80adb2b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -464,8 +464,8 @@ public class CatalogdMetaProvider implements MetaProvider {
       case DATA_SOURCE_NOT_FOUND:
         invalidateCacheForObject(req.object_desc);
         throw new InconsistentMetadataFetchException(
-            String.format("Fetching %s failed. Could not find %s",
-                req.object_desc.type.name(), req.object_desc.toString()));
+            String.format("Fetching %s failed: %s. Could not find %s",
+                req.object_desc.type, resp.lookup_status, req.object_desc));
       default: break;
     }
     Preconditions.checkState(resp.lookup_status == CatalogLookupStatus.OK);
@@ -897,9 +897,13 @@ public class CatalogdMetaProvider implements MetaProvider {
     if (existing == null) return null;
     if (!(existing instanceof Future)) return existing;
     try {
-      return ((Future<Object>)existing).get();
-    } catch (InterruptedException | ExecutionException e) {
-      Throwables.propagateIfPossible(e, TException.class);
+      return Uninterruptibles.getUninterruptibly((Future<Object>) existing);
+    } catch (ExecutionException e) {
+      // Try throwing the cause which is the error in the loading thread.
+      // This is consistent with the error handling in loadWithCaching().
+      // So failures like InconsistentMetadataFetchException can be caught
+      // and retry in Frontend#getTExecRequest().
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
       throw new RuntimeException(e);
     }
   }
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 349958a9a..d958cd765 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -449,6 +449,40 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     for i in t.imap_unordered(do_table, range(NUM_ITERS)):
       pass
 
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true "
+                   "--inject_failure_ratio_in_catalog_fetch=0.1 "
+                   "--inject_latency_after_catalog_fetch_ms=100",
+      catalogd_args="--catalog_topic_mode=minimal")
+  def test_fetch_metadata_retry_in_piggybacked_failures(self, unique_database):
+    test_self = self
+
+    class ThreadLocalClient(threading.local):
+      def __init__(self):
+        self.c = test_self.create_impala_client()
+
+    NUM_THREADS = 8
+    t = ThreadPool(processes=NUM_THREADS)
+    tls = ThreadLocalClient()
+
+    self.execute_query(
+        "create table {0}.tbl (i int) partitioned by (p int)".format(unique_database))
+    self.execute_query(
+        "insert into {0}.tbl partition(p) values (0,0)".format(unique_database))
+
+    def read_part(i):
+      self.execute_query_expect_success(
+          tls.c, "select * from {0}.tbl where p=0".format(unique_database))
+
+    # Prior to fixing IMPALA-12670, this test would fail within 20 iterations,
+    # so 100 should be quite reliable as a regression test.
+    NUM_ITERS = 100
+    for k in range(NUM_ITERS):
+      # Read the same partition in concurrent queries so requests can be piggybacked.
+      for i in t.imap_unordered(read_part, range(NUM_THREADS)):
+        pass
+      # Refresh to invalidate the partition in local catalog cache
+      self.execute_query("refresh {0}.tbl partition(p=0)".format(unique_database))
 
 class TestLocalCatalogObservability(CustomClusterTestSuite):
   def get_catalog_cache_metrics(self, impalad):