You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/05/03 21:17:38 UTC
[gobblin] branch master updated: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbf6761c [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)
7bbf6761c is described below
commit 7bbf6761c63055eb9ecc9f2756d4b3d68b7a1b08
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed May 3 14:17:31 2023 -0700
[GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (#3686)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs
* remove un-intended change
* address comments and remove the multi threading since it does not improve too much
* address comments
* make cache TTL configurable
* add comments to describe the difference of resolveReplicatedOwnerAndPermissionsRecursivelyWithCache and resolveReplicatedOwnerAndPermissionsRecursively
---------
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../gobblin/data/management/copy/CopyableFile.java | 34 ++++++++++++++++++
.../data/management/copy/ManifestBasedDataset.java | 41 ++++++++++++++--------
2 files changed, 61 insertions(+), 14 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 23c27b61d..85fa80f0f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,10 +17,13 @@
package org.apache.gobblin.data.management.copy;
+import com.google.common.cache.Cache;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +61,7 @@ import org.apache.gobblin.util.guid.Guid;
@Setter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EqualsAndHashCode(callSuper = true)
+@Slf4j
public class CopyableFile extends CopyEntity implements File {
private static final byte[] EMPTY_CHECKSUM = new byte[0];
@@ -375,6 +379,36 @@ public class CopyableFile extends CopyEntity implements File {
return ownerAndPermissions;
}
+ /**
+ * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+ * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+ * Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this method utilizes permissionMap as a cache to minimize the number of calls to HDFS.
+ * It is recommended to use this method when recursively calculating permissions for numerous files that share the same ancestor.
+ *
+ * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+ * @throws IOException if toPath is not an ancestor of fromPath.
+ */
+ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+ Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)
+ throws IOException, ExecutionException {
+
+ if (!PathUtils.isAncestor(toPath, fromPath)) {
+ throw new IOException(String.format("toPath %s must be an ancestor of fromPath %s.", toPath, fromPath));
+ }
+
+ List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+ Path currentPath = fromPath;
+
+ while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, currentPath.getParent())) {
+ Path finalCurrentPath = currentPath;
+ ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(), () -> resolveReplicatedOwnerAndPermission(sourceFs,
+ finalCurrentPath, copyConfiguration)));
+ currentPath = currentPath.getParent();
+ }
+
+ return ownerAndPermissions;
+ }
+
public static Map<String, OwnerAndPermission> resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, Path fromPath,
Path toPath, CopyConfiguration copyConfiguration) throws IOException {
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), "Source path must be a directory.");
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 17de89458..fbb88640b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.data.management.copy;
import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonIOException;
@@ -27,6 +29,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -47,12 +50,15 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
+ private static final String PERMISSION_CACHE_TTL_SECONDS = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
+ private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private final FileSystem fs;
private final Path manifestPath;
private final Properties properties;
private final boolean deleteFileThatNotExistOnSource;
private final String commonFilesParent;
+ private final int permissionCacheTTLSeconds;
public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) {
this.fs = fs;
@@ -60,6 +66,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
this.properties = properties;
this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"));
this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT);
+ this.permissionCacheTTLSeconds = Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
}
@Override
@@ -82,25 +89,31 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
List<FileStatus> toDelete = Lists.newArrayList();
//todo: put permission preserve logic here?
try {
+ long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds, TimeUnit.SECONDS).build();
+ int numFiles = 0;
while (manifests.hasNext()) {
+ numFiles++;
+ CopyManifest.CopyableUnit file = manifests.next();
//todo: We can use fileSet to partition the data in case of some softbound issue
//todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
- CopyManifest.CopyableUnit file = manifests.next();
Path fileToCopy = new Path(file.fileName);
- if (this.fs.exists(fileToCopy)) {
+ if (fs.exists(fileToCopy)) {
boolean existOnTarget = targetFs.exists(fileToCopy);
- FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
- if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
- CopyableFile copyableFile =
- CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
+ FileStatus srcFile = fs.getFileStatus(fileToCopy);
+ OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+ if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+ CopyableFile.Builder copyableFileBuilder =
+ CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
.fileSet(datasetURN())
.datasetOutputPath(fileToCopy.toString())
- .ancestorsOwnerAndPermission(CopyableFile
- .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
- new Path(this.commonFilesParent), configuration))
- .build();
- copyableFile.setFsDatasets(this.fs, targetFs);
+ .ancestorsOwnerAndPermission(
+ CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+ new Path(commonFilesParent), configuration, permissionMap))
+ .destinationOwnerAndPermission(replicatedPermission);
+ CopyableFile copyableFile = copyableFileBuilder.build();
+ copyableFile.setFsDatasets(fs, targetFs);
copyEntities.add(copyableFile);
if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
@@ -108,7 +121,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
- } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
+ } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)) {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
@@ -117,6 +130,7 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
}
+ log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));
} catch (JsonIOException| JsonSyntaxException e) {
//todo: update error message to point to a sample json file instead of schema which is hard to understand
log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema"
@@ -134,11 +148,10 @@ public class ManifestBasedDataset implements IterableCopyableDataset {
return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator();
}
- private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
+ private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
throws IOException {
if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
// if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
- OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration);
return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
return fileInSource.getModificationTime() > fileInTarget.getModificationTime();