You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/20 21:30:06 UTC
[09/50] [abbrv] hbase git commit: HBASE-14135 Merge backup images
(Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
deleted file mode 100644
index ba1b65e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
- * for later bulk importing.
- */
-@InterfaceAudience.Private
-public class HFileSplitterJob extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class);
- final static String NAME = "HFileSplitterJob";
- public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
- public final static String TABLES_KEY = "hfile.input.tables";
- public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
- public HFileSplitterJob() {
- }
-
- protected HFileSplitterJob(final Configuration c) {
- super(c);
- }
-
- /**
- * A mapper that just writes out cells. This one can be used together with
- * {@link KeyValueSortReducer}
- */
- static class HFileCellMapper extends
- Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-
- @Override
- public void map(NullWritable key, KeyValue value, Context context) throws IOException,
- InterruptedException {
- // Convert value to KeyValue if subclass
- if (!value.getClass().equals(KeyValue.class)) {
- value =
- new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
- value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
- value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
- value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
- value.getValueOffset(), value.getValueLength());
- }
- context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
- }
-
- @Override
- public void setup(Context context) throws IOException {
- // do nothing
- }
- }
-
- /**
- * Sets up the actual job.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- public Job createSubmittableJob(String[] args) throws IOException {
- Configuration conf = getConf();
- String inputDirs = args[0];
- String tabName = args[1];
- conf.setStrings(TABLES_KEY, tabName);
- conf.set(FileInputFormat.INPUT_DIR, inputDirs);
- Job job =
- Job.getInstance(conf,
- conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
- job.setJarByClass(HFileSplitterJob.class);
- job.setInputFormatClass(HFileInputFormat.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
- if (hfileOutPath != null) {
- LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
- TableName tableName = TableName.valueOf(tabName);
- job.setMapperClass(HFileCellMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputValueClass(KeyValue.class);
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
- HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
- }
- LOG.debug("success configuring load incremental job");
-
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
- } else {
- throw new IOException("No bulk output directory specified");
- }
- return job;
- }
-
- /**
- * Print usage
- * @param errorMsg Error message. Can be null.
- */
- private void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
- System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
- System.err.println("<table> table to load.\n");
- System.err.println("To generate HFiles for a bulk data load, pass the option:");
- System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
- System.err.println("Other options:");
- System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the HFile splitter");
- System.err.println("For performance also consider the following options:\n"
- + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
- }
-
- /**
- * Main entry point.
- * @param args The command line parameters.
- * @throws Exception When running the job fails.
- */
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args);
- System.exit(ret);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage("Wrong number of arguments: " + args.length);
- System.exit(-1);
- }
- Job job = createSubmittableJob(args);
- int result = job.waitForCompletion(true) ? 0 : 1;
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
new file mode 100644
index 0000000..00c5b83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link BackupMergeJob}
+ * Must be initialized with configuration of a backup destination cluster
+ *
+ */
+
+@InterfaceAudience.Private
+public class MapReduceBackupMergeJob implements BackupMergeJob {
+ public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class);
+
+ protected Tool player;
+ protected Configuration conf;
+
+ public MapReduceBackupMergeJob() {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void run(String[] backupIds) throws IOException {
+ String bulkOutputConfKey;
+
+ // TODO : run player on remote cluster
+ player = new MapReduceHFileSplitterJob();
+ bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file
+ String bids = StringUtils.join(backupIds, ",");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merge backup images " + bids);
+ }
+
+ List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+ boolean finishedTables = false;
+ Connection conn = ConnectionFactory.createConnection(getConf());
+ BackupSystemTable table = new BackupSystemTable(conn);
+ FileSystem fs = FileSystem.get(getConf());
+
+ try {
+
+ // Get exclusive lock on backup system
+ table.startBackupExclusiveOperation();
+ // Start merge operation
+ table.startMergeOperation(backupIds);
+
+ // Select most recent backup id
+ String mergedBackupId = findMostRecentBackupId(backupIds);
+
+ TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+ String backupRoot = null;
+
+ BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+ backupRoot = bInfo.getBackupRootDir();
+
+ for (int i = 0; i < tableNames.length; i++) {
+
+ LOG.info("Merge backup images for " + tableNames[i]);
+
+ // Find input directories for table
+
+ Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+ String dirs = StringUtils.join(dirPaths, ",");
+ Path bulkOutputPath =
+ BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+ getConf(), false);
+ // Delete content if exists
+ if (fs.exists(bulkOutputPath)) {
+ if (!fs.delete(bulkOutputPath, true)) {
+ LOG.warn("Can not delete: " + bulkOutputPath);
+ }
+ }
+ Configuration conf = getConf();
+ conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+ String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+ int result = 0;
+
+ player.setConf(getConf());
+ result = player.run(playerArgs);
+ if (!succeeded(result)) {
+ throw new IOException("Can not merge backup images for " + dirs
+ + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+ }
+ // Add to processed table list
+ processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+ LOG.debug("Merge Job finished:" + result);
+ }
+ List<TableName> tableList = toTableNameList(processedTableList);
+ table.updateProcessedTablesForMerge(tableList);
+ finishedTables = true;
+
+ // Move data
+ for (Pair<TableName, Path> tn : processedTableList) {
+ moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+ }
+
+ // Delete old data and update manifest
+ List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+ deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+ updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+ // Finish merge session
+ table.finishMergeOperation();
+ // Release lock
+ table.finishBackupExclusiveOperation();
+ } catch (RuntimeException e) {
+
+ throw e;
+ } catch (Exception e) {
+ LOG.error(e);
+ if (!finishedTables) {
+ // cleanup bulk directories and finish merge
+ // merge MUST be repeated (no need for repair)
+ cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+ table.finishMergeOperation();
+ table.finishBackupExclusiveOperation();
+ throw new IOException("Backup merge operation failed, you should try it again", e);
+ } else {
+ // backup repair must be run
+ throw new IOException(
+ "Backup merge operation failed, run backup repair tool to restore system's integrity",
+ e);
+ }
+ } finally {
+ table.close();
+ conn.close();
+ }
+ }
+
+ protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
+ ArrayList<Path> list = new ArrayList<Path>();
+ for (Pair<TableName, Path> p : processedTableList) {
+ list.add(p.getSecond());
+ }
+ return list;
+ }
+
+ protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
+ ArrayList<TableName> list = new ArrayList<TableName>();
+ for (Pair<TableName, Path> p : processedTableList) {
+ list.add(p.getFirst());
+ }
+ return list;
+ }
+
+ protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
+ for (Path path : pathList) {
+
+ if (!fs.delete(path, true)) {
+ LOG.warn("Can't delete " + path);
+ }
+ }
+ }
+
+ protected void updateBackupManifest(String backupRoot, String mergedBackupId,
+ List<String> backupsToDelete) throws IllegalArgumentException, IOException {
+
+ BackupManifest manifest =
+ HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
+ manifest.getBackupImage().removeAncestors(backupsToDelete);
+ // save back
+ manifest.store(conf);
+
+ }
+
+ protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
+ String backupRoot) throws IOException {
+
+ // Delete from backup system table
+ try (BackupSystemTable table = new BackupSystemTable(conn);) {
+ for (String backupId : backupIds) {
+ table.deleteBackupInfo(backupId);
+ }
+ }
+
+ // Delete from file system
+ for (String backupId : backupIds) {
+ Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+
+ if (!fs.delete(backupDirPath, true)) {
+ LOG.warn("Could not delete " + backupDirPath);
+ }
+ }
+ }
+
+ protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
+ List<String> list = new ArrayList<String>();
+ for (String id : backupIds) {
+ if (id.equals(mergedBackupId)) {
+ continue;
+ }
+ list.add(id);
+ }
+ return list;
+ }
+
+ protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName,
+ String mergedBackupId) throws IllegalArgumentException, IOException {
+
+ Path dest =
+ new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
+
+ // Delete all in dest
+ if (!fs.delete(dest, true)) {
+ throw new IOException("Could not delete " + dest);
+ }
+
+ FileStatus[] fsts = fs.listStatus(bulkOutputPath);
+ for (FileStatus fst : fsts) {
+ if (fst.isDirectory()) {
+ fs.rename(fst.getPath().getParent(), dest);
+ }
+ }
+
+ }
+
+ protected String findMostRecentBackupId(String[] backupIds) {
+ long recentTimestamp = Long.MIN_VALUE;
+ for (String backupId : backupIds) {
+ long ts = Long.parseLong(backupId.split("_")[1]);
+ if (ts > recentTimestamp) {
+ recentTimestamp = ts;
+ }
+ }
+ return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+ }
+
+ protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
+
+ Set<TableName> allSet = new HashSet<TableName>();
+
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ BackupSystemTable table = new BackupSystemTable(conn);) {
+ for (String backupId : backupIds) {
+ BackupInfo bInfo = table.readBackupInfo(backupId);
+
+ allSet.addAll(bInfo.getTableNames());
+ }
+ }
+
+ TableName[] ret = new TableName[allSet.size()];
+ return allSet.toArray(ret);
+ }
+
+ protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
+ String[] backupIds) throws IOException {
+
+ List<Path> dirs = new ArrayList<Path>();
+
+ for (String backupId : backupIds) {
+ Path fileBackupDirPath =
+ new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
+ if (fs.exists(fileBackupDirPath)) {
+ dirs.add(fileBackupDirPath);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("File: " + fileBackupDirPath + " does not exist.");
+ }
+ }
+ }
+ Path[] ret = new Path[dirs.size()];
+ return dirs.toArray(ret);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
new file mode 100644
index 0000000..49e8c75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class MapReduceHFileSplitterJob extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class);
+ final static String NAME = "HFileSplitterJob";
+ public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+ public final static String TABLES_KEY = "hfile.input.tables";
+ public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ public MapReduceHFileSplitterJob() {
+ }
+
+ protected MapReduceHFileSplitterJob(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * A mapper that just writes out cells. This one can be used together with
+ * {@link KeyValueSortReducer}
+ */
+ static class HFileCellMapper extends
+ Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+ @Override
+ public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+ InterruptedException {
+ // Convert value to KeyValue if subclass
+ if (!value.getClass().equals(KeyValue.class)) {
+ value =
+ new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+ value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+ value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+ value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+ value.getValueOffset(), value.getValueLength());
+ }
+ context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ // do nothing
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Configuration conf = getConf();
+ String inputDirs = args[0];
+ String tabName = args[1];
+ conf.setStrings(TABLES_KEY, tabName);
+ conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+ Job job =
+ Job.getInstance(conf,
+ conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+ job.setJarByClass(MapReduceHFileSplitterJob.class);
+ job.setInputFormatClass(HFileInputFormat.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ if (hfileOutPath != null) {
+ LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+ TableName tableName = TableName.valueOf(tabName);
+ job.setMapperClass(HFileCellMapper.class);
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputValueClass(KeyValue.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+ }
+ LOG.debug("success configuring load incremental job");
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+ } else {
+ throw new IOException("No bulk output directory specified");
+ }
+ return job;
+ }
+
+ /**
+ * Print usage
+ * @param errorMsg Error message. Can be null.
+ */
+ private void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+ System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+ System.err.println("<table> table to load.\n");
+ System.err.println("To generate HFiles for a bulk data load, pass the option:");
+ System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err.println("Other options:");
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the HFile splitter");
+ System.err.println("For performance also consider the following options:\n"
+ + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
+ }
+
+ /**
+ * Main entry point.
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ System.exit(-1);
+ }
+ Job job = createSubmittableJob(args);
+ int result = job.waitForCompletion(true) ? 0 : 1;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 4161ca9..1209e7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -17,31 +17,31 @@
*/
package org.apache.hadoop.hbase.backup.mapreduce;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.Tool;
+
/**
* MapReduce implementation of {@link RestoreJob}
*
- * For full backup restore, it runs {@link HFileSplitterJob} job and creates
+ * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates
* HFiles which are aligned with a region boundaries of a table being
- * restored, for incremental backup restore it runs {@link WALPlayer} in
- * bulk load mode (creates HFiles from WAL edits).
+ * restored.
*
* The resulting HFiles then are loaded using HBase bulk load tool
* {@link LoadIncrementalHFiles}
@@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
String bulkOutputConfKey;
- player = new HFileSplitterJob();
- bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+ player = new MapReduceHFileSplitterJob();
+ bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file
String dirs = StringUtils.join(dirPaths, ",");
@@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob {
if (LOG.isDebugEnabled()) {
LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+ " backup from directory " + dirs + " from hbase tables "
- + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) +
- " to tables "
+ + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
+ + " to tables "
+ StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
}
@@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob {
LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
- Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+ Path bulkOutputPath =
+ BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]),
+ getConf());
Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString());
String[] playerArgs =
- { dirs,
- fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
- };
+ {
+ dirs,
+ fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i]
+ .getNameAsString() };
int result = 0;
int loaderResult = 0;
@@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob {
result = player.run(playerArgs);
if (succeeded(result)) {
// do bulk load
- LoadIncrementalHFiles loader = createLoader(getConf());
+ LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
}
@@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob {
}
LOG.debug("Restore Job finished:" + result);
} catch (Exception e) {
+ LOG.error(e);
throw new IOException("Can not restore from backup directory " + dirs
+ " (check Hadoop and HBase logs) ", e);
}
-
- }
- }
-
- private String getFileNameCompatibleString(TableName table) {
- return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
- }
-
- private boolean failed(int result) {
- return result != 0;
- }
-
- private boolean succeeded(int result) {
- return result == 0;
- }
-
- public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
- // set configuration for restore:
- // LoadIncrementalHFile needs more time
- // <name>hbase.rpc.timeout</name> <value>600000</value>
- // calculates
- Integer milliSecInHour = 3600000;
- Configuration conf = new Configuration(config);
- conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
- // By default, it is 32 and loader will fail if # of files in any region exceed this
- // limit. Bad for snapshot restore.
- conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
- conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
- LoadIncrementalHFiles loader = null;
- try {
- loader = new LoadIncrementalHFiles(conf);
- } catch (Exception e) {
- throw new IOException(e);
}
- return loader;
- }
-
- private Path getBulkOutputDir(String tableName) throws IOException {
- Configuration conf = getConf();
- FileSystem fs = FileSystem.get(conf);
- String tmp =
- conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path path =
- new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
- + EnvironmentEdgeManager.currentTime());
- fs.deleteOnExit(path);
- return path;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index e32853d..ce77645 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
public final class BackupUtils {
protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
public static final String LOGNAME_SEPARATOR = ".";
+ public static final int MILLISEC_IN_HOUR = 3600000;
private BackupUtils() {
throw new AssertionError("Instantiating utility class...");
}
/**
- * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp
- * value for the RS among the tables.
+ * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+ * for the RS among the tables.
* @param rsLogTimestampMap timestamp map
* @return the min timestamp of each RS
*/
@@ -114,16 +117,17 @@ public final class BackupUtils {
}
/**
- * copy out Table RegionInfo into incremental backup image need to consider move this
- * logic into HBackupFileSystem
+ * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+ * HBackupFileSystem
* @param conn connection
* @param backupInfo backup info
* @param conf configuration
* @throws IOException exception
* @throws InterruptedException exception
*/
- public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
- Configuration conf) throws IOException, InterruptedException {
+ public static void
+ copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
+ throws IOException, InterruptedException {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
@@ -152,10 +156,8 @@ public final class BackupUtils {
LOG.debug("Starting to write region info for table " + table);
for (HRegionInfo regionInfo : regions) {
Path regionDir =
- HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)),
- regionInfo);
- regionDir =
- new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+ HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
+ regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
}
LOG.debug("Finished writing region info for table " + table);
@@ -301,7 +303,6 @@ public final class BackupUtils {
return ret;
}
-
/**
* Check whether the backup path exist
* @param backupStr backup
@@ -431,8 +432,7 @@ public final class BackupUtils {
* @param conf configuration
* @throws IOException exception
*/
- private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf)
- throws IOException {
+ private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException {
String logDir = backupInfo.getHLogTargetDir();
if (logDir == null) {
@@ -452,7 +452,6 @@ public final class BackupUtils {
}
}
-
private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
try {
// clean up the data at target directory
@@ -498,8 +497,8 @@ public final class BackupUtils {
* @param tableName table name
* @return backupPath String for the particular table
*/
- public static String getTableBackupDir(String backupRootDir, String backupId,
- TableName tableName) {
+ public static String
+ getTableBackupDir(String backupRootDir, String backupId, TableName tableName) {
return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+ tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
+ Path.SEPARATOR;
@@ -523,7 +522,6 @@ public final class BackupUtils {
return list;
}
-
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
* differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
@@ -655,19 +653,16 @@ public final class BackupUtils {
* @param backupId backup id
* @param check check only
* @param fromTables table list from
- * @param toTables table list to
+ * @param toTables table list to
* @param isOverwrite overwrite data
* @return request obkect
*/
public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
RestoreRequest.Builder builder = new RestoreRequest.Builder();
- RestoreRequest request = builder.withBackupRootDir(backupRootDir)
- .withBackupId(backupId)
- .withCheck(check)
- .withFromTables(fromTables)
- .withToTables(toTables)
- .withOvewrite(isOverwrite).build();
+ RestoreRequest request =
+ builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+ .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
return request;
}
@@ -699,4 +694,54 @@ public final class BackupUtils {
return isValid;
}
+ public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String tmp =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path path =
+ new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ + EnvironmentEdgeManager.currentTime());
+ if (deleteOnExit) {
+ fs.deleteOnExit(path);
+ }
+ return path;
+ }
+
+ public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
+ return getBulkOutputDir(tableName, conf, true);
+ }
+
+ public static String getFileNameCompatibleString(TableName table) {
+ return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+ }
+
+ public static boolean failed(int result) {
+ return result != 0;
+ }
+
+ public static boolean succeeded(int result) {
+ return result == 0;
+ }
+
+ public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+ // set configuration for restore:
+ // LoadIncrementalHFile needs more time
+ // <name>hbase.rpc.timeout</name> <value>600000</value>
+ // calculates
+ Configuration conf = new Configuration(config);
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR);
+
+ // By default, it is 32 and loader will fail if # of files in any region exceed this
+ // limit. Bad for snapshot restore.
+ conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+ LoadIncrementalHFiles loader = null;
+ try {
+ loader = new LoadIncrementalHFiles(conf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return loader;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
new file mode 100644
index 0000000..7011ed3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
+ private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class);
+
+ static enum FailurePhase {
+ PHASE1, PHASE2, PHASE3, PHASE4
+ }
+ public final static String FAILURE_PHASE_KEY = "failurePhase";
+
+ static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
+
+ FailurePhase failurePhase;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ String val = conf.get(FAILURE_PHASE_KEY);
+ if (val != null) {
+ failurePhase = FailurePhase.valueOf(val);
+ } else {
+ Assert.fail("Failure phase is not set");
+ }
+ }
+
+
+ /**
+ * This is the exact copy of parent's run() with injections
+ * of different types of failures
+ */
+ @Override
+ public void run(String[] backupIds) throws IOException {
+ String bulkOutputConfKey;
+
+ // TODO : run player on remote cluster
+ player = new MapReduceHFileSplitterJob();
+ bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file
+ String bids = StringUtils.join(backupIds, ",");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merge backup images " + bids);
+ }
+
+ List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+ boolean finishedTables = false;
+ Connection conn = ConnectionFactory.createConnection(getConf());
+ BackupSystemTable table = new BackupSystemTable(conn);
+ FileSystem fs = FileSystem.get(getConf());
+
+ try {
+
+ // Start backup exclusive operation
+ table.startBackupExclusiveOperation();
+ // Start merge operation
+ table.startMergeOperation(backupIds);
+
+ // Select most recent backup id
+ String mergedBackupId = findMostRecentBackupId(backupIds);
+
+ TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+ String backupRoot = null;
+
+ BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+ backupRoot = bInfo.getBackupRootDir();
+ // PHASE 1
+ checkFailure(FailurePhase.PHASE1);
+
+ for (int i = 0; i < tableNames.length; i++) {
+
+ LOG.info("Merge backup images for " + tableNames[i]);
+
+ // Find input directories for table
+
+ Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+ String dirs = StringUtils.join(dirPaths, ",");
+ Path bulkOutputPath =
+ BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+ getConf(), false);
+ // Delete content if exists
+ if (fs.exists(bulkOutputPath)) {
+ if (!fs.delete(bulkOutputPath, true)) {
+ LOG.warn("Can not delete: " + bulkOutputPath);
+ }
+ }
+ Configuration conf = getConf();
+ conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+ String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+ int result = 0;
+ // PHASE 2
+ checkFailure(FailurePhase.PHASE2);
+ player.setConf(getConf());
+ result = player.run(playerArgs);
+ if (succeeded(result)) {
+ // Add to processed table list
+ processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+ } else {
+ throw new IOException("Can not merge backup images for " + dirs
+ + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+ }
+ LOG.debug("Merge Job finished:" + result);
+ }
+ List<TableName> tableList = toTableNameList(processedTableList);
+ // PHASE 3
+ checkFailure(FailurePhase.PHASE3);
+ table.updateProcessedTablesForMerge(tableList);
+ finishedTables = true;
+
+ // Move data
+ for (Pair<TableName, Path> tn : processedTableList) {
+ moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+ }
+ // PHASE 4
+ checkFailure(FailurePhase.PHASE4);
+ // Delete old data and update manifest
+ List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+ deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+ updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+ // Finish merge session
+ table.finishMergeOperation();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error(e);
+ if (!finishedTables) {
+ // cleanup bulk directories and finish merge
+ // merge MUST be repeated (no need for repair)
+ cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+ table.finishMergeOperation();
+ table.finishBackupExclusiveOperation();
+ throw new IOException("Backup merge operation failed, you should try it again", e);
+ } else {
+ // backup repair must be run
+ throw new IOException(
+ "Backup merge operation failed, run backup repair tool to restore system's integrity",
+ e);
+ }
+ } finally {
+ table.close();
+ conn.close();
+ }
+
+ }
+
+ private void checkFailure(FailurePhase phase) throws IOException {
+ if ( failurePhase != null && failurePhase == phase) {
+ throw new IOException (phase.toString());
+ }
+ }
+
+ }
+
+
+ @Test
+ public void TestIncBackupMergeRestore() throws Exception {
+
+ int ADD_ROWS = 99;
+ // #1 - create full backup for all tables
+ LOG.info("create full backup image for all tables");
+
+ List<TableName> tables = Lists.newArrayList(table1, table2);
+ // Set custom Merge Job implementation
+ conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
+ BackupMergeJobWithFailures.class, BackupMergeJob.class);
+
+ Connection conn = ConnectionFactory.createConnection(conf1);
+
+ HBaseAdmin admin = null;
+ admin = (HBaseAdmin) conn.getAdmin();
+ BackupAdminImpl client = new BackupAdminImpl(conn);
+
+ BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+ String backupIdFull = client.backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdFull));
+
+ // #2 - insert some data to table1
+ HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+ Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+ t1.close();
+ LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+ HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+ Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+ t2.close();
+ LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+ // #3 - incremental backup for multiple tables
+ tables = Lists.newArrayList(table1, table2);
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple = client.backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdIncMultiple));
+
+ t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+ t1.close();
+
+ t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+ t2.close();
+
+ // #3 - incremental backup for multiple tables
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple2 = client.backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+ // #4 Merge backup images with failures
+
+ for ( FailurePhase phase : FailurePhase.values()) {
+ Configuration conf = conn.getConfiguration();
+
+ conf.set(FAILURE_PHASE_KEY, phase.toString());
+
+ try (BackupAdmin bAdmin = new BackupAdminImpl(conn);)
+ {
+ String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+ bAdmin.mergeBackups(backups);
+ Assert.fail("Expected IOException");
+ } catch (IOException e) {
+ BackupSystemTable table = new BackupSystemTable(conn);
+ if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
+ // No need to repair:
+ // Both Merge and backup exclusive operations are finished
+ assertFalse(table.isMergeInProgress());
+ try {
+ table.finishBackupExclusiveOperation();
+ Assert.fail("IOException is expected");
+ } catch(IOException ee) {
+ // Expected
+ }
+ } else {
+ // Repair is required
+ assertTrue(table.isMergeInProgress());
+ try {
+ table.startBackupExclusiveOperation();
+ Assert.fail("IOException is expected");
+ } catch(IOException ee) {
+ // Expected - clean up before proceeding
+ table.finishMergeOperation();
+ table.finishBackupExclusiveOperation();
+ }
+ }
+ table.close();
+ LOG.debug("Expected :"+ e.getMessage());
+ }
+ }
+
+ // Now merge w/o failures
+ Configuration conf = conn.getConfiguration();
+ conf.unset(FAILURE_PHASE_KEY);
+ conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
+
+ try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) {
+ String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+ bAdmin.mergeBackups(backups);
+ }
+
+ // #6 - restore incremental backup for multiple tables, with overwrite
+ TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+ TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+ tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+ Table hTable = conn.getTable(table1_restore);
+ LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+ Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+
+ hTable.close();
+
+ hTable = conn.getTable(table2_restore);
+ Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+ hTable.close();
+
+ admin.close();
+ conn.close();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
index 9c47641..556521f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
@@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase {
admin.restoreSnapshot(snapshotName);
admin.enableTable(BackupSystemTable.getTableName(conf1));
// Start backup session
- table.startBackupSession();
+ table.startBackupExclusiveOperation();
// Start delete operation
table.startDeleteOperation(backupIds);