You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/19 06:39:57 UTC
(impala) branch master updated (b372f87b6 -> 5dfcdf1c9)
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
from b372f87b6 IMPALA-12708: An UPDATE creates 2 new snapshots in Iceberg tables
new 526d6eb68 IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
new 5dfcdf1c9 IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../impala/catalog/events/MetastoreEvents.java | 53 ++++++++++++++++----
tests/custom_cluster/test_events_custom_configs.py | 58 ++++++++++++++++++++++
tests/custom_cluster/test_web_pages.py | 8 +--
3 files changed, 106 insertions(+), 13 deletions(-)
(impala) 01/02: IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 526d6eb68f202c5172d608f661979a40a92c2261
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Jan 11 14:01:31 2024 -0800
IMPALA-12719: Reload filemetadata for AlterTable event of type truncate
When the event processor receives an alter table event and the event
type is of truncate operation currently file metadata is not reloaded.
This patch addresses this issue, where the alter table event type is
verified if it's a truncate operation and then reload the file metadata
accordingly.
Note: Alter table event for an external table generated by the truncate
operation from Impala cannot be identified if it's a truncate op or not
This becomes an issue in multi cluster Impala environments where events
generated from one impala cluster is consumed by other impala clusters.
Truncate operations in Impala on replicated tables will generated alter
event with 'isTruncateOp' field set to true.
Testing:
Added an end-to-end test to verify whether file metadata is reloaded
for the above scenario.
Change-Id: I53bb80c294623eec7c79d9f30f410771386c6b75
Reviewed-on: http://gerrit.cloudera.org:8080/20887
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../impala/catalog/events/MetastoreEvents.java | 53 ++++++++++++++++----
tests/custom_cluster/test_events_custom_configs.py | 58 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 10 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 6dcd86420..887404d36 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1451,6 +1451,8 @@ public class MetastoreEvents {
private final Boolean eventSyncAfterFlag_;
// value of the db flag at the time of event creation
private final boolean dbFlagVal;
+ // true if this alter event was due to a truncate operation in metastore
+ private final boolean isTruncateOp_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1467,6 +1469,7 @@ public class MetastoreEvents {
msTbl_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
tableAfter_ = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
tableBefore_ = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+ isTruncateOp_ = alterTableMessage.getIsTruncateOp();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter table message"), e);
@@ -1563,7 +1566,8 @@ public class MetastoreEvents {
+ "which can be ignored.");
return;
}
- skipFileMetadataReload_ = canSkipFileMetadataReload(tableBefore_, tableAfter_);
+ skipFileMetadataReload_ = !isTruncateOp_ && canSkipFileMetadataReload(tableBefore_,
+ tableAfter_);
long startNs = System.nanoTime();
if (wasEventSyncTurnedOn()) {
handleEventSyncTurnedOn();
@@ -1716,6 +1720,11 @@ public class MetastoreEvents {
// event by setting those parameters equal before and after the event and
// comparing the objects.
+ // alter table event from truncate ops always can't be skipped.
+ if (isTruncateOp_) {
+ return false;
+ }
+
// Avoid modifying the object from event.
org.apache.hadoop.hive.metastore.api.Table tblAfter = tableAfter_.deepCopy();
setTrivialParameters(tableBefore_.getParameters(), tblAfter.getParameters());
@@ -2190,6 +2199,8 @@ public class MetastoreEvents {
private final long versionNumberFromEvent_;
// the service id from the partition parameters of the event.
private final String serviceIdFromEvent_;
+ // true if this alter event was due to a truncate operation in metastore
+ private final boolean isTruncateOp_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -2208,6 +2219,7 @@ public class MetastoreEvents {
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
partitionAfter_ =
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
+ isTruncateOp_ = alterPartitionMessage.getIsTruncateOp();
msTbl_ = alterPartitionMessage.getTableObj();
Map<String, String> parameters = partitionAfter_.getParameters();
versionNumberFromEvent_ = Long.parseLong(
@@ -2302,9 +2314,13 @@ public class MetastoreEvents {
partitionAfter_);
try {
// load file metadata only if storage descriptor of partitionAfter_ differs
- // from sd of HdfsPartition
- reloadPartitions(Arrays.asList(partitionAfter_),
- FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, "ALTER_PARTITION event");
+ // from sd of HdfsPartition. If the alter_partition event type is of truncate
+ // then force load the file metadata.
+ FileMetadataLoadOpts fileMetadataLoadOpts =
+ isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
+ FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
+ reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts,
+ "ALTER_PARTITION event");
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(
debugString("Refresh partition on table {} partition {} failed. Event " +
@@ -2323,6 +2339,11 @@ public class MetastoreEvents {
// event by setting those parameters equal before and after the event and
// comparing the objects.
+ // alter partition event from truncate ops always can't be skipped.
+ if (isTruncateOp_) {
+ return false;
+ }
+
// Avoid modifying the object from event.
Partition afterPartition = partitionAfter_.deepCopy();
setTrivialParameters(partitionBefore_.getParameters(),
@@ -2426,17 +2447,22 @@ public class MetastoreEvents {
// Ignore the event if this is a trivial event. See javadoc for
// isTrivialAlterPartitionEvent() for examples.
List<T> eventsToProcess = new ArrayList<>();
+ List<Partition> partitionEventsToForceReload = new ArrayList<>();
for (T event : batchedEvents_) {
if (isOlderEvent(event.getPartitionForBatching())) {
infoLog("Not processing the current event id {} as it is an older event",
event.getEventId());
continue;
}
- if (!event.canBeSkipped()) {
+ boolean isTruncateOp = (event instanceof AlterPartitionEvent &&
+ ((AlterPartitionEvent)event).isTruncateOp_);
+ if (isTruncateOp) {
+ partitionEventsToForceReload.add(event.getPartitionForBatching());
+ } else if (!event.canBeSkipped()){
eventsToProcess.add(event);
}
}
- if (eventsToProcess.isEmpty()) {
+ if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) {
LOG.info(
"Ignoring events from event id {} to {} since they modify parameters "
+ " which can be ignored", getFirstEventId(), getLastEventId());
@@ -2459,10 +2485,17 @@ public class MetastoreEvents {
reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD,
getEventType().toString() + " event");
} else {
- // alter partition event. Reload file metadata of only those partitions
- // for which sd has changed
- reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
- getEventType().toString() + " event");
+ if (!partitionEventsToForceReload.isEmpty()) {
+ // force reload truncated partitions
+ reloadPartitions(partitionEventsToForceReload,
+ FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + " event");
+ }
+ if (!partitions.isEmpty()) {
+ // alter partition event. Reload file metadata of only those partitions
+ // for which sd has changed
+ reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
+ getEventType().toString() + " event");
+ }
}
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(String.format(
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index aaebde2b9..09ab84b58 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1086,3 +1086,61 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
assert parts_added_before == parts_added_after
assert parts_refreshed_before == parts_refreshed_after
assert events_skipped_after > events_skipped_before
+
+ @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5")
+ def test_truncate_table_from_hive(self, unique_database):
+ """IMPALA-12636: verify truncate table from hive reloads file metadata in Impala"""
+ hive_tbl = "tbl_in_hive"
+ values = "values (10),(20),(30)"
+
+ def verify_truncate_op_in_hive(tbl_name, is_transactional, is_partitioned,
+ is_batched):
+ create_query = " ".join(["create", "table `{}`.`{}` (i int)",
+ " partitioned by (year int) " if is_partitioned else '',
+ self.__get_transactional_tblproperties(is_transactional)])
+ self.execute_query(create_query.format(unique_database, tbl_name))
+ insert_query = " ".join(["insert into `{}`.`{}`", "partition (year=2024)"
+ if is_partitioned else '', values])
+ self.run_stmt_in_hive(insert_query.format(unique_database, tbl_name))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("refresh {}.{}".format(unique_database, tbl_name))
+ truncate_query = " ".join(["truncate table `{}`.`{}`", "partition (year=2024)"
+ if is_partitioned else ''])
+ self.run_stmt_in_hive(truncate_query.format(unique_database, tbl_name))
+ if is_batched:
+ self.run_stmt_in_hive(
+ "insert into {}.{} partition (year=2024) values (1),(2)"
+ .format(unique_database, tbl_name))
+ EventProcessorUtils.wait_for_event_processing(self)
+ data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+ unique_database, tbl_name)))
+ assert data == 2 if is_batched else data == 0
+ self.client.execute("drop table {}.{}".format(unique_database, tbl_name))
+ # Case-I: truncate single partition
+ verify_truncate_op_in_hive(hive_tbl, False, False, False)
+ verify_truncate_op_in_hive(hive_tbl, True, False, False)
+ verify_truncate_op_in_hive(hive_tbl, False, True, False)
+ verify_truncate_op_in_hive(hive_tbl, False, True, True)
+ verify_truncate_op_in_hive(hive_tbl, True, True, False)
+ verify_truncate_op_in_hive(hive_tbl, True, True, True)
+
+ # Case-II: truncate partition in multi partition
+ hive_tbl = "multi_part_tbl"
+ self.client.execute("create table {}.{} (i int) partitioned by "
+ "(p int, q int)".format(unique_database, hive_tbl))
+ self.client.execute("insert into {}.{} partition(p, q) values "
+ "(0,0,0), (0,0,1), (0,0,2)".format(unique_database, hive_tbl))
+ self.client.execute("insert into {}.{} partition(p, q) values "
+ "(0,1,0), (0,1,1)".format(unique_database, hive_tbl))
+ self.run_stmt_in_hive("truncate table {}.{} partition(p=0)"
+ .format(unique_database, hive_tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+ unique_database, hive_tbl)))
+ assert data == 2
+ self.run_stmt_in_hive("truncate table {}.{}"
+ .format(unique_database, hive_tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
+ unique_database, hive_tbl)))
+ assert data == 0
(impala) 02/02: IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5dfcdf1c9538b69e25b9bde1e86c209595b218b5
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jan 19 07:36:19 2024 +0800
IMPALA-12716: Fix timeout thresholds in test_catalog_operations_with_rpc_retry
test_catalog_operations_with_rpc_retry uses a short timeout which could
lead to failures in the first DESCRIBE statement. What it expects is the
next REFRESH statement failed by the RPC timeout.
The DESCRIBE statement triggers a catalog RPC of PrioritizeLoad on the
table. The RPC usually finishes in 40ms. This patch bumps the RPC
timeout of the test to be 100ms so it's long enough for DESCRIBE to
succeed. Also bumps the sleep time in the REFRESH statement so its
ExecDdl RPC will always time out in 100ms.
Tests:
- Ran the test till night (2700 times) and all passed.
Change-Id: Ibbfa79d7f7530af4cfbb6f8ebc55e8267e3d3261
Reviewed-on: http://gerrit.cloudera.org:8080/20924
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/custom_cluster/test_web_pages.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 2c5b87015..2bd40764a 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -330,17 +330,19 @@ class TestWebPage(CustomClusterTestSuite):
assert op["target_name"] == unique_database
@CustomClusterTestSuite.with_args(
- impalad_args="--catalog_client_rpc_timeout_ms=10 "
+ impalad_args="--catalog_client_rpc_timeout_ms=100 "
"--catalog_client_rpc_retry_interval_ms=10 "
"--catalog_client_connection_num_retries=2")
def test_catalog_operations_with_rpc_retry(self):
"""Test that catalog RPC retries are all shown in the /operations page"""
# Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will
- # time out in its real work.
+ # time out in its real work. This triggers a PrioritizeLoad RPC which usually
+ # finishes in 40ms. So 100ms for catalog RPC timeout is enough.
self.execute_query("describe functional.alltypes")
try:
+ # This runs around 600ms with the debug action so the catalog RPC will timeout.
self.execute_query("refresh functional.alltypes", {
- "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+ "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@100"
})
except ImpalaBeeswaxException as e:
assert "RPC recv timed out" in str(e)