You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ZihanLi58 (via GitHub)" <gi...@apache.org> on 2023/04/22 00:06:44 UTC

[GitHub] [gobblin] ZihanLi58 opened a new pull request, #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

ZihanLi58 opened a new pull request, #3686:
URL: https://github.com/apache/gobblin/pull/3686

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1824
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   Optimizing Permission Calculation and Introducing Multithreading in Manifest-Based DistCp Work Planning
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Unit test 
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175911613


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some softbound issue
-        //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some softbound issue
+            //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            .resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }

Review Comment:
   NBD as presently formulated... but if you did want to detect system-level issues, such as a fatal problem w/ the target FS, in the beginning rather than pointlessly creating thousands of futures, you could use a `CompletionService` to poll for results in between when adding additional ones:
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
   
   just maintain a count of how many futures you're expecting, so you know when they're all done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1179445911


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +379,35 @@ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecur
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+   * Use permissionMap as a cache to reduce the call to hdfs
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)

Review Comment:
   I leave the previous method for backward compatibility as there are some other places that call this method, and if it does not do the calculation for a bunch of files sharing the same flows, introducing a cache is not necessary there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1178773725


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -117,6 +126,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation take %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));

Review Comment:
   nit: "take" => "took"
   
   BTW, I really like this timing!  do we want a metric in addition to a logged line?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,37 +81,43 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());

Review Comment:
   do these need to be synchronized?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +379,35 @@ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecur
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+   * Use permissionMap as a cache to reduce the call to hdfs
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)

Review Comment:
   why would a caller use this one rather than `resolveReplicatedAncestorOwnerAndPermissionsRecursively` - do we need both?  they're not exactly identical, but there seems material overlap.  and, why return a `Map` there but a `List` here?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,37 +81,43 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
     try {
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.SECONDS).build();

Review Comment:
   how did you arrive at 30s TTL?  seems short... I'd think more in the neighborhood of 5-15 mins



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 merged pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 merged PR #3686:
URL: https://github.com/apache/gobblin/pull/3686


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1175884690


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some softbound issue
-        //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some softbound issue
+            //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }

Review Comment:
   a. what sorts of `IOException` are swallowed here?
   b. is it appropriate to print every one (e.g. rather than logging at a particular log level that could be adjusted by config)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some softbound issue
-        //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some softbound issue
+            //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            .resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);

Review Comment:
   in the case of a general FS misconfiguration (e.g. issue w/ target FS), does this mean we'll now log one error message for every manifest entry, whereas before, with sync processing, only the first one failed, and then we bail out and go no further through the manifest?
   
   and even so, swallowing the exception may hide issues (e.g. from the automatic troubleshooter).  why not allow them to percolate as `j.u.concurrent.ExecutionException`s? (and unwrap back to the underlying exception at a higher layer.)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +377,35 @@ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecur
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+   * Use permissionMap as a cache to reduce the call to hdfs
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Map<String, OwnerAndPermission> permissionMap) throws IOException {
+
+    if (!PathUtils.isAncestor(toPath, fromPath)) {
+      throw new IOException(String.format("toPath %s must be an ancestor of fromPath %s.", toPath, fromPath));
+    }
+
+    List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
+    Path currentPath = fromPath;
+
+    while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, currentPath.getParent())) {
+      if (!permissionMap.containsKey(currentPath.toString())) {
+        permissionMap.put(currentPath.toString(), resolveReplicatedOwnerAndPermission(sourceFs, currentPath, copyConfiguration));

Review Comment:
   a guava `LoadingCache<K, V>` would better encapsulate the memoization here.
   
   e.g. have the caller implement `CacheLoader<K,V>.load` and pass in a `LoadingCache`; e.g.:
   https://guava.dev/releases/20.0/api/docs/com/google/common/cache/CacheLoader.html#load-K-
   
   that would mean reworking this static method to take a `Function<K, V>`.  even better however would be to create a class for `OwnerAndPermission` resolution, which would get constructed w/ the `FileSystem` and `CopyConfiguration`, to keep them unchanged throughout.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some softbound issue
-        //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some softbound issue
+            //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            .resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }
       if (!toDelete.isEmpty()) {
         //todo: add support sync for empty dir
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Calculate workunits take %s milliseconds", System.currentTimeMillis() - startTime));

Review Comment:
   nit: probably should be past tense, so:
   ```
   "WorkUnit calculation took %s ms"
   ```
   or
   ```
   "Calculating WorkUnits took %s ms"
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,45 +89,71 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
+    ExecutorService queueExecutor = null;
     try {
+      queueExecutor = Executors.newFixedThreadPool(planningThreadsPoolSize, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("ManifestBasedDatasetPlanningThread-%d")));
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      List<Future> pendingTask = new ArrayList<>();
+      Map<String, OwnerAndPermission> permissionMap = new ConcurrentHashMap<>();
       while (manifests.hasNext()) {
-        //todo: We can use fileSet to partition the data in case of some softbound issue
-        //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
         CopyManifest.CopyableUnit file = manifests.next();
-        Path fileToCopy = new Path(file.fileName);
-        if (this.fs.exists(fileToCopy)) {
-          boolean existOnTarget = targetFs.exists(fileToCopy);
-          FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
-          if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
-            CopyableFile copyableFile =
-                CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
-                    .fileSet(datasetURN())
-                    .datasetOutputPath(fileToCopy.toString())
-                    .ancestorsOwnerAndPermission(CopyableFile
-                        .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
-                            new Path(this.commonFilesParent), configuration))
-                    .build();
-            copyableFile.setFsDatasets(this.fs, targetFs);
-            copyEntities.add(copyableFile);
-            if (existOnTarget && srcFile.isFile()) {
-              // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
-              // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+        Future future = queueExecutor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try{
+            //todo: We can use fileSet to partition the data in case of some softbound issue
+            //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
+            Path fileToCopy = new Path(file.fileName);
+            if (fs.exists(fileToCopy)) {
+              boolean existOnTarget = false;
+              try {
+                existOnTarget = targetFs.exists(fileToCopy);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+              FileStatus srcFile = fs.getFileStatus(fileToCopy);
+              OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
+              if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
+                CopyableFile.Builder copyableFileBuilder =
+                    CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
+                        .fileSet(datasetURN())
+                        .datasetOutputPath(fileToCopy.toString())
+                        .ancestorsOwnerAndPermission(CopyableFile
+                            .resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
+                                new Path(commonFilesParent), configuration, permissionMap))
+                        .destinationOwnerAndPermission(replicatedPermission);
+                CopyableFile copyableFile = copyableFileBuilder.build();
+                copyableFile.setFsDatasets(fs, targetFs);
+                copyEntities.add(copyableFile);
+                if (existOnTarget && srcFile.isFile()) {
+                  // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
+                  // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
+                  toDelete.add(targetFs.getFileStatus(fileToCopy));
+                }
+              }
+            } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
               toDelete.add(targetFs.getFileStatus(fileToCopy));
             }
+          } catch (IOException e) {
+            log.error("meet exception:", e);
           }
-        } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
-          toDelete.add(targetFs.getFileStatus(fileToCopy));
-        }
+        }}
+        );
+        pendingTask.add(future);
+      }
+      for (Future f: pendingTask) {
+        f.get();
       }

Review Comment:
   NBD as presently formulated... but if you did want to detect system-level issues, such as a fatal problem w/ the target FS, in the beginning rather than pointlessly creating thousands of futures, you could use a `CompletionService` to poll for results in between when adding additional ones:
   https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html
   just maintain a count of how many futures you're expecting, so you know when they're all done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#issuecomment-1522704632

   ## [Codecov](https://codecov.io/gh/apache/gobblin/pull/3686?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3686](https://codecov.io/gh/apache/gobblin/pull/3686?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (325ada3) into [master](https://codecov.io/gh/apache/gobblin/commit/203f5e64696a3a3aa0f89d6e029e2a35929c16fa?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (203f5e6) will **decrease** coverage by `2.09%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3686      +/-   ##
   ============================================
   - Coverage     46.85%   44.77%   -2.09%     
   + Complexity    10757     2092    -8665     
   ============================================
     Files          2138      411    -1727     
     Lines         84022    17721   -66301     
     Branches       9338     2159    -7179     
   ============================================
   - Hits          39367     7934   -31433     
   + Misses        41059     8929   -32130     
   + Partials       3596      858    -2738     
   ```
   
   
   [see 1733 files with indirect coverage changes](https://codecov.io/gh/apache/gobblin/pull/3686/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] phet commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "phet (via GitHub)" <gi...@apache.org>.
phet commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1183348834


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java:
##########
@@ -375,6 +379,35 @@ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecur
     return ownerAndPermissions;
   }
 
+  /**
+   * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
+   * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
+   * Use permissionMap as a cache to reduce the call to hdfs
+   *
+   * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
+   * @throws IOException if toPath is not an ancestor of fromPath.
+   */
+  public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
+      Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)

Review Comment:
   I won't strongly suggest refactoring these into a single impl, but I do highly recommend you either add a TODO about doing that later or else provide a javadoc comment describing how the two relate and who should use each.
   
   BTW, from your reply above about backward compatibility, I almost wonder whether your stance is that the existing method should get `@Deprecated`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1183018237


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,37 +81,43 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
     try {
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.SECONDS).build();

Review Comment:
   Could we make this configurable?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -117,6 +126,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));

Review Comment:
   Should we measure this in nanoseconds? Typically that's what is used when instrumenting java code. Can convert units afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1183109184


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -117,6 +126,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));

Review Comment:
   Oh I see, in regular distcp it takes a few milliseconds on avg which is why the granularity helps but manifest is different



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1183040061


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -117,6 +126,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));

Review Comment:
   I don't have a strong preference here, but calling nanoTime seems a little expansive. Also, in this case, where our computing time usually takes several seconds to several minutes, do we need that precise value? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1179450955


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -78,37 +81,43 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
           + "%s, you can specify multi locations split by '',", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
     CopyManifest.CopyableUnitIterator manifests = null;
-    List<CopyEntity> copyEntities = Lists.newArrayList();
-    List<FileStatus> toDelete = Lists.newArrayList();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(Lists.newArrayList());
+    List<FileStatus> toDelete = Collections.synchronizedList(Lists.newArrayList());
     //todo: put permission preserve logic here?
     try {
+      long startTime = System.currentTimeMillis();
       manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.SECONDS).build();

Review Comment:
   TTL means if we haven't tried to get it within the 30s. I tested for 30k files, we take around 30 sec to plan. Also, given the truth or assumption that files share the same parent usually sit near each other in the manifest file, I think 30 sec should be enough for us. Also, this will control the memory we are using here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3686: [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

Posted by "ZihanLi58 (via GitHub)" <gi...@apache.org>.
ZihanLi58 commented on code in PR #3686:
URL: https://github.com/apache/gobblin/pull/3686#discussion_r1179458045


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java:
##########
@@ -117,6 +126,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
         CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
         copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
       }
+      log.info(String.format("Workunits calculation take %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));

Review Comment:
   We already have an event emitted for the workunit creation time (event name isWorkUnitsCreationTimer). I add a log here just for good observability for a single benchmark job. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org