You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/27 01:40:02 UTC

[gobblin] branch master updated: [GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e385ea46b [GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)
e385ea46b is described below

commit e385ea46b078175c443d10e8c97dd1840d5f46d8
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Mon Sep 26 18:39:56 2022 -0700

    [GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)
    
    * Add `IcebergTableTest` unit test
    
    * Fixup comment and indentation
    
    * Minor correction of `Long` => `Integer`
    
    * Correct comment
    
    * [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
    
    * Minor rename of local var
    
    * Extend `IcebergTable` to collect Iceberg metadata across all snapshots
    
    * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (#3549)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding
    
    * fix compilation error
    
    * address comments
    
    * address comments
    
    * address comments
    
    * update outdated javadoc
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    
    * [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
    
    * initial commit for iceberg distcp.
    
    * adding copy entity helper and icerbeg distcp template and test case.
    
    * Adding unit tests and refactoring method definitions for an Iceberg dataset.
    
    * resolve conflicts after cleaning history
    
    * update iceberg dataset and finder to include javadoc
    
    * addressed comments on PR and aligned code check style
    
    * renamed vars, added logging and updated javadoc
    
    * update dataset descriptor with ternary operation and rename fs to sourceFs
    
    * added source and target fs and update iceberg dataset finder constructor
    
    * Update source and dest dataset methods as protected and add req args constructor
    
    * change the order of attributes for iceberg dataset finder ctor
    
    * update iceberg dataset methods with correct source and target fs
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
    
    * Update `IcebergDataset` to use `IcebergTable.getIncrementalSnapshotInfosIterator` rather than `.getCurrentSnapshotInfo`
    
    * Augment `IcebergDatasetTest` unit test to exercise mult-snapshot icebergs
    
    * Minor javadoc Update
    
    * Throw `IcebergTable.TableNotFoundException` when no such table found
    
    Co-authored-by: Matthew Ho <ho...@gmail.com>
    Co-authored-by: Zihan Li <zi...@linkedin.com>
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
    Co-authored-by: meethngala <me...@gmail.com>
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
 .../copy/iceberg/IcebergCatalogFactory.java        |   9 +-
 .../management/copy/iceberg/IcebergDataset.java    |  86 ++++++-----
 .../copy/iceberg/IcebergHiveCatalog.java           |   3 +-
 .../copy/iceberg/IcebergSnapshotInfo.java          |   9 +-
 .../data/management/copy/iceberg/IcebergTable.java | 101 +++++++++++--
 .../copy/iceberg/IcebergDatasetTest.java           | 159 ++++++++++++++-------
 .../management/copy/iceberg/IcebergTableTest.java  |  95 +++++++++++-
 7 files changed, 352 insertions(+), 110 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
index 43dff9fc6..a3e8464c3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import com.google.common.collect.Maps;
 
 
 /**
@@ -26,6 +28,9 @@ import org.apache.iceberg.hive.HiveCatalogs;
  */
 public class IcebergCatalogFactory {
   public static IcebergCatalog create(Configuration configuration) {
-    return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
+    HiveCatalog hcat = new HiveCatalog();
+    hcat.setConf(configuration);
+    hcat.initialize("hive", Maps.newHashMap());
+    return new IcebergHiveCatalog(hcat);
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 8d905c8e6..ae6e1aaea 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,7 +34,6 @@ import org.apache.hadoop.fs.Path;
 import org.jetbrains.annotations.NotNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -64,8 +64,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
   protected final Properties properties;
   protected final FileSystem sourceFs;
 
-  private final Optional<String> sourceMetastoreURI;
-  private final Optional<String> targetMetastoreURI;
+  private final Optional<URI> sourceCatalogMetastoreURI;
+  private final Optional<URI> targetCatalogMetastoreURI;
 
   /** Target metastore URI */
   public static final String TARGET_METASTORE_URI_KEY =
@@ -79,10 +79,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
     this.icebergTable = icebergTbl;
     this.properties = properties;
     this.sourceFs = sourceFs;
-    this.sourceMetastoreURI =
-        Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
-    this.targetMetastoreURI =
-        Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+    this.sourceCatalogMetastoreURI = getAsOptionalURI(this.properties, IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY);
+    this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties, TARGET_METASTORE_URI_KEY);
   }
 
   /**
@@ -96,42 +94,49 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
 
   @Override
   public String datasetURN() {
-    // TODO: verify!
-    return this.dbName + "." + this.inputTableName;
+    return this.getFileSetId();
   }
 
   /**
    * Finds all files read by the table and generates CopyableFiles.
-   * For the specific semantics see {@link #getCopyEntities}.
+   * For the specific semantics see {@link #createFileSets}.
    */
   @Override
   public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration) {
-    return getCopyEntities(targetFs, configuration);
+    return createFileSets(targetFs, configuration);
   }
   /**
    * Finds all files read by the table and generates CopyableFiles.
-   * For the specific semantics see {@link #getCopyEntities}.
+   * For the specific semantics see {@link #createFileSets}.
    */
   @Override
   public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration,
       Comparator<FileSet<CopyEntity>> prioritizer, PushDownRequestor<FileSet<CopyEntity>> requestor) {
     // TODO: Implement PushDownRequestor and priority based copy entity iteration
-    return getCopyEntities(targetFs, configuration);
+    return createFileSets(targetFs, configuration);
+  }
+
+  /** @return unique ID for this dataset, usable as a {@link CopyEntity}.fileset, for atomic publication grouping */
+  protected String getFileSetId() {
+    return this.dbName + "." + this.inputTableName;
   }
 
   /**
-   * Finds all files read by the table and generates {@link CopyEntity}s for duplicating the table.
+   * Generates {@link FileSet}s, being themselves able to generate {@link CopyEntity}s for all files, data and metadata,
+   * comprising the iceberg/table, so as to fully specify remaining table replication.
    */
-  Iterator<FileSet<CopyEntity>> getCopyEntities(FileSystem targetFs, CopyConfiguration configuration) {
+  protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs, CopyConfiguration configuration) {
     FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
-    return Iterators.singletonIterator(fileSet);  }
+    return Iterators.singletonIterator(fileSet);
+  }
 
   /**
-   * Finds all files read by the table file set and generates {@link CopyEntity}s for duplicating the table.
+   * Finds all files, data and metadata, as {@link CopyEntity}s that comprise the table and fully specify remaining
+   * table replication.
    */
   @VisibleForTesting
   Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration configuration) throws IOException {
-    String fileSet = this.getInputTableName();
+    String fileSet = this.getFileSetId();
     List<CopyEntity> copyEntities = Lists.newArrayList();
     Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
     log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
@@ -177,36 +182,45 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
   protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
     Map<Path, FileStatus> result = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
-    IcebergSnapshotInfo icebergSnapshotInfo = icebergTable.getCurrentSnapshotInfo();
-
-    log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-        icebergSnapshotInfo.getSnapshotId(), icebergSnapshotInfo.getMetadataPath());
-    List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
-
-    for (String pathString : pathsToCopy) {
+    Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
+    Iterator<String> filePathsIterator = Iterators.concat(
+        Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
+          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
+          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          return snapshotInfo.getAllPaths().iterator();
+        })
+    );
+    Iterable<String> filePathsIterable = () -> filePathsIterator;
+    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
+    // likely benefit from parallelism
+    for (String pathString : filePathsIterable) {
       Path path = new Path(pathString);
       result.put(path, this.sourceFs.getFileStatus(path));
     }
     return result;
   }
 
+  protected static Optional<URI> getAsOptionalURI(Properties props, String key) {
+    return Optional.ofNullable(props.getProperty(key)).map(URI::create);
+  }
+
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return getDatasetDescriptor(sourceMetastoreURI, sourceFs);
+    return getDatasetDescriptor(sourceCatalogMetastoreURI, sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return getDatasetDescriptor(targetMetastoreURI, targetFs);
+    return getDatasetDescriptor(targetCatalogMetastoreURI, targetFs);
   }
 
   @NotNull
-  private DatasetDescriptor getDatasetDescriptor(Optional<String> stringMetastoreURI, FileSystem fs) {
-    String currentTable = this.getDbName() + "." + this.getInputTableName();
-
-    URI hiveMetastoreURI = stringMetastoreURI.isPresent() ? URI.create(stringMetastoreURI.get()) : null;
-
-    DatasetDescriptor currentDataset =
-        new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, hiveMetastoreURI, currentTable);
-    currentDataset.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
-    return currentDataset;
+  private DatasetDescriptor getDatasetDescriptor(Optional<URI> catalogMetastoreURI, FileSystem fs) {
+    DatasetDescriptor descriptor = new DatasetDescriptor(
+        DatasetConstants.PLATFORM_ICEBERG,
+        catalogMetastoreURI.orElse(null),
+        this.getFileSetId()
+    );
+    descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+    return descriptor;
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index d8ffdb799..0af012ec5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -35,6 +35,7 @@ public class IcebergHiveCatalog implements IcebergCatalog {
 
   @Override
   public IcebergTable openTable(String dbName, String tableName) {
-    return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, tableName)));
+    TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
+    return new IcebergTable(tableId, hc.newTableOps(tableId));
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index c51c1a27d..b4214017f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.time.Instant;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import lombok.Builder;
 import lombok.Data;
 
 import com.google.common.collect.Lists;
@@ -29,6 +31,7 @@ import com.google.common.collect.Lists;
 /**
  * Information about the metadata file and data file paths of a single Iceberg Snapshot.
  */
+@Builder(toBuilder = true)
 @Data
 public class IcebergSnapshotInfo {
 
@@ -40,7 +43,8 @@ public class IcebergSnapshotInfo {
 
   private final Long snapshotId;
   private final Instant timestamp;
-  private final String metadataPath;
+  /** only for the current snapshot, being whom the metadata file 'belongs to'; `isEmpty()` for all other snapshots */
+  private final Optional<String> metadataPath;
   private final String manifestListPath;
   private final List<ManifestFileInfo> manifestFiles;
 
@@ -53,7 +57,8 @@ public class IcebergSnapshotInfo {
   }
 
   public List<String> getAllPaths() {
-    List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
+    List<String> result = metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
+    result.add(manifestListPath);
     result.addAll(getManifestFilePaths());
     result.addAll(getAllDataFilePaths());
     return result;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index f6ff42698..a9fbe05cc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -19,9 +19,14 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.iceberg.ManifestFile;
@@ -29,11 +34,13 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
 
@@ -44,24 +51,98 @@ import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInf
 @Slf4j
 @AllArgsConstructor
 public class IcebergTable {
+
+  /** Indicate the table identified by `tableId` does not (or does no longer) exist in the catalog */
+  public static class TableNotFoundException extends IOException {
+    @Getter
+    private final TableIdentifier tableId; // stored purely for logging / diagnostics
+
+    public TableNotFoundException(TableIdentifier tableId) {
+      super("Not found: '" + tableId + "'");
+      this.tableId = tableId;
+    }
+  }
+
+  private final TableIdentifier tableId;
   private final TableOperations tableOps;
 
+  /** @return metadata info limited to the most recent (current) snapshot */
   public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
-    TableMetadata current = tableOps.current();
-    Snapshot snapshot = current.currentSnapshot();
+    TableMetadata current = accessTableMetadata();
+    return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()));
+  }
+
+  /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
+  public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
+    TableMetadata current = accessTableMetadata();
+    long currentSnapshotId = current.currentSnapshot().snapshotId();
+    List<Snapshot> snapshots = current.snapshots();
+    return Iterators.transform(snapshots.iterator(), snapshot -> {
+      try {
+        return IcebergTable.this.createSnapshotInfo(
+            snapshot,
+            currentSnapshotId == snapshot.snapshotId() ? Optional.of(current.metadataFileLocation()) : Optional.empty()
+        );
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /**
+   * @return metadata info for all known snapshots, but incrementally, so content overlap between snapshots appears
+   * only within the first as they're ordered historically, with *most recent last*
+   */
+  public Iterator<IcebergSnapshotInfo> getIncrementalSnapshotInfosIterator() throws IOException {
+    // TODO: investigate using `.addedFiles()`, `.deletedFiles()` to calc this
+    Set<String> knownManifestListFilePaths = Sets.newHashSet();
+    Set<String> knownManifestFilePaths = Sets.newHashSet();
+    Set<String> knownListedFilePaths = Sets.newHashSet();
+    return Iterators.filter(Iterators.transform(getAllSnapshotInfosIterator(), snapshotInfo -> {
+      if (false == knownManifestListFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
+        return snapshotInfo.toBuilder().manifestListPath(null).build(); // use `null` as marker to surrounding `filter`
+      }
+      List<IcebergSnapshotInfo.ManifestFileInfo> novelManifestInfos = Lists.newArrayList();
+      for (ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+        if (true == knownManifestFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
+          List<String> novelListedPaths = mfi.getListedFilePaths().stream()
+              .filter(fpath -> true == knownListedFilePaths.add(fpath)) // heretofore unknown
+              .collect(Collectors.toList());
+          if (novelListedPaths.size() == mfi.getListedFilePaths().size()) { // nothing filtered
+            novelManifestInfos.add(mfi); // reuse orig
+          } else {
+            novelManifestInfos.add(new ManifestFileInfo(mfi.getManifestFilePath(), novelListedPaths));
+          }
+        } // else, whenever recognized manifest file, skip w/ all its listed paths--which also all would be recognized
+      }
+      if (novelManifestInfos.size() == snapshotInfo.getManifestFiles().size()) { // nothing filtered
+        return snapshotInfo; // reuse orig
+      } else {
+        return snapshotInfo.toBuilder().manifestFiles(novelManifestInfos).build(); // replace manifestFiles
+      }
+    }), snapshotInfo -> snapshotInfo.getManifestListPath() != null); // remove marked-as-repeat-manifest-list snapshots
+  }
+
+  /** @throws {@link IcebergTable.TableNotFoundException} when table does not exist */
+  protected TableMetadata accessTableMetadata() throws TableNotFoundException {
+    TableMetadata current = this.tableOps.current();
+    return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId));
+  }
+
+  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation) throws IOException {
+    // TODO: verify correctness, even when handling 'delete manifests'!
     List<ManifestFile> manifests = snapshot.allManifests();
     return new IcebergSnapshotInfo(
         snapshot.snapshotId(),
         Instant.ofEpochMilli(snapshot.timestampMillis()),
-        current.metadataFileLocation(),
+        metadataFileLocation,
         snapshot.manifestListLocation(),
         // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
-        calcAllManifestFileInfo(manifests, tableOps.io())
+        calcAllManifestFileInfos(manifests, tableOps.io())
       );
   }
 
-  @VisibleForTesting
-  static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> manifests, FileIO io) throws IOException {
+  protected static List<IcebergSnapshotInfo.ManifestFileInfo> calcAllManifestFileInfos(List<ManifestFile> manifests, FileIO io) throws IOException {
     List<ManifestFileInfo> result = Lists.newArrayList();
     for (ManifestFile manifest : manifests) {
       result.add(calcManifestFileInfo(manifest, io));
@@ -69,13 +150,11 @@ public class IcebergTable {
     return result;
   }
 
-  @VisibleForTesting
-  static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+  protected static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
     return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
   }
 
-  @VisibleForTesting
-  static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
+  protected static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
     CloseableIterable<String> manifestPathsIterable = ManifestFiles.readPaths(manifest, io);
     try {
       return Lists.newArrayList(manifestPathsIterable);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index f409ee67a..0f7fd491d 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -24,19 +24,26 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Collectors;
+
+import lombok.Data;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.api.client.util.Maps;
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 
@@ -46,31 +53,57 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.PreserveAttributes;
 
 
-public class
-IcebergDatasetTest {
-
-  static final String METADATA_PATH = "/root/iceberg/test/metadata";
-  static final String MANIFEST_PATH = "/root/iceberg/test/metadata/test_manifest";
-  static final String MANIFEST_LIST_PATH = "/root/iceberg/test/metadata/test_manifest/data";
-  static final String MANIFEST_FILE_PATH1 = "/root/iceberg/test/metadata/test_manifest/data/a";
-  static final String MANIFEST_FILE_PATH2 = "/root/iceberg/test/metadata/test_manifest/data/b";
+public class IcebergDatasetTest {
+
+  private static final String ROOT_PATH = "/root/iceberg/test/";
+  private static final String METADATA_PATH = ROOT_PATH + "metadata/metadata.json";
+  private static final String MANIFEST_LIST_PATH_0 = ROOT_PATH + "metadata/manifest_list.x";
+  private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a";
+  private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
+  private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+  private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockedIcebergTable.SnapshotPaths(
+      Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
+      Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
+          MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B)))
+  );
+  private static final String MANIFEST_LIST_PATH_1 = MANIFEST_LIST_PATH_0.replaceAll("\\.x$", ".y");
+  private static final String MANIFEST_PATH_1 = MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
+  private static final String MANIFEST_DATA_PATH_1A = MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
+  private static final String MANIFEST_DATA_PATH_1B = MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
+  private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockedIcebergTable.SnapshotPaths(
+      Optional.empty(), MANIFEST_LIST_PATH_1,
+      Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
+          MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))
+  );
+
+  private final String test_db_name = "test_db_name";
+  private final String test_table_name = "test_tbl_name";
+  private final String test_qualified_path = "/root/iceberg/test/destination/sub_path_destination";
+  private final String test_uri_path = "/root/iceberg/test/output";
+  private final Properties properties = new Properties();
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    properties.setProperty("data.publisher.final.dir", "/test");
+  }
 
   @Test
   public void testGetFilePaths() throws IOException {
 
-    List<String> pathsToCopy = new ArrayList<>();
-    pathsToCopy.add(MANIFEST_FILE_PATH1);
-    pathsToCopy.add(MANIFEST_FILE_PATH2);
+    List<String> pathsToCopy = Lists.newArrayList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
     Map<Path, FileStatus> expected = Maps.newHashMap();
-    expected.put(new Path(MANIFEST_FILE_PATH1), null);
-    expected.put(new Path(MANIFEST_FILE_PATH2), null);
+    expected.put(new Path(MANIFEST_DATA_PATH_0A), null);
+    expected.put(new Path(MANIFEST_DATA_PATH_0B), null);
 
     IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
     FileSystem fs = Mockito.mock(FileSystem.class);
     IcebergSnapshotInfo icebergSnapshotInfo = Mockito.mock(IcebergSnapshotInfo.class);
 
-    Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+    Mockito.when(icebergTable.getIncrementalSnapshotInfosIterator()).thenReturn(Arrays.asList(icebergSnapshotInfo).iterator());
     Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+    Mockito.when(icebergSnapshotInfo.getSnapshotId()).thenReturn(98765L);
+    Mockito.when(icebergSnapshotInfo.getMetadataPath()).thenReturn(Optional.of("path for log message"));
+
     IcebergDataset icebergDataset = new IcebergDataset("test_db_name", "test_tbl_name", icebergTable, new Properties(), fs);
 
     Map<Path, FileStatus> actual = icebergDataset.getFilePathsToFileStatus();
@@ -84,52 +117,61 @@ IcebergDatasetTest {
    */
   @Test
   public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException, URISyntaxException {
+    List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0,
+        MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
 
     FileSystem fs = Mockito.mock(FileSystem.class);
-    String test_db_name = "test_db_name";
-    String test_table_name = "test_tbl_name";
-    String test_qualified_path = "/root/iceberg/test/destination/sub_path_destination";
-    String test_uri_path = "/root/iceberg/test/output";
-    Properties properties = new Properties();
-    properties.setProperty("data.publisher.final.dir", "/test");
-    List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH, MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+    IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergDataset icebergDataset = new IcebergDataset(test_db_name, test_table_name, icebergTable, new Properties(), fs);
+    DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
+    destinationFileSystem.addPaths(expectedPaths);
+
+    mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus, test_qualified_path, test_uri_path);
 
     CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, properties)
         .preserve(PreserveAttributes.fromMnemonicString(""))
         .copyContext(new CopyContext())
         .build();
+    Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(fs, copyConfiguration);
+    verifyCopyEntities(copyEntities, expectedPaths);
+  }
+
+  /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */
+  @Test
+  public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException, URISyntaxException {
+    List<String> expectedPaths = Arrays.asList(METADATA_PATH,
+        MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B,
+        MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B);
 
-    List<String> listedManifestFilePaths = Arrays.asList(MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2);
-    IcebergSnapshotInfo.ManifestFileInfo manifestFileInfo = new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_LIST_PATH, listedManifestFilePaths);
-    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles = Arrays.asList(manifestFileInfo);
-    IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH, MANIFEST_PATH, manifestFiles);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
     IcebergDataset icebergDataset = new IcebergDataset(test_db_name, test_table_name, icebergTable, new Properties(), fs);
     DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
-    destinationFileSystem.addPath(METADATA_PATH);
-    destinationFileSystem.addPath(MANIFEST_PATH);
-    destinationFileSystem.addPath(MANIFEST_LIST_PATH);
-    destinationFileSystem.addPath(MANIFEST_FILE_PATH1);
-    destinationFileSystem.addPath(MANIFEST_FILE_PATH2);
+    destinationFileSystem.addPaths(expectedPaths);
 
     mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus, test_qualified_path, test_uri_path);
 
+    CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, properties)
+        .preserve(PreserveAttributes.fromMnemonicString(""))
+        .copyContext(new CopyContext())
+        .build();
     Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(fs, copyConfiguration);
-    verifyCopyEntities(copyEntities, expected);
-
+    verifyCopyEntities(copyEntities, expectedPaths);
   }
 
   private void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
     List<String> actual = new ArrayList<>();
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
-      JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
-      JsonObject objectData =
-          jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
-      JsonObject pathObject = objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
-      String filepath = pathObject.getAsJsonPrimitive("object-data").getAsString();
+      String filepath = new Gson().fromJson(json, JsonObject.class)
+          .getAsJsonObject("object-data").getAsJsonObject("origin")
+          .getAsJsonObject("object-data").getAsJsonObject("path")
+          .getAsJsonObject("object-data").getAsJsonObject("uri")
+          .getAsJsonPrimitive("object-data").getAsString();
       actual.add(filepath);
     }
-    Assert.assertEquals(actual.size(), expected.size());
+    Assert.assertEquals(actual.size(), expected.size(),
+        "Set" + actual.toString() + " vs Set" + expected.toString());
     Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
   }
 
@@ -147,22 +189,32 @@ IcebergDatasetTest {
 
   private static class MockedIcebergTable extends IcebergTable {
 
-    String metadataPath;
-    String manifestListPath;
-    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+    @Data
+    public static class SnapshotPaths {
+      private final Optional<String> metadataPath;
+      private final String manifestListPath;
+      private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+    }
+
+    private final List<SnapshotPaths> snapshotPathsList;
 
-    public MockedIcebergTable(String metadataPath, String manifestListPath, List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles) {
-      super(null);
-      this.metadataPath = metadataPath;
-      this.manifestListPath = manifestListPath;
-      this.manifestFiles = manifestFiles;
+    public MockedIcebergTable(List<SnapshotPaths> snapshotPathsList) {
+      super(null, null);
+      this.snapshotPathsList = Lists.newCopyOnWriteArrayList(snapshotPathsList);
     }
 
     @Override
-    public IcebergSnapshotInfo getCurrentSnapshotInfo() {
+    public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() {
       Long snapshotId = 0L;
       Instant timestamp  = Instant.ofEpochMilli(0L);
-      return new IcebergSnapshotInfo(snapshotId, timestamp, metadataPath, manifestListPath, manifestFiles);
+      List<IcebergSnapshotInfo> snapshotInfos = snapshotPathsList.stream()
+          .map(snapshotPaths -> createSnapshotInfo(snapshotPaths, snapshotId, timestamp))
+          .collect(Collectors.toList());
+      return snapshotInfos.iterator();
+    }
+
+    private IcebergSnapshotInfo createSnapshotInfo(SnapshotPaths snapshotPaths, Long snapshotId, Instant timestamp) {
+      return new IcebergSnapshotInfo(snapshotId, timestamp, snapshotPaths.metadataPath, snapshotPaths.manifestListPath, snapshotPaths.manifestFiles);
     }
   }
 
@@ -173,10 +225,13 @@ IcebergDatasetTest {
       this.pathToFileStatus = Maps.newHashMap();
     }
 
-    public void addPath(String pathString) {
-      if (StringUtils.isBlank(pathString)) {
-        throw new IllegalArgumentException("Missing path value for the file system");
+    public void addPaths(List<String> pathStrings) {
+      for (String pathString : pathStrings) {
+        addPath(pathString);
       }
+    }
+
+    public void addPath(String pathString) {
       Path path  = new Path(pathString);
       FileStatus fileStatus = new FileStatus();
       fileStatus.setPath(path);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 9cf62ca98..fd0b3cf35 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -100,12 +101,93 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    IcebergSnapshotInfo snapshotInfo = new IcebergTable(catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+    verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
+  }
+
+  /** Verify failure when attempting to get current snapshot info for non-existent table */
+  @Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
+  public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
+    TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS");
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId)).getCurrentSnapshotInfo();
+    Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
+  }
+
+  /** Verify info about all (full) snapshots */
+  @Test
+  public void testGetAllSnapshotInfosIterator() throws IOException {
+    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+        Lists.newArrayList("/path/to/data-a0.orc"),
+        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
+        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+        Lists.newArrayList("/path/to/data-d0.orc")
+    );
 
+    initializeSnapshots(table, perSnapshotFilesets);
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getAllSnapshotInfosIterator());
+    Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+    for (int i = 0; i < snapshotInfos.size(); ++i) {
+      System.err.println("verifying snapshotInfo[" + i + "]");
+      verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(0, i + 1), snapshotInfos.size());
+    }
+  }
+
+  /** Verify info about all snapshots (incremental deltas) */
+  @Test
+  public void testGetIncrementalSnapshotInfosIterator() throws IOException {
+    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+        Lists.newArrayList("/path/to/data-a0.orc"),
+        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
+        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+        Lists.newArrayList("/path/to/data-d0.orc")
+    );
+
+    initializeSnapshots(table, perSnapshotFilesets);
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+    Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+    for (int i = 0; i < snapshotInfos.size(); ++i) {
+      System.err.println("verifying snapshotInfo[" + i + "]");
+      verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(i, i + 1), snapshotInfos.size());
+    }
+  }
+
+  /** Verify info about all snapshots (incremental deltas) correctly eliminates repeated data files */
+  @Test
+  public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOException {
+    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+        Lists.newArrayList("/path/to/data-a0.orc"),
+        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc", "/path/to/data-a0.orc"),
+        Lists.newArrayList("/path/to/data-a0.orc","/path/to/data-c0.orc", "/path/to/data-b1.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+        Lists.newArrayList("/path/to/data-d0.orc")
+    );
+
+    initializeSnapshots(table, perSnapshotFilesets);
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+    Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+    for (int i = 0; i < snapshotInfos.size(); ++i) {
+      System.err.println("verifying snapshotInfo[" + i + "] - " + snapshotInfos.get(i));
+      char initialChar = (char) ((int) 'a' + i);
+      // adjust expectations to eliminate duplicate entries (i.e. those bearing letter not aligned with ordinal fileset)
+      List<String> fileset = perSnapshotFilesets.get(i).stream().filter(name -> {
+        String uniquePortion = name.substring("/path/to/data-".length());
+        return uniquePortion.startsWith(Character.toString(initialChar));
+      }).collect(Collectors.toList());
+      verifySnapshotInfo(snapshotInfos.get(i), Arrays.asList(fileset), snapshotInfos.size());
+    }
+  }
+
+  /** full validation for a particular {@link IcebergSnapshotInfo} */
+  protected void verifySnapshotInfo(IcebergSnapshotInfo snapshotInfo, List<List<String>> perSnapshotFilesets, int overallNumSnapshots) {
     // verify metadata file
-    Optional<File> optMetadataFile = extractSomeMetadataFilepath(snapshotInfo.getMetadataPath(), metadataBasePath, IcebergTableTest::doesResembleMetadataFilename);
-    Assert.assertTrue(optMetadataFile.isPresent(), "has metadata filepath");
-    verifyMetadataFile(optMetadataFile.get(), Optional.of(perSnapshotFilesets.size()));
+    snapshotInfo.getMetadataPath().ifPresent(metadataPath -> {
+          Optional<File> optMetadataFile = extractSomeMetadataFilepath(metadataPath, metadataBasePath, IcebergTableTest::doesResembleMetadataFilename);
+          Assert.assertTrue(optMetadataFile.isPresent(), "has metadata filepath");
+          verifyMetadataFile(optMetadataFile.get(), Optional.of(overallNumSnapshots));
+        }
+    );
     // verify manifest list file
     Optional<File> optManifestListFile = extractSomeMetadataFilepath(snapshotInfo.getManifestListPath(), metadataBasePath, IcebergTableTest::doesResembleManifestListFilename);
     Assert.assertTrue(optManifestListFile.isPresent(), "has manifest list filepath");
@@ -115,7 +197,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
     verifyManifestFiles(manifestFileInfos, snapshotInfo.getManifestFilePaths(), perSnapshotFilesets);
     verifyAnyOrder(snapshotInfo.getAllDataFilePaths(), flatten(perSnapshotFilesets), "data filepaths");
     // verify all aforementioned paths collectively equal `getAllPaths()`
-    List<String> allPathsExpected = Lists.newArrayList(snapshotInfo.getMetadataPath(), snapshotInfo.getManifestListPath());
+    List<String> allPathsExpected = Lists.newArrayList(snapshotInfo.getManifestListPath());
+    snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
     allPathsExpected.addAll(snapshotInfo.getManifestFilePaths());
     allPathsExpected.addAll(snapshotInfo.getAllDataFilePaths());
     verifyAnyOrder(snapshotInfo.getAllPaths(), allPathsExpected, "all paths, metadata and data");
@@ -131,7 +214,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
     return basePath;
   }
 
-  /** Add one snapshot per sub-list of `perSnapshotFilesets`, in order, with the sub-list contents as its data files */
+  /** Add one snapshot per sub-list of `perSnapshotFilesets`, in order, with the sub-list contents as the data files */
   protected static void initializeSnapshots(Table table, List<List<String>> perSnapshotFilesets) {
     for (List<String> snapshotFileset : perSnapshotFilesets) {
       AppendFiles append = table.newAppend();