You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/01 00:46:59 UTC

[GitHub] [gobblin] phet opened a new pull request, #3575: Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

phet opened a new pull request, #3575:
URL: https://github.com/apache/gobblin/pull/3575

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1707
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   unit tests included
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990545035


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   Or we can add more clarity in IcebergTable.getIncrementalSnapshotInfosIterator(), at least I was missing the assumption that it return the snapshot from oldest to latest. and files only appear once in earliest snapshot that contains it. Might be just me though... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit
   >  all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()}
   >  from the n-th or prior elements.  Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this
   >  mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from
   >  which it first becomes reachable.
   >  
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990330125


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) {
+                missingPaths.add(mfi.getManifestFilePath());
+                mfi.getListedFilePaths().stream().filter(p ->
+                    !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+                ).forEach(missingPaths::add);
+              }
+            }
+            return missingPaths.iterator();
+          } else {
+            log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p ->
+                !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+            log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      for (String pathString : filePathsIterable) {
+        try {
+          Path path = new Path(pathString);
+          results.put(path, this.sourceFs.getFileStatus(path));
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe);
+          }
+        }
+      }
+    } catch (WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
+  protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
+    try {
+      // omit considering timestamp (or other markers of freshness), as files should be immutable

Review Comment:
   not sure whether this is true for all files, as metadata file is immutable, data file might be updated? 



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   Can we add comment here to mention that only oldest snapshot which most likely already been copied contains the all old files, the mfi from newer snapshot only contain new added file in those manifest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on pull request #3575: Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#issuecomment-1271153373

   > it seems as long as there is one new snapshot generate on source, we will go through all the data files available on source to do copy even there is only one new file added.
   
   actually we'll always go through the complete metadata on source to list every file reachable from at least one snapshot.  it's true we do that even when there's not any 'new', unreplicated snapshot.  actual copy however only happens for files that are not present on the destination.  further, we need not examine every file, to determine whether it exists on dest.  rather, thanks to the immutability of iceberg files, we may short-circuit evaluation of an entire subtree of the iceberg metadata, when the root (e.g. manifest-list or manifest) is found already to exist at dest.  for details, I've added the comment `// ALGO:` in `IcebergDataset.getFilePathsToFileStatus()`
   
   > How do we plan to handle the file deletion on source? i.e. expire snapshot operation?
   
   good question!  the answer is: distcp is not responsible.  instead we will expect reachability analysis and orphan file deletion to happen elsewhere.  a good candidate would be the destination catalog we'll eventually register the copied 'metadata.json' file with.  e.g. that catalog would hold the metadata version prior to the registration and could easily determine which snapshots 'expire' from the act of replacing the older metadata file with the newer one (replication has copied from source)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements.  Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable.
    
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
   



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements.  Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable.
   > 
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990545312


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) {
+                missingPaths.add(mfi.getManifestFilePath());
+                mfi.getListedFilePaths().stream().filter(p ->
+                    !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+                ).forEach(missingPaths::add);
+              }
+            }
+            return missingPaths.iterator();
+          } else {
+            log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p ->
+                !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+            log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      for (String pathString : filePathsIterable) {
+        try {
+          Path path = new Path(pathString);
+          results.put(path, this.sourceFs.getFileStatus(path));
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe);
+          }
+        }
+      }
+    } catch (WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
+  protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
+    try {
+      // omit considering timestamp (or other markers of freshness), as files should be immutable

Review Comment:
   Makes sense, given you have the comment mention this behavior as well, it should be fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990534636


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   those are definitely the semantics of using `IcebergTable.getIncrementalSnapshotInfosIterator()`.  it's in the javadoc there.  where do you want me to repeat it over here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990533651


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) {
+                missingPaths.add(mfi.getManifestFilePath());
+                mfi.getListedFilePaths().stream().filter(p ->
+                    !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+                ).forEach(missingPaths::add);
+              }
+            }
+            return missingPaths.iterator();
+          } else {
+            log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p ->
+                !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+            log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      for (String pathString : filePathsIterable) {
+        try {
+          Path path = new Path(pathString);
+          results.put(path, this.sourceFs.getFileStatus(path));
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe);
+          }
+        }
+      }
+    } catch (WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
+  protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
+    try {
+      // omit considering timestamp (or other markers of freshness), as files should be immutable

Review Comment:
   it's technically possible to change files in place, but to do so, breaks the iceberg's repeatability.  it's not something we should ever encourage... instead write new files and create a snapshot w/ those that replaces the original ones!  the real issue w/ in-place mods to data files is that every delta copy must devolve into a full comparison of the filestatus (between source and dest) for the entire iceberg table.  that's a huge amount of effort in some cases... all because of misbehaving writers/updaters.
   
   I suggest that if we do find we're working w/ writers that do this, we can return here to add the necessary complexity, likely something we'll control via configuration, so it's not always on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  * This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit
   >  * all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()}
   >  * from the n-th or prior elements.  Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this
   >  * mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from
   >  * which it first becomes reachable.
   >  *
   >  * Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990533651


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) {
+                missingPaths.add(mfi.getManifestFilePath());
+                mfi.getListedFilePaths().stream().filter(p ->
+                    !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+                ).forEach(missingPaths::add);
+              }
+            }
+            return missingPaths.iterator();
+          } else {
+            log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p ->
+                !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+            log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      for (String pathString : filePathsIterable) {
+        try {
+          Path path = new Path(pathString);
+          results.put(path, this.sourceFs.getFileStatus(path));
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe);
+          }
+        }
+      }
+    } catch (WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
+  protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
+    try {
+      // omit considering timestamp (or other markers of freshness), as files should be immutable

Review Comment:
   it's technically possible to change files in place, but to do so, breaks the iceberg's repeatability.  it's not something we should ever encourage... instead write new files and create a snapshot w/ those that substitutes out and hence replaces the original ones!
   
   for distcp specifically, the real issue w/ in-place mods to data files is that every delta copy must devolve into a full comparison of the filestatus (between source and dest) for the entire iceberg table.  that's a huge amount of effort in some cases... all because of misbehaving writers/updaters.
   
   I suggest that if we do learn we're working w/ writers that do this, we later return here to add the necessary complexity.  it's likely something we'll control via configuration, so it's not always on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
ZihanLi58 merged PR #3575:
URL: https://github.com/apache/gobblin/pull/3575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3575: Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#issuecomment-1264179900

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3575?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3575](https://codecov.io/gh/apache/gobblin/pull/3575?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (499f47f) into [master](https://codecov.io/gh/apache/gobblin/commit/71da34b5d42e7b2c159ef241368b644a08de875d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71da34b) will **decrease** coverage by `3.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3575      +/-   ##
   ============================================
   - Coverage     46.90%   43.90%   -3.01%     
   + Complexity    10610     2057    -8553     
   ============================================
     Files          2111      406    -1705     
     Lines         82533    17545   -64988     
     Branches       9178     2143    -7035     
   ============================================
   - Hits          38714     7703   -31011     
   + Misses        40261     8986   -31275     
   + Partials       3558      856    -2702     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3575?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...lin/util/filesystem/FileSystemInstrumentation.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvZmlsZXN5c3RlbS9GaWxlU3lzdGVtSW5zdHJ1bWVudGF0aW9uLmphdmE=) | `92.85% <0.00%> (-7.15%)` | :arrow_down: |
   | [...n/data/management/copy/iceberg/IcebergDataset.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaWNlYmVyZy9JY2ViZXJnRGF0YXNldC5qYXZh) | | |
   | [...apache/gobblin/metrics/ConsoleReporterFactory.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9Db25zb2xlUmVwb3J0ZXJGYWN0b3J5LmphdmE=) | | |
   | [...blin/runtime/scheduler/QuartzJobSpecScheduler.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvc2NoZWR1bGVyL1F1YXJ0ekpvYlNwZWNTY2hlZHVsZXIuamF2YQ==) | | |
   | [...source/extractor/filebased/FileBasedExtractor.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZmlsZWJhc2VkL0ZpbGVCYXNlZEV4dHJhY3Rvci5qYXZh) | | |
   | [...he/gobblin/runtime/JobExecutionEventSubmitter.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvSm9iRXhlY3V0aW9uRXZlbnRTdWJtaXR0ZXIuamF2YQ==) | | |
   | [...ain/java/org/apache/gobblin/hive/HiveRegProps.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL0hpdmVSZWdQcm9wcy5qYXZh) | | |
   | [...obblin/util/limiter/RestliServiceBasedLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbGltaXRlci9SZXN0bGlTZXJ2aWNlQmFzZWRMaW1pdGVyLmphdmE=) | | |
   | [...blin/source/extractor/resultset/RecordSetList.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvcmVzdWx0c2V0L1JlY29yZFNldExpc3QuamF2YQ==) | | |
   | [...source/extractor/filebased/GZIPFileDownloader.java](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZmlsZWJhc2VkL0daSVBGaWxlRG93bmxvYWRlci5qYXZh) | | |
   | ... and [1698 more](https://codecov.io/gh/apache/gobblin/pull/3575/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990534636


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   those are definitely the semantics of using `IcebergTable.getIncrementalSnapshotInfosIterator()`.  it's in the javadoc there.  where do you want me to repeat it over here?  maybe should I note those semantics above here when I call that method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

Posted by GitBox <gi...@apache.org>.
phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990566896


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {

Review Comment:
   sure thing.  I added this:
   >  This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()} from the n-th or prior elements.  Given the order of the {@link Iterator\<IcebergSnapshotInfo\>} returned, this mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from which it first becomes reachable.
   > 
   >  Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org