You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 19:11:39 UTC
svn commit: r1445787 - in /hbase/branches/hbase-7290/hbase-server/src:
main/java/org/apache/hadoop/hbase/snapshot/
main/java/org/apache/hadoop/hbase/snapshot/exception/
main/java/org/apache/hadoop/hbase/snapshot/tool/
test/java/org/apache/hadoop/hbase/...
Author: jmhsieh
Date: Wed Feb 13 18:11:38 2013
New Revision: 1445787
URL: http://svn.apache.org/r1445787
Log:
HBASE-6802 Export Snapshot (Matteo Bertozzi)
Added:
hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java
hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/
hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java
hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/
hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java
Modified:
hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java?rev=1445787&r1=1445786&r2=1445787&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java Wed Feb 13 18:11:38 2013
@@ -244,6 +244,17 @@ public class SnapshotDescriptionUtils {
}
/**
+ * Get the directory to build a snapshot, before it is finalized
+ * @param snapshotName name of the snapshot
+ * @param rootDir root directory of the hbase installation
+ * @return {@link Path} where one can build a snapshot
+ */
+ public static Path getWorkingSnapshotDir(String snapshotName, final Path rootDir) {
+ return getCompletedSnapshotDir(new Path(getSnapshotsDir(rootDir), SNAPSHOT_TMP_DIR_NAME),
+ snapshotName);
+ }
+
+ /**
* Get the directory to store the snapshot instance
* @param snapshotsDir hbase-global directory for storing all snapshots
* @param snapshotName name of the snapshot to take
Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java?rev=1445787&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/exception/ExportSnapshotException.java Wed Feb 13 18:11:38 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot.exception;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown when a snapshot could not be exported due to an error during the operation.
+ */
+@InterfaceAudience.Public
+@SuppressWarnings("serial")
+public class ExportSnapshotException extends HBaseSnapshotException {
+
+ /**
+ * @param msg message describing the exception
+ */
+ public ExportSnapshotException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param message message describing the exception
+ * @param e cause
+ */
+ public ExportSnapshotException(String message, Exception e) {
+ super(message, e);
+ }
+}
Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java?rev=1445787&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/tool/ExportSnapshot.java Wed Feb 13 18:11:38 2013
@@ -0,0 +1,714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot.tool;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.mapreduce.JobUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.snapshot.exception.ExportSnapshotException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Export the specified snapshot to a given FileSystem.
+ *
+ * The .snapshot/name folder is copied to the destination cluster
+ * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
+ * When everything is done, the second cluster can restore the snapshot.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class ExportSnapshot extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
+
+ private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
+ private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
+ private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
+ private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
+ private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
+ private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
+
+ private static final String INPUT_FOLDER_PREFIX = "export-files.";
+
+ // Export Map-Reduce Counters, to keep track of the progress
+ public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED };
+
+ private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
+ final static int REPORT_SIZE = 1 * 1024 * 1024;
+ final static int BUFFER_SIZE = 64 * 1024;
+
+ private boolean verifyChecksum;
+ private String filesGroup;
+ private String filesUser;
+ private short filesMode;
+
+ private FileSystem outputFs;
+ private Path outputArchive;
+ private Path outputRoot;
+
+ private FileSystem inputFs;
+ private Path inputArchive;
+ private Path inputRoot;
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
+
+ filesGroup = conf.get(CONF_FILES_GROUP);
+ filesUser = conf.get(CONF_FILES_USER);
+ filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
+ outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
+ inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
+
+ inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
+ outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+ try {
+ inputFs = FileSystem.get(inputRoot.toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e);
+ }
+
+ try {
+ outputFs = FileSystem.get(outputRoot.toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e);
+ }
+ }
+
+ @Override
+ public void cleanup(Context context) {
+ if (outputFs != null) {
+ try {
+ outputFs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing output FileSystem", e);
+ }
+ }
+
+ if (inputFs != null) {
+ try {
+ inputFs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing input FileSystem", e);
+ }
+ }
+ }
+
+ @Override
+ public void map(Text key, NullWritable value, Context context)
+ throws InterruptedException, IOException {
+ Path inputPath = new Path(key.toString());
+ Path outputPath = getOutputPath(inputPath);
+
+ LOG.info("copy file input=" + inputPath + " output=" + outputPath);
+ if (copyFile(context, inputPath, outputPath)) {
+ LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
+ }
+ }
+
+ /**
+ * Returns the location where the inputPath will be copied.
+ * - hfiles are encoded as hfile links hfile-region-table
+ * - logs are encoded as serverName/logName
+ */
+ private Path getOutputPath(final Path inputPath) throws IOException {
+ Path path;
+ if (HFileLink.isHFileLink(inputPath)) {
+ String family = inputPath.getParent().getName();
+ String table = HFileLink.getReferencedTableName(inputPath.getName());
+ String region = HFileLink.getReferencedRegionName(inputPath.getName());
+ String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
+ path = new Path(table, new Path(region, new Path(family, hfile)));
+ } else if (isHLogLinkPath(inputPath)) {
+ String logName = inputPath.getName();
+ path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
+ } else {
+ path = inputPath;
+ }
+ return new Path(outputArchive, path);
+ }
+
+ private boolean copyFile(final Context context, final Path inputPath, final Path outputPath)
+ throws IOException {
+ FSDataInputStream in = openSourceFile(inputPath);
+ if (in == null) {
+ context.getCounter(Counter.MISSING_FILES).increment(1);
+ return false;
+ }
+
+ try {
+ // Verify if the input file exists
+ FileStatus inputStat = getFileStatus(inputFs, inputPath);
+ if (inputStat == null) return false;
+
+ // Verify if the output file exists and is the same that we want to copy
+ FileStatus outputStat = getFileStatus(outputFs, outputPath);
+ if (outputStat != null && sameFile(inputStat, outputStat)) {
+ LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
+ return true;
+ }
+
+ context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
+
+ // Ensure that the output folder is there and copy the file
+ outputFs.mkdirs(outputPath.getParent());
+ FSDataOutputStream out = outputFs.create(outputPath, true);
+ try {
+ if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen()))
+ return false;
+ } finally {
+ out.close();
+ }
+
+ // Preserve attributes
+ return preserveAttributes(outputPath, inputStat);
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Preserve the files attribute selected by the user copying them from the source file
+ */
+ private boolean preserveAttributes(final Path path, final FileStatus refStat) {
+ FileStatus stat;
+ try {
+ stat = outputFs.getFileStatus(path);
+ } catch (IOException e) {
+ LOG.warn("Unable to get the status for file=" + path);
+ return false;
+ }
+
+ try {
+ if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
+ outputFs.setPermission(path, new FsPermission(filesMode));
+ } else if (!stat.getPermission().equals(refStat.getPermission())) {
+ outputFs.setPermission(path, refStat.getPermission());
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to set the permission for file=" + path, e);
+ return false;
+ }
+
+ try {
+ String user = (filesUser != null) ? filesUser : refStat.getOwner();
+ String group = (filesGroup != null) ? filesGroup : refStat.getGroup();
+ if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
+ outputFs.setOwner(path, user, group);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to set the owner/group for file=" + path, e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean copyData(final Context context,
+ final Path inputPath, final FSDataInputStream in,
+ final Path outputPath, final FSDataOutputStream out,
+ final long inputFileSize) {
+ final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
+ " (%.3f%%) from " + inputPath + " to " + outputPath;
+
+ try {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ long totalBytesWritten = 0;
+ int reportBytes = 0;
+ int bytesRead;
+
+ while ((bytesRead = in.read(buffer)) > 0) {
+ out.write(buffer, 0, bytesRead);
+ totalBytesWritten += bytesRead;
+ reportBytes += bytesRead;
+
+ if (reportBytes >= REPORT_SIZE) {
+ context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
+ context.setStatus(String.format(statusMessage,
+ StringUtils.humanReadableInt(totalBytesWritten),
+ reportBytes/(float)inputFileSize));
+ reportBytes = 0;
+ }
+ }
+
+ context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
+ context.setStatus(String.format(statusMessage,
+ StringUtils.humanReadableInt(totalBytesWritten),
+ reportBytes/(float)inputFileSize));
+
+ // Verify that the written size match
+ if (totalBytesWritten != inputFileSize) {
+ LOG.error("number of bytes copied not matching copied=" + totalBytesWritten +
+ " expected=" + inputFileSize + " for file=" + inputPath);
+ context.getCounter(Counter.COPY_FAILED).increment(1);
+ return false;
+ }
+
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error copying " + inputPath + " to " + outputPath, e);
+ context.getCounter(Counter.COPY_FAILED).increment(1);
+ return false;
+ }
+ }
+
+ private FSDataInputStream openSourceFile(final Path path) {
+ try {
+ if (HFileLink.isHFileLink(path)) {
+ return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
+ } else if (isHLogLinkPath(path)) {
+ String serverName = path.getParent().getName();
+ String logName = path.getName();
+ return new HLogLink(inputRoot, serverName, logName).open(inputFs);
+ }
+ return inputFs.open(path);
+ } catch (IOException e) {
+ LOG.error("Unable to open source file=" + path, e);
+ return null;
+ }
+ }
+
+ private FileStatus getFileStatus(final FileSystem fs, final Path path) {
+ try {
+ if (HFileLink.isHFileLink(path)) {
+ Path refPath = HFileLink.getReferencedPath(fs, inputRoot, inputArchive, path);
+ return fs.getFileStatus(refPath);
+ } else if (isHLogLinkPath(path)) {
+ String serverName = path.getParent().getName();
+ String logName = path.getName();
+ return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs);
+ }
+ return fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOG.warn("Unable to get the status for file=" + path);
+ return null;
+ }
+ }
+
+ private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
+ try {
+ return fs.getFileChecksum(path);
+ } catch (IOException e) {
+ LOG.warn("Unable to get checksum for file=" + path, e);
+ return null;
+ }
+ }
+
+ /**
+ * Check if the two files are equal by looking at the file length,
+ * and at the checksum (if user has specified the verifyChecksum flag).
+ */
+ private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
+ // Not matching length
+ if (inputStat.getLen() != outputStat.getLen()) return false;
+
+ // Mark files as equals, since user asked for no checksum verification
+ if (!verifyChecksum) return true;
+
+ // If checksums are not available, files are not the same.
+ FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
+ if (inChecksum == null) return false;
+
+ FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
+ if (outChecksum == null) return false;
+
+ return inChecksum.equals(outChecksum);
+ }
+
+ /**
+ * HLog files are encoded as serverName/logName
+ * and since all the other files should be in /hbase/table/..path..
+ * we can rely on the depth, for now.
+ */
+ private static boolean isHLogLinkPath(final Path path) {
+ return path.depth() == 2;
+ }
+ }
+
+ /**
+ * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
+ * @return list of files referenced by the snapshot (pair of path and size)
+ */
+ private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir) throws IOException {
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+ final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
+ final String table = snapshotDesc.getTable();
+ final Configuration conf = getConf();
+
+ // Get snapshot files
+ SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
+ new SnapshotReferenceUtil.FileVisitor() {
+ public void storeFile (final String region, final String family, final String hfile)
+ throws IOException {
+ Path path = new Path(family, HFileLink.createHFileLinkName(table, region, hfile));
+ long size = fs.getFileStatus(HFileLink.getReferencedPath(conf, fs, path)).getLen();
+ files.add(new Pair<Path, Long>(path, size));
+ }
+
+ public void recoveredEdits (final String region, final String logfile)
+ throws IOException {
+ // copied with the snapshot referenecs
+ }
+
+ public void logFile (final String server, final String logfile)
+ throws IOException {
+ long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
+ files.add(new Pair<Path, Long>(new Path(server, logfile), size));
+ }
+ });
+
+ return files;
+ }
+
+ /**
+ * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
+ * The groups created will have similar amounts of bytes.
+ * <p>
+ * The algorithm used is pretty straightforward; the file list is sorted by size,
+ * and then each group fetch the bigger file available, iterating through groups
+ * alternating the direction.
+ */
+ static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
+ // Sort files by size, from small to big
+ Collections.sort(files, new Comparator<Pair<Path, Long>>() {
+ public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
+ long r = a.getSecond() - b.getSecond();
+ return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
+ }
+ });
+
+ // create balanced groups
+ List<List<Path>> fileGroups = new LinkedList<List<Path>>();
+ long[] sizeGroups = new long[ngroups];
+ int hi = files.size() - 1;
+ int lo = 0;
+
+ List<Path> group;
+ int dir = 1;
+ int g = 0;
+
+ while (hi >= lo) {
+ if (g == fileGroups.size()) {
+ group = new LinkedList<Path>();
+ fileGroups.add(group);
+ } else {
+ group = fileGroups.get(g);
+ }
+
+ Pair<Path, Long> fileInfo = files.get(hi--);
+
+ // add the hi one
+ sizeGroups[g] += fileInfo.getSecond();
+ group.add(fileInfo.getFirst());
+
+ // change direction when at the end or the beginning
+ g += dir;
+ if (g == ngroups) {
+ dir = -1;
+ g = ngroups - 1;
+ } else if (g < 0) {
+ dir = 1;
+ g = 0;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < sizeGroups.length; ++i) {
+ LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
+ }
+ }
+
+ return fileGroups;
+ }
+
+ private static Path getInputFolderPath(Configuration conf)
+ throws IOException, InterruptedException {
+ Path stagingDir = JobUtil.getStagingDir(conf);
+ return new Path(stagingDir, INPUT_FOLDER_PREFIX +
+ String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
+ }
+
+ /**
+ * Create the input files, with the path to copy, for the MR job.
+ * Each input files contains n files, and each input file has a similar amount data to copy.
+ * The number of input files created are based on the number of mappers provided as argument
+ * and the number of the files to copy.
+ */
+ private static Path[] createInputFiles(final Configuration conf,
+ final List<Pair<Path, Long>> snapshotFiles, int mappers)
+ throws IOException, InterruptedException {
+ Path inputFolderPath = getInputFolderPath(conf);
+ FileSystem fs = inputFolderPath.getFileSystem(conf);
+ LOG.debug("Input folder location: " + inputFolderPath);
+
+ List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
+ Path[] inputFiles = new Path[splits.size()];
+
+ Text key = new Text();
+ for (int i = 0; i < inputFiles.length; i++) {
+ List<Path> files = splits.get(i);
+ inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
+ Text.class, NullWritable.class);
+ LOG.debug("Input split: " + i);
+ try {
+ for (Path file: files) {
+ LOG.debug(file.toString());
+ key.set(file.toString());
+ writer.append(key, NullWritable.get());
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ return inputFiles;
+ }
+
+ /**
+ * Run Map-Reduce Job to perform the files copy.
+ */
+ private boolean runCopyJob(final Path inputRoot, final Path outputRoot,
+ final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
+ final String filesUser, final String filesGroup, final int filesMode,
+ final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = getConf();
+ if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
+ if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
+ conf.setInt(CONF_FILES_MODE, filesMode);
+ conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
+ conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
+ conf.set(CONF_INPUT_ROOT, inputRoot.toString());
+ conf.setInt("mapreduce.job.maps", mappers);
+
+ Job job = new Job(conf);
+ job.setJobName("ExportSnapshot");
+ job.setJarByClass(ExportSnapshot.class);
+ job.setMapperClass(ExportMapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setMapSpeculativeExecution(false);
+ job.setNumReduceTasks(0);
+ for (Path path: createInputFiles(conf, snapshotFiles, mappers)) {
+ LOG.debug("Add Input Path=" + path);
+ SequenceFileInputFormat.addInputPath(job, path);
+ }
+
+ return job.waitForCompletion(true);
+ }
+
+ /**
+ * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
+ * @return 0 on success, and != 0 upon failure.
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ boolean verifyChecksum = true;
+ String snapshotName = null;
+ String filesGroup = null;
+ String filesUser = null;
+ Path outputRoot = null;
+ int filesMode = 0;
+ int mappers = getConf().getInt("mapreduce.job.maps", 1);
+
+ // Process command line args
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+ try {
+ if (cmd.equals("-snapshot")) {
+ snapshotName = args[++i];
+ } else if (cmd.equals("-copy-to")) {
+ outputRoot = new Path(args[++i]);
+ } else if (cmd.equals("-no-checksum-verify")) {
+ verifyChecksum = false;
+ } else if (cmd.equals("-mappers")) {
+ mappers = Integer.parseInt(args[++i]);
+ } else if (cmd.equals("-chuser")) {
+ filesUser = args[++i];
+ } else if (cmd.equals("-chgroup")) {
+ filesGroup = args[++i];
+ } else if (cmd.equals("-chmod")) {
+ filesMode = Integer.parseInt(args[++i], 8);
+ } else if (cmd.equals("-h") || cmd.equals("--help")) {
+ printUsageAndExit();
+ } else {
+ System.err.println("UNEXPECTED: " + cmd);
+ printUsageAndExit();
+ }
+ } catch (Exception e) {
+ printUsageAndExit();
+ }
+ }
+
+ // Check user options
+ if (snapshotName == null) {
+ System.err.println("Snapshot name not provided.");
+ printUsageAndExit();
+ }
+
+ if (outputRoot == null) {
+ System.err.println("Destination file-system not provided.");
+ printUsageAndExit();
+ }
+
+ Configuration conf = getConf();
+ Path inputRoot = FSUtils.getRootDir(conf);
+ FileSystem inputFs = FileSystem.get(conf);
+ FileSystem outputFs = FileSystem.get(outputRoot.toUri(), new Configuration());
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
+ Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotName, outputRoot);
+ Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, outputRoot);
+
+ // Check if the snapshot already exists
+ if (outputFs.exists(outputSnapshotDir)) {
+ System.err.println("The snapshot '" + snapshotName +
+ "' already exists in the destination: " + outputSnapshotDir);
+ return 1;
+ }
+
+ // Check if the snapshot already in-progress
+ if (outputFs.exists(snapshotTmpDir)) {
+ System.err.println("A snapshot with the same name '" + snapshotName + "' is in-progress");
+ return 1;
+ }
+
+ // Step 0 - Extract snapshot files to copy
+ final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
+
+ // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
+ // The snapshot references must be copied before the hfiles otherwise the cleaner
+ // will remove them because they are unreferenced.
+ try {
+ FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf);
+ } catch (IOException e) {
+ System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir +
+ " to=" + snapshotTmpDir);
+ e.printStackTrace(System.err);
+ return 1;
+ }
+
+ // Step 2 - Start MR Job to copy files
+ // The snapshot references must be copied before the files otherwise the files gets removed
+ // by the HFileArchiver, since they have no references.
+ try {
+ if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
+ filesUser, filesGroup, filesMode, mappers)) {
+ throw new ExportSnapshotException("Snapshot export failed!");
+ }
+
+ // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
+ if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
+ System.err.println("Snapshot export failed!");
+ System.err.println("Unable to rename snapshot directory from=" +
+ snapshotTmpDir + " to=" + outputSnapshotDir);
+ return 1;
+ }
+
+ return 0;
+ } catch (Exception e) {
+ System.err.println("Snapshot export failed!");
+ e.printStackTrace(System.err);
+ outputFs.delete(outputSnapshotDir, true);
+ return 1;
+ }
+ }
+
+ // ExportSnapshot
+ private void printUsageAndExit() {
+ System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
+ System.err.println(" where [options] are:");
+ System.err.println(" -h|-help Show this help and exit.");
+ System.err.println(" -snapshot NAME Snapshot to restore.");
+ System.err.println(" -copy-to NAME Remote destination hdfs://");
+ System.err.println(" -no-checksum-verify Do not verify checksum.");
+ System.err.println(" -chuser USERNAME Change the owner of the files to the specified one.");
+ System.err.println(" -chgroup GROUP Change the group of the files to the specified one.");
+ System.err.println(" -chmod MODE Change the permission of the files to the specified one.");
+ System.err.println(" -mappers Number of mappers to use during the copy (mapreduce.job.maps).");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" hbase " + getClass() + " \\");
+ System.err.println(" -snapshot MySnapshot -copy-to hdfs:///srv2:8082/hbase \\");
+ System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
+ System.exit(1);
+ }
+
+ /**
+ * The guts of the {@link #main} method.
+ * Call this method to avoid the {@link #main(String[])} System.exit.
+ * @param args
+ * @return errCode
+ * @throws Exception
+ */
+ static int innerMain(final Configuration conf, final String [] args) throws Exception {
+ return ToolRunner.run(conf, new ExportSnapshot(), args);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(innerMain(HBaseConfiguration.create(), args));
+ }
+}
Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java?rev=1445787&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/tool/TestExportSnapshot.java Wed Feb 13 18:11:38 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot.tool;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Category(MediumTests.class)
+public class TestExportSnapshot {
+ private final Log LOG = LogFactory.getLog(getClass());
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private byte[] snapshotName;
+ private byte[] tableName;
+ private HBaseAdmin admin;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Create a table and take a snapshot of the table used by the export test.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.admin = TEST_UTIL.getHBaseAdmin();
+
+ long tid = System.currentTimeMillis();
+ tableName = Bytes.toBytes("testtb-" + tid);
+ snapshotName = Bytes.toBytes("snaptb0-" + tid);
+
+ // create Table
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(FAMILY));
+ admin.createTable(htd, null);
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ TEST_UTIL.loadTable(table, FAMILY);
+
+ // take a snapshot
+ admin.disableTable(tableName);
+ admin.snapshot(snapshotName, tableName);
+ admin.enableTable(tableName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.admin.close();
+ }
+
+ /**
+ * Verfy the result of getBalanceSplits() method.
+ * The result are groups of files, used as input list for the "export" mappers.
+ * All the groups should have similar amount of data.
+ *
+ * The input list is a pair of file path and length.
+ * The getBalanceSplits() function sort it by length,
+ * and assign to each group a file, going back and forth through the groups.
+ */
+ @Test
+ public void testBalanceSplit() throws Exception {
+ // Create a list of files
+ List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
+ for (long i = 0; i <= 20; i++) {
+ files.add(new Pair<Path, Long>(new Path("file-" + i), i));
+ }
+
+ // Create 5 groups (total size 210)
+ // group 0: 20, 11, 10, 1 (total size: 42)
+ // group 1: 19, 12, 9, 2 (total size: 42)
+ // group 2: 18, 13, 8, 3 (total size: 42)
+ // group 3: 17, 12, 7, 4 (total size: 42)
+ // group 4: 16, 11, 6, 5 (total size: 42)
+ List<List<Path>> splits = ExportSnapshot.getBalancedSplits(files, 5);
+ assertEquals(5, splits.size());
+ assertEquals(Arrays.asList(new Path("file-20"), new Path("file-11"),
+ new Path("file-10"), new Path("file-1"), new Path("file-0")), splits.get(0));
+ assertEquals(Arrays.asList(new Path("file-19"), new Path("file-12"),
+ new Path("file-9"), new Path("file-2")), splits.get(1));
+ assertEquals(Arrays.asList(new Path("file-18"), new Path("file-13"),
+ new Path("file-8"), new Path("file-3")), splits.get(2));
+ assertEquals(Arrays.asList(new Path("file-17"), new Path("file-14"),
+ new Path("file-7"), new Path("file-4")), splits.get(3));
+ assertEquals(Arrays.asList(new Path("file-16"), new Path("file-15"),
+ new Path("file-6"), new Path("file-5")), splits.get(4));
+ }
+
+ /**
+ * Verify if exported snapshot and copied files matches the original one.
+ */
+ @Test
+ public void testExportFileSystemState() throws Exception {
+ Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
+ URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri();
+ FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+ copyDir = copyDir.makeQualified(fs);
+
+ // Export Snapshot
+ int res = ExportSnapshot.innerMain(TEST_UTIL.getConfiguration(), new String[] {
+ "-snapshot", Bytes.toString(snapshotName),
+ "-copy-to", copyDir.toString()
+ });
+ assertEquals(0, res);
+
+ // Verify File-System state
+ FileStatus[] rootFiles = fs.listStatus(copyDir);
+ assertEquals(2, rootFiles.length);
+ for (FileStatus fileStatus: rootFiles) {
+ String name = fileStatus.getPath().getName();
+ assertTrue(fileStatus.isDir());
+ assertTrue(name.equals(".snapshot") || name.equals(".archive"));
+ }
+
+ // compare the snapshot metadata and verify the hfiles
+ final FileSystem hdfs = FileSystem.get(hdfsUri, TEST_UTIL.getConfiguration());
+ final Path snapshotDir = new Path(".snapshot", Bytes.toString(snapshotName));
+ verifySnapshot(hdfs, new Path(TEST_UTIL.getDefaultRootDirPath(), snapshotDir),
+ fs, new Path(copyDir, snapshotDir));
+ verifyArchive(fs, copyDir, Bytes.toString(snapshotName));
+
+ // Remove the exported dir
+ fs.delete(copyDir, true);
+ }
+
+ /*
+ * verify if the snapshot folder on file-system 1 match the one on file-system 2
+ */
+ private void verifySnapshot(final FileSystem fs1, final Path root1,
+ final FileSystem fs2, final Path root2) throws IOException {
+ Set<String> s = new HashSet<String>();
+ assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2));
+ }
+
+ /*
+ * Verify if the files exists
+ */
+ private void verifyArchive(final FileSystem fs, final Path rootDir, final String snapshotName)
+ throws IOException {
+ final Path exportedSnapshot = new Path(rootDir, new Path(".snapshot", snapshotName));
+ final Path exportedArchive = new Path(rootDir, ".archive");
+ LOG.debug(listFiles(fs, exportedArchive, exportedArchive));
+ SnapshotReferenceUtil.visitReferencedFiles(fs, exportedSnapshot,
+ new SnapshotReferenceUtil.FileVisitor() {
+ public void storeFile (final String region, final String family, final String hfile)
+ throws IOException {
+ verifyNonEmptyFile(new Path(exportedArchive,
+ new Path(Bytes.toString(tableName), new Path(region, new Path(family, hfile)))));
+ }
+
+ public void recoveredEdits (final String region, final String logfile)
+ throws IOException {
+ verifyNonEmptyFile(new Path(exportedSnapshot,
+ new Path(Bytes.toString(tableName), new Path(region, logfile))));
+ }
+
+ public void logFile (final String server, final String logfile)
+ throws IOException {
+ verifyNonEmptyFile(new Path(exportedSnapshot, new Path(server, logfile)));
+ }
+
+ private void verifyNonEmptyFile(final Path path) throws IOException {
+ LOG.debug(path);
+ assertTrue(fs.exists(path));
+ assertTrue(fs.getFileStatus(path).getLen() > 0);
+ }
+ });
+ }
+
+ private Set<String> listFiles(final FileSystem fs, final Path root, final Path dir)
+ throws IOException {
+ Set<String> files = new HashSet<String>();
+ int rootPrefix = root.toString().length();
+ FileStatus[] list = FSUtils.listStatus(fs, dir);
+ if (list != null) {
+ for (FileStatus fstat: list) {
+ LOG.debug(fstat.getPath());
+ if (fstat.isDir()) {
+ files.addAll(listFiles(fs, root, fstat.getPath()));
+ } else {
+ files.add(fstat.getPath().toString().substring(rootPrefix));
+ }
+ }
+ }
+ return files;
+ }
+}
+