You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2023/08/18 18:30:09 UTC

[impala] branch master updated: IMPALA-12228: Simulate the failure of an iceberg transaction.

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e155ab35f IMPALA-12228: Simulate the failure of an iceberg transaction.
e155ab35f is described below

commit e155ab35f5823f766bb8d8e3a2b9d13e0622fee3
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Wed May 31 15:03:48 2023 -0700

    IMPALA-12228: Simulate the failure of an iceberg transaction.
    
    The commit of an Iceberg transaction is done by the Iceberg catalog. In
    the common case for Impala the Iceberg catalog is HiveCatalog, and the
    actual commit is performed by HMS. This means the commit could fail
    because of some activity outside of Impala. It is useful therefore to
    be able to simulate what happens when an Iceberg commit fails.
    
    Extend Java DebugAction to allow it to throw an exception. For now this
    is limited to throwing unchecked exceptions, which is all that is
    needed for this patch.
    
    Add two DebugActions that can be used to throw Iceberg
    CommitFailedExceptions at the point where the Iceberg transaction is
    about to commit.
    
    Add a new test that uses the new DebugActions to abort an insert and the
    addition of a column.
    
    Change-Id: Iafdacc3377a0a946ead9331ba6a63a1fbb11f0eb
    Reviewed-on: http://gerrit.cloudera.org:8080/20306
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 13 +++++-
 .../java/org/apache/impala/util/DebugUtils.java    | 32 +++++++++++++--
 .../org/apache/impala/util/DebugUtilsTest.java     | 38 +++++++++++++++++-
 tests/query_test/test_iceberg.py                   | 46 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5d8391845..f569dfbd8 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1034,7 +1034,7 @@ public class CatalogOpExecutor {
       } else if (tbl instanceof IcebergTable &&
           altersIcebergTable(params.getAlter_type())) {
         boolean needToUpdateHms = alterIcebergTable(params, response, (IcebergTable)tbl,
-            newCatalogVersion, wantMinimalResult);
+            newCatalogVersion, wantMinimalResult, debugAction);
         if (!needToUpdateHms) return;
       }
       switch (params.getAlter_type()) {
@@ -1340,7 +1340,8 @@ public class CatalogOpExecutor {
    * Executes the ALTER TABLE command for an Iceberg table and reloads its metadata.
    */
   private boolean alterIcebergTable(TAlterTableParams params, TDdlExecResponse response,
-      IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult)
+      IcebergTable tbl, long newCatalogVersion, boolean wantMinimalResult,
+      String debugAction)
       throws ImpalaException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     boolean needsToUpdateHms = !isIcebergHmsIntegrationEnabled(tbl.getMetaStoreTable());
@@ -1422,6 +1423,9 @@ public class CatalogOpExecutor {
           IcebergCatalogOpExecutor.addCatalogVersionToTxn(iceTxn,
               catalog_.getCatalogServiceId(), newCatalogVersion);
         }
+        if (debugAction != null) {
+          DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT);
+        }
         iceTxn.commitTransaction();
       }
     } catch (IllegalArgumentException ex) {
@@ -6865,6 +6869,11 @@ public class CatalogOpExecutor {
               catalog_.getCatalogServiceId(), newCatalogVersion);
           catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
         }
+
+        if (update.isSetDebug_action()) {
+          String debugAction = update.getDebug_action();
+          DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT);
+        }
         iceTxn.commitTransaction();
       }
 
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 45c5eede7..b265f46ff 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -22,6 +22,8 @@ import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import java.util.List;
 import java.util.Random;
+
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,9 @@ public class DebugUtils {
   // CatalogOpExecutor#updateCatalog() finishes.
   public static final String INSERT_FINISH_DELAY = "catalogd_insert_finish_delay";
 
+  // debug action label for Iceberg transaction commit.
+  public static final String ICEBERG_COMMIT = "catalogd_iceberg_commit";
+
   // debug action label for throwing an exception during loadFileMetadataForPartitions.
   public static final String LOAD_FILE_METADATA_THROW_EXCEPTION =
       "catalogd_load_file_metadata_throw_exception";
@@ -86,8 +91,8 @@ public class DebugUtils {
    * For example, if the debug action configuration is:
    * CATALOGD_HDFS_LISTING_DELAY:SLEEP@100|CATALOGD_HMS_RPC_DELAY:JITTER@100@0.2
    * Then a when a label "CATALOGD_HDFS_LISTING_DELAY" is provided, this method will sleep
-   * for 100 milli-seconds. If the label CATALOGD_HMS_RPC_DELAY is provided, this method
-   * will sleep for a random value between 1-100 milli-seconds with a probability of 0.2.
+   * for 100 milliseconds. If the label CATALOGD_HMS_RPC_DELAY is provided, this method
+   * will sleep for a random value between 1-100 milliseconds with a probability of 0.2.
    *
    * @param debugActions the debug actions with the format given in the description
    *                     above.
@@ -108,7 +113,7 @@ public class DebugUtils {
           "Invalid debug action " + action);
       List<String> actionParams = Splitter.on('@').splitToList(components.get(1));
       Preconditions.checkState(actionParams.size() > 1,
-          "Illegal debug action format found in " + debugActions + " for label"
+          "Illegal debug action format found in " + debugActions + " for label "
               + label);
       switch (actionParams.get(0)) {
         case "SLEEP":
@@ -147,6 +152,27 @@ public class DebugUtils {
             LOG.warn("Sleep interrupted for the debug action {}", label);
           }
           break;
+        case "EXCEPTION":
+          // the EXCEPTION debug action is of format EXCEPTION@<exception_type>@parameter
+          Preconditions.checkState(actionParams.size() == 3,
+              "EXCEPTION debug action needs 3 action params");
+          String exceptionClazz = actionParams.get(1);
+          String param = actionParams.get(2);
+          RuntimeException exceptionToThrow = null;
+          switch (exceptionClazz.toLowerCase()) {
+            case "commitfailedexception":
+              exceptionToThrow = new CommitFailedException(param);
+              break;
+            default:
+              LOG.error("Debug action exception class {} is not implemented",
+                  exceptionClazz);
+              break;
+          }
+          if (exceptionToThrow != null) {
+            LOG.info("Throwing DebugAction exception of class {}", exceptionClazz);
+            throw exceptionToThrow;
+          }
+          break;
         default:
           LOG.error("Debug action {} is not implemented", actionParams.get(0));
       }
diff --git a/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
index 9a19c844b..4aaba9b37 100644
--- a/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
@@ -64,4 +64,40 @@ public class DebugUtilsTest {
     DebugUtils.executeDebugAction("TEST_JITTER_ACTION@JITTER:1", "test_jitter_action");
     DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER", "test_jitter_action");
   }
-}
+
+
+  /**
+   * Test the EXCEPTION DebugAction.
+   */
+  @Test
+  public void testException() {
+    try {
+      DebugUtils.executeDebugAction(
+          "TEST_FAIL_ACTION:EXCEPTION@CommitFailedException@some text",
+          "test_fail_action");
+      Assert.fail("should have got exception");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getClass().getName().contains("CommitFailedException"));
+      Assert.assertTrue(e.getMessage().contains("some text"));
+    }
+  }
+
+  /**
+   * Negative test for the EXCEPTION DebugAction.
+   */
+  @Test
+  public void testExceptionNegative() {
+    // Unimplemented Exception type. This logs an error but does not fail.
+    DebugUtils.executeDebugAction(
+        "TEST_FAIL_ACTION:EXCEPTION@LocalCatalogException@some text",
+        "test_fail_action");
+    try {
+      // No exception text specified in debug action string.
+      DebugUtils.executeDebugAction(
+          "TEST_FAIL_ACTION:EXCEPTION@CommitFailedException", "test_fail_action");
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("EXCEPTION debug action needs 3 action params"));
+    }
+  }
+}
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 45ae3bdb1..43f7974ba 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -28,6 +28,7 @@ import re
 import time
 
 from subprocess import check_call
+# noinspection PyUnresolvedReferences
 from parquet.ttypes import ConvertedType
 
 from avro.datafile import DataFileReader
@@ -1099,6 +1100,51 @@ class TestIcebergTable(IcebergTestSuite):
       self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables',
                          vector, unique_database)
 
+  def test_abort_transaction(self, unique_database):
+    """Test that iceberg operations fail correctly when an Iceberg transaction commit
+    fails, and that the effects of the failed operation are not visible."""
+    tbl_name = unique_database + ".abort_iceberg_transaction"
+    # The query options that inject an iceberg transaction commit failure.
+    abort_ice_transaction_options = {'debug_action':
+                       'CATALOGD_ICEBERG_COMMIT:EXCEPTION@'
+                       'CommitFailedException@'
+                       'simulated commit failure'}
+    # Create an iceberg table and insert a row.
+    self.client.execute("""create table {0} (i int)
+        stored as iceberg""".format(tbl_name))
+    self.execute_query_expect_success(self.client,
+        "insert into {0} values (1);".format(tbl_name))
+
+    # Run a query that would insert a row, but pass the query options that
+    # will cause the iceberg transaction to abort.
+    err = self.execute_query_expect_failure(self.client,
+        "insert into {0} values (2);".format(tbl_name),
+        query_options=abort_ice_transaction_options)
+    # Check that the error message looks reasonable.
+    result = str(err)
+    assert "Query aborted:CommitFailedException: simulated commit failure" in result
+    # Check that no data was inserted.
+    data = self.execute_query_expect_success(self.client,
+        "select * from {0}".format(tbl_name))
+    assert data.column_labels == ['I']
+    assert len(data.data) == 1
+    assert data.data[0] == '1'
+
+    # Run a query that would add a column to the table, but pass the query options that
+    # will cause the iceberg transaction to abort.
+    ddl_err = self.execute_query_expect_failure(self.client,
+        "alter table {0} add column {1} bigint"
+        .format(tbl_name, "j"), query_options=abort_ice_transaction_options)
+    ddl_result = str(ddl_err)
+    # Check that the error message looks reasonable.
+    assert "Query aborted:CommitFailedException: simulated commit failure" in ddl_result
+    # Check that no column was added.
+    data = self.execute_query_expect_success(self.client,
+        "select * from {0}".format(tbl_name))
+    assert data.column_labels == ['I']
+    assert len(data.data) == 1
+    assert data.data[0] == '1'
+
 
 class TestIcebergV2Table(IcebergTestSuite):
   """Tests related to Iceberg V2 tables."""