You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/20 21:30:06 UTC

[09/50] [abbrv] hbase git commit: HBASE-14135 Merge backup images (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
deleted file mode 100644
index ba1b65e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
- * for later bulk importing.
- */
-@InterfaceAudience.Private
-public class HFileSplitterJob extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class);
-  final static String NAME = "HFileSplitterJob";
-  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
-  public final static String TABLES_KEY = "hfile.input.tables";
-  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
-  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
-  public HFileSplitterJob() {
-  }
-
-  protected HFileSplitterJob(final Configuration c) {
-    super(c);
-  }
-
-  /**
-   * A mapper that just writes out cells. This one can be used together with
-   * {@link KeyValueSortReducer}
-   */
-  static class HFileCellMapper extends
-      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
-
-    @Override
-    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
-        InterruptedException {
-      // Convert value to KeyValue if subclass
-      if (!value.getClass().equals(KeyValue.class)) {
-        value =
-            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
-                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
-                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
-                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
-                value.getValueOffset(), value.getValueLength());
-      }
-      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
-    }
-
-    @Override
-    public void setup(Context context) throws IOException {
-      // do nothing
-    }
-  }
-
-  /**
-   * Sets up the actual job.
-   * @param args The command line parameters.
-   * @return The newly created job.
-   * @throws IOException When setting up the job fails.
-   */
-  public Job createSubmittableJob(String[] args) throws IOException {
-    Configuration conf = getConf();
-    String inputDirs = args[0];
-    String tabName = args[1];
-    conf.setStrings(TABLES_KEY, tabName);
-    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
-    Job job =
-        Job.getInstance(conf,
-          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
-    job.setJarByClass(HFileSplitterJob.class);
-    job.setInputFormatClass(HFileInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-    if (hfileOutPath != null) {
-      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
-      TableName tableName = TableName.valueOf(tabName);
-      job.setMapperClass(HFileCellMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
-      Path outputDir = new Path(hfileOutPath);
-      FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf);
-          Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-      }
-      LOG.debug("success configuring load incremental job");
-
-      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
-    } else {
-      throw new IOException("No bulk output directory specified");
-    }
-    return job;
-  }
-
-  /**
-   * Print usage
-   * @param errorMsg Error message. Can be null.
-   */
-  private void usage(final String errorMsg) {
-    if (errorMsg != null && errorMsg.length() > 0) {
-      System.err.println("ERROR: " + errorMsg);
-    }
-    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
-    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
-    System.err.println("<table>  table to load.\n");
-    System.err.println("To generate HFiles for a bulk data load, pass the option:");
-    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
-    System.err.println("Other options:");
-    System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the HFile splitter");
-    System.err.println("For performance also consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
-  }
-
-  /**
-   * Main entry point.
-   * @param args The command line parameters.
-   * @throws Exception When running the job fails.
-   */
-  public static void main(String[] args) throws Exception {
-    int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args);
-    System.exit(ret);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage("Wrong number of arguments: " + args.length);
-      System.exit(-1);
-    }
-    Job job = createSubmittableJob(args);
-    int result = job.waitForCompletion(true) ? 0 : 1;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
new file mode 100644
index 0000000..00c5b83
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupMergeJob;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link BackupMergeJob}
+ * Must be initialized with configuration of a backup destination cluster
+ *
+ */
+
+@InterfaceAudience.Private
+public class MapReduceBackupMergeJob implements BackupMergeJob {
+  public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class);
+
+  protected Tool player;
+  protected Configuration conf;
+
+  public MapReduceBackupMergeJob() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void run(String[] backupIds) throws IOException {
+    String bulkOutputConfKey;
+
+    // TODO : run player on remote cluster
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file
+    String bids = StringUtils.join(backupIds, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Merge backup images " + bids);
+    }
+
+    List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+    boolean finishedTables = false;
+    Connection conn = ConnectionFactory.createConnection(getConf());
+    BackupSystemTable table = new BackupSystemTable(conn);
+    FileSystem fs = FileSystem.get(getConf());
+
+    try {
+
+      // Get exclusive lock on backup system
+      table.startBackupExclusiveOperation();
+      // Start merge operation
+      table.startMergeOperation(backupIds);
+
+      // Select most recent backup id
+      String mergedBackupId = findMostRecentBackupId(backupIds);
+
+      TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+      String backupRoot = null;
+
+      BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+      backupRoot = bInfo.getBackupRootDir();
+
+      for (int i = 0; i < tableNames.length; i++) {
+
+        LOG.info("Merge backup images for " + tableNames[i]);
+
+        // Find input directories for table
+
+        Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+        String dirs = StringUtils.join(dirPaths, ",");
+        Path bulkOutputPath =
+            BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+              getConf(), false);
+        // Delete content if exists
+        if (fs.exists(bulkOutputPath)) {
+          if (!fs.delete(bulkOutputPath, true)) {
+            LOG.warn("Can not delete: " + bulkOutputPath);
+          }
+        }
+        Configuration conf = getConf();
+        conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+        String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+        int result = 0;
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (!succeeded(result)) {
+          throw new IOException("Can not merge backup images for " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+        }
+        // Add to processed table list
+        processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+        LOG.debug("Merge Job finished:" + result);
+      }
+      List<TableName> tableList = toTableNameList(processedTableList);
+      table.updateProcessedTablesForMerge(tableList);
+      finishedTables = true;
+
+      // Move data
+      for (Pair<TableName, Path> tn : processedTableList) {
+        moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+      }
+
+      // Delete old data and update manifest
+      List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+      deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+      updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+      // Finish merge session
+      table.finishMergeOperation();
+      // Release lock
+      table.finishBackupExclusiveOperation();
+    } catch (RuntimeException e) {
+
+      throw e;
+    } catch (Exception e) {
+      LOG.error(e);
+      if (!finishedTables) {
+        // cleanup bulk directories and finish merge
+        // merge MUST be repeated (no need for repair)
+        cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+        table.finishMergeOperation();
+        table.finishBackupExclusiveOperation();
+        throw new IOException("Backup merge operation failed, you should try it again", e);
+      } else {
+        // backup repair must be run
+        throw new IOException(
+            "Backup merge operation failed, run backup repair tool to restore system's integrity",
+            e);
+      }
+    } finally {
+      table.close();
+      conn.close();
+    }
+  }
+
+  protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<Path> list = new ArrayList<Path>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getSecond());
+    }
+    return list;
+  }
+
+  protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) {
+    ArrayList<TableName> list = new ArrayList<TableName>();
+    for (Pair<TableName, Path> p : processedTableList) {
+      list.add(p.getFirst());
+    }
+    return list;
+  }
+
+  protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException {
+    for (Path path : pathList) {
+
+      if (!fs.delete(path, true)) {
+        LOG.warn("Can't delete " + path);
+      }
+    }
+  }
+
+  protected void updateBackupManifest(String backupRoot, String mergedBackupId,
+      List<String> backupsToDelete) throws IllegalArgumentException, IOException {
+
+    BackupManifest manifest =
+        HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId);
+    manifest.getBackupImage().removeAncestors(backupsToDelete);
+    // save back
+    manifest.store(conf);
+
+  }
+
+  protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs,
+      String backupRoot) throws IOException {
+
+    // Delete from backup system table
+    try (BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        table.deleteBackupInfo(backupId);
+      }
+    }
+
+    // Delete from file system
+    for (String backupId : backupIds) {
+      Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
+
+      if (!fs.delete(backupDirPath, true)) {
+        LOG.warn("Could not delete " + backupDirPath);
+      }
+    }
+  }
+
+  protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) {
+    List<String> list = new ArrayList<String>();
+    for (String id : backupIds) {
+      if (id.equals(mergedBackupId)) {
+        continue;
+      }
+      list.add(id);
+    }
+    return list;
+  }
+
+  protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName,
+      String mergedBackupId) throws IllegalArgumentException, IOException {
+
+    Path dest =
+        new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
+
+    // Delete all in dest
+    if (!fs.delete(dest, true)) {
+      throw new IOException("Could not delete " + dest);
+    }
+
+    FileStatus[] fsts = fs.listStatus(bulkOutputPath);
+    for (FileStatus fst : fsts) {
+      if (fst.isDirectory()) {
+        fs.rename(fst.getPath().getParent(), dest);
+      }
+    }
+
+  }
+
+  protected String findMostRecentBackupId(String[] backupIds) {
+    long recentTimestamp = Long.MIN_VALUE;
+    for (String backupId : backupIds) {
+      long ts = Long.parseLong(backupId.split("_")[1]);
+      if (ts > recentTimestamp) {
+        recentTimestamp = ts;
+      }
+    }
+    return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
+  }
+
+  protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
+
+    Set<TableName> allSet = new HashSet<TableName>();
+
+    try (Connection conn = ConnectionFactory.createConnection(conf);
+        BackupSystemTable table = new BackupSystemTable(conn);) {
+      for (String backupId : backupIds) {
+        BackupInfo bInfo = table.readBackupInfo(backupId);
+
+        allSet.addAll(bInfo.getTableNames());
+      }
+    }
+
+    TableName[] ret = new TableName[allSet.size()];
+    return allSet.toArray(ret);
+  }
+
+  protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName,
+      String[] backupIds) throws IOException {
+
+    List<Path> dirs = new ArrayList<Path>();
+
+    for (String backupId : backupIds) {
+      Path fileBackupDirPath =
+          new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
+      if (fs.exists(fileBackupDirPath)) {
+        dirs.add(fileBackupDirPath);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("File: " + fileBackupDirPath + " does not exist.");
+        }
+      }
+    }
+    Path[] ret = new Path[dirs.size()];
+    return dirs.toArray(ret);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
new file mode 100644
index 0000000..49e8c75
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class MapReduceHFileSplitterJob extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class);
+  final static String NAME = "HFileSplitterJob";
+  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+  public final static String TABLES_KEY = "hfile.input.tables";
+  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public MapReduceHFileSplitterJob() {
+  }
+
+  protected MapReduceHFileSplitterJob(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out cells. This one can be used together with
+   * {@link KeyValueSortReducer}
+   */
+  static class HFileCellMapper extends
+      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+    @Override
+    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+        InterruptedException {
+      // Convert value to KeyValue if subclass
+      if (!value.getClass().equals(KeyValue.class)) {
+        value =
+            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+                value.getValueOffset(), value.getValueLength());
+      }
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // do nothing
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   * @param args The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    String inputDirs = args[0];
+    String tabName = args[1];
+    conf.setStrings(TABLES_KEY, tabName);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+    Job job =
+        Job.getInstance(conf,
+          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    job.setJarByClass(MapReduceHFileSplitterJob.class);
+    job.setInputFormatClass(HFileInputFormat.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+      TableName tableName = TableName.valueOf(tabName);
+      job.setMapperClass(HFileCellMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+      }
+      LOG.debug("success configuring load incremental job");
+
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+    } else {
+      throw new IOException("No bulk output directory specified");
+    }
+    return job;
+  }
+
+  /**
+   * Print usage
+   * @param errorMsg Error message. Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+    System.err.println("<table>  table to load.\n");
+    System.err.println("To generate HFiles for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("Other options:");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the HFile splitter");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    int result = job.waitForCompletion(true) ? 0 : 1;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 4161ca9..1209e7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.hbase.backup.mapreduce;
 
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 
+
 /**
  * MapReduce implementation of {@link RestoreJob}
  *
- * For full backup restore, it runs {@link HFileSplitterJob} job and creates
+ * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates
  * HFiles which are aligned with a region boundaries of a table being
- * restored, for incremental backup restore it runs {@link WALPlayer} in
- * bulk load mode (creates HFiles from WAL edits).
+ * restored.
  *
  * The resulting HFiles then are loaded using HBase bulk load tool
  * {@link LoadIncrementalHFiles}
@@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
 
     String bulkOutputConfKey;
 
-    player = new HFileSplitterJob();
-    bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+    player = new MapReduceHFileSplitterJob();
+    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
     // Player reads all files in arbitrary directory structure and creates
     // a Map task for each file
     String dirs = StringUtils.join(dirPaths, ",");
@@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
           + " backup from directory " + dirs + " from hbase tables "
-          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) +
-          " to tables "
+          + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
+          + " to tables "
           + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
     }
 
@@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob {
 
       LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
 
-      Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+      Path bulkOutputPath =
+          BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]),
+            getConf());
       Configuration conf = getConf();
       conf.set(bulkOutputConfKey, bulkOutputPath.toString());
       String[] playerArgs =
-        { dirs,
-          fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
-        };
+          {
+              dirs,
+              fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i]
+                  .getNameAsString() };
 
       int result = 0;
       int loaderResult = 0;
@@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob {
         result = player.run(playerArgs);
         if (succeeded(result)) {
           // do bulk load
-          LoadIncrementalHFiles loader = createLoader(getConf());
+          LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
           if (LOG.isDebugEnabled()) {
             LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
           }
@@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob {
         }
         LOG.debug("Restore Job finished:" + result);
       } catch (Exception e) {
+        LOG.error(e);
         throw new IOException("Can not restore from backup directory " + dirs
             + " (check Hadoop and HBase logs) ", e);
       }
-
-    }
-  }
-
-  private String getFileNameCompatibleString(TableName table) {
-    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
-  }
-
-  private boolean failed(int result) {
-    return result != 0;
-  }
-
-  private boolean succeeded(int result) {
-    return result == 0;
-  }
-
-  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
-    // set configuration for restore:
-    // LoadIncrementalHFile needs more time
-    // <name>hbase.rpc.timeout</name> <value>600000</value>
-    // calculates
-    Integer milliSecInHour = 3600000;
-    Configuration conf = new Configuration(config);
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
-    // By default, it is 32 and loader will fail if # of files in any region exceed this
-    // limit. Bad for snapshot restore.
-    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
-    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
-    LoadIncrementalHFiles loader = null;
-    try {
-      loader = new LoadIncrementalHFiles(conf);
-    } catch (Exception e) {
-      throw new IOException(e);
     }
-    return loader;
-  }
-
-  private Path getBulkOutputDir(String tableName) throws IOException {
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    String tmp =
-        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path path =
-        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
-            + EnvironmentEdgeManager.currentTime());
-    fs.deleteOnExit(path);
-    return path;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index e32853d..ce77645 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 public final class BackupUtils {
   protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
   public static final String LOGNAME_SEPARATOR = ".";
+  public static final int MILLISEC_IN_HOUR = 3600000;
 
   private BackupUtils() {
     throw new AssertionError("Instantiating utility class...");
   }
 
   /**
-   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp
-   * value for the RS among the tables.
+   * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+   * for the RS among the tables.
    * @param rsLogTimestampMap timestamp map
    * @return the min timestamp of each RS
    */
@@ -114,16 +117,17 @@ public final class BackupUtils {
   }
 
   /**
-   * copy out Table RegionInfo into incremental backup image need to consider move this
-   * logic into HBackupFileSystem
+   * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+   * HBackupFileSystem
    * @param conn connection
    * @param backupInfo backup info
    * @param conf configuration
    * @throws IOException exception
    * @throws InterruptedException exception
    */
-  public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
-      Configuration conf) throws IOException, InterruptedException {
+  public static void
+      copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
+          throws IOException, InterruptedException {
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = rootDir.getFileSystem(conf);
 
@@ -152,10 +156,8 @@ public final class BackupUtils {
       LOG.debug("Starting to write region info for table " + table);
       for (HRegionInfo regionInfo : regions) {
         Path regionDir =
-            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)),
-              regionInfo);
-        regionDir =
-            new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+            HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
+        regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
         writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
       }
       LOG.debug("Finished writing region info for table " + table);
@@ -301,7 +303,6 @@ public final class BackupUtils {
     return ret;
   }
 
-
   /**
    * Check whether the backup path exist
    * @param backupStr backup
@@ -431,8 +432,7 @@ public final class BackupUtils {
    * @param conf configuration
    * @throws IOException exception
    */
-  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf)
-      throws IOException {
+  private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException {
 
     String logDir = backupInfo.getHLogTargetDir();
     if (logDir == null) {
@@ -452,7 +452,6 @@ public final class BackupUtils {
     }
   }
 
-
   private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
     try {
       // clean up the data at target directory
@@ -498,8 +497,8 @@ public final class BackupUtils {
    * @param tableName table name
    * @return backupPath String for the particular table
    */
-  public static String getTableBackupDir(String backupRootDir, String backupId,
-      TableName tableName) {
+  public static String
+      getTableBackupDir(String backupRootDir, String backupId, TableName tableName) {
     return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
         + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
         + Path.SEPARATOR;
@@ -523,7 +522,6 @@ public final class BackupUtils {
     return list;
   }
 
-
   /**
    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
    * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
@@ -655,19 +653,16 @@ public final class BackupUtils {
    * @param backupId backup id
    * @param check check only
    * @param fromTables table list from
-   * @param toTables   table list to
+   * @param toTables table list to
    * @param isOverwrite overwrite data
    * @return request obkect
    */
   public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
       boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
     RestoreRequest.Builder builder = new RestoreRequest.Builder();
-    RestoreRequest request = builder.withBackupRootDir(backupRootDir)
-                                    .withBackupId(backupId)
-                                    .withCheck(check)
-                                    .withFromTables(fromTables)
-                                    .withToTables(toTables)
-                                    .withOvewrite(isOverwrite).build();
+    RestoreRequest request =
+        builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+            .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
     return request;
   }
 
@@ -699,4 +694,54 @@ public final class BackupUtils {
     return isValid;
   }
 
+  public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
+    if (deleteOnExit) {
+      fs.deleteOnExit(path);
+    }
+    return path;
+  }
+
+  public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
+    return getBulkOutputDir(tableName, conf, true);
+  }
+
+  public static String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+  }
+
+  public static boolean failed(int result) {
+    return result != 0;
+  }
+
+  public static boolean succeeded(int result) {
+    return result == 0;
+  }
+
+  public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Configuration conf = new Configuration(config);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR);
+
+    // By default, it is 32 and loader will fail if # of files in any region exceed this
+    // limit. Bad for snapshot restore.
+    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(conf);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return loader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
new file mode 100644
index 0000000..7011ed3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
+  private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class);
+
+  static enum FailurePhase {
+    PHASE1, PHASE2, PHASE3, PHASE4
+  }
+  public final static String FAILURE_PHASE_KEY = "failurePhase";
+
+  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
+
+    FailurePhase failurePhase;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      String val = conf.get(FAILURE_PHASE_KEY);
+      if (val != null) {
+        failurePhase = FailurePhase.valueOf(val);
+      } else {
+        Assert.fail("Failure phase is not set");
+      }
+    }
+
+
+    /**
+     * This is the exact copy of parent's run() with injections
+     * of different types of failures
+     */
+    @Override
+    public void run(String[] backupIds) throws IOException {
+      String bulkOutputConfKey;
+
+      // TODO : run player on remote cluster
+      player = new MapReduceHFileSplitterJob();
+      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+      // Player reads all files in arbitrary directory structure and creates
+      // a Map task for each file
+      String bids = StringUtils.join(backupIds, ",");
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Merge backup images " + bids);
+      }
+
+      List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>();
+      boolean finishedTables = false;
+      Connection conn = ConnectionFactory.createConnection(getConf());
+      BackupSystemTable table = new BackupSystemTable(conn);
+      FileSystem fs = FileSystem.get(getConf());
+
+      try {
+
+        // Start backup exclusive operation
+        table.startBackupExclusiveOperation();
+        // Start merge operation
+        table.startMergeOperation(backupIds);
+
+        // Select most recent backup id
+        String mergedBackupId = findMostRecentBackupId(backupIds);
+
+        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
+        String backupRoot = null;
+
+        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
+        backupRoot = bInfo.getBackupRootDir();
+        // PHASE 1
+        checkFailure(FailurePhase.PHASE1);
+
+        for (int i = 0; i < tableNames.length; i++) {
+
+          LOG.info("Merge backup images for " + tableNames[i]);
+
+          // Find input directories for table
+
+          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
+          String dirs = StringUtils.join(dirPaths, ",");
+          Path bulkOutputPath =
+              BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
+                getConf(), false);
+          // Delete content if exists
+          if (fs.exists(bulkOutputPath)) {
+            if (!fs.delete(bulkOutputPath, true)) {
+              LOG.warn("Can not delete: " + bulkOutputPath);
+            }
+          }
+          Configuration conf = getConf();
+          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+          int result = 0;
+          // PHASE 2
+          checkFailure(FailurePhase.PHASE2);
+          player.setConf(getConf());
+          result = player.run(playerArgs);
+          if (succeeded(result)) {
+            // Add to processed table list
+            processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath));
+          } else {
+            throw new IOException("Can not merge backup images for " + dirs
+                + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+          }
+          LOG.debug("Merge Job finished:" + result);
+        }
+        List<TableName> tableList = toTableNameList(processedTableList);
+        // PHASE 3
+        checkFailure(FailurePhase.PHASE3);
+        table.updateProcessedTablesForMerge(tableList);
+        finishedTables = true;
+
+        // Move data
+        for (Pair<TableName, Path> tn : processedTableList) {
+          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
+        }
+        // PHASE 4
+        checkFailure(FailurePhase.PHASE4);
+        // Delete old data and update manifest
+        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
+        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
+        updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
+        // Finish merge session
+        table.finishMergeOperation();
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.error(e);
+        if (!finishedTables) {
+          // cleanup bulk directories and finish merge
+          // merge MUST be repeated (no need for repair)
+          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
+          table.finishMergeOperation();
+          table.finishBackupExclusiveOperation();
+          throw new IOException("Backup merge operation failed, you should try it again", e);
+        } else {
+          // backup repair must be run
+          throw new IOException(
+              "Backup merge operation failed, run backup repair tool to restore system's integrity",
+              e);
+        }
+      } finally {
+        table.close();
+        conn.close();
+      }
+
+    }
+
+    private void checkFailure(FailurePhase phase) throws IOException {
+      if ( failurePhase != null && failurePhase == phase) {
+        throw new IOException (phase.toString());
+      }
+    }
+
+  }
+
+
+  @Test
+  public void TestIncBackupMergeRestore() throws Exception {
+
+    int ADD_ROWS = 99;
+    // #1 - create full backup for all tables
+    LOG.info("create full backup image for all tables");
+
+    List<TableName> tables = Lists.newArrayList(table1, table2);
+    // Set custom Merge Job implementation
+    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
+      BackupMergeJobWithFailures.class, BackupMergeJob.class);
+
+    Connection conn = ConnectionFactory.createConnection(conf1);
+
+    HBaseAdmin admin = null;
+    admin = (HBaseAdmin) conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+    String backupIdFull = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table1
+    HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t1.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+
+    HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+
+    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+    t2.close();
+    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+
+    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
+    t1.close();
+
+    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
+    t2.close();
+
+    // #3 - incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+
+    // #4 Merge backup images with failures
+
+    for ( FailurePhase phase : FailurePhase.values()) {
+      Configuration conf = conn.getConfiguration();
+
+      conf.set(FAILURE_PHASE_KEY, phase.toString());
+
+      try (BackupAdmin bAdmin = new BackupAdminImpl(conn);)
+      {
+        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+        bAdmin.mergeBackups(backups);
+        Assert.fail("Expected IOException");
+      } catch (IOException e) {
+        BackupSystemTable table = new BackupSystemTable(conn);
+        if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
+          // No need to repair:
+          // Both Merge and backup exclusive operations are finished
+          assertFalse(table.isMergeInProgress());
+          try {
+            table.finishBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected
+          }
+        } else {
+          // Repair is required
+          assertTrue(table.isMergeInProgress());
+          try {
+            table.startBackupExclusiveOperation();
+            Assert.fail("IOException is expected");
+          } catch(IOException ee) {
+            // Expected - clean up before proceeding
+            table.finishMergeOperation();
+            table.finishBackupExclusiveOperation();
+          }
+        }
+        table.close();
+        LOG.debug("Expected :"+ e.getMessage());
+      }
+    }
+
+    // Now merge w/o failures
+    Configuration conf = conn.getConfiguration();
+    conf.unset(FAILURE_PHASE_KEY);
+    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
+
+    try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) {
+      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
+      bAdmin.mergeBackups(backups);
+    }
+
+    // #6 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+    Table hTable = conn.getTable(table1_restore);
+    LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
+    Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
+    hTable.close();
+
+    admin.close();
+    conn.close();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
index 9c47641..556521f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java
@@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase {
     admin.restoreSnapshot(snapshotName);
     admin.enableTable(BackupSystemTable.getTableName(conf1));
     // Start backup session
-    table.startBackupSession();
+    table.startBackupExclusiveOperation();
     // Start delete operation
     table.startDeleteOperation(backupIds);