You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/13 17:04:39 UTC

[gobblin] branch master updated: [GOBBLIN-1795] Make Manifest based copy to support facl (#3654)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be6d46c2a [GOBBLIN-1795] Make Manifest based copy to support facl (#3654)
be6d46c2a is described below

commit be6d46c2a117b1248d88d22cc64db5a12b802d16
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Mar 13 10:04:30 2023 -0700

    [GOBBLIN-1795] Make Manifest based copy to support facl (#3654)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1795] Make Manifest based copy to support facl
    
    * fix code style
    
    ---------
    
    Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
 .../gobblin/data/management/copy/ManifestBasedDataset.java   | 12 ++++++++----
 .../gobblin/data/management/copy/OwnerAndPermission.java     |  9 +++++++--
 .../copy/writer/FileAwareInputStreamDataWriter.java          | 10 +++++++---
 .../apache/gobblin/util/filesystem/FileSystemDecorator.java  | 11 +++++++++++
 4 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 27ef5d059..17de89458 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -46,16 +46,20 @@ import org.apache.hadoop.fs.Path;
 public class ManifestBasedDataset implements IterableCopyableDataset {
 
   private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
+  private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+  private static final String DEFAULT_COMMON_FILES_PARENT = "/";
   private final FileSystem fs;
   private final Path manifestPath;
   private final Properties properties;
   private final boolean deleteFileThatNotExistOnSource;
+  private final String commonFilesParent;
 
   public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) {
     this.fs = fs;
     this.manifestPath = manifestPath;
     this.properties = properties;
     this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"));
+    this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT);
   }
 
   @Override
@@ -87,14 +91,14 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
         if (this.fs.exists(fileToCopy)) {
           boolean existOnTarget = targetFs.exists(fileToCopy);
           FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
+          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
             CopyableFile copyableFile =
                 CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
                     .fileSet(datasetURN())
                     .datasetOutputPath(fileToCopy.toString())
                     .ancestorsOwnerAndPermission(CopyableFile
                         .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path("/"), configuration))
+                            new Path(this.commonFilesParent), configuration))
                     .build();
             copyableFile.setFsDatasets(this.fs, targetFs);
             copyEntities.add(copyableFile);
@@ -130,12 +134,12 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
     return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator();
   }
 
-  private static boolean shouldCopy(FileSystem srcFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
+  private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
       throws IOException {
     if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
       // if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
       OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration);
-      return !replicatedPermission.hasSameOwnerAndPermission(fileInTarget);
+      return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
     }
     return fileInSource.getModificationTime() > fileInTarget.getModificationTime();
   }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
index f6b5a1370..03251a373 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
@@ -27,6 +27,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
@@ -82,8 +83,8 @@ public class OwnerAndPermission implements Writable {
    * @param file the file status that need to be evaluated
    * @return true if the metadata for the file match the current owner and permission
    */
-  public boolean hasSameOwnerAndPermission(FileStatus file) {
-    return this.hasSameFSPermission(file) && this.hasSameGroup(file) && this.hasSameOwner(file);
+  public boolean hasSameOwnerAndPermission(FileSystem fs, FileStatus file) throws IOException {
+    return this.hasSameFSPermission(file) && this.hasSameGroup(file) && this.hasSameOwner(file) && this.hasSameAcls(fs.getAclStatus(file.getPath()).getEntries());
   }
 
   private boolean hasSameGroup(FileStatus file) {
@@ -97,4 +98,8 @@ public class OwnerAndPermission implements Writable {
   private boolean hasSameFSPermission(FileStatus file) {
     return this.fsPermission == null || file.getPermission().equals(this.fsPermission);
   }
+
+  private boolean hasSameAcls(List<AclEntry> acls) {
+    return this.aclEntries.isEmpty() || acls.equals(aclEntries);
+  }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 3ae5c3c92..ed6c754d5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -511,9 +511,13 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
       String group = ownerAndPermission.getGroup();
       String owner = ownerAndPermission.getOwner();
       List<AclEntry> aclEntries = ownerAndPermission.getAclEntries();
-      if (group != null || owner != null) {
-        log.debug("Applying owner {} and group {} to path {}.", owner, group, path);
-        fs.setOwner(path, owner, group);
+      try {
+        if (group != null || owner != null) {
+          log.debug("Applying owner {} and group {} to path {}.", owner, group, path);
+          fs.setOwner(path, owner, group);
+        }
+      } catch (IOException ioe) {
+        log.warn("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group, ioe);
       }
       if (!aclEntries.isEmpty()) {
         fs.setAcl(path, aclEntries);
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
index d8b07b6bb..6ae706baa 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.EnumSet;
 
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -37,6 +38,8 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -100,6 +103,14 @@ class FileSystemDecorator extends FileSystem implements Decorator {
         this.underlyingScheme, this.replacementScheme);
   }
 
+  public AclStatus getAclStatus(Path path) throws IOException {
+    return this.underlyingFs.getAclStatus(path);
+  }
+
+  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+    this.underlyingFs.setAcl(path, aclSpec);
+  }
+
   public FsStatus getStatus() throws java.io.IOException {
     return this.underlyingFs.getStatus();
   }