You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2018/05/18 13:42:19 UTC

hbase git commit: HBASE-20579 Improve snapshot manifest copy in ExportSnapshot

Repository: hbase
Updated Branches:
  refs/heads/master 0836b0719 -> c9f8c3436


HBASE-20579 Improve snapshot manifest copy in ExportSnapshot

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9f8c343
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9f8c343
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9f8c343

Branch: refs/heads/master
Commit: c9f8c3436f6e38b5c7807677c5c3e7fc3e19e071
Parents: 0836b07
Author: jingyuntian <ti...@gmail.com>
Authored: Thu May 17 11:32:49 2018 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Fri May 18 06:42:12 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/snapshot/ExportSnapshot.java   | 82 +++++++++++++-------
 .../org/apache/hadoop/hbase/util/FSUtils.java   | 43 ++++++++++
 .../apache/hadoop/hbase/util/TestFSUtils.java   | 27 +++++++
 3 files changed, 125 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index ef67b7b..4af7dfb 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -29,6 +29,11 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -36,7 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -109,6 +113,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
   private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
+  private static final String CONF_COPY_MANIFEST_THREADS =
+      "snapshot.export.copy.references.threads";
+  private static final int DEFAULT_COPY_MANIFEST_THREADS =
+      Runtime.getRuntime().availableProcessors();
 
   static class Testing {
     static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
@@ -842,35 +850,52 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
   }
 
-  /**
-   * Set path ownership.
-   */
-  private void setOwner(final FileSystem fs, final Path path, final String user,
-      final String group, final boolean recursive) throws IOException {
-    if (user != null || group != null) {
-      if (recursive && fs.isDirectory(path)) {
-        for (FileStatus child : fs.listStatus(path)) {
-          setOwner(fs, child.getPath(), user, group, recursive);
-        }
+  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
+      BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
+    ExecutorService pool = Executors
+        .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
+    List<Future<Void>> futures = new ArrayList<>();
+    for (Path dstPath : traversedPath) {
+      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
+      futures.add(future);
+    }
+    try {
+      for (Future<Void> future : futures) {
+        future.get();
       }
-      fs.setOwner(path, user, group);
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IOException(e);
+    } finally {
+      pool.shutdownNow();
     }
   }
 
-  /**
-   * Set path permission.
-   */
-  private void setPermission(final FileSystem fs, final Path path, final short filesMode,
-      final boolean recursive) throws IOException {
-    if (filesMode > 0) {
-      FsPermission perm = new FsPermission(filesMode);
-      if (recursive && fs.isDirectory(path)) {
-        for (FileStatus child : fs.listStatus(path)) {
-          setPermission(fs, child.getPath(), filesMode, recursive);
-        }
+  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
+      Configuration conf, List<Path> traversedPath) throws IOException {
+    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
+      try {
+        fs.setOwner(path, filesUser, filesGroup);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed");
       }
-      fs.setPermission(path, perm);
+    }, conf);
+  }
+
+  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
+      final List<Path> traversedPath, final Configuration conf) throws IOException {
+    if (filesMode <= 0) {
+      return;
     }
+    FsPermission perm = new FsPermission(filesMode);
+    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
+      try {
+        fs.setPermission(path, perm);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "set permission for file " + path + " to " + filesMode + " failed");
+      }
+    }, conf);
   }
 
   private boolean verifyTarget = true;
@@ -1001,9 +1026,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
     // The snapshot references must be copied before the hfiles otherwise the cleaner
     // will remove them because they are unreferenced.
+    List<Path> travesedPaths = new ArrayList<>();
     try {
       LOG.info("Copy Snapshot Manifest");
-      FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
+      travesedPaths =
+          FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
+              conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
     } catch (IOException e) {
       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
         snapshotDir + " to=" + initialOutputSnapshotDir, e);
@@ -1013,11 +1041,11 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
             + filesUser)
             + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
             + filesGroup));
-        setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
+        setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
       }
       if (filesMode > 0) {
         LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
-        setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
+        setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index b106a31..53db140 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -1741,4 +1743,45 @@ public abstract class FSUtils extends CommonFSUtils {
     }
   }
 
+  public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
+      Configuration conf, int threads) throws IOException {
+    ExecutorService pool = Executors.newFixedThreadPool(threads);
+    List<Future<Void>> futures = new ArrayList<>();
+    List<Path> traversedPaths;
+    try {
+      traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
+      for (Future<Void> future : futures) {
+        future.get();
+      }
+    } catch (ExecutionException | InterruptedException | IOException e) {
+      throw new IOException("copy snapshot reference files failed", e);
+    } finally {
+      pool.shutdownNow();
+    }
+    return traversedPaths;
+  }
+
+  private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
+      Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
+    List<Path> traversedPaths = new ArrayList<>();
+    traversedPaths.add(dst);
+    FileStatus currentFileStatus = srcFS.getFileStatus(src);
+    if (currentFileStatus.isDirectory()) {
+      if (!dstFS.mkdirs(dst)) {
+        throw new IOException("create dir failed: " + dst);
+      }
+      FileStatus[] subPaths = srcFS.listStatus(src);
+      for (FileStatus subPath : subPaths) {
+        traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
+          new Path(dst, subPath.getPath().getName()), conf, pool, futures));
+      }
+    } else {
+      Future<Void> future = pool.submit(() -> {
+        FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
+        return null;
+      });
+      futures.add(future);
+    }
+    return traversedPaths;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 2718120..a862c8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
@@ -412,6 +413,32 @@ public class TestFSUtils {
     }
   }
 
+
+  @Test
+  public void testCopyFilesParallel() throws Exception {
+    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    Path src = new Path("/src");
+    fs.mkdirs(src);
+    for (int i = 0; i < 50; i++) {
+      WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
+    }
+    Path sub = new Path(src, "sub");
+    fs.mkdirs(sub);
+    for (int i = 0; i < 50; i++) {
+      WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
+    }
+    Path dst = new Path("/dst");
+    List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
+
+    assertEquals(102, allFiles.size());
+    FileStatus[] list = fs.listStatus(dst);
+    assertEquals(51, list.length);
+    FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
+    assertEquals(50, sublist.length);
+  }
+
   // Below is taken from TestPread over in HDFS.
   static final int blockSize = 4096;
   static final long seed = 0xDEADBEEFL;