You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Quanlong Huang (Jira)" <ji...@apache.org> on 2024/03/12 13:37:00 UTC

[jira] [Resolved] (IMPALA-12831) HdfsTable.toMinimalTCatalogObject() should hold table read lock to generate incremental updates

     [ https://issues.apache.org/jira/browse/IMPALA-12831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Quanlong Huang resolved IMPALA-12831.
-------------------------------------
    Fix Version/s: Impala 4.4.0
       Resolution: Fixed

> HdfsTable.toMinimalTCatalogObject() should hold table read lock to generate incremental updates
> -----------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-12831
>                 URL: https://issues.apache.org/jira/browse/IMPALA-12831
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Catalog
>            Reporter: Quanlong Huang
>            Assignee: Quanlong Huang
>            Priority: Critical
>             Fix For: Impala 4.4.0
>
>
> When enable_incremental_metadata_updates=true (default), catalogd sends incremental partition updates to coordinators, which goes into HdfsTable.toMinimalTCatalogObject():
> {code:java}
>   public TCatalogObject toMinimalTCatalogObject() {
>     TCatalogObject catalogObject = super.toMinimalTCatalogObject();
>     if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
>       return catalogObject;
>     }    
>     catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
>     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
>         nullPartitionKeyValue_, nullColumnValue_,
>         /*idToPartition=*/ new HashMap<>(),
>         /*prototypePartition=*/ new THdfsPartition());
>     for (HdfsPartition part : partitionMap_.values()) {
>       hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
>     }    
>     hdfsTable.setHas_full_partitions(false);
>     // The minimal catalog object of partitions contain the partition names.
>     hdfsTable.setHas_partition_names(true);
>     catalogObject.getTable().setHdfs_table(hdfsTable);
>     return catalogObject;
>   }{code}
> Accessing table fields without holding the table read lock might be failed by concurrent DDLs. All workloads that use this method (e.g. INVALIDATE commands) could hit this issue. We've saw event-processor failed in processing a RELOAD event that want to invalidates an HdfsTable:
> {noformat}
> E0216 16:23:44.283689   253 MetastoreEventsProcessor.java:899] Unexpected exception received while processing event
> Java exception follows:
> java.util.ConcurrentModificationException
> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
> 	at java.util.ArrayList$Itr.next(ArrayList.java:861)
> 	at org.apache.impala.catalog.Column.toColumnNames(Column.java:148)
> 	at org.apache.impala.catalog.Table.getColumnNames(Table.java:844)
> 	at org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132)
> 	at org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2221)
> 	at org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2202)
> 	at org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2797)
> 	at org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.processTableInvalidate(MetastoreEvents.java:2734)
> 	at org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.process(MetastoreEvents.java:2656)
> 	at org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.processIfEnabled(MetastoreEvents.java:522)
> 	at org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1052)
> 	at org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:881)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:750){noformat}
> I can reproduce the issue using the following test:
> {code:python}
>   @CustomClusterTestSuite.with_args(
>     catalogd_args="--enable_incremental_metadata_updates=true")
>   def test_concurrent_invalidate_metadata_with_refresh(self, unique_database):
>     # Create a wide table with some partitions
>     tbl = unique_database + ".wide_tbl"
>     create_stmt = "create table {} (".format(tbl)
>     for i in range(600):
>       create_stmt += "col{} int, ".format(i)
>     create_stmt += "col600 int) partitioned by (p int) stored as textfile"
>     self.execute_query(create_stmt)
>     for i in range(10):
>       self.execute_query("alter table {} add partition (p={})".format(tbl, i))
>     refresh_stmt = "refresh " + tbl
>     handle = self.client.execute_async(refresh_stmt)
>     for i in range(10):
>       self.execute_query("invalidate metadata " + tbl)
>       # Always keep a concurrent REFRESH statement running
>       if self.client.get_state(handle) == self.client.QUERY_STATES['FINISHED']:
>         handle = self.client.execute_async(refresh_stmt){code}
> and see a similar exception:
> {noformat}
> E0222 10:44:40.912338  6833 JniUtil.java:183] da4099ef24bb1f03:01c8f5d200000000] Error in INVALIDATE TABLE test_concurrent_invalidate_metadata_with_refresh_65c57cb0.wide_tbl issued by quanlong. Time spent: 32ms 
> I0222 10:44:40.912528  6833 jni-util.cc:302] da4099ef24bb1f03:01c8f5d200000000] java.util.ConcurrentModificationException
>         at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
>         at java.util.ArrayList$Itr.next(ArrayList.java:861)
>         at org.apache.impala.catalog.Column.toColumnNames(Column.java:148)
>         at org.apache.impala.catalog.Table.getColumnNames(Table.java:875)
>         at org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132)
>         at org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2264)
>         at org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2245)
>         at org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2840)
>         at org.apache.impala.service.CatalogOpExecutor.execResetMetadataImpl(CatalogOpExecutor.java:6676)
>         at org.apache.impala.service.CatalogOpExecutor.execResetMetadata(CatalogOpExecutor.java:6612)
>         at org.apache.impala.service.JniCatalog.lambda$resetMetadata$4(JniCatalog.java:327)
>         at org.apache.impala.service.JniCatalogOp.lambda$execAndSerialize$1(JniCatalogOp.java:90)
>         at org.apache.impala.service.JniCatalogOp.execOp(JniCatalogOp.java:58)
>         at org.apache.impala.service.JniCatalogOp.execAndSerialize(JniCatalogOp.java:89)
>         at org.apache.impala.service.JniCatalogOp.execAndSerialize(JniCatalogOp.java:100)
>         at org.apache.impala.service.JniCatalog.execAndSerialize(JniCatalog.java:243)
>         at org.apache.impala.service.JniCatalog.execAndSerialize(JniCatalog.java:257)
>         at org.apache.impala.service.JniCatalog.resetMetadata(JniCatalog.java:326){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)