You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2023/03/27 21:56:38 UTC

[impala] branch master updated: IMPALA-11509: Prevent queries hanging when Iceberg metadata is missing.

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c779939d IMPALA-11509: Prevent queries hanging when Iceberg metadata is missing.
2c779939d is described below

commit 2c779939dc302be9ee5dd97ddf374bb043040891
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Tue Aug 2 17:53:36 2022 -0700

    IMPALA-11509: Prevent queries hanging when Iceberg metadata is missing.
    
    Traditionally table metadata is loaded by the catalog and sent as thrift
    to the Impala daemons. With Iceberg tables, some metadata, for example
    the org.apache.iceberg.Table, is loaded in the Coordinator at the same
    time as the thrift description is being deserialized. If the loading of
    the org.apache.iceberg.Table fails, perhaps because of missing Iceberg
    metadata, then the loading of the table fails. This can cause an
    infinite loop as StmtMetadataLoader.loadTables() waits hopefully for
    the catalog to send a new version of the table.
    
    Change some Iceberg table loading methods to throw
    IcebergTableLoadingException when a failure occurs. Prevent the hang by
    substituting in an IncompleteTable if an IcebergTableLoadingException
    occurs.
    
    The test test_drop_incomplete_table had previously been disabled because
    of IMPALA-11509. To re-enable this required a second change. The way
    that DROP TABLE is executed on an iceberg table depends on which
    Iceberg catalog is being used. If this Iceberg catalog is not a Hive
    catalog then the execution happens in two parts, first the Iceberg
    table is dropped, then the table is dropped in HMS. If this case, if
    the drop fails in Iceberg, we should still continue on to perform the
    drop in HMS.
    
    TESTING
    
    - Add a new test, originally developed for IMPALA-11330, which tests
      failures after deleting Iceberg metadata.
    - Re-enable test_drop_incomplete_table().
    
    Change-Id: I695559e21c510615918a51a4b5057bc616ee5421
    Reviewed-on: http://gerrit.cloudera.org:8080/19509
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../main/java/org/apache/impala/catalog/Table.java | 12 +++++--
 .../impala/catalog/iceberg/IcebergCatalog.java     |  3 +-
 .../impala/catalog/iceberg/IcebergCatalogs.java    |  3 +-
 .../catalog/iceberg/IcebergHadoopCatalog.java      |  9 ++---
 .../catalog/iceberg/IcebergHadoopTables.java       |  9 ++---
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |  7 ++--
 .../apache/impala/service/CatalogOpExecutor.java   | 31 ++++++++++++----
 .../java/org/apache/impala/util/IcebergUtil.java   | 11 +++---
 .../queries/QueryTest/iceberg-negative.test        |  2 +-
 tests/query_test/test_iceberg.py                   | 42 ++++++++++++++++++----
 10 files changed, 95 insertions(+), 34 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 21b766ce8..2ada744f7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -538,7 +538,15 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
           IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name(),
               tblType, MetadataOp.getTableComment(thriftTable.getMetastore_table()));
     }
-    newTable.loadFromThrift(thriftTable);
+    try {
+      newTable.loadFromThrift(thriftTable);
+    } catch (IcebergTableLoadingException e) {
+      LOG.warn(String.format("The table %s in database %s could not be loaded.",
+                   thriftTable.getTbl_name(), parentDb.getName()),
+          e);
+      newTable = IncompleteTable.createFailedMetadataLoadTable(
+          parentDb, thriftTable.getTbl_name(), e);
+    }
     newTable.validate();
     return newTable;
   }
@@ -1046,4 +1054,4 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
       setLastSyncedEventId(eventId);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
index 1aa239ba2..18bc7c981 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.ImpalaRuntimeException;
 
@@ -57,7 +58,7 @@ public interface IcebergCatalog {
    *     is being used.
    */
    Table loadTable(TableIdentifier tableId, String tableLocation,
-      Map<String, String> properties) throws TableLoadingException;
+      Map<String, String> properties) throws IcebergTableLoadingException;
 
   /**
    * Drops the table from this catalog.
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
index e4fa00284..99beb6794 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -116,7 +117,7 @@ public class IcebergCatalogs implements IcebergCatalog {
 
   @Override
   public Table loadTable(TableIdentifier tableId, String tableLocation,
-      Map<String, String> tableProps) throws TableLoadingException {
+      Map<String, String> tableProps) throws IcebergTableLoadingException {
     setContextClassLoader();
     Properties properties = createPropsForCatalogs(tableId, tableLocation, tableProps);
     return Catalogs.loadTable(configuration_, properties);
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
index 80d542dee..e7bf4dd63 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -77,7 +78,7 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
 
   @Override
   public Table loadTable(TableIdentifier tableId, String tableLocation,
-      Map<String, String> properties) throws TableLoadingException {
+      Map<String, String> properties) throws IcebergTableLoadingException {
     Preconditions.checkState(tableId != null);
     final int MAX_ATTEMPTS = 5;
     final int SLEEP_MS = 500;
@@ -86,11 +87,11 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
       try {
         return hadoopCatalog.loadTable(tableId);
       } catch (NoSuchTableException e) {
-        throw new TableLoadingException(e.getMessage());
+        throw new IcebergTableLoadingException(e.getMessage());
       } catch (NullPointerException | UncheckedIOException e) {
         if (attempt == MAX_ATTEMPTS - 1) {
           // Throw exception on last attempt.
-          throw new TableLoadingException(String.format(
+          throw new IcebergTableLoadingException(String.format(
               "Could not load Iceberg table %s", tableId), (Exception)e);
         }
         LOG.warn("Caught Exception during Iceberg table loading: {}: {}", tableId, e);
@@ -103,7 +104,7 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
       }
     }
     // We shouldn't really get there, but to make the compiler happy:
-    throw new TableLoadingException(
+    throw new IcebergTableLoadingException(
         String.format("Failed to load Iceberg table with id: %s", tableId));
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
index 5a99c0b56..a0c6d2df0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -76,7 +77,7 @@ public class IcebergHadoopTables implements IcebergCatalog {
 
   @Override
   public Table loadTable(TableIdentifier tableId, String tableLocation,
-      Map<String, String> properties) throws TableLoadingException {
+      Map<String, String> properties) throws IcebergTableLoadingException {
     Preconditions.checkState(tableLocation != null);
     final int MAX_ATTEMPTS = 5;
     final int SLEEP_MS = 500;
@@ -85,11 +86,11 @@ public class IcebergHadoopTables implements IcebergCatalog {
       try {
         return hadoopTables.load(tableLocation);
       } catch (NoSuchTableException e) {
-        throw new TableLoadingException(e.getMessage());
+        throw new IcebergTableLoadingException(e.getMessage());
       } catch (NullPointerException | UncheckedIOException e) {
         if (attempt == MAX_ATTEMPTS - 1) {
           // Throw exception on last attempt.
-          throw new TableLoadingException(String.format(
+          throw new IcebergTableLoadingException(String.format(
               "Could not load Iceberg table at location: %s", tableLocation),
               (Exception)e);
         }
@@ -104,7 +105,7 @@ public class IcebergHadoopTables implements IcebergCatalog {
       }
     }
     // We shouldn't really get there, but to make the compiler happy:
-    throw new TableLoadingException(String.format(
+    throw new IcebergTableLoadingException(String.format(
         "Could not load Iceberg table at location: %s", tableLocation));
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
index d0263ca32..ed1757b4a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
@@ -21,7 +21,7 @@ import static org.apache.impala.catalog.Table.TBL_PROP_EXTERNAL_TABLE_PURGE;
 import static org.apache.impala.catalog.Table.TBL_PROP_EXTERNAL_TABLE_PURGE_DEFAULT;
 
 import java.util.Map;
-import java.util.HashMap;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -30,6 +30,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.util.IcebergUtil;
@@ -83,12 +84,12 @@ public class IcebergHiveCatalog implements IcebergCatalog {
 
   @Override
   public Table loadTable(TableIdentifier tableId, String tableLocation,
-      Map<String, String> properties) throws TableLoadingException {
+      Map<String, String> properties) throws IcebergTableLoadingException {
     Preconditions.checkState(tableId != null);
     try {
       return hiveCatalog_.loadTable(tableId);
     } catch (Exception e) {
-      throw new TableLoadingException(String.format(
+      throw new IcebergTableLoadingException(String.format(
           "Failed to load Iceberg table with id: %s", tableId), e);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6e2a3f478..e2030b251 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2822,18 +2822,35 @@ public class CatalogOpExecutor {
       boolean isSynchronizedIcebergTable = msTbl != null &&
           IcebergTable.isIcebergTable(msTbl) &&
           IcebergTable.isSynchronizedTable(msTbl);
-      if (!(existingTbl instanceof IncompleteTable) && isSynchronizedIcebergTable) {
-        Preconditions.checkState(existingTbl instanceof IcebergTable);
-        IcebergCatalogOpExecutor.dropTable((IcebergTable)existingTbl, params.if_exists);
-      }
-
       // When HMS integration is automatic, the table is dropped automatically. In all
       // other cases, we need to drop the HMS table entry ourselves.
       boolean isSynchronizedTable = isSynchronizedKuduTable || isSynchronizedIcebergTable;
       boolean needsHmsDropTable =
           (existingTbl instanceof IncompleteTable && isSynchronizedIcebergTable) ||
-          !isSynchronizedTable ||
-          !isHmsIntegrationAutomatic(msTbl);
+              !isSynchronizedTable ||
+              !isHmsIntegrationAutomatic(msTbl);
+
+      if (!(existingTbl instanceof IncompleteTable) && isSynchronizedIcebergTable) {
+        Preconditions.checkState(existingTbl instanceof IcebergTable);
+        try {
+          IcebergCatalogOpExecutor.dropTable((IcebergTable)existingTbl, params.if_exists);
+        } catch (TableNotFoundException e) {
+          // This is unusual as normally this would have already shown up as
+          // (existingTbl instanceof IncompleteTable), but this can happen if
+          // for example the Iceberg metadata is removed.
+          if (!needsHmsDropTable) {
+            // There is no more work to be done, so throw exception.
+            throw e;
+          }
+          // Although dropTable() failed in Iceberg we need to also drop the table in
+          // HMS, so we continue here.
+          LOG.warn(String.format("Could not drop Iceberg table %s.%s " +
+                  "proceeding to drop table in HMS", tableName.getDb(),
+              tableName.getTbl()), e);
+        }
+      }
+
+
       if (needsHmsDropTable) {
         try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
           msClient.getHiveClient().dropTable(
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index c6595a968..c0dc4c3e6 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -43,8 +43,6 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
@@ -75,6 +73,7 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
@@ -134,7 +133,8 @@ public class IcebergUtil {
   /**
    * Helper method to load native Iceberg table for 'feTable'.
    */
-  public static Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+  public static Table loadTable(FeIcebergTable feTable)
+      throws IcebergTableLoadingException {
     return loadTable(feTable.getIcebergCatalog(), getIcebergTableIdentifier(feTable),
         feTable.getIcebergCatalogLocation(), feTable.getMetaStoreTable().getParameters());
   }
@@ -143,12 +143,13 @@ public class IcebergUtil {
    * Helper method to load native Iceberg table.
    */
   public static Table loadTable(TIcebergCatalog catalog, TableIdentifier tableId,
-      String location, Map<String, String> tableProps) throws TableLoadingException {
+      String location, Map<String, String> tableProps)
+      throws IcebergTableLoadingException {
     try {
       IcebergCatalog cat = getIcebergCatalog(catalog, location);
       return cat.loadTable(tableId, location, tableProps);
     } catch (ImpalaRuntimeException e) {
-      throw new TableLoadingException(String.format(
+      throw new IcebergTableLoadingException(String.format(
           "Failed to load Iceberg table: %s at location: %s",
           tableId, location), e);
     }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index a4c1836f4..8cebc653a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -103,7 +103,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
     'iceberg.table_identifier'='fake_db.fake_table');
 SHOW CREATE TABLE fake_iceberg_table_hadoop_catalog;
 ---- CATCH
-row_regex:.*CAUSED BY: TableLoadingException: Table does not exist: fake_db.fake_table*
+row_regex:.*CAUSED BY: IcebergTableLoadingException: Table does not exist: fake_db.fake_table*
 ====
 ---- QUERY
 CREATE TABLE iceberg_overwrite_bucket (i int, j int)
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index a8e05baf5..5895a78f8 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -130,16 +130,10 @@ class TestIcebergTable(IcebergTestSuite):
   def test_truncate_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
 
-  # With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing
-  # "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will
-  # trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix
-  # for this issue. Note, we could add a sleep right after table creation that could
-  # workaround the above mentioned bug but then we would hit another issue: IMPALA-11502.
   @SkipIf.not_dfs
   def test_drop_incomplete_table(self, vector, unique_database):
     """Test DROP TABLE when the underlying directory is deleted. In that case table
     loading fails, but we should be still able to drop the table from Impala."""
-    pytest.skip("Gets into a metadata update loop")
     tbl_name = unique_database + ".synchronized_iceberg_tbl"
     cat_location = get_fs_path("/test-warehouse/" + unique_database)
     self.client.execute("""create table {0} (i int) stored as iceberg
@@ -148,6 +142,42 @@ class TestIcebergTable(IcebergTestSuite):
     self.filesystem_client.delete_file_dir(cat_location, True)
     self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name))
 
+  @SkipIf.not_dfs(reason="Dfs required as test to directly delete files.")
+  def test_drop_corrupt_table(self, unique_database):
+    self._do_test_drop_corrupt_table(unique_database, do_invalidate=False)
+
+  @SkipIf.not_dfs(reason="Dfs required as test to directly delete files.")
+  def test_drop_corrupt_table_with_invalidate(self, unique_database):
+    self._do_test_drop_corrupt_table(unique_database, do_invalidate=True)
+
+  def _do_test_drop_corrupt_table(self, unique_database, do_invalidate):
+    """Test that if the underlying iceberg metadata directory is deleted, then a query
+      fails with a reasonable error message, and the table can be dropped successfully."""
+    table = "corrupt_iceberg_tbl"
+    full_table_name = unique_database + "." + table
+    self.client.execute("""create table {0} (i int) stored as iceberg""".
+                        format(full_table_name))
+    metadata_location = get_fs_path("""/test-warehouse/{0}.db/{1}/metadata""".format(
+      unique_database, table))
+    assert self.filesystem_client.exists(metadata_location)
+    status = self.filesystem_client.delete_file_dir(metadata_location, True)
+    assert status, "Delete failed with {0}".format(status)
+    assert not self.hdfs_client.exists(metadata_location)
+
+    if do_invalidate:
+      # Invalidate so that table loading problems will happen in the catalog.
+      self.client.execute("invalidate metadata {0}".format(full_table_name))
+
+    # Query should now fail.
+    err = self.execute_query_expect_failure(self.client, """select * from {0}""".
+                                            format(full_table_name))
+    result = str(err)
+    assert "AnalysisException: Failed to load metadata for table" in result
+    assert ("Failed to load metadata for table" in result  # local catalog
+            or "Error loading metadata for Iceberg table" in result)  # default catalog
+    self.execute_query_expect_success(self.client, """drop table {0}""".
+                                      format(full_table_name))
+
   def test_insert(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database)