You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/21 03:37:12 UTC
[05/32] hbase git commit: HBASE-20579 Improve snapshot manifest copy
in ExportSnapshot
HBASE-20579 Improve snapshot manifest copy in ExportSnapshot
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9f8c343
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9f8c343
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9f8c343
Branch: refs/heads/HBASE-19064
Commit: c9f8c3436f6e38b5c7807677c5c3e7fc3e19e071
Parents: 0836b07
Author: jingyuntian <ti...@gmail.com>
Authored: Thu May 17 11:32:49 2018 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Fri May 18 06:42:12 2018 -0700
----------------------------------------------------------------------
.../hadoop/hbase/snapshot/ExportSnapshot.java | 82 +++++++++++++-------
.../org/apache/hadoop/hbase/util/FSUtils.java | 43 ++++++++++
.../apache/hadoop/hbase/util/TestFSUtils.java | 27 +++++++
3 files changed, 125 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index ef67b7b..4af7dfb 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -29,6 +29,11 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -36,7 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -109,6 +113,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
+ private static final String CONF_COPY_MANIFEST_THREADS =
+ "snapshot.export.copy.references.threads";
+ private static final int DEFAULT_COPY_MANIFEST_THREADS =
+ Runtime.getRuntime().availableProcessors();
static class Testing {
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
@@ -842,35 +850,52 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
}
- /**
- * Set path ownership.
- */
- private void setOwner(final FileSystem fs, final Path path, final String user,
- final String group, final boolean recursive) throws IOException {
- if (user != null || group != null) {
- if (recursive && fs.isDirectory(path)) {
- for (FileStatus child : fs.listStatus(path)) {
- setOwner(fs, child.getPath(), user, group, recursive);
- }
+ private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
+ BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
+ ExecutorService pool = Executors
+ .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
+ List<Future<Void>> futures = new ArrayList<>();
+ for (Path dstPath : traversedPath) {
+ Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
+ futures.add(future);
+ }
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
}
- fs.setOwner(path, user, group);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ pool.shutdownNow();
}
}
- /**
- * Set path permission.
- */
- private void setPermission(final FileSystem fs, final Path path, final short filesMode,
- final boolean recursive) throws IOException {
- if (filesMode > 0) {
- FsPermission perm = new FsPermission(filesMode);
- if (recursive && fs.isDirectory(path)) {
- for (FileStatus child : fs.listStatus(path)) {
- setPermission(fs, child.getPath(), filesMode, recursive);
- }
+ private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
+ Configuration conf, List<Path> traversedPath) throws IOException {
+ setConfigParallel(outputFs, traversedPath, (fs, path) -> {
+ try {
+ fs.setOwner(path, filesUser, filesGroup);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed");
}
- fs.setPermission(path, perm);
+ }, conf);
+ }
+
+ private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
+ final List<Path> traversedPath, final Configuration conf) throws IOException {
+ if (filesMode <= 0) {
+ return;
}
+ FsPermission perm = new FsPermission(filesMode);
+ setConfigParallel(outputFs, traversedPath, (fs, path) -> {
+ try {
+ fs.setPermission(path, perm);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "set permission for file " + path + " to " + filesMode + " failed");
+ }
+ }, conf);
}
private boolean verifyTarget = true;
@@ -1001,9 +1026,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
// The snapshot references must be copied before the hfiles otherwise the cleaner
// will remove them because they are unreferenced.
+ List<Path> travesedPaths = new ArrayList<>();
try {
LOG.info("Copy Snapshot Manifest");
- FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
+ travesedPaths =
+ FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
+ conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
} catch (IOException e) {
throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
snapshotDir + " to=" + initialOutputSnapshotDir, e);
@@ -1013,11 +1041,11 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
+ filesUser)
+ (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
+ filesGroup));
- setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
+ setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
}
if (filesMode > 0) {
LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
- setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
+ setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index b106a31..53db140 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
@@ -1741,4 +1743,45 @@ public abstract class FSUtils extends CommonFSUtils {
}
}
+ public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
+ Configuration conf, int threads) throws IOException {
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ List<Future<Void>> futures = new ArrayList<>();
+ List<Path> traversedPaths;
+ try {
+ traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (ExecutionException | InterruptedException | IOException e) {
+ throw new IOException("copy snapshot reference files failed", e);
+ } finally {
+ pool.shutdownNow();
+ }
+ return traversedPaths;
+ }
+
+ private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
+ Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
+ List<Path> traversedPaths = new ArrayList<>();
+ traversedPaths.add(dst);
+ FileStatus currentFileStatus = srcFS.getFileStatus(src);
+ if (currentFileStatus.isDirectory()) {
+ if (!dstFS.mkdirs(dst)) {
+ throw new IOException("create dir failed: " + dst);
+ }
+ FileStatus[] subPaths = srcFS.listStatus(src);
+ for (FileStatus subPath : subPaths) {
+ traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
+ new Path(dst, subPath.getPath().getName()), conf, pool, futures));
+ }
+ } else {
+ Future<Void> future = pool.submit(() -> {
+ FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
+ return null;
+ });
+ futures.add(future);
+ }
+ return traversedPaths;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f8c343/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 2718120..a862c8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@@ -412,6 +413,32 @@ public class TestFSUtils {
}
}
+
+ @Test
+ public void testCopyFilesParallel() throws Exception {
+ MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ Path src = new Path("/src");
+ fs.mkdirs(src);
+ for (int i = 0; i < 50; i++) {
+ WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
+ }
+ Path sub = new Path(src, "sub");
+ fs.mkdirs(sub);
+ for (int i = 0; i < 50; i++) {
+ WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
+ }
+ Path dst = new Path("/dst");
+ List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
+
+ assertEquals(102, allFiles.size());
+ FileStatus[] list = fs.listStatus(dst);
+ assertEquals(51, list.length);
+ FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
+ assertEquals(50, sublist.length);
+ }
+
// Below is taken from TestPread over in HDFS.
static final int blockSize = 4096;
static final long seed = 0xDEADBEEFL;