You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/14 17:52:25 UTC

[gobblin] branch master updated: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 585298fb5 [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)
585298fb5 is described below

commit 585298fb5ebc074f69c1b9db87de6186c4855b26
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Fri Oct 14 10:52:18 2022 -0700

    [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)
    
    * Enhance `IcebergDataset` to detect when files already at dest and proceed with only delta
    
    * fixup: minor doc/comment mods
    
    * Allow not finding a file mentioned by iceberg metadata to be a non-fatal error
    
    * Have `IcebergDataset` check more thoroughly at dest for previously-copied files that could be skipped
    
    * Add short-circuiting of `IcebergDataset` file scan when (root) metadata file already replicated to dest
    
    * Abreviate `IcebergDataset` logging when source file not found, refactor for code reuse, and round out testing
    
    * Improve javadoc as suggested during review
    
    * Streamline `IcebergDataset` logging when source path not found
    
    * Skip `IcebergDataset` per-data-file check, falling back to per-manifest-file checking
    
    * minor comment change
    
    * Add `IcebergDataset` logging to indicate volume of filepaths accumulated
    
    * Extend and refactor `IcebergDataset` logging to indicate volume of filepaths accumulated
    
    * improve comments
    
    * Add logging for running count of source paths not found
---
 .../management/copy/iceberg/IcebergDataset.java    | 160 +++++++++++++--
 .../copy/iceberg/IcebergSnapshotInfo.java          |   8 +-
 .../data/management/copy/iceberg/IcebergTable.java |  37 +++-
 .../copy/iceberg/IcebergDatasetTest.java           | 227 ++++++++++++++++++---
 .../service/monitoring/FsJobStatusRetriever.java   |   2 +-
 .../util/function/CheckedExceptionFunction.java    |  44 +++-
 .../util/measurement/GrowthMilestoneTracker.java   |  64 ++++++
 7 files changed, 475 insertions(+), 67 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 1886689a1..006e9c1c6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.data.management.copy.iceberg;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -27,9 +29,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
+import java.util.function.Function;
+import javax.annotation.concurrent.NotThreadSafe;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -61,6 +68,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
   private final IcebergTable icebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
+  private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired
 
   private final Optional<URI> sourceCatalogMetastoreURI;
   private final Optional<URI> targetCatalogMetastoreURI;
@@ -127,15 +135,14 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
   Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
     String fileSet = this.getFileSetId();
     List<CopyEntity> copyEntities = Lists.newArrayList();
-    Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
-    log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
+    Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus(targetFs, copyConfig);
+    log.info("~{}.{}~ found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
 
     Configuration defaultHadoopConfiguration = new Configuration();
     for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
       Path srcPath = entry.getKey();
       FileStatus srcFileStatus = entry.getValue();
-      // TODO: determine whether unnecessarily expensive to repeatedly re-create what should be the same FS: could it
-      // instead be created once and reused thereafter?
+      // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
       FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
 
       // TODO: Add preservation of ancestor ownership and permissions!
@@ -149,34 +156,151 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
       copyEntities.add(fileEntity);
     }
-    log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
+    log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
     return copyEntities;
   }
 
   /**
    * Finds all files of the Iceberg's current snapshot
-   * Returns a map of path, file status for each file that needs to be copied
+   * @return a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
     IcebergTable icebergTable = this.getIcebergTable();
+    /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
+    Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
+      // omit considering timestamp (or other markers of freshness), as files should be immutable
+      // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
+      copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
+    );
+
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
+        isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
+      log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return Maps.newHashMap();
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPresentOnTarget.apply(manListPath)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
+                missingPaths.add(mfi.getManifestFilePath());
+                // being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
+                // skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
+                // during a period where replication fell so far behind that no snapshots listed among current metadata
+                // are yet at dest.  since the consequence of unnecessary copy is merely wasted data transfer and
+                // compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
+                // file count may run into 100k+ (even beyond!)
+                missingPaths.addAll(mfi.getListedFilePaths());
+              }
+            }
+            log.info("~{}.{}~ snapshot '{}': collected {} additional source paths",
+                dbName, inputTableName, snapshotInfo.getSnapshotId(), missingPaths.size());
+            return missingPaths.iterator();
+          } else {
+            log.info("~{}.{}~ snapshot '{}' already present on target... skipping (including contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> metadataPath = snapshotInfo.getMetadataPath();
+            Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
+            metadataPath.ifPresent(ignore ->
+                log.info("~{}.{}~ metadata IS {} already present on target", dbName, inputTableName,
+                    nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+            );
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
+
+    Map<Path, FileStatus> results = Maps.newHashMap();
+    long numSourceFilesNotFound = 0L;
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
+      PathErrorConsolidator errorConsolidator = new PathErrorConsolidator();
+      for (String pathString : filePathsIterable) {
+        Path path = new Path(pathString);
+        try {
+          results.put(path, this.sourceFs.getFileStatus(path));
+          if (growthTracker.isAnotherMilestone(results.size())) {
+            log.info("~{}.{}~ collected file status on '{}' source paths", dbName, inputTableName, results.size());
+          }
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            String total = ++numSourceFilesNotFound + " total";
+            String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
+            errorConsolidator.prepLogMsg(path).ifPresent(msg ->
+                log.warn("~{}.{}~ source {} ({}... {})", dbName, inputTableName, msg, speculation, total)
+            );
+          }
+        }
+      }
+    } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /**
+   * Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy.
+   * OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level:
+   * 1. on the first path within the dir, log that specific path
+   * 2. on the second path within the dir, log the dir path as a summarization (with ellipsis)
+   * 3. thereafter, skip, logging nothing
+   * The directory, parent path is the default consolidation strategy, yet may be overridden.
+   */
+  @NotThreadSafe
+  protected static class PathErrorConsolidator {
+    private final Map<Path, Boolean> consolidatedPathToWhetherErrorLogged = Maps.newHashMap();
+
+    /** @return consolidated message to log, iff appropriate; else `Optional.empty()` when deserves inhibition */
+    public Optional<String> prepLogMsg(Path path) {
+      Path consolidatedPath = calcPathConsolidation(path);
+      Boolean hadAlreadyLoggedConsolidation = this.consolidatedPathToWhetherErrorLogged.get(consolidatedPath);
+      if (!Boolean.valueOf(true).equals(hadAlreadyLoggedConsolidation)) {
+        boolean shouldLogConsolidationNow = hadAlreadyLoggedConsolidation != null;
+        consolidatedPathToWhetherErrorLogged.put(consolidatedPath, shouldLogConsolidationNow);
+        String pathLogString = shouldLogConsolidationNow ? (consolidatedPath.toString() + "/...") : path.toString();
+        return Optional.of("path" + (shouldLogConsolidationNow ? "s" : " ") + " not found: '" + pathLogString + "'");
+      } else {
+        return Optional.empty();
+      }
+    }
+
+    /** @return a {@link Path} to consolidate around; default is: {@link Path#getParent()} */
+    protected Path calcPathConsolidation(Path path) {
+      return path.getParent();
     }
-    return result;
+  }
+
+  @VisibleForTesting
+  static PathErrorConsolidator createPathErrorConsolidator() {
+    return new PathErrorConsolidator();
   }
 
   /** Add layer of indirection to permit test mocking by working around `FileSystem.get()` `static` method */
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index b4214017f..42b871302 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -56,9 +56,15 @@ public class IcebergSnapshotInfo {
     return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
   }
 
-  public List<String> getAllPaths() {
+  /** @return the `manifestListPath` and `metadataPath`, if present */
+  public List<String> getSnapshotApexPaths() {
     List<String> result = metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
     result.add(manifestListPath);
+    return result;
+  }
+
+  public List<String> getAllPaths() {
+    List<String> result = getSnapshotApexPaths();
     result.addAll(getManifestFilePaths());
     result.addAll(getAllDataFilePaths());
     return result;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index a9fbe05cc..fbd924845 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -72,6 +72,12 @@ public class IcebergTable {
     return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()));
   }
 
+  /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */
+  public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException {
+    TableMetadata current = accessTableMetadata();
+    return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), true);
+  }
+
   /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
   public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
     TableMetadata current = accessTableMetadata();
@@ -90,23 +96,31 @@ public class IcebergTable {
   }
 
   /**
-   * @return metadata info for all known snapshots, but incrementally, so content overlap between snapshots appears
-   * only within the first as they're ordered historically, with *most recent last*
+   * @return metadata info for all known snapshots, but incrementally, so overlapping entries within snapshots appear
+   * only with the first as they're ordered historically, with *most recent last*.
+   *
+   * This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit
+   * all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()}
+   * from the n-th or prior elements.  Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this
+   * mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from
+   * which it first becomes reachable.
+   *
+   * Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
    */
   public Iterator<IcebergSnapshotInfo> getIncrementalSnapshotInfosIterator() throws IOException {
     // TODO: investigate using `.addedFiles()`, `.deletedFiles()` to calc this
-    Set<String> knownManifestListFilePaths = Sets.newHashSet();
-    Set<String> knownManifestFilePaths = Sets.newHashSet();
-    Set<String> knownListedFilePaths = Sets.newHashSet();
+    Set<String> knownFilePaths = Sets.newHashSet(); // as absolute paths are clearly unique, use a single set for all
     return Iterators.filter(Iterators.transform(getAllSnapshotInfosIterator(), snapshotInfo -> {
-      if (false == knownManifestListFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
+      log.info("~{}~ before snapshot '{}' - '{}' total known iceberg paths",
+          tableId, snapshotInfo.getSnapshotId(), knownFilePaths.size());
+      if (false == knownFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
         return snapshotInfo.toBuilder().manifestListPath(null).build(); // use `null` as marker to surrounding `filter`
       }
       List<IcebergSnapshotInfo.ManifestFileInfo> novelManifestInfos = Lists.newArrayList();
       for (ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
-        if (true == knownManifestFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
+        if (true == knownFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
           List<String> novelListedPaths = mfi.getListedFilePaths().stream()
-              .filter(fpath -> true == knownListedFilePaths.add(fpath)) // heretofore unknown
+              .filter(fpath -> true == knownFilePaths.add(fpath)) // heretofore unknown
               .collect(Collectors.toList());
           if (novelListedPaths.size() == mfi.getListedFilePaths().size()) { // nothing filtered
             novelManifestInfos.add(mfi); // reuse orig
@@ -130,15 +144,18 @@ public class IcebergTable {
   }
 
   protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation) throws IOException {
+    return createSnapshotInfo(snapshot, metadataFileLocation, false);
+  }
+
+  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation, boolean skipManifestFileInfo) throws IOException {
     // TODO: verify correctness, even when handling 'delete manifests'!
-    List<ManifestFile> manifests = snapshot.allManifests();
     return new IcebergSnapshotInfo(
         snapshot.snapshotId(),
         Instant.ofEpochMilli(snapshot.timestampMillis()),
         metadataFileLocation,
         snapshot.manifestListLocation(),
         // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
-        calcAllManifestFileInfos(manifests, tableOps.io())
+        skipManifestFileInfo ? Lists.newArrayList() : calcAllManifestFileInfos(snapshot.allManifests(), tableOps.io())
       );
   }
 
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 61aaf6851..159eca34c 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -25,15 +25,16 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import lombok.Data;
 
 import org.apache.hadoop.conf.Configuration;
@@ -79,7 +80,7 @@ public class IcebergDatasetTest {
   private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a";
   private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
   private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
-  private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockedIcebergTable.SnapshotPaths(
+  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockIcebergTable.SnapshotPaths(
       Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
       Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
           MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B)))
@@ -88,7 +89,7 @@ public class IcebergDatasetTest {
   private static final String MANIFEST_PATH_1 = MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
   private static final String MANIFEST_DATA_PATH_1A = MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
   private static final String MANIFEST_DATA_PATH_1B = MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
-  private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockedIcebergTable.SnapshotPaths(
+  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockIcebergTable.SnapshotPaths(
       Optional.empty(), MANIFEST_LIST_PATH_1,
       Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
           MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))
@@ -104,23 +105,109 @@ public class IcebergDatasetTest {
   }
 
   @Test
-  public void testGetFilePaths() throws IOException {
-    IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
-    IcebergSnapshotInfo icebergSnapshotInfo = SNAPSHOT_PATHS_0.asSnapshotInfo();
-    Mockito.when(icebergTable.getIncrementalSnapshotInfosIterator()).thenReturn(Arrays.asList(icebergSnapshotInfo).iterator());
+  public void testGetFilePathsWhenDestEmpty() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList();
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+  }
 
-    FileSystem sourceFs = Mockito.mock(FileSystem.class);
-    IcebergDataset icebergDataset = new IcebergDataset("test_db_name", "test_tbl_name", icebergTable, new Properties(), sourceFs);
+  @Test
+  public void testGetFilePathsWhenOneManifestListAtDest() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+  }
 
-    Set<Path> expectedPaths = Sets.newHashSet();
-    for (String p : icebergSnapshotInfo.getAllPaths()) {
-      expectedPaths.add(new Path(p));
-    }
+  @Test
+  public void testGetFilePathsWhenOneManifestAtDest() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_PATH_1);
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+    expectedResultPaths.add(new Path(MANIFEST_LIST_PATH_1)); // expect manifest's parent, despite manifest subtree skip
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+  }
+
+  @Test
+  public void testGetFilePathsWhenSomeDataFilesAtDest() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_DATA_PATH_1B, MANIFEST_DATA_PATH_0A);
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    // despite already existing on target, expect anyway: per-file check skipped for optimization's sake
+    // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_1B));
+    // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_0A));
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+  }
+
+  @Test
+  public void testGetFilePathsWillSkipMissingSourceFile() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    // pretend this path doesn't exist on source:
+    Path missingPath = new Path(MANIFEST_DATA_PATH_0A);
+    Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    existingSourcePaths.remove(missingPath);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+    expectedResultPaths.remove(missingPath);
+    validateGetFilePathsGivenDestState(
+        icebergSnapshots,
+        Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
+        existingDestPaths,
+        expectedResultPaths);
+  }
+
+  @Test
+  public void testGetFilePathsWhenManifestListsAtDestButNotMetadata() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+    Set<Path> expectedResultPaths = Sets.newHashSet();
+    expectedResultPaths.add(new Path(METADATA_PATH));
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+  }
 
-    Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus();
-    Assert.assertEquals(filePathsToFileStatus.keySet(), expectedPaths);
-    // verify all values `null` (because `sourceFs.getFileStatus` not mocked)
-    Assert.assertEquals(Sets.newHashSet(filePathsToFileStatus.values()), new HashSet<>(Arrays.asList(new FileStatus[] { null })));
+  @Test
+  public void testGetFilePathsWhenAllAtDest() throws IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH, MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+    Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any delta
+    IcebergTable mockTable = validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+    // ensure short-circuiting was able to avert iceberg manifests scan
+    Mockito.verify(mockTable, Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
+    Mockito.verifyNoMoreInteractions(mockTable);
+  }
+
+  /** Exception wrapping is used internally--ensure that doesn't lapse into silently swallowing errors */
+  @Test(expectedExceptions = IOException.class)
+  public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOException {
+    IcebergTable icebergTable = MockIcebergTable.withSnapshots(Lists.newArrayList(SNAPSHOT_PATHS_0));
+
+    MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI);
+    FileSystem sourceFs = sourceFsBuilder.build();
+    IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
+
+    MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+    FileSystem destFs = destFsBuilder.build();
+    Mockito.doThrow(new IOException("Ha - not so fast!")).when(destFs).getFileStatus(new Path(SNAPSHOT_PATHS_0.manifestListPath));
+
+    CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
+    icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+  }
+
+  /** Validate error consolidation used to streamline logging. */
+  @Test
+  public void testPathErrorConsolidator() {
+    IcebergDataset.PathErrorConsolidator pec = IcebergDataset.createPathErrorConsolidator();
+    Optional<String> msg0 = pec.prepLogMsg(new Path("/a/b/c/file0"));
+    Assert.assertTrue(msg0.isPresent());
+    Assert.assertEquals(msg0.get(), "path  not found: '/a/b/c/file0'");
+    Optional<String> msg1 = pec.prepLogMsg(new Path("/a/b/c/file1"));
+    Assert.assertTrue(msg1.isPresent());
+    Assert.assertEquals(msg1.get(), "paths not found: '/a/b/c/...'");
+    Optional<String> msg2 = pec.prepLogMsg(new Path("/a/b/c/file2"));
+    Assert.assertFalse(msg2.isPresent());
+    Optional<String> msg3 = pec.prepLogMsg(new Path("/a/b/c-other/file0"));
+    Assert.assertTrue(msg3.isPresent());
   }
 
   /**
@@ -137,7 +224,7 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
     IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
@@ -162,7 +249,7 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
+    IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
     IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
@@ -176,7 +263,64 @@ public class IcebergDatasetTest {
     verifyCopyEntities(copyEntities, expectedPaths);
   }
 
-  private void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
+  /**
+   *  exercise {@link IcebergDataset::getFilePaths} and validate the result
+   *  @return {@link IcebergTable} (mock!), for behavioral verification
+   */
+  protected IcebergTable validateGetFilePathsGivenDestState(
+      List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
+      List<String> existingDestPaths,
+      Set<Path> expectedResultPaths) throws IOException {
+    return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths);
+  }
+
+  /**
+   *  exercise {@link IcebergDataset::getFilePaths} and validate the result
+   *  @return {@link IcebergTable} (mock!), for behavioral verification
+   */
+  protected IcebergTable validateGetFilePathsGivenDestState(
+      List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
+      Optional<List<String>> optExistingSourcePaths,
+      List<String> existingDestPaths,
+      Set<Path> expectedResultPaths) throws IOException {
+    IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
+
+    MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
+    optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
+    FileSystem sourceFs = sourceFsBuilder.build();
+    IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
+
+    MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+    destFsBuilder.addPaths(existingDestPaths);
+    FileSystem destFs = destFsBuilder.build();
+    CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
+
+    Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+    Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
+    // verify solely the path portion of the `FileStatus`, since that's all mock sets up
+    Assert.assertEquals(
+        filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()),
+        expectedResultPaths);
+    return icebergTable;
+  }
+
+  /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */
+  protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) {
+    Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
+        snapshotDef.asSnapshotInfo().getAllPaths().stream()
+    ).forEach(p ->
+        paths.add(new Path(p))
+    );
+    return paths;
+  }
+
+  private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) {
+    return CopyConfiguration.builder(fs, copyConfigProperties)
+        .copyContext(new CopyContext())
+        .build();
+  }
+
+  private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
     List<String> actual = new ArrayList<>();
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
@@ -218,6 +362,7 @@ public class IcebergDatasetTest {
     public MockFileSystemBuilder(URI fsURI) {
       this(fsURI, false);
     }
+
     public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) {
       this.fsURI = fsURI;
       this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet());
@@ -256,6 +401,8 @@ public class IcebergDatasetTest {
         Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation ->
             createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString()));
       } else {
+        // WARNING: order is critical--specific paths *after* `any(Path)`; in addition, since mocking further
+        // an already-mocked instance, `.doReturn/.when` is needed (vs. `.when/.thenReturn`)
         Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException());
         for (Path p : this.optPaths.get()) {
           Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
@@ -273,7 +420,7 @@ public class IcebergDatasetTest {
   }
 
 
-  private static class MockedIcebergTable extends IcebergTable {
+  private static class MockIcebergTable {
 
     @Data
     public static class SnapshotPaths {
@@ -282,7 +429,12 @@ public class IcebergDatasetTest {
       private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
 
       public IcebergSnapshotInfo asSnapshotInfo() {
-        return asSnapshotInfo(0L, Instant.ofEpochMilli(0L));
+        return asSnapshotInfo(0L);
+      }
+
+      /** @param snapshotIdIndex used both as snapshot ID and as snapshot (epoch) timestamp */
+      public IcebergSnapshotInfo asSnapshotInfo(long snapshotIdIndex) {
+        return asSnapshotInfo(snapshotIdIndex, Instant.ofEpochMilli(snapshotIdIndex));
       }
 
       public IcebergSnapshotInfo asSnapshotInfo(Long snapshotId, Instant timestamp) {
@@ -290,19 +442,26 @@ public class IcebergDatasetTest {
       }
     }
 
-    private final List<SnapshotPaths> snapshotPathsList;
-
-    public MockedIcebergTable(List<SnapshotPaths> snapshotPathsList) {
-      super(null, null);
-      this.snapshotPathsList = Lists.newCopyOnWriteArrayList(snapshotPathsList);
+    public static IcebergTable withSnapshots(List<SnapshotPaths> snapshotPathSets) throws IOException {
+      IcebergTable table = Mockito.mock(IcebergTable.class);
+      int lastIndex = snapshotPathSets.size() - 1;
+      Mockito.when(table.getCurrentSnapshotInfoOverviewOnly()).thenReturn(
+          snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
+      // ADMISSION: this is strictly more analogous to `IcebergTable.getAllSnapshotInfosIterator()`, as it doesn't
+      // filter only the delta... nonetheless, it should work fine for the tests herein
+      Mockito.when(table.getIncrementalSnapshotInfosIterator()).thenReturn(
+          IndexingStreams.transformWithIndex(snapshotPathSets.stream(),
+              (pathSet, i) -> pathSet.asSnapshotInfo(i)).iterator());
+      return table;
     }
+  }
 
-    @Override
-    public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() {
-      List<IcebergSnapshotInfo> snapshotInfos = snapshotPathsList.stream()
-          .map(SnapshotPaths::asSnapshotInfo)
-          .collect(Collectors.toList());
-      return snapshotInfos.iterator();
+  public static class IndexingStreams {
+    /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in scala */
+    public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, BiFunction<T, Integer, R> f) {
+      // given sketchy import, sequester for now within enclosing test class, rather than adding to `gobblin-utility`
+      return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(
+          inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
     }
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 9b5c40362..613dc7f69 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -123,7 +123,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
     try {
       String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
       List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName -> storeName.startsWith(storeNamePrefix));
-      List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapUnchecked(storeName ->
+      List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapToUnchecked(storeName ->
           stateStore.getAll(storeName).stream()
       )).collect(Collectors.toList());
       return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, flowGroupExecutionsStates, countJobStatusesPerFlowName));
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
index 96c6ab45f..115a43d35 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
@@ -17,18 +17,40 @@
 
 package org.apache.gobblin.util.function;
 
+import java.io.IOException;
 import java.util.function.Function;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 
 /**
- * Alternative to {@link java.util.function.Function} that handles wrapping one checked `Exception`.
+ * Alternative to {@link Function} that handles wrapping (or tunneling) a single checked {@link Exception} derived class.
  * Inspired by: https://dzone.com/articles/how-to-handle-checked-exception-in-lambda-expressi
  */
 @FunctionalInterface
 public interface CheckedExceptionFunction<T, R, E extends Exception> {
+  /**
+   * Wrapper to tunnel {@link IOException} as an unchecked exception that would later be unwrapped via
+   * {@link WrappedIOException#rethrowWrapped()}.  If no expectation of unwrapping, this wrapper may simply add
+   * unnecessar obfuscation: instead use {@link CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)}
+   *
+   * BUMMER: specific {@link IOException} hard-coded because: "generic class may not extend 'java.lang.Throwable'"
+   */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    /** CAUTION: if this be your intent, DO NOT FORGET!  Being unchecked, the compiler WILL NOT remind you. */
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   R apply(T arg) throws E;
 
-  /** @return a `Function` that will invoke {@link #apply} and catch any instance of Exception, `E`, rethrowing it wrapped as {@link RuntimeException}. */
-  static <T, R, E extends Exception> Function<T, R> wrapUnchecked(CheckedExceptionFunction<T, R, E> f) {
+  /** @return {@link Function} proxy that catches any instance of {@link Exception} and rethrows it wrapped as {@link RuntimeException} */
+  static <T, R, E extends Exception> Function<T, R> wrapToUnchecked(CheckedExceptionFunction<T, R, E> f) {
     return a -> {
       try {
         return f.apply(a);
@@ -39,4 +61,20 @@ public interface CheckedExceptionFunction<T, R, E extends Exception> {
       }
     };
   }
+
+  /**
+   * @return {@link Function} proxy that catches any instance of {@link IOException}, and rethrows it wrapped as {@link WrappedIOException},
+   * for easy unwrapping via {@link WrappedIOException#rethrowWrapped()}
+   */
+  static <T, R, E extends IOException> Function<T, R> wrapToTunneled(CheckedExceptionFunction<T, R, E> f) {
+    return a -> {
+      try {
+        return f.apply(a);
+      } catch (RuntimeException re) {
+        throw re; // no double wrapping
+      } catch (IOException ioe) {
+        throw new WrappedIOException(ioe);
+      }
+    };
+  }
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java
new file mode 100644
index 000000000..99dbd82fb
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.measurement;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.stream.LongStream;
+
+
+/** Stateful class to track growth/accumulation/"high watermark" against milestones */
+public class GrowthMilestoneTracker {
+  private final Iterator<Long> milestoneSequence = createMilestoneSequence();
+  private Long nextMilestone = milestoneSequence.next();
+
+  /** @return whether `n >=` the next monotonically increasing milestone (with no effort to handle wrap-around) */
+  public final boolean isAnotherMilestone(long n) {
+    return this.calcLargestNewMilestone(n).isPresent();
+  }
+
+  /** @return largest monotonically increasing milestone iff `n >=` some new one (no effort to handle wrap-around) */
+  public final Optional<Long> calcLargestNewMilestone(long n) {
+    if (n < this.nextMilestone) {
+      return Optional.empty();
+    }
+    Long largestMilestoneAchieved;
+    do {
+      largestMilestoneAchieved = this.nextMilestone;
+      this.nextMilestone = this.milestoneSequence.hasNext() ? this.milestoneSequence.next() : Long.MAX_VALUE;
+    } while (n >= this.nextMilestone);
+    return Optional.of(largestMilestoneAchieved);
+  }
+
+  /**
+   * @return positive monotonically increasing milestones, for {@link GrowthMilestoneTracker#isAnotherMilestone(long)}
+   * to track against; if/whenever exhausted, {@link Long#MAX_VALUE} becomes stand-in thereafter
+   * DEFAULT SEQ: [1, 10, 100, 1000, 10k, 15k, 20k, 25k, 30k, ..., 50k, 75k, 100k, 125k, ..., 250k, 300k, 350k, ... )
+   */
+  protected Iterator<Long> createMilestoneSequence() {
+    LongStream initially = LongStream.iterate(1L, i -> i * 10).limit((long) Math.log10(1000));
+    LongStream next = LongStream.rangeClosed(2L, 9L).map(i -> i * 5000); // 10k - 45k
+    LongStream then = LongStream.rangeClosed(2L, 9L).map(i -> i * 25000); // 50k - 225k
+    LongStream thereafter = LongStream.iterate(250000L, i -> i + 50000);
+    return
+        LongStream.concat(initially,
+            LongStream.concat(next,
+                LongStream.concat(then, thereafter))
+        ).iterator();
+  }
+}