You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/07 22:31:58 UTC

[GitHub] [gobblin] phet commented on a diff in pull request #3575: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta

phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990533651


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
     return copyEntities;
   }
 
+  /** Not intended to escape this class... yet `public` visibility in case it somehow does */
+  @RequiredArgsConstructor
+  public static class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
   /**
    * Finds all files of the Iceberg's current snapshot
    * Returns a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
-    Map<Path, FileStatus> result = Maps.newHashMap();
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
+    Map<Path, FileStatus> results = Maps.newHashMap();
     IcebergTable icebergTable = this.getIcebergTable();
+    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+    if (currentSnapshotOverview.getMetadataPath().map(p -> isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+        isPathPresentOnTarget(new Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+      log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+          currentSnapshotOverview.getManifestListPath(),
+          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+      return results;
+    }
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
-          // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
-          log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
-              snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
-          return snapshotInfo.getAllPaths().iterator();
+          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+          String manListPath = snapshotInfo.getManifestListPath();
+          log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
+          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
+          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
+          // some of the children pointed to within could have been copied prior, when they previously appeared as a
+          // child of the current file's predecessor (which this new meta file now replaces).
+          if (!isPathPresentOnTarget(new Path(manListPath), targetFs, copyConfig)) {
+            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+              if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()), targetFs, copyConfig)) {
+                missingPaths.add(mfi.getManifestFilePath());
+                mfi.getListedFilePaths().stream().filter(p ->
+                    !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+                ).forEach(missingPaths::add);
+              }
+            }
+            return missingPaths.iterator();
+          } else {
+            log.info("{}.{} - snapshot '{}' already present on target... skipping (with contents)",
+                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+            Optional<String> nonReplicatedMetadataPath = snapshotInfo.getMetadataPath().filter(p ->
+                !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+            log.info("{}.{} - metadata is {}already present on target", dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+            return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+          }
         })
     );
     Iterable<String> filePathsIterable = () -> filePathsIterator;
-    // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
-    // likely benefit from parallelism
-    for (String pathString : filePathsIterable) {
-      Path path = new Path(pathString);
-      result.put(path, this.sourceFs.getFileStatus(path));
+    try {
+      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+      // to benefit from parallelism
+      for (String pathString : filePathsIterable) {
+        try {
+          Path path = new Path(pathString);
+          results.put(path, this.sourceFs.getFileStatus(path));
+        } catch (FileNotFoundException fnfe) {
+          if (!shouldTolerateMissingSourceFiles) {
+            throw fnfe;
+          } else {
+            // log, but otherwise swallow... to continue on
+            log.warn("MIA source file... did premature deletion subvert time-travel or maybe metadata read interleaved with delete?", fnfe);
+          }
+        }
+      }
+    } catch (WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return results;
+  }
+
+  /** @returns whether `path` is present on `targetFs`, tunneling checked exceptions and caching results throughout */
+  protected static boolean isPathPresentOnTarget(Path path, FileSystem targetFs, CopyConfiguration copyConfig) {
+    try {
+      // omit considering timestamp (or other markers of freshness), as files should be immutable

Review Comment:
   it's technically possible to change files in place, but to do so, breaks the iceberg's repeatability.  it's not something we should ever encourage... instead write new files and create a snapshot w/ those that replaces the original ones!  the real issue w/ in-place mods to data files is that every delta copy must devolve into a full comparison of the filestatus (between source and dest) for the entire iceberg table.  that's a huge amount of effort in some cases... all because of misbehaving writers/updaters.
   
   I suggest that if we do find we're working w/ writers that do this, we can return here to add the necessary complexity, likely something we'll control via configuration, so it's not always on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org