You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2023/08/01 13:47:45 UTC
[impala] branch master updated: IMPALA-12298: Improve incremental load of Iceberg tables
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new c84221369 IMPALA-12298: Improve incremental load of Iceberg tables
c84221369 is described below
commit c8422136962b8d08e5f44d8351fb4fe7cdb675b8
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jul 26 19:28:45 2023 +0200
IMPALA-12298: Improve incremental load of Iceberg tables
Currently Impala reloads the whole table with all its metadata
when a table is updated. Even if there are no files modififed, or
only a few file added. This hurts performance for large tables,
especially when Hadoop RPC encryption is enabled. See HADOOP-14558 and
HADOOP-10768 for details.
This patch adds an optimization to only load the newly added files
if their number are under a threshold. The threshold can be set by
the backend flag 'iceberg_reload_new_files_threshold' (100 by default).
If there are more files than the threshold, we fallback to the old
behavior.
Testing:
* added Unit test
* manually checked the TRACE logs of IcebergFileMetadataLoader
Change-Id: Icf643798a93e74ae7b0f37ceeab0a8052fb2699d
Reviewed-on: http://gerrit.cloudera.org:8080/20271
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/global-flags.cc | 5 +
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/catalog/FeIcebergTable.java | 8 +-
.../apache/impala/catalog/FileMetadataLoader.java | 2 +-
.../impala/catalog/IcebergFileMetadataLoader.java | 147 ++++++++++++-
.../org/apache/impala/catalog/IcebergTable.java | 4 +-
.../catalog/iceberg/GroupedContentFiles.java | 5 +
.../org/apache/impala/common/FileSystemUtil.java | 17 +-
.../org/apache/impala/service/BackendConfig.java | 4 +
.../impala/catalog/FileMetadataLoaderTest.java | 227 +++++++++++++++++++--
11 files changed, 393 insertions(+), 30 deletions(-)
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 09e981c96..d58b56c42 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -408,6 +408,11 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, "(Advanced) Interval (
"with which the statestore resends the update catalogd RPC to a subscriber if the "
"statestore has failed to send the RPC to the subscriber.");
+DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a table "
+ "refresh the number of new files are greater than this, catalogd will completely "
+ "reload all file metadata. If number of new files are less or equal to this, "
+ "catalogd will only load the metadata of the newly added files.");
+
// TGeospatialLibrary's values are mapped here as constants
static const string geo_lib_none = "NONE";
static const string geo_lib_hive_esri = "HIVE_ESRI";
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 965547515..c10f472e4 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -107,6 +107,7 @@ DECLARE_string(geospatial_library);
DECLARE_int32(thrift_rpc_max_message_size);
DECLARE_string(file_metadata_reload_properties);
DECLARE_string(java_weigher);
+DECLARE_int32(iceberg_reload_new_files_threshold);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -421,6 +422,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size);
cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
+ cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 5e4868082..6cee3ab7a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -258,4 +258,6 @@ struct TBackendGflags {
113: required double scan_range_cost_factor
114: required bool use_jamm_weigher
+
+ 115: required i32 iceberg_reload_new_files_threshold
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 1d579cac5..c55c9f7fa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -698,18 +698,16 @@ public interface FeIcebergTable extends FeFsTable {
FileStatus fileStatus, FeIcebergTable table) throws IOException {
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
- String relPath = null;
String absPath = null;
Path tableLoc = new Path(table.getIcebergTableLocation());
- URI relUri = tableLoc.toUri().relativize(fileStatus.getPath().toUri());
- if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+ String relPath = FileSystemUtil.relativizePathNoThrow(
+ fileStatus.getPath(), tableLoc);
+ if (relPath == null) {
if (Utils.requiresDataFilesInTableLocation(table)) {
throw new RuntimeException(fileStatus.getPath()
+ " is outside of the Iceberg table location " + tableLoc);
}
absPath = fileStatus.getPath().toString();
- } else {
- relPath = relUri.getPath();
}
if (!FileSystemUtil.supportsStorageIds(fs)) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 9a514b05d..16720e01e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -67,7 +67,7 @@ public class FileMetadataLoader {
protected boolean forceRefreshLocations = false;
- private List<FileDescriptor> loadedFds_;
+ protected List<FileDescriptor> loadedFds_;
private List<FileDescriptor> loadedInsertDeltaFds_;
private List<FileDescriptor> loadedDeleteDeltaFds_;
protected LoadStats loadStats_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index c365e9cac..e0540baa7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -17,13 +17,17 @@
package org.apache.impala.catalog;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
-import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,13 +40,32 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
+import org.apache.impala.util.ThreadNameAnnotator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Utility for loading the content files metadata of the Iceberg tables.
*/
public class IcebergFileMetadataLoader extends FileMetadataLoader {
+ private final static Logger LOG = LoggerFactory.getLogger(
+ IcebergFileMetadataLoader.class);
+
+ private static final Configuration CONF = new Configuration();
+
+ // Default value of 'newFilesThreshold_' if the given parameter or startup flag have
+ // invalid value.
+ private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
+
+ // If there are more new files than 'newFilesThreshold_', we should fall back
+ // to regular file metadata loading.
+ private final int newFilesThreshold_;
+
private final GroupedContentFiles icebergFiles_;
private final boolean canDataBeOutsideOfTableLocation_;
@@ -50,10 +73,82 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
ValidTxnList validTxnList, ValidWriteIdList writeIds,
GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) {
+ this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
+ icebergFiles, canDataBeOutsideOfTableLocation,
+ BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
+ }
+
+ public IcebergFileMetadataLoader(Path partDir, boolean recursive,
+ List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+ ValidTxnList validTxnList, ValidWriteIdList writeIds,
+ GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation,
+ int newFilesThresholdParam) {
super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
HdfsFileFormat.ICEBERG);
icebergFiles_ = icebergFiles;
canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+ if (newFilesThresholdParam >= 0) {
+ newFilesThreshold_ = newFilesThresholdParam;
+ } else {
+ newFilesThreshold_ = NEW_FILES_THRESHOLD_DEFAULT;
+ LOG.warn("Ignoring invalid new files threshold: {} " +
+ "using value: {}", newFilesThresholdParam, newFilesThreshold_);
+ }
+ }
+
+ @Override
+ public void load() throws CatalogException, IOException {
+ if (!shouldReuseOldFds()) {
+ super.load();
+ } else {
+ reloadWithOldFds();
+ }
+ }
+
+ /**
+ * Iceberg tables are a collection of immutable, uniquely identifiable data files,
+ * which means we can safely reuse the old FDs.
+ */
+ private void reloadWithOldFds() throws IOException {
+ loadStats_ = new LoadStats(partDir_);
+ FileSystem fs = partDir_.getFileSystem(CONF);
+
+ String msg = String.format("Refreshing Iceberg file metadata from path %s " +
+ "while reusing old file descriptors", partDir_);
+ LOG.trace(msg);
+ try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
+ loadedFds_ = new ArrayList<>();
+ Reference<Long> numUnknownDiskIds = new Reference<>(0L);
+ for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+ FileDescriptor fd = getOldFd(contentFile);
+ if (fd == null) {
+ fd = getFileDescriptor(fs, contentFile, numUnknownDiskIds);
+ } else {
+ ++loadStats_.skippedFiles;
+ }
+ loadedFds_.add(Preconditions.checkNotNull(fd));
+ }
+ Preconditions.checkState(loadStats_.loadedFiles <= newFilesThreshold_);
+ loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(loadStats_.debugString());
+ }
+ }
+ }
+
+ private FileDescriptor getFileDescriptor(FileSystem fs, ContentFile<?> contentFile,
+ Reference<Long> numUnknownDiskIds) throws IOException {
+ Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFile.path().toString()));
+ FileStatus stat;
+ if (FileSystemUtil.supportsStorageIds(fs)) {
+ stat = Utils.createLocatedFileStatus(fileLoc, fs);
+ } else {
+ // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves.
+ stat = Utils.createFileStatus(contentFile, fileLoc);
+ }
+ return getFileDescriptor(fs, FileSystemUtil.supportsStorageIds(fs),
+ numUnknownDiskIds, stat);
}
/**
@@ -63,18 +158,15 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
@Override
protected FileDescriptor getFileDescriptor(FileSystem fs, boolean listWithLocations,
Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws IOException {
- String relPath = null;
String absPath = null;
- URI relUri = partDir_.toUri().relativize(fileStatus.getPath().toUri());
- if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+ String relPath = FileSystemUtil.relativizePathNoThrow(fileStatus.getPath(), partDir_);
+ if (relPath == null) {
if (canDataBeOutsideOfTableLocation_) {
absPath = fileStatus.getPath().toString();
} else {
throw new IOException(String.format("Failed to load Iceberg datafile %s, because "
+ "it's outside of the table location", fileStatus.getPath().toUri()));
}
- } else {
- relPath = relUri.getPath();
}
String path = Strings.isNullOrEmpty(relPath) ? absPath : relPath;
@@ -136,4 +228,47 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
}
return stats;
}
+
+ @VisibleForTesting
+ boolean shouldReuseOldFds() throws IOException {
+ if (oldFdsByPath_ == null || oldFdsByPath_.isEmpty()) return false;
+ if (forceRefreshLocations) return false;
+
+ int oldFdsSize = oldFdsByPath_.size();
+ int iceContentFilesSize = icebergFiles_.size();
+
+ if (iceContentFilesSize - oldFdsSize > newFilesThreshold_) {
+ LOG.trace("There are at least {} new files under path {}.",
+ iceContentFilesSize - oldFdsSize, partDir_);
+ return false;
+ }
+
+ int newFiles = 0;
+ for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+ if (getOldFd(contentFile) == null) {
+ ++newFiles;
+ if (newFiles > newFilesThreshold_) {
+ LOG.trace("There are at least {} new files under path {}.", newFiles, partDir_);
+ return false;
+ }
+ }
+ }
+ LOG.trace("There are only {} new files under path {}.", newFiles, partDir_);
+ return true;
+ }
+
+ FileDescriptor getOldFd(ContentFile<?> contentFile) throws IOException {
+ Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFile.path().toString()));
+ String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, partDir_);
+ if (lookupPath == null) {
+ if (canDataBeOutsideOfTableLocation_) {
+ lookupPath = contentFilePath.toString();
+ } else {
+ throw new IOException(String.format("Failed to load Iceberg datafile %s, because "
+ + "it's outside of the table location", contentFilePath));
+ }
+ }
+ return oldFdsByPath_.get(lookupPath);
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 91eefeee9..4c83acf02 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -334,7 +334,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
* propagate alterations made to the Iceberg table to HMS.
*/
@Override
- public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
+ public void load(boolean reuseMetadata, IMetaStoreClient msClient,
org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
throws TableLoadingException {
final Timer.Context context =
@@ -362,7 +362,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
hdfsTable_.setIcebergFiles(icebergFiles);
hdfsTable_.setCanDataBeOutsideOfTableLocation(
!Utils.requiresDataFilesInTableLocation(this));
- hdfsTable_.load(msClient, msTable_, reason);
+ hdfsTable_.load(reuseMetadata, msClient, msTable_, reason);
fileStore_ = Utils.loadAllPartition(this, icebergFiles);
partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
setIcebergTableStats();
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
index da3aac4da..a01f43c10 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
@@ -57,6 +57,11 @@ public class GroupedContentFiles {
return Iterables.concat(dataFilesWithoutDeletes, dataFilesWithDeletes, deleteFiles);
}
+ public int size() {
+ return dataFilesWithDeletes.size() + dataFilesWithoutDeletes.size() +
+ deleteFiles.size();
+ }
+
public boolean isEmpty() {
return Iterables.isEmpty(getAllContentFiles());
}
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index bd88f4a98..519449237 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -965,13 +965,28 @@ public class FileSystemUtil {
*/
public static String relativizePath(Path path, Path startPath) {
URI relUri = startPath.toUri().relativize(path.toUri());
- if (relUri.isAbsolute() || relUri.getPath().startsWith("/")) {
+ if (!isRelative(relUri)) {
throw new RuntimeException("FileSystem returned an unexpected path " +
path + " for a file within " + startPath);
}
return relUri.getPath();
}
+ /**
+ * Return the path of 'path' relative to the startPath. This may
+ * differ from simply the file name in the case of recursive listings.
+ * Instead of throwing an exception, it returns null when cannot relativize 'path'.
+ */
+ public static String relativizePathNoThrow(Path path, Path startPath) {
+ URI relUri = startPath.toUri().relativize(path.toUri());
+ if (isRelative(relUri)) return relUri.getPath();
+ return null;
+ }
+
+ private static boolean isRelative(URI uri) {
+ return !(uri.isAbsolute() || uri.getPath().startsWith(Path.SEPARATOR));
+ }
+
/**
* Util method to check if the given file status relative to its parent is contained
* in a ignored directory. This is useful to ignore the files which seemingly are valid
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index ac7f62715..29352d908 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -415,4 +415,8 @@ public class BackendConfig {
public boolean useJammWeigher() {
return backendCfg_.use_jamm_weigher;
}
+
+ public int icebergReloadNewFilesThreshold() {
+ return backendCfg_.iceberg_reload_new_files_threshold;
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 4f278850c..d84e04ff2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -31,8 +32,11 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.ListMap;
import org.junit.Test;
@@ -113,15 +117,15 @@ public class FileMetadataLoaderTest {
@Test
public void testIcebergLoading() throws IOException, CatalogException {
- ListMap<TNetworkAddress> hostIndex = new ListMap<>();
- Path table1Path = new Path(
- "hdfs://localhost:20500/test-warehouse/iceberg_test/iceberg_partitioned");
- FileMetadataLoader fml1 = new FileMetadataLoader(table1Path, /* recursive=*/true,
- /* oldFds = */ Collections.emptyList(), hostIndex, null, null,
- HdfsFileFormat.ICEBERG);
+ CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+ IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
fml1.load();
- assertEquals(25, fml1.getStats().loadedFiles);
- assertEquals(25, fml1.getLoadedFds().size());
+ assertEquals(20, fml1.getStats().loadedFiles);
+ assertEquals(0, fml1.getStats().skippedFiles);
+ assertEquals(20, fml1.getLoadedFds().size());
ArrayList<String> relPaths = new ArrayList<>(
Collections2.transform(fml1.getLoadedFds(), FileDescriptor::getRelativePath));
@@ -129,14 +133,14 @@ public class FileMetadataLoaderTest {
assertEquals("data/event_time_hour=2020-01-01-08/action=view/" +
"00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet", relPaths.get(0));
- Path table2Path = new Path(
- "hdfs://localhost:20500/test-warehouse/iceberg_test/iceberg_non_partitioned");
- FileMetadataLoader fml2 = new FileMetadataLoader(table2Path, /* recursive=*/true,
- /* oldFds = */ Collections.emptyList(), hostIndex, null, null,
- HdfsFileFormat.ICEBERG);
+ IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
fml2.load();
- assertEquals(25, fml2.getStats().loadedFiles);
- assertEquals(25, fml2.getLoadedFds().size());
+ assertEquals(20, fml2.getStats().loadedFiles);
+ assertEquals(0, fml2.getStats().skippedFiles);
+ assertEquals(20, fml2.getLoadedFds().size());
relPaths = new ArrayList<>(
Collections2.transform(fml2.getLoadedFds(), FileDescriptor::getRelativePath));
@@ -145,6 +149,199 @@ public class FileMetadataLoaderTest {
relPaths.get(0));
}
+ @Test
+ public void testIcebergRefresh() throws IOException, CatalogException {
+ CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+ IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml1.load();
+
+ IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ assertTrue(fml1Refresh.shouldReuseOldFds());
+ fml1Refresh.load();
+ assertEquals(0, fml1Refresh.getStats().loadedFiles);
+ assertEquals(20, fml1Refresh.getStats().skippedFiles);
+ assertEquals(20, fml1Refresh.getLoadedFds().size());
+
+ List<String> relPaths = new ArrayList<>(
+ Collections2.transform(fml1Refresh.getLoadedFds(),
+ FileDescriptor::getRelativePath));
+ Collections.sort(relPaths);
+ assertEquals("data/event_time_hour=2020-01-01-08/action=view/" +
+ "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet", relPaths.get(0));
+
+ IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml2.load();
+
+ IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ fml2.getLoadedFds(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ assertTrue(fml2Refresh.shouldReuseOldFds());
+ fml2Refresh.load();
+ assertEquals(0, fml2Refresh.getStats().loadedFiles);
+ assertEquals(20, fml2Refresh.getStats().skippedFiles);
+ assertEquals(20, fml2Refresh.getLoadedFds().size());
+
+ relPaths = new ArrayList<>(
+ Collections2.transform(fml2Refresh.getLoadedFds(),
+ FileDescriptor::getRelativePath));
+ Collections.sort(relPaths);
+ assertEquals("data/00001-1-5dbd44ad-18bc-40f2-9dd6-aeb2cc23457c-00000.parquet",
+ relPaths.get(0));
+ }
+
+ @Test
+ public void testIcebergPartialRefresh() throws IOException, CatalogException {
+ CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+ IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml1.load();
+
+ IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ assertTrue(fml1Refresh.shouldReuseOldFds());
+ fml1Refresh.load();
+ assertEquals(10, fml1Refresh.getStats().loadedFiles);
+ assertEquals(10, fml1Refresh.getStats().skippedFiles);
+ assertEquals(20, fml1Refresh.getLoadedFds().size());
+
+ IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml2.load();
+
+ IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ assertTrue(fml2Refresh.shouldReuseOldFds());
+ fml2Refresh.load();
+ assertEquals(10, fml2Refresh.getStats().loadedFiles);
+ assertEquals(10, fml2Refresh.getStats().skippedFiles);
+ assertEquals(20, fml2Refresh.getLoadedFds().size());
+ }
+
+ @Test
+ public void testIcebergNewFilesThreshold() throws IOException, CatalogException {
+ CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+ IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml1.load();
+
+ IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ assertTrue(fml1Refresh.shouldReuseOldFds());
+ fml1Refresh.setForceRefreshBlockLocations(true);
+ assertFalse(fml1Refresh.shouldReuseOldFds());
+ fml1Refresh.setForceRefreshBlockLocations(false);
+ assertTrue(fml1Refresh.shouldReuseOldFds());
+
+ IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false, 10);
+ assertTrue(fml1Refresh10.shouldReuseOldFds());
+ IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false, 9);
+ assertFalse(fml1Refresh9.shouldReuseOldFds());
+
+
+ IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ fml2.load();
+
+ IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false);
+ IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false, 10);
+ assertTrue(fml2Refresh10.shouldReuseOldFds());
+ IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_non_partitioned",
+ /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false, 9);
+ assertFalse(fml2Refresh9.shouldReuseOldFds());
+ }
+
+ @Test
+ public void testIcebergMultipleStorageLocations() throws IOException, CatalogException {
+ CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
+ IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_multiple_storage_locations",
+ /* oldFds = */ Collections.emptyList(),
+ /* canDataBeOutsideOfTableLocation = */ true);
+ fml1.load();
+
+ IcebergFileMetadataLoader fml1Refresh1 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_multiple_storage_locations",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 1),
+ /* canDataBeOutsideOfTableLocation = */ true);
+ assertTrue(fml1Refresh1.shouldReuseOldFds());
+ fml1Refresh1.load();
+ assertEquals(5, fml1Refresh1.getStats().loadedFiles);
+ assertEquals(1, fml1Refresh1.getStats().skippedFiles);
+ assertEquals(6, fml1Refresh1.getLoadedFds().size());
+
+ IcebergFileMetadataLoader fml1Refresh5 = getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_multiple_storage_locations",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 5),
+ /* canDataBeOutsideOfTableLocation = */ true);
+ assertTrue(fml1Refresh5.shouldReuseOldFds());
+ fml1Refresh5.load();
+ assertEquals(1, fml1Refresh5.getStats().loadedFiles);
+ assertEquals(5, fml1Refresh5.getStats().skippedFiles);
+ assertEquals(6, fml1Refresh5.getLoadedFds().size());
+ }
+
+ private IcebergFileMetadataLoader getLoaderForIcebergTable(
+ CatalogServiceCatalog catalog, String dbName, String tblName,
+ List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation)
+ throws CatalogException {
+ return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds,
+ canDataBeOutsideOfTableLocation, -1);
+ }
+
+ private IcebergFileMetadataLoader getLoaderForIcebergTable(
+ CatalogServiceCatalog catalog, String dbName, String tblName,
+ List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation,
+ int newFilesThreshold)
+ throws CatalogException {
+ ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+ FeIcebergTable iceT = (FeIcebergTable)catalog.getOrLoadTable(
+ dbName, tblName, "test", null);
+ Path location = new Path(iceT.getLocation());
+ GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT,
+ /*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null);
+ return new IcebergFileMetadataLoader(location, /* recursive=*/true,
+ oldFds, hostIndex, null, null, iceFiles, canDataBeOutsideOfTableLocation,
+ newFilesThreshold);
+ }
+
private FileMetadataLoader getLoaderForAcidTable(
String validWriteIdString, String path, HdfsFileFormat format)
throws IOException, CatalogException {