You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/14 03:26:50 UTC

(impala) 01/02: IMPALA-12229: Support soft-delete Kudu table

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 45682c132ffb6c5511e40e949a2c8d4008f17e7d
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Tue Dec 12 10:17:06 2023 +0800

    IMPALA-12229: Support soft-delete Kudu table
    
    Adds 'kudu_table_reserve_seconds' query option to set reserved time
    for deleted Impala managed Kudu tables. The default value is 0.
    This option can prevent users from deleting important Kudu tables
    by mistake.
    
    Testing:
    - Added e2e tests.
    
    Change-Id: I3020567bb6cfe4dd48ef17906f8de674f37217e7
    Reviewed-on: http://gerrit.cloudera.org:8080/20773
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    |  7 +++
 be/src/service/query-options.h                     |  4 +-
 common/thrift/CatalogService.thrift                |  5 ++
 common/thrift/ImpalaService.thrift                 |  5 ++
 common/thrift/Query.thrift                         |  3 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 33 +++++++-----
 .../java/org/apache/impala/service/Frontend.java   |  2 +
 .../impala/service/KuduCatalogOpExecutor.java      |  9 ++--
 infra/python/deps/kudu-requirements.txt            |  2 +-
 tests/query_test/test_kudu.py                      | 59 ++++++++++++++++++++++
 10 files changed, 112 insertions(+), 17 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 062b9b6c2..4678a7d19 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1166,6 +1166,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_codegen_opt_level(enum_type);
         break;
       }
+      case TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_kudu_table_reserve_seconds(int32_t_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 68337d907..c420e0743 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::CODEGEN_OPT_LEVEL + 1);                                       \
+      TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS + 1);                              \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -311,6 +311,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(hdfs_scanner_non_reserved_bytes, HDFS_SCANNER_NON_RESERVED_BYTES,         \
       TQueryOptionLevel::ADVANCED)                                                       \
   QUERY_OPT_FN(codegen_opt_level, CODEGEN_OPT_LEVEL, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(kudu_table_reserve_seconds, KUDU_TABLE_RESERVE_SECONDS,                   \
+      TQueryOptionLevel::ADVANCED)                                                       \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 7d2282d3f..99d210d06 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -108,6 +108,11 @@ struct TDdlQueryOptions {
 
   // Maximum wait time on an HMS ACID lock in seconds.
   3: optional i32 lock_max_wait_time_s
+
+  // The reservation time (in seconds) for deleted impala-managed kudu table.
+  // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API.
+  // See KUDU-3326 for details.
+  4: optional i32 kudu_table_reserve_seconds
 }
 
 // Request for executing a DDL operation (CREATE, ALTER, DROP).
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 13c6c4e34..d2fb3a046 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -873,6 +873,11 @@ enum TImpalaQueryOptions {
   // Select codegen optimization level from O0, O1, Os, O2, or O3. Higher levels will
   // overwrite existing codegen cache entries.
   CODEGEN_OPT_LEVEL = 167
+
+  // The reservation time (in seconds) for deleted impala-managed Kudu tables.
+  // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API.
+  // See KUDU-3326 for details.
+  KUDU_TABLE_RESERVE_SECONDS = 168
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 39ba7f896..e6e5ded5a 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -678,6 +678,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   168: optional TCodeGenOptLevel codegen_opt_level = TCodeGenOptLevel.O2
+
+  // See comment in ImpalaService.thrift
+  169: optional i32 kudu_table_reserve_seconds = 0;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f89bfd40f..75fac245a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -520,7 +520,8 @@ public class CatalogOpExecutor {
           TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
           tTableName = Optional.of(new TTableName(drop_db_params.getDb(), ""));
           catalogOpTracker_.increment(ddlRequest, tTableName);
-          dropDatabase(drop_db_params, response);
+          dropDatabase(drop_db_params, response,
+              ddlRequest.getQuery_options().getKudu_table_reserve_seconds());
           break;
         case DROP_TABLE:
         case DROP_VIEW:
@@ -531,7 +532,8 @@ public class CatalogOpExecutor {
           // Dropped tables and views are already returned as minimal results, so don't
           // need to pass down wantMinimalResult here.
           dropTableOrView(drop_table_or_view_params, response,
-              ddlRequest.getQuery_options().getLock_max_wait_time_s());
+              ddlRequest.getQuery_options().getLock_max_wait_time_s(),
+              ddlRequest.getQuery_options().getKudu_table_reserve_seconds());
           break;
         case TRUNCATE_TABLE:
           TTruncateParams truncate_params = ddlRequest.getTruncate_params();
@@ -2652,8 +2654,8 @@ public class CatalogOpExecutor {
    * internal cache. Attempts to remove the HDFS cache directives of the underlying
    * tables. Re-throws any HMS exceptions encountered during the drop.
    */
-  private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
-      throws ImpalaException {
+  private void dropDatabase(TDropDbParams params, TDdlExecResponse resp,
+      int kudu_table_reserve_seconds) throws ImpalaException {
     Preconditions.checkNotNull(params);
     String dbName = params.getDb();
     Preconditions.checkState(dbName != null && !dbName.isEmpty(),
@@ -2677,7 +2679,9 @@ public class CatalogOpExecutor {
     getMetastoreDdlLock().lock();
     try {
       // Remove all the Kudu tables of 'db' from the Kudu storage engine.
-      if (db != null && params.cascade) dropTablesFromKudu(db);
+      if (db != null && params.cascade) {
+        dropTablesFromKudu(db, kudu_table_reserve_seconds);
+      }
 
       // The Kudu tables in the HMS should have been dropped at this point
       // with the Hive Metastore integration enabled.
@@ -2764,7 +2768,8 @@ public class CatalogOpExecutor {
    * metadata for Kudu tables cannot be loaded from HMS or if an error occurs while
    * trying to drop a table from Kudu.
    */
-  private void dropTablesFromKudu(Db db) throws ImpalaException {
+  private void dropTablesFromKudu(Db db, int kudu_table_reserve_seconds)
+      throws ImpalaException {
     // If the table format isn't available, because the table hasn't been loaded yet,
     // the metadata must be fetched from the Hive Metastore.
     List<String> incompleteTableNames = Lists.newArrayList();
@@ -2793,7 +2798,8 @@ public class CatalogOpExecutor {
       // some reason Kudu is permanently stuck in a non-functional state, the user is
       // expected to ALTER TABLE to either set the table to UNMANAGED or set the format
       // to something else.
-      KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true);
+      KuduCatalogOpExecutor.dropTable(
+        msTable, /*if exists*/ true, kudu_table_reserve_seconds);
     }
   }
 
@@ -2837,7 +2843,7 @@ public class CatalogOpExecutor {
    * executing the drop operation.
    */
   private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp,
-      int lockMaxWaitTime) throws ImpalaException {
+      int lockMaxWaitTime, int kudu_table_reserve_seconds) throws ImpalaException {
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
     Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists,
@@ -2882,7 +2888,7 @@ public class CatalogOpExecutor {
     }
 
     try {
-      dropTableOrViewInternal(params, tableName, resp);
+      dropTableOrViewInternal(params, tableName, resp, kudu_table_reserve_seconds);
     } finally {
       if (lockId > 0) catalog_.releaseTableLock(lockId);
     }
@@ -2892,7 +2898,8 @@ public class CatalogOpExecutor {
    * Helper function for dropTableOrView().
    */
   private void dropTableOrViewInternal(TDropTableOrViewParams params,
-      TableName tableName, TDdlExecResponse resp) throws ImpalaException {
+      TableName tableName, TDdlExecResponse resp, int kudu_table_reserve_seconds)
+      throws ImpalaException {
     TCatalogObject removedObject = new TCatalogObject();
     getMetastoreDdlLock().lock();
     try {
@@ -2950,7 +2957,8 @@ public class CatalogOpExecutor {
       boolean isSynchronizedKuduTable = msTbl != null &&
               KuduTable.isKuduTable(msTbl) && KuduTable.isSynchronizedTable(msTbl);
       if (isSynchronizedKuduTable) {
-        KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
+        KuduCatalogOpExecutor.dropTable(
+            msTbl, /* if exists */ true, kudu_table_reserve_seconds);
       }
 
       long eventId;
@@ -3654,7 +3662,8 @@ public class CatalogOpExecutor {
       try {
         // Error creating the table in HMS, drop the synchronized table from Kudu.
         if (!KuduTable.isSynchronizedTable(newTable)) {
-          KuduCatalogOpExecutor.dropTable(newTable, false);
+          KuduCatalogOpExecutor.dropTable(
+            newTable, /* if exists */ false, /* kudu_table_reserve_seconds */ 0);
         }
       } catch (Exception logged) {
         String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 543c756a4..c0633b83a 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -885,6 +885,8 @@ public class Frontend {
       }
       ddlQueryOpts.setLock_max_wait_time_s(
           result.getQuery_options().lock_max_wait_time_s);
+      ddlQueryOpts.setKudu_table_reserve_seconds(
+          result.getQuery_options().kudu_table_reserve_seconds);
       ddl.getDdl_params().setQuery_options(ddlQueryOpts);
     } else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
       ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 738e21fbf..f725ab0a9 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -324,10 +324,13 @@ public class KuduCatalogOpExecutor {
   /**
    * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a
    * TableNotFoundException is thrown. If the table exists and could not be dropped,
-   * an ImpalaRuntimeException is thrown.
+   * an ImpalaRuntimeException is thrown. If 'kudu_table_reserve_seconds' is 0, the
+   * table will be deleted immediately, otherwise the table will be reserved in the
+   * kudu cluster for 'kudu_table_reserve_seconds'.
    */
   public static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException {
+      boolean ifExists, int kudu_table_reserve_seconds)
+      throws ImpalaRuntimeException, TableNotFoundException {
     Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
     String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
     String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
@@ -341,7 +344,7 @@ public class KuduCatalogOpExecutor {
       // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
       // (see KUDU-1710).
       if (kudu.tableExists(tableName)) {
-        kudu.deleteTable(tableName);
+        kudu.deleteTable(tableName, kudu_table_reserve_seconds);
       } else if (!ifExists) {
         throw new TableNotFoundException(String.format(
             "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts));
diff --git a/infra/python/deps/kudu-requirements.txt b/infra/python/deps/kudu-requirements.txt
index bb5e2c429..893e9658b 100644
--- a/infra/python/deps/kudu-requirements.txt
+++ b/infra/python/deps/kudu-requirements.txt
@@ -19,4 +19,4 @@
 # and also depends on Cython being installed into the virtualenv, so it must be installed
 # after the toolchain is bootstrapped and all requirements in requirements.txt and
 # compiled-requirements.txt are installed into the virtualenv.
-kudu-python==1.14.0
+kudu-python==1.17.0
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 1472bd130..ea3c3388b 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -1258,6 +1258,40 @@ class TestDropDb(KuduTestSuite):
       assert kudu_client.table_exists(kudu_table.name)
       assert not kudu_client.table_exists(managed_table_name)
 
+  @SkipIfKudu.hms_integration_enabled
+  def test_soft_drop_db_cascade(self, unique_cursor, kudu_client):
+    """Check that an attempt to drop a database will succeed but the managed Kudu tables
+       are not removed immediately if 'kudu_table_reserve_seconds' is greater than 0.
+       These Kudu tables are in 'soft_deleted' state and can be recalled during the
+       reservation period.
+    """
+    db_name = unique_cursor.conn.db_name
+    table_name_pattern = "managed_kudu_table_"
+    for i in range(10):
+      managed_table_name = table_name_pattern + str(i)
+      unique_cursor.execute("""
+          CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3
+          STORED AS KUDU""" % managed_table_name)
+      kudu_tbl_name = KuduTestSuite.to_kudu_table_name(db_name, managed_table_name)
+      assert kudu_client.table_exists(kudu_tbl_name)
+
+    unique_cursor.execute("set kudu_table_reserve_seconds=300")
+    unique_cursor.execute("USE DEFAULT")
+    unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
+    unique_cursor.execute("SHOW DATABASES")
+    assert (db_name, '') not in unique_cursor.fetchall()
+
+    for i in range(10):
+      kudu_tbl_name = \
+          KuduTestSuite.to_kudu_table_name(db_name, table_name_pattern + str(i))
+      assert kudu_client.table_exists(kudu_tbl_name)
+      assert kudu_tbl_name not in kudu_client.list_tables()
+      assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
+      table = kudu_client.table(kudu_tbl_name)
+      kudu_client.recall_table(table.id)
+      assert kudu_tbl_name in kudu_client.list_tables()
+      assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
+
 class TestImpalaKuduIntegration(KuduTestSuite):
   @SkipIfKudu.hms_integration_enabled
   def test_replace_kudu_table(self, cursor, kudu_client):
@@ -1338,6 +1372,31 @@ class TestImpalaKuduIntegration(KuduTestSuite):
     cursor.execute("SHOW TABLES IN %s" % unique_database)
     assert (impala_tbl_name,) not in cursor.fetchall()
 
+  @SkipIfKudu.hms_integration_enabled
+  def test_soft_delete_kudu_table(self, cursor, kudu_client, unique_database):
+    """Check that the query option 'kudu_table_reserve_seconds' works for managed Kudu
+    table. If it is greater than 0, the underlying Kudu will not be deleted immediately.
+    During the reservation period, the Kudu table can be recalled."""
+    impala_tbl_name = "foo"
+    cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
+        PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
+    kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
+    assert kudu_client.table_exists(kudu_tbl_name)
+
+    cursor.execute("set kudu_table_reserve_seconds=300")
+    cursor.execute("DROP TABLE %s.%s" % (unique_database, impala_tbl_name))
+    cursor.execute("SHOW TABLES IN %s" % unique_database)
+    assert (impala_tbl_name,) not in cursor.fetchall()
+
+    assert kudu_client.table_exists(kudu_tbl_name)
+    assert kudu_tbl_name not in kudu_client.list_tables()
+    assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
+
+    table = kudu_client.table(kudu_tbl_name)
+    kudu_client.recall_table(table.id)
+    assert kudu_tbl_name in kudu_client.list_tables()
+    assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
+
 @SkipIfNotHdfsMinicluster.tuned_for_minicluster
 class TestKuduMemLimits(KuduTestSuite):