You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/03/13 17:04:39 UTC
[gobblin] branch master updated: [GOBBLIN-1795] Make Manifest based copy to support facl (#3654)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be6d46c2a [GOBBLIN-1795] Make Manifest based copy to support facl (#3654)
be6d46c2a is described below
commit be6d46c2a117b1248d88d22cc64db5a12b802d16
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Mar 13 10:04:30 2023 -0700
[GOBBLIN-1795] Make Manifest based copy to support facl (#3654)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1795] Make Manifest based copy to support facl
* fix code style
---------
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../gobblin/data/management/copy/ManifestBasedDataset.java | 12 ++++++++----
.../gobblin/data/management/copy/OwnerAndPermission.java | 9 +++++++--
.../copy/writer/FileAwareInputStreamDataWriter.java | 10 +++++++---
.../apache/gobblin/util/filesystem/FileSystemDecorator.java | 11 +++++++++++
4 files changed, 33 insertions(+), 9 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 27ef5d059..17de89458 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -46,16 +46,20 @@ import org.apache.hadoop.fs.Path;
public class ManifestBasedDataset implements IterableCopyableDataset {
private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
+ private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+ private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private final FileSystem fs;
private final Path manifestPath;
private final Properties properties;
private final boolean deleteFileThatNotExistOnSource;
+ private final String commonFilesParent;
public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) {
this.fs = fs;
this.manifestPath = manifestPath;
this.properties = properties;
this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"));
+ this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT);
}
@Override
@@ -87,14 +91,14 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
if (this.fs.exists(fileToCopy)) {
boolean existOnTarget = targetFs.exists(fileToCopy);
FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
+ if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
.fileSet(datasetURN())
.datasetOutputPath(fileToCopy.toString())
.ancestorsOwnerAndPermission(CopyableFile
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
- new Path("/"), configuration))
+ new Path(this.commonFilesParent), configuration))
.build();
copyableFile.setFsDatasets(this.fs, targetFs);
copyEntities.add(copyableFile);
@@ -130,12 +134,12 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator();
}
- private static boolean shouldCopy(FileSystem srcFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
+ private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
throws IOException {
if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
// if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration);
- return !replicatedPermission.hasSameOwnerAndPermission(fileInTarget);
+ return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
return fileInSource.getModificationTime() > fileInTarget.getModificationTime();
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
index f6b5a1370..03251a373 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/OwnerAndPermission.java
@@ -27,6 +27,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
@@ -82,8 +83,8 @@ public class OwnerAndPermission implements Writable {
* @param file the file status that need to be evaluated
* @return true if the metadata for the file match the current owner and permission
*/
- public boolean hasSameOwnerAndPermission(FileStatus file) {
- return this.hasSameFSPermission(file) && this.hasSameGroup(file) && this.hasSameOwner(file);
+ public boolean hasSameOwnerAndPermission(FileSystem fs, FileStatus file) throws IOException {
+ return this.hasSameFSPermission(file) && this.hasSameGroup(file) && this.hasSameOwner(file) && this.hasSameAcls(fs.getAclStatus(file.getPath()).getEntries());
}
private boolean hasSameGroup(FileStatus file) {
@@ -97,4 +98,8 @@ public class OwnerAndPermission implements Writable {
private boolean hasSameFSPermission(FileStatus file) {
return this.fsPermission == null || file.getPermission().equals(this.fsPermission);
}
+
+ private boolean hasSameAcls(List<AclEntry> acls) {
+ return this.aclEntries.isEmpty() || acls.equals(aclEntries);
+ }
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 3ae5c3c92..ed6c754d5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -511,9 +511,13 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
String group = ownerAndPermission.getGroup();
String owner = ownerAndPermission.getOwner();
List<AclEntry> aclEntries = ownerAndPermission.getAclEntries();
- if (group != null || owner != null) {
- log.debug("Applying owner {} and group {} to path {}.", owner, group, path);
- fs.setOwner(path, owner, group);
+ try {
+ if (group != null || owner != null) {
+ log.debug("Applying owner {} and group {} to path {}.", owner, group, path);
+ fs.setOwner(path, owner, group);
+ }
+ } catch (IOException ioe) {
+ log.warn("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group, ioe);
}
if (!aclEntries.isEmpty()) {
fs.setAcl(path, aclEntries);
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
index d8b07b6bb..6ae706baa 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -37,6 +38,8 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -100,6 +103,14 @@ class FileSystemDecorator extends FileSystem implements Decorator {
this.underlyingScheme, this.replacementScheme);
}
+ public AclStatus getAclStatus(Path path) throws IOException {
+ return this.underlyingFs.getAclStatus(path);
+ }
+
+ public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
+ this.underlyingFs.setAcl(path, aclSpec);
+ }
+
public FsStatus getStatus() throws java.io.IOException {
return this.underlyingFs.getStatus();
}