You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/15 01:16:55 UTC

[impala] 01/02: IMPALA-11721: Impala query keep being retried over frequently updated iceberg table

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit af46585801387a41153d4a957336d6f0101d09f7
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Nov 11 16:11:14 2022 +0100

    IMPALA-11721: Impala query keep being retried over frequently updated iceberg table
    
    Iceberg table loading can fail in local catalog mode if the table gets
    updated frequently. This is what happens during table loading in local
    catalog mode: every query starts with it's own empty local catalog.
    Table metadata is fetched in multiple requests via a MetaProvider which
    is always a CatalogdMetaProvider. CatalogdMetaProvider caches requests
    and the cache key also includes the table's catalog version.
    
    The Iceberg table is loaded by the following requests:
    
    1 CatalogdMetaProvider.loadTable()
    2 CatalogdMetaProvider.loadIcebergTable()
    3 CatalogdMetaProvider.loadIcebergApiTable() # This actually directly
                                                 # loads the Iceberg table
                                                 # via Iceberg API
                                                 # (no CatalogD involved)
    4 CatalogdMetaProvider.loadTableColumnStatistics()
    5 CatalogdMetaProvider.loadPartitionList()
    6 CatalogdMetaProvider.loadPartitionsByRefs()
    
    Steps 1-4 happens during table loading, steps 5-6 happens during
    planning. We cannot really reorder these invocations, but since
    CatalogdMetaProvider caches these, only the very first invocations need
    to reach out to CatalogD and check the table's catalog version.
    Subsequent invocations, i.e. subsequent queries that use the Iceberg
    table can use the cached metadata, and no need to check the catalog
    version of the cached metadata since the cache key also includes
    the catalog version, hence we have corresponding metadata in the cache.
    
    This patch resolves the issue by pre-warming the metaprovider's cache
    before issuing loadIcebergApiTable() so the CatalogdMetaProvider.load*()
    operations can be served from cache.
    
    So what happens when the metaprovider's cache gets invalidated due to
    concurrent updates to the table and we are still processing the query?
    No problem, only the top-level TableCacheKey gets invalidated. The
    cache will still be able to answer the fine-grained load requests that
    are keyed by the now outdated catalog version. E.g. ColStatsCacheKey
    hashes db name, table name, catalog version, and column name as a key
    in the cache. Therefore the current query processing can be finished
    using a consistent state of the metadata. Subsequent queries will use
    a newer version of the table.
    
    Testing:
     * modified test_insert_stress.py so it won't tolerate inconsistent
       metadata fetch exceptions (Frontend already tolerates them
       to some degree)
    
    Change-Id: Iac28224b2b6d67725eeb17f3e9d813ba622edb43
    Reviewed-on: http://gerrit.cloudera.org:8080/19234
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/local/LocalIcebergTable.java    | 42 ++++++++++++++++++----
 tests/stress/test_insert_stress.py                 | 26 ++++----------
 2 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 1813ce00e..219da5e85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.local;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -33,8 +34,8 @@ import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergContentFileStore;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -87,6 +88,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
       TableParams tableParams = new TableParams(msTable);
       TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
           .loadIcebergTable(ref);
+      LocalFsTable fsTable = LocalFsTable.load(db, msTable, ref);
+      warmupMetaProviderCache(db, msTable, ref, fsTable);
       org.apache.iceberg.Table icebergApiTable = db.getCatalog().getMetaProvider()
           .loadIcebergApiTable(ref, tableParams, msTable);
       List<Column> iceColumns = IcebergSchemaConverter.convertToImpalaSchema(
@@ -96,8 +99,11 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
           /*numClusteringCols=*/ 0,
           db.getName() + "." + msTable.getTableName(),
           /*isFullAcidSchema=*/false);
-      return new LocalIcebergTable(db, msTable, ref, colMap, tableInfo, tableParams,
-          icebergApiTable);
+      return new LocalIcebergTable(db, msTable, ref, fsTable, colMap, tableInfo,
+          tableParams, icebergApiTable);
+    } catch (InconsistentMetadataFetchException e) {
+      // Just rethrow this so the query can be retried by the Frontend.
+      throw e;
     } catch (Exception e) {
       String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
       throw new TableLoadingException(
@@ -105,13 +111,37 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     }
   }
 
+  /**
+   * Eagerly warmup metaprovider cache before calling loadIcebergApiTable(). So
+   * later they can be served from cache when really needed. Otherwise, if there are
+   * frequent updates to the table, CatalogD might refresh the Iceberg table during
+   * loadIcebergApiTable().
+   * If that happens, subsequent load*() calls via the metaprovider would
+   * fail due to InconsistentMetadataFetchException.
+   */
+  private static void warmupMetaProviderCache(LocalDb db, Table msTable, TableMetaRef ref,
+      LocalFsTable fsTable) throws Exception {
+    db.getCatalog().getMetaProvider().loadTableColumnStatistics(ref,
+        getHmsColumnNames(msTable));
+    FeCatalogUtils.loadAllPartitions(fsTable);
+  }
+
+  private static List<String> getHmsColumnNames(Table msTable) {
+    List<String> ret = new ArrayList<>();
+    for (FieldSchema fs : msTable.getSd().getCols()) {
+      ret.add(fs.getName());
+    }
+    return ret;
+  }
+
   private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref,
-      ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams,
-      org.apache.iceberg.Table icebergApiTable) throws TableLoadingException {
+      LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo,
+      TableParams tableParams, org.apache.iceberg.Table icebergApiTable)
+      throws TableLoadingException {
     super(db, msTable, ref, cmap);
 
     Preconditions.checkNotNull(tableInfo);
-    localFsTable_ = LocalFsTable.load(db, msTable, ref);
+    localFsTable_ = fsTable;
     tableParams_ = tableParams;
     fileStore_ = IcebergContentFileStore.fromThrift(
         tableInfo.getIceberg_table().getContent_files(),
diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py
index 5caf3db7c..fe43a38c9 100644
--- a/tests/stress/test_insert_stress.py
+++ b/tests/stress/test_insert_stress.py
@@ -48,16 +48,9 @@ class TestInsertStress(ImpalaTestSuite):
     try:
       insert_cnt = 0
       while insert_cnt < num_inserts:
-        try:
-          impalad_client.execute("insert into table %s values (%i, %i)" % (
-              tbl_name, wid, insert_cnt))
-          insert_cnt += 1
-        except Exception as e:
-          # It's possible that the Iceberg table is concurrently updated in CatalogD
-          # during data load in local catalog.
-          if "InconsistentMetadataFetchException" in str(e):
-            continue
-          raise e
+        impalad_client.execute("insert into table %s values (%i, %i)" % (
+            tbl_name, wid, insert_cnt))
+        insert_cnt += 1
     finally:
       with counter.get_lock():
         counter.value += 1
@@ -79,16 +72,9 @@ class TestInsertStress(ImpalaTestSuite):
     impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
     try:
       while counter.value != writers:
-        try:
-          result = impalad_client.execute("select * from %s" % tbl_name)
-          verify_result_set(result)
-          time.sleep(random.random())
-        except Exception as e:
-          # It's possible that the Iceberg table is concurrently updated in CatalogD
-          # during data load in local catalog.
-          if "InconsistentMetadataFetchException" in str(e):
-            continue
-          raise e
+        result = impalad_client.execute("select * from %s" % tbl_name)
+        verify_result_set(result)
+        time.sleep(random.random())
     finally:
       impalad_client.close()