You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/04/21 17:13:05 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #1916: MAPREDUCE-7267 Merge paths with multi threads during commit job in FileOutputCommitter

steveloughran commented on a change in pull request #1916:
URL: https://github.com/apache/hadoop/pull/1916#discussion_r412321478



##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -454,6 +471,27 @@ protected void commitJobInternal(JobContext context) throws IOException {
    */
   private void mergePaths(FileSystem fs, final FileStatus from,
       final Path to, JobContext context) throws IOException {
+    final List<Future<Void>> futures = new LinkedList<>();
+    final ExecutorService pool = mergeThreadNum > 1 ?
+      Executors.newFixedThreadPool(Math.min(mergeThreadNum, 128)) : null;
+
+    try {
+      doMergePaths(fs, from, to, context, pool, futures);
+      if (null != pool) {
+        for (Future<Void> future: futures) {
+          FutureIOSupport.awaitFuture(future);

Review comment:
       CompletableFuture.allOf() gives some aggregate future you can block on, so there's no need to wait in the specific order. Not sure if that makes a different performance wise. We're still evolving our understanding about how to best use futures, so any suggestions are welcome

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -466,31 +504,50 @@ private void mergePaths(FileSystem fs, final FileStatus from,
     }
 
     if (from.isFile()) {
-      if (toStat != null) {
-        if (!fs.delete(to, true)) {
-          throw new IOException("Failed to delete " + to);
+      if (null != pool) {
+        FileStatus finalToStat = toStat;
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            if (finalToStat != null) {
+              if (!fs.delete(to, true)) {
+                throw new IOException("Failed to delete " + to);
+              }
+            }
+
+            if (!fs.rename(from.getPath(), to)) {
+              throw new IOException("Failed to rename " + from + " to " + to);
+            }
+            return null;
+          }
+        }));
+      } else {
+        if (toStat != null) {
+          if (!fs.delete(to, true)) {
+            throw new IOException("Failed to delete " + to);
+          }
         }
-      }
 
-      if (!fs.rename(from.getPath(), to)) {
-        throw new IOException("Failed to rename " + from + " to " + to);
+        if (!fs.rename(from.getPath(), to)) {
+          throw new IOException("Failed to rename " + from + " to " + to);

Review comment:
       though rename/2 is broken that way.
   
   now, if we moved to FileContext, you'd get a proper rename

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -466,31 +504,50 @@ private void mergePaths(FileSystem fs, final FileStatus from,
     }
 
     if (from.isFile()) {
-      if (toStat != null) {
-        if (!fs.delete(to, true)) {
-          throw new IOException("Failed to delete " + to);
+      if (null != pool) {
+        FileStatus finalToStat = toStat;
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            if (finalToStat != null) {
+              if (!fs.delete(to, true)) {
+                throw new IOException("Failed to delete " + to);
+              }
+            }
+
+            if (!fs.rename(from.getPath(), to)) {
+              throw new IOException("Failed to rename " + from + " to " + to);
+            }
+            return null;
+          }
+        }));
+      } else {
+        if (toStat != null) {
+          if (!fs.delete(to, true)) {
+            throw new IOException("Failed to delete " + to);

Review comment:
       delete only returns false if to isn't found you don't need this safety check

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -99,11 +106,18 @@
   public static final boolean
       FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
 
+  // The thread num use to merge paths during commitJob. If it is bigger than 1,
+  // a thread pool would be created to merge paths, which has better performance.
+  public static final String FILEOUTPUTCOMMITTER_MERGE_THREADS =

Review comment:
       this is going to need some docs in the markdown too, I'm afraid

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -454,6 +471,27 @@ protected void commitJobInternal(JobContext context) throws IOException {
    */
   private void mergePaths(FileSystem fs, final FileStatus from,
       final Path to, JobContext context) throws IOException {
+    final List<Future<Void>> futures = new LinkedList<>();
+    final ExecutorService pool = mergeThreadNum > 1 ?
+      Executors.newFixedThreadPool(Math.min(mergeThreadNum, 128)) : null;

Review comment:
       why 128? Also: make an explicit (private) constant.

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
##########
@@ -502,16 +559,28 @@ private void reportProgress(JobContext context) {
   }
 
   private void renameOrMerge(FileSystem fs, FileStatus from, Path to,
-      JobContext context) throws IOException {
+      JobContext context, ExecutorService pool, List<Future<Void>> futures) throws IOException {
     if (algorithmVersion == 1) {
-      if (!fs.rename(from.getPath(), to)) {
-        throw new IOException("Failed to rename " + from + " to " + to);
+      if (null != pool) {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            if (!fs.rename(from.getPath(), to)) {

Review comment:
       how about rename-throwing-an-exception is factored out to a method. it is being used enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org