You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/15 01:16:55 UTC
[impala] 01/02: IMPALA-11721: Impala query keep being retried over frequently updated iceberg table
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit af46585801387a41153d4a957336d6f0101d09f7
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Nov 11 16:11:14 2022 +0100
IMPALA-11721: Impala query keep being retried over frequently updated iceberg table
Iceberg table loading can fail in local catalog mode if the table gets
updated frequently. This is what happens during table loading in local
catalog mode: every query starts with it's own empty local catalog.
Table metadata is fetched in multiple requests via a MetaProvider which
is always a CatalogdMetaProvider. CatalogdMetaProvider caches requests
and the cache key also includes the table's catalog version.
The Iceberg table is loaded by the following requests:
1 CatalogdMetaProvider.loadTable()
2 CatalogdMetaProvider.loadIcebergTable()
3 CatalogdMetaProvider.loadIcebergApiTable() # This actually directly
# loads the Iceberg table
# via Iceberg API
# (no CatalogD involved)
4 CatalogdMetaProvider.loadTableColumnStatistics()
5 CatalogdMetaProvider.loadPartitionList()
6 CatalogdMetaProvider.loadPartitionsByRefs()
Steps 1-4 happens during table loading, steps 5-6 happens during
planning. We cannot really reorder these invocations, but since
CatalogdMetaProvider caches these, only the very first invocations need
to reach out to CatalogD and check the table's catalog version.
Subsequent invocations, i.e. subsequent queries that use the Iceberg
table can use the cached metadata, and no need to check the catalog
version of the cached metadata since the cache key also includes
the catalog version, hence we have corresponding metadata in the cache.
This patch resolves the issue by pre-warming the metaprovider's cache
before issuing loadIcebergApiTable() so the CatalogdMetaProvider.load*()
operations can be served from cache.
So what happens when the metaprovider's cache gets invalidated due to
concurrent updates to the table and we are still processing the query?
No problem, only the top-level TableCacheKey gets invalidated. The
cache will still be able to answer the fine-grained load requests that
are keyed by the now outdated catalog version. E.g. ColStatsCacheKey
hashes db name, table name, catalog version, and column name as a key
in the cache. Therefore the current query processing can be finished
using a consistent state of the metadata. Subsequent queries will use
a newer version of the table.
Testing:
* modified test_insert_stress.py so it won't tolerate inconsistent
metadata fetch exceptions (Frontend already tolerates them
to some degree)
Change-Id: Iac28224b2b6d67725eeb17f3e9d813ba622edb43
Reviewed-on: http://gerrit.cloudera.org:8080/19234
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../impala/catalog/local/LocalIcebergTable.java | 42 ++++++++++++++++++----
tests/stress/test_insert_stress.py | 26 ++++----------
2 files changed, 42 insertions(+), 26 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 1813ce00e..219da5e85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -17,6 +17,7 @@
package org.apache.impala.catalog.local;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -33,8 +34,8 @@ import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergContentFileStore;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
import org.apache.impala.thrift.TCompressionCodec;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.THdfsTable;
@@ -87,6 +88,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
TableParams tableParams = new TableParams(msTable);
TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider()
.loadIcebergTable(ref);
+ LocalFsTable fsTable = LocalFsTable.load(db, msTable, ref);
+ warmupMetaProviderCache(db, msTable, ref, fsTable);
org.apache.iceberg.Table icebergApiTable = db.getCatalog().getMetaProvider()
.loadIcebergApiTable(ref, tableParams, msTable);
List<Column> iceColumns = IcebergSchemaConverter.convertToImpalaSchema(
@@ -96,8 +99,11 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
/*numClusteringCols=*/ 0,
db.getName() + "." + msTable.getTableName(),
/*isFullAcidSchema=*/false);
- return new LocalIcebergTable(db, msTable, ref, colMap, tableInfo, tableParams,
- icebergApiTable);
+ return new LocalIcebergTable(db, msTable, ref, fsTable, colMap, tableInfo,
+ tableParams, icebergApiTable);
+ } catch (InconsistentMetadataFetchException e) {
+ // Just rethrow this so the query can be retried by the Frontend.
+ throw e;
} catch (Exception e) {
String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
throw new TableLoadingException(
@@ -105,13 +111,37 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
}
}
+ /**
+ * Eagerly warmup metaprovider cache before calling loadIcebergApiTable(). So
+ * later they can be served from cache when really needed. Otherwise, if there are
+ * frequent updates to the table, CatalogD might refresh the Iceberg table during
+ * loadIcebergApiTable().
+ * If that happens, subsequent load*() calls via the metaprovider would
+ * fail due to InconsistentMetadataFetchException.
+ */
+ private static void warmupMetaProviderCache(LocalDb db, Table msTable, TableMetaRef ref,
+ LocalFsTable fsTable) throws Exception {
+ db.getCatalog().getMetaProvider().loadTableColumnStatistics(ref,
+ getHmsColumnNames(msTable));
+ FeCatalogUtils.loadAllPartitions(fsTable);
+ }
+
+ private static List<String> getHmsColumnNames(Table msTable) {
+ List<String> ret = new ArrayList<>();
+ for (FieldSchema fs : msTable.getSd().getCols()) {
+ ret.add(fs.getName());
+ }
+ return ret;
+ }
+
private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref,
- ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams,
- org.apache.iceberg.Table icebergApiTable) throws TableLoadingException {
+ LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo,
+ TableParams tableParams, org.apache.iceberg.Table icebergApiTable)
+ throws TableLoadingException {
super(db, msTable, ref, cmap);
Preconditions.checkNotNull(tableInfo);
- localFsTable_ = LocalFsTable.load(db, msTable, ref);
+ localFsTable_ = fsTable;
tableParams_ = tableParams;
fileStore_ = IcebergContentFileStore.fromThrift(
tableInfo.getIceberg_table().getContent_files(),
diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py
index 5caf3db7c..fe43a38c9 100644
--- a/tests/stress/test_insert_stress.py
+++ b/tests/stress/test_insert_stress.py
@@ -48,16 +48,9 @@ class TestInsertStress(ImpalaTestSuite):
try:
insert_cnt = 0
while insert_cnt < num_inserts:
- try:
- impalad_client.execute("insert into table %s values (%i, %i)" % (
- tbl_name, wid, insert_cnt))
- insert_cnt += 1
- except Exception as e:
- # It's possible that the Iceberg table is concurrently updated in CatalogD
- # during data load in local catalog.
- if "InconsistentMetadataFetchException" in str(e):
- continue
- raise e
+ impalad_client.execute("insert into table %s values (%i, %i)" % (
+ tbl_name, wid, insert_cnt))
+ insert_cnt += 1
finally:
with counter.get_lock():
counter.value += 1
@@ -79,16 +72,9 @@ class TestInsertStress(ImpalaTestSuite):
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
try:
while counter.value != writers:
- try:
- result = impalad_client.execute("select * from %s" % tbl_name)
- verify_result_set(result)
- time.sleep(random.random())
- except Exception as e:
- # It's possible that the Iceberg table is concurrently updated in CatalogD
- # during data load in local catalog.
- if "InconsistentMetadataFetchException" in str(e):
- continue
- raise e
+ result = impalad_client.execute("select * from %s" % tbl_name)
+ verify_result_set(result)
+ time.sleep(random.random())
finally:
impalad_client.close()