You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/15 01:16:54 UTC

[impala] branch master updated (97a506c65 -> 86fdbfcc9)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 97a506c65 IMPALA-11537: Query option validation for numeric types
     new af4658580 IMPALA-11721: Impala query keep being retried over frequently updated iceberg table
     new 86fdbfcc9 IMPALA-11720: Deflake FileMetadataLoaderTest due to FileSystem closed

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impala/catalog/local/LocalIcebergTable.java    | 42 ++++++++++++++++++----
 .../org/apache/impala/common/FileSystemUtil.java   | 29 +++++++++------
 .../apache/impala/common/FileSystemUtilTest.java   |  6 ++--
 tests/stress/test_insert_stress.py                 | 26 ++++----------
 4 files changed, 63 insertions(+), 40 deletions(-)


[impala] 01/02: IMPALA-11721: Impala query keep being retried over frequently updated iceberg table

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit af46585801387a41153d4a957336d6f0101d09f7
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Nov 11 16:11:14 2022 +0100

    IMPALA-11721: Impala query keep being retried over frequently updated iceberg table
    
    Iceberg table loading can fail in local catalog mode if the table gets
    updated frequently. This is what happens during table loading in local
    catalog mode: every query starts with it's own empty local catalog.
    Table metadata is fetched in multiple requests via a MetaProvider which
    is always a CatalogdMetaProvider. CatalogdMetaProvider caches requests
    and the cache key also includes the table's catalog version.
    
    The Iceberg table is loaded by the following requests:
    
    1 CatalogdMetaProvider.loadTable()
    2 CatalogdMetaProvider.loadIcebergTable()
    3 CatalogdMetaProvider.loadIcebergApiTable() # This actually directly
                                                 # loads the Iceberg table
                                                 # via Iceberg API
                                                 # (no CatalogD involved)
    4 CatalogdMetaProvider.loadTableColumnStatistics()
    5 CatalogdMetaProvider.loadPartitionList()
    6 CatalogdMetaProvider.loadPartitionsByRefs()
    
    Steps 1-4 happens during table loading, steps 5-6 happens during
    planning. We cannot really reorder these invocations, but since
    CatalogdMetaProvider caches these, only the very first invocations need
    to reach out to CatalogD and check the table's catalog version.
    Subsequent invocations, i.e. subsequent queries that use the Iceberg
    table can use the cached metadata, and no need to check the catalog
    version of the cached metadata since the cache key also includes
    the catalog version, hence we have corresponding metadata in the cache.
    
    This patch resolves the issue by pre-warming the metaprovider's cache
    before issuing loadIcebergApiTable() so the CatalogdMetaProvider.load*()
    operations can be served from cache.
    
    So what happens when the metaprovider's cache gets invalidated due to
    concurrent updates to the table and we are still processing the query?
    No problem, only the top-level TableCacheKey gets invalidated. The
    cache will still be able to answer the fine-grained load requests that
    are keyed by the now outdated catalog version. E.g. ColStatsCacheKey
    hashes db name, table name, catalog version, and column name as a key
    in the cache. Therefore the current query processing can be finished
    using a consistent state of the metadata. Subsequent queries will use
    a newer version of the table.
    
    Testing:
     * modified test_insert_stress.py so it won't tolerate inconsistent
       metadata fetch exceptions (Frontend already tolerates them
       to some degree)
    
    Change-Id: Iac28224b2b6d67725eeb17f3e9d813ba622edb43
    Reviewed-on: http://gerrit.cloudera.org:8080/19234
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/local/LocalIcebergTable.java    | 42 ++++++++++++++++++----
 tests/stress/test_insert_stress.py                 | 26 ++++----------
 2 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 1813ce00e..219da5e85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.local;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -33,8 +34,8 @@ import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergContentFileStore;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -87,6 +88,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
       TableParams tableParams = new TableParams(msTable);
       TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
           .loadIcebergTable(ref);
+      LocalFsTable fsTable = LocalFsTable.load(db, msTable, ref);
+      warmupMetaProviderCache(db, msTable, ref, fsTable);
       org.apache.iceberg.Table icebergApiTable = db.getCatalog().getMetaProvider()
           .loadIcebergApiTable(ref, tableParams, msTable);
       List<Column> iceColumns = IcebergSchemaConverter.convertToImpalaSchema(
@@ -96,8 +99,11 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
           /*numClusteringCols=*/ 0,
           db.getName() + "." + msTable.getTableName(),
           /*isFullAcidSchema=*/false);
-      return new LocalIcebergTable(db, msTable, ref, colMap, tableInfo, tableParams,
-          icebergApiTable);
+      return new LocalIcebergTable(db, msTable, ref, fsTable, colMap, tableInfo,
+          tableParams, icebergApiTable);
+    } catch (InconsistentMetadataFetchException e) {
+      // Just rethrow this so the query can be retried by the Frontend.
+      throw e;
     } catch (Exception e) {
       String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
       throw new TableLoadingException(
@@ -105,13 +111,37 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     }
   }
 
+  /**
+   * Eagerly warmup metaprovider cache before calling loadIcebergApiTable(). So
+   * later they can be served from cache when really needed. Otherwise, if there are
+   * frequent updates to the table, CatalogD might refresh the Iceberg table during
+   * loadIcebergApiTable().
+   * If that happens, subsequent load*() calls via the metaprovider would
+   * fail due to InconsistentMetadataFetchException.
+   */
+  private static void warmupMetaProviderCache(LocalDb db, Table msTable, TableMetaRef ref,
+      LocalFsTable fsTable) throws Exception {
+    db.getCatalog().getMetaProvider().loadTableColumnStatistics(ref,
+        getHmsColumnNames(msTable));
+    FeCatalogUtils.loadAllPartitions(fsTable);
+  }
+
+  private static List<String> getHmsColumnNames(Table msTable) {
+    List<String> ret = new ArrayList<>();
+    for (FieldSchema fs : msTable.getSd().getCols()) {
+      ret.add(fs.getName());
+    }
+    return ret;
+  }
+
   private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref,
-      ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams,
-      org.apache.iceberg.Table icebergApiTable) throws TableLoadingException {
+      LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo,
+      TableParams tableParams, org.apache.iceberg.Table icebergApiTable)
+      throws TableLoadingException {
     super(db, msTable, ref, cmap);
 
     Preconditions.checkNotNull(tableInfo);
-    localFsTable_ = LocalFsTable.load(db, msTable, ref);
+    localFsTable_ = fsTable;
     tableParams_ = tableParams;
     fileStore_ = IcebergContentFileStore.fromThrift(
         tableInfo.getIceberg_table().getContent_files(),
diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py
index 5caf3db7c..fe43a38c9 100644
--- a/tests/stress/test_insert_stress.py
+++ b/tests/stress/test_insert_stress.py
@@ -48,16 +48,9 @@ class TestInsertStress(ImpalaTestSuite):
     try:
       insert_cnt = 0
       while insert_cnt < num_inserts:
-        try:
-          impalad_client.execute("insert into table %s values (%i, %i)" % (
-              tbl_name, wid, insert_cnt))
-          insert_cnt += 1
-        except Exception as e:
-          # It's possible that the Iceberg table is concurrently updated in CatalogD
-          # during data load in local catalog.
-          if "InconsistentMetadataFetchException" in str(e):
-            continue
-          raise e
+        impalad_client.execute("insert into table %s values (%i, %i)" % (
+            tbl_name, wid, insert_cnt))
+        insert_cnt += 1
     finally:
       with counter.get_lock():
         counter.value += 1
@@ -79,16 +72,9 @@ class TestInsertStress(ImpalaTestSuite):
     impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
     try:
       while counter.value != writers:
-        try:
-          result = impalad_client.execute("select * from %s" % tbl_name)
-          verify_result_set(result)
-          time.sleep(random.random())
-        except Exception as e:
-          # It's possible that the Iceberg table is concurrently updated in CatalogD
-          # during data load in local catalog.
-          if "InconsistentMetadataFetchException" in str(e):
-            continue
-          raise e
+        result = impalad_client.execute("select * from %s" % tbl_name)
+        verify_result_set(result)
+        time.sleep(random.random())
     finally:
       impalad_client.close()
 


[impala] 02/02: IMPALA-11720: Deflake FileMetadataLoaderTest due to FileSystem closed

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 86fdbfcc995a22a5836c6625fd5c86080fa7764a
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Nov 11 19:41:55 2022 +0800

    IMPALA-11720: Deflake FileMetadataLoaderTest due to FileSystem closed
    
    IMPALA-11699 recently reverted the changes in FileMetadataLoaderTest
    that it no longer extends to FrontendTestBase. This causes tests in
    FileMetadataLoaderTest to become flaky if run individually.
    
    This is a subtle bug in FileMetadataLoader#load(), where we first call
    FileSystem#getFileSystem() to get a FileSystem object, and then call
    methods of FileSystemUtil which will trigger class loading of it and
    running its static code.
    
    FileSystem#getFileSystem() will create a FileSystem object in the first
    get and cache it for follow-up usage. Without extending
    FrontendTestBase, BackendConfig.INSTANCE is not initialized when the
    test is run. So the static code in FileSystemUtil lazily initializes
    BackendConfig.INSTANCE, which initializes FeSupport and finally calls
    JniUtil::InitLibhdfs(). In this method, a FileSystem object is get and
    closed. This is exactly the FileSystem object created in
    FileMetadataLoader#load(). So the following usage on it causes an
    IOException of "Filesystem closed".
    
    The purpose of JniUtil::InitLibhdfs() is to make a simple call on
    libhdfs to make it initialize the JVM. This is crucial when launching
    from C++ codes for impalad and catalogd to init the embedded JVM. So we
    should keep it unchanged.
    
    The fix for this bug is to maintain the state of BackendConfig.INSTANCE
    in the static code of FileSystemUtil (i.e., keep un-initialized or
    initialized), which avoids the subtle side effects.
    
    Tests:
     - Verified tests in FileMetadataLoaderTest individually
    
    Change-Id: Ib6f96950210c9a0124fe03696ef334cb00b057ab
    Reviewed-on: http://gerrit.cloudera.org:8080/19233
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Michael Smith <mi...@cloudera.com>
    Reviewed-by: Qifan Chen <qf...@hotmail.com>
---
 .../org/apache/impala/common/FileSystemUtil.java   | 29 ++++++++++++++--------
 .../apache/impala/common/FileSystemUtilTest.java   |  6 ++---
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index ec178b643..82cf9cfcf 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.util.DebugUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -860,21 +859,31 @@ public class FileSystemUtil {
     return false;
   }
 
+  public static final String DOT = ".";
+  public static final String HIVE_TEMP_FILE_PREFIX = "_tmp.";
+  public static final String SPARK_TEMP_FILE_PREFIX = "_spark_metadata";
+
   /**
    * Prefix string used by tools like hive/spark/flink to write certain temporary or
    * "non-data" files in the table location
    */
   private static final List<String> TMP_DIR_PREFIX_LIST = new ArrayList<>();
   static {
-    if (BackendConfig.INSTANCE == null) {
-      BackendConfig.create(new TBackendGflags());
-      LOG.warn("Initialized BackendConfig.INSTANCE lazily. This should only happen in" +
-          " tests.");
-    }
-    String s = BackendConfig.INSTANCE.getIgnoredDirPrefixList();
-    for (String prefix : s.split(",")) {
-      if (!prefix.isEmpty()) {
-        TMP_DIR_PREFIX_LIST.add(prefix);
+    // Use hard-coded prefix-list if BackendConfig is uninitialized. Note that
+    // getIgnoredDirPrefixList() could return null if BackendConfig is created with
+    // initialize=false in external FE (IMPALA-10515).
+    if (BackendConfig.INSTANCE == null
+        || BackendConfig.INSTANCE.getIgnoredDirPrefixList() == null) {
+      TMP_DIR_PREFIX_LIST.add(DOT);
+      TMP_DIR_PREFIX_LIST.add(HIVE_TEMP_FILE_PREFIX);
+      TMP_DIR_PREFIX_LIST.add(SPARK_TEMP_FILE_PREFIX);
+      LOG.warn("BackendConfig.INSTANCE uninitialized. Use hard-coded prefix-list.");
+    } else {
+      String s = BackendConfig.INSTANCE.getIgnoredDirPrefixList();
+      for (String prefix : s.split(",")) {
+        if (!prefix.isEmpty()) {
+          TMP_DIR_PREFIX_LIST.add(prefix);
+        }
       }
     }
     LOG.info("Prefix list of ignored dirs: " + TMP_DIR_PREFIX_LIST);
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index 38ab46ef5..e3b167ba8 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.common;
 
+import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
+import static org.apache.impala.common.FileSystemUtil.SPARK_TEMP_FILE_PREFIX;
 import static org.apache.impala.common.FileSystemUtil.isIgnoredDir;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -25,8 +27,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.impala.service.BackendConfig;
-import org.apache.impala.thrift.TBackendGflags;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -39,8 +39,6 @@ import java.util.List;
  * Tests for the various util methods in FileSystemUtil class
  */
 public class FileSystemUtilTest {
-  private static final String HIVE_TEMP_FILE_PREFIX = "_tmp.";
-  private static final String SPARK_TEMP_FILE_PREFIX = "_spark_metadata";
   private static final Path TEST_TABLE_PATH = new Path("/test-warehouse/foo"
       + ".db/filesystem-util-test");