You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/03 21:17:38 UTC

[gobblin] branch master updated: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbf6761c [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)
7bbf6761c is described below

commit 7bbf6761c63055eb9ecc9f2756d4b3d68b7a1b08
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed May 3 14:17:31 2023 -0700

    [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs
    
    * remove un-intended change
    
    * address comments and remove the multi threading since it does not improve too much
    
    * address comments
    
    * make cache TTL configurable
    
    * add comments to describe the difference of resolveReplicatedOwnerAndPermissionsRecursivelyWithCache and resolveReplicatedOwnerAndPermissionsRecursively
    
    ---------
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../gobblin/data/management/copy/CopyableFile.java | 34 ++++++++++++++++++
 .../data/management/copy/ManifestBasedDataset.java | 41 ++++++++++++++--------
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 23c27b61d..85fa80f0f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,10 +17,13 @@
 
 package org.apache.gobblin.data.management.copy;
 
+import com.google.common.cache.Cache;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +61,7 @@ import org.apache.gobblin.util.guid.Guid;
 @Setter
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
 @EqualsAndHashCode(callSuper = true)
+@Slf4j
 public class CopyableFile extends CopyEntity implements File {
   private static final byte[] EMPTY_CHECKSUM = new byte[0];
 
@@ -375,6 +379,36 @@ public class CopyableFile extends CopyEntity implements File {
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+   * Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this method utilizes permissionMap as a cache to minimize the number of calls to HDFS.
+   * It is recommended to use this method when recursively calculating permissions for numerous files that share the same ancestor.
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)
+      throws IOException, ExecutionException {
+
+    if (!PathUtils.isAncestor(toPath, fromPath)) {
+      throw new IOException(String.format("toPath %s must be an ancestor of fromPath %s.", toPath, fromPath));
+    }
+
+    List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+    Path currentPath = fromPath;
+
+    while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, currentPath.getParent())) {
+      Path finalCurrentPath = currentPath;
+      ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(), () -> resolveReplicatedOwnerAndPermission(sourceFs,
+          finalCurrentPath, copyConfiguration)));
+      currentPath = currentPath.getParent();
+    }
+
+    return ownerAndPermissions;
+  }
+
   public static Map<String, OwnerAndPermission> resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, Path fromPath,
       Path toPath, CopyConfiguration copyConfiguration) throws IOException {
     Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), "Source path must be a directory.");
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 17de89458..fbb88640b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.data.management.copy;
 
 import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.JsonIOException;
@@ -27,6 +29,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -47,12 +50,15 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
 
   private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
   private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+  private static final String PERMISSION_CACHE_TTL_SECONDS = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+  private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
   private static final String DEFAULT_COMMON_FILES_PARENT = "/";
   private final FileSystem fs;
   private final Path manifestPath;
   private final Properties properties;
   private final boolean deleteFileThatNotExistOnSource;
   private final String commonFilesParent;
+  private final int permissionCacheTTLSeconds;
 
   public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) {
     this.fs = fs;
@@ -60,6 +66,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
     this.properties = properties;
     this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"));
     this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT);
+    this.permissionCacheTTLSeconds = Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
   }
 
   @Override
@@ -82,25 +89,31 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
     List<FileStatus> toDelete = Lists.newArrayList();
     //todo: put permission preserve logic here?
     try {
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds, TimeUnit.SECONDS).build();
+      int numFiles = 0;
       while (manifests.hasNext()) {
+        numFiles++;
+        CopyManifest.CopyableUnit file = manifests.next();
         //todo: We can use fileSet to partition the data in case of some softbound issue
         //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
-        CopyManifest.CopyableUnit file = manifests.next();
         Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
+        if (fs.exists(fileToCopy)) {
           boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
+          FileStatus srcFile = fs.getFileStatus(fileToCopy);
+          OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+          if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+            CopyableFile.Builder copyableFileBuilder =
+                CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
                     .fileSet(datasetURN())
                     .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
+                    .ancestorsOwnerAndPermission(
+                        CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+                            new Path(commonFilesParent), configuration, permissionMap))
+                    .destinationOwnerAndPermission(replicatedPermission);
+            CopyableFile copyableFile = copyableFileBuilder.build();
+            copyableFile.setFsDatasets(fs, targetFs);
             copyEntities.add(copyableFile);
             if (existOnTarget && srcFile.isFile()) {
               // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
@@ -108,7 +121,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
           }
-        } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
+        } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)) {
           toDelete.add(targetFs.getFileStatus(fileToCopy));
         }
       }
@@ -117,6 +130,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));
     } catch (JsonIOException| JsonSyntaxException e) {
       //todo: update error message to point to a sample json file instead of schema which is hard to understand
       log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema"
@@ -134,11 +148,10 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
     return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator();
   }
 
-  private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
+  private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
       throws IOException {
     if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
       // if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
-      OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration);
       return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
     }
     return fileInSource.getModificationTime() > fileInTarget.getModificationTime();