You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/14 03:26:50 UTC
(impala) 01/02: IMPALA-12229: Support soft-delete Kudu table
This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 45682c132ffb6c5511e40e949a2c8d4008f17e7d
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Tue Dec 12 10:17:06 2023 +0800
IMPALA-12229: Support soft-delete Kudu table
Adds 'kudu_table_reserve_seconds' query option to set reserved time
for deleted Impala managed Kudu tables. The default value is 0.
This option can prevent users from deleting important Kudu tables
by mistake.
Testing:
- Added e2e tests.
Change-Id: I3020567bb6cfe4dd48ef17906f8de674f37217e7
Reviewed-on: http://gerrit.cloudera.org:8080/20773
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/service/query-options.cc | 7 +++
be/src/service/query-options.h | 4 +-
common/thrift/CatalogService.thrift | 5 ++
common/thrift/ImpalaService.thrift | 5 ++
common/thrift/Query.thrift | 3 ++
.../apache/impala/service/CatalogOpExecutor.java | 33 +++++++-----
.../java/org/apache/impala/service/Frontend.java | 2 +
.../impala/service/KuduCatalogOpExecutor.java | 9 ++--
infra/python/deps/kudu-requirements.txt | 2 +-
tests/query_test/test_kudu.py | 59 ++++++++++++++++++++++
10 files changed, 112 insertions(+), 17 deletions(-)
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 062b9b6c2..4678a7d19 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1166,6 +1166,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_codegen_opt_level(enum_type);
break;
}
+ case TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS: {
+ int32_t int32_t_val = 0;
+ RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+ option, value, &int32_t_val));
+ query_options->__set_kudu_table_reserve_seconds(int32_t_val);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 68337d907..c420e0743 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \
- TImpalaQueryOptions::CODEGEN_OPT_LEVEL + 1); \
+ TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS + 1); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \
@@ -311,6 +311,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(hdfs_scanner_non_reserved_bytes, HDFS_SCANNER_NON_RESERVED_BYTES, \
TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(codegen_opt_level, CODEGEN_OPT_LEVEL, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(kudu_table_reserve_seconds, KUDU_TABLE_RESERVE_SECONDS, \
+ TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 7d2282d3f..99d210d06 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -108,6 +108,11 @@ struct TDdlQueryOptions {
// Maximum wait time on an HMS ACID lock in seconds.
3: optional i32 lock_max_wait_time_s
+
+ // The reservation time (in seconds) for deleted impala-managed kudu table.
+ // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API.
+ // See KUDU-3326 for details.
+ 4: optional i32 kudu_table_reserve_seconds
}
// Request for executing a DDL operation (CREATE, ALTER, DROP).
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 13c6c4e34..d2fb3a046 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -873,6 +873,11 @@ enum TImpalaQueryOptions {
// Select codegen optimization level from O0, O1, Os, O2, or O3. Higher levels will
// overwrite existing codegen cache entries.
CODEGEN_OPT_LEVEL = 167
+
+ // The reservation time (in seconds) for deleted impala-managed Kudu tables.
+ // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API.
+ // See KUDU-3326 for details.
+ KUDU_TABLE_RESERVE_SECONDS = 168
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 39ba7f896..e6e5ded5a 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -678,6 +678,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
168: optional TCodeGenOptLevel codegen_opt_level = TCodeGenOptLevel.O2
+
+ // See comment in ImpalaService.thrift
+ 169: optional i32 kudu_table_reserve_seconds = 0;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f89bfd40f..75fac245a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -520,7 +520,8 @@ public class CatalogOpExecutor {
TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
tTableName = Optional.of(new TTableName(drop_db_params.getDb(), ""));
catalogOpTracker_.increment(ddlRequest, tTableName);
- dropDatabase(drop_db_params, response);
+ dropDatabase(drop_db_params, response,
+ ddlRequest.getQuery_options().getKudu_table_reserve_seconds());
break;
case DROP_TABLE:
case DROP_VIEW:
@@ -531,7 +532,8 @@ public class CatalogOpExecutor {
// Dropped tables and views are already returned as minimal results, so don't
// need to pass down wantMinimalResult here.
dropTableOrView(drop_table_or_view_params, response,
- ddlRequest.getQuery_options().getLock_max_wait_time_s());
+ ddlRequest.getQuery_options().getLock_max_wait_time_s(),
+ ddlRequest.getQuery_options().getKudu_table_reserve_seconds());
break;
case TRUNCATE_TABLE:
TTruncateParams truncate_params = ddlRequest.getTruncate_params();
@@ -2652,8 +2654,8 @@ public class CatalogOpExecutor {
* internal cache. Attempts to remove the HDFS cache directives of the underlying
* tables. Re-throws any HMS exceptions encountered during the drop.
*/
- private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
- throws ImpalaException {
+ private void dropDatabase(TDropDbParams params, TDdlExecResponse resp,
+ int kudu_table_reserve_seconds) throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
@@ -2677,7 +2679,9 @@ public class CatalogOpExecutor {
getMetastoreDdlLock().lock();
try {
// Remove all the Kudu tables of 'db' from the Kudu storage engine.
- if (db != null && params.cascade) dropTablesFromKudu(db);
+ if (db != null && params.cascade) {
+ dropTablesFromKudu(db, kudu_table_reserve_seconds);
+ }
// The Kudu tables in the HMS should have been dropped at this point
// with the Hive Metastore integration enabled.
@@ -2764,7 +2768,8 @@ public class CatalogOpExecutor {
* metadata for Kudu tables cannot be loaded from HMS or if an error occurs while
* trying to drop a table from Kudu.
*/
- private void dropTablesFromKudu(Db db) throws ImpalaException {
+ private void dropTablesFromKudu(Db db, int kudu_table_reserve_seconds)
+ throws ImpalaException {
// If the table format isn't available, because the table hasn't been loaded yet,
// the metadata must be fetched from the Hive Metastore.
List<String> incompleteTableNames = Lists.newArrayList();
@@ -2793,7 +2798,8 @@ public class CatalogOpExecutor {
// some reason Kudu is permanently stuck in a non-functional state, the user is
// expected to ALTER TABLE to either set the table to UNMANAGED or set the format
// to something else.
- KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true);
+ KuduCatalogOpExecutor.dropTable(
+ msTable, /*if exists*/ true, kudu_table_reserve_seconds);
}
}
@@ -2837,7 +2843,7 @@ public class CatalogOpExecutor {
* executing the drop operation.
*/
private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp,
- int lockMaxWaitTime) throws ImpalaException {
+ int lockMaxWaitTime, int kudu_table_reserve_seconds) throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getTable_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists,
@@ -2882,7 +2888,7 @@ public class CatalogOpExecutor {
}
try {
- dropTableOrViewInternal(params, tableName, resp);
+ dropTableOrViewInternal(params, tableName, resp, kudu_table_reserve_seconds);
} finally {
if (lockId > 0) catalog_.releaseTableLock(lockId);
}
@@ -2892,7 +2898,8 @@ public class CatalogOpExecutor {
* Helper function for dropTableOrView().
*/
private void dropTableOrViewInternal(TDropTableOrViewParams params,
- TableName tableName, TDdlExecResponse resp) throws ImpalaException {
+ TableName tableName, TDdlExecResponse resp, int kudu_table_reserve_seconds)
+ throws ImpalaException {
TCatalogObject removedObject = new TCatalogObject();
getMetastoreDdlLock().lock();
try {
@@ -2950,7 +2957,8 @@ public class CatalogOpExecutor {
boolean isSynchronizedKuduTable = msTbl != null &&
KuduTable.isKuduTable(msTbl) && KuduTable.isSynchronizedTable(msTbl);
if (isSynchronizedKuduTable) {
- KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
+ KuduCatalogOpExecutor.dropTable(
+ msTbl, /* if exists */ true, kudu_table_reserve_seconds);
}
long eventId;
@@ -3654,7 +3662,8 @@ public class CatalogOpExecutor {
try {
// Error creating the table in HMS, drop the synchronized table from Kudu.
if (!KuduTable.isSynchronizedTable(newTable)) {
- KuduCatalogOpExecutor.dropTable(newTable, false);
+ KuduCatalogOpExecutor.dropTable(
+ newTable, /* if exists */ false, /* kudu_table_reserve_seconds */ 0);
}
} catch (Exception logged) {
String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 543c756a4..c0633b83a 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -885,6 +885,8 @@ public class Frontend {
}
ddlQueryOpts.setLock_max_wait_time_s(
result.getQuery_options().lock_max_wait_time_s);
+ ddlQueryOpts.setKudu_table_reserve_seconds(
+ result.getQuery_options().kudu_table_reserve_seconds);
ddl.getDdl_params().setQuery_options(ddlQueryOpts);
} else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 738e21fbf..f725ab0a9 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -324,10 +324,13 @@ public class KuduCatalogOpExecutor {
/**
* Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a
* TableNotFoundException is thrown. If the table exists and could not be dropped,
- * an ImpalaRuntimeException is thrown.
+ * an ImpalaRuntimeException is thrown. If 'kudu_table_reserve_seconds' is 0, the
+ * table will be deleted immediately, otherwise the table will be reserved in the
+ * kudu cluster for 'kudu_table_reserve_seconds'.
*/
public static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
- boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException {
+ boolean ifExists, int kudu_table_reserve_seconds)
+ throws ImpalaRuntimeException, TableNotFoundException {
Preconditions.checkState(KuduTable.isSynchronizedTable(msTbl));
String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
@@ -341,7 +344,7 @@ public class KuduCatalogOpExecutor {
// TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
// (see KUDU-1710).
if (kudu.tableExists(tableName)) {
- kudu.deleteTable(tableName);
+ kudu.deleteTable(tableName, kudu_table_reserve_seconds);
} else if (!ifExists) {
throw new TableNotFoundException(String.format(
"Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts));
diff --git a/infra/python/deps/kudu-requirements.txt b/infra/python/deps/kudu-requirements.txt
index bb5e2c429..893e9658b 100644
--- a/infra/python/deps/kudu-requirements.txt
+++ b/infra/python/deps/kudu-requirements.txt
@@ -19,4 +19,4 @@
# and also depends on Cython being installed into the virtualenv, so it must be installed
# after the toolchain is bootstrapped and all requirements in requirements.txt and
# compiled-requirements.txt are installed into the virtualenv.
-kudu-python==1.14.0
+kudu-python==1.17.0
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 1472bd130..ea3c3388b 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -1258,6 +1258,40 @@ class TestDropDb(KuduTestSuite):
assert kudu_client.table_exists(kudu_table.name)
assert not kudu_client.table_exists(managed_table_name)
+ @SkipIfKudu.hms_integration_enabled
+ def test_soft_drop_db_cascade(self, unique_cursor, kudu_client):
+ """Check that an attempt to drop a database will succeed but the managed Kudu tables
+ are not removed immediately if 'kudu_table_reserve_seconds' is greater than 0.
+ These Kudu tables are in 'soft_deleted' state and can be recalled during the
+ reservation period.
+ """
+ db_name = unique_cursor.conn.db_name
+ table_name_pattern = "managed_kudu_table_"
+ for i in range(10):
+ managed_table_name = table_name_pattern + str(i)
+ unique_cursor.execute("""
+ CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3
+ STORED AS KUDU""" % managed_table_name)
+ kudu_tbl_name = KuduTestSuite.to_kudu_table_name(db_name, managed_table_name)
+ assert kudu_client.table_exists(kudu_tbl_name)
+
+ unique_cursor.execute("set kudu_table_reserve_seconds=300")
+ unique_cursor.execute("USE DEFAULT")
+ unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
+ unique_cursor.execute("SHOW DATABASES")
+ assert (db_name, '') not in unique_cursor.fetchall()
+
+ for i in range(10):
+ kudu_tbl_name = \
+ KuduTestSuite.to_kudu_table_name(db_name, table_name_pattern + str(i))
+ assert kudu_client.table_exists(kudu_tbl_name)
+ assert kudu_tbl_name not in kudu_client.list_tables()
+ assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
+ table = kudu_client.table(kudu_tbl_name)
+ kudu_client.recall_table(table.id)
+ assert kudu_tbl_name in kudu_client.list_tables()
+ assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
+
class TestImpalaKuduIntegration(KuduTestSuite):
@SkipIfKudu.hms_integration_enabled
def test_replace_kudu_table(self, cursor, kudu_client):
@@ -1338,6 +1372,31 @@ class TestImpalaKuduIntegration(KuduTestSuite):
cursor.execute("SHOW TABLES IN %s" % unique_database)
assert (impala_tbl_name,) not in cursor.fetchall()
+ @SkipIfKudu.hms_integration_enabled
+ def test_soft_delete_kudu_table(self, cursor, kudu_client, unique_database):
+ """Check that the query option 'kudu_table_reserve_seconds' works for managed Kudu
+ table. If it is greater than 0, the underlying Kudu will not be deleted immediately.
+ During the reservation period, the Kudu table can be recalled."""
+ impala_tbl_name = "foo"
+ cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
+ PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
+ kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
+ assert kudu_client.table_exists(kudu_tbl_name)
+
+ cursor.execute("set kudu_table_reserve_seconds=300")
+ cursor.execute("DROP TABLE %s.%s" % (unique_database, impala_tbl_name))
+ cursor.execute("SHOW TABLES IN %s" % unique_database)
+ assert (impala_tbl_name,) not in cursor.fetchall()
+
+ assert kudu_client.table_exists(kudu_tbl_name)
+ assert kudu_tbl_name not in kudu_client.list_tables()
+ assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
+
+ table = kudu_client.table(kudu_tbl_name)
+ kudu_client.recall_table(table.id)
+ assert kudu_tbl_name in kudu_client.list_tables()
+ assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
+
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
class TestKuduMemLimits(KuduTestSuite):